Start implementation of file replication using ContentMonster

This commit is contained in:
Kumi 2022-09-20 06:59:30 +00:00
parent 3c8c0dce25
commit a3f28a5b85
Signed by: kumi
GPG key ID: ECBCC9082395383F
10 changed files with 313 additions and 1 deletions

0
core/classes/__init__.py Normal file
View file

147
core/classes/replication.py Normal file
View file

@ -0,0 +1,147 @@
import sqlite3
import pathlib
import uuid
from typing import Union, Optional
from contentmonster.classes.file import File as ContentMonsterFile
from contentmonster.classes.directory import Directory as ContentMonsterDirectory
from contentmonster.classes.vessel import Vessel as ContentMonsterVessel
from ..models.replication import ReplicationFile, ReplicationSource, ReplicationFileLog, ReplicationTarget
class ContentMonsterDatabase:
"""Class wrapping Django database for ContentMonster
"""
def commit(self) -> None:
"""noop
"""
pass
def getFileUUID(self, fileobj: ContentMonsterFile) -> str:
"""Retrieve unique identifier for ContentMonsterFile object
Args:
fileobj (ContentMonsterFile): ContentMonsterFile object to retrieve UUID for
Returns:
str: UUID for passed ContentMonsterFile object
"""
hash = fileobj.getHash()
files = ReplicationFile.objects.filter(directory__name=fileobj.directory.name, name=fileobj.name)
# If file with same name and directory exists
for result in files:
# If it has the same hash, it is the same file -> return its UUID
if file.checksum == hash:
fileuuid = result.uuid
# If not, it is a file that can no longer exist -> delete it
else:
self.removeFileByUUID(result.uuid)
# Return found UUID or generate a new one
return fileuuid or self.addFile(fileobj, hash)
def addFile(self, fileobj: ContentMonsterFile, hash: Optional[str] = None) -> str:
"""Adds a new ReplicationFile object to the database
Args:
fileobj (ContentMonsterFile): ContentMonsterFile object to add to database
hash (str, optional): Checksum of the file, if already known.
Defaults to None and will use .getHash() to calculate checksum then.
Returns:
str: UUID of the new ContentMonsterFile record
"""
hash = hash or fileobj.getHash()
fileuuid = str(uuid.uuid4())
directory = ReplicationSource.objects.get(name=fileobj.directory.name)
ReplicationFile.objects.create(uuid=fileuuid, directory=directory, name=fileobj.name, checksum=hash)
return fileuuid
def getFileByUUID(self, fileuuid: str) -> Optional[tuple[str, str, str]]:
"""Get additional information on a ContentMonsterFile by its UUID
Args:
fileuuid (str): The UUID of the ReplicationFile to retrieve from the database
Returns:
tuple: A tuple consisting of (directory, name, checksum), where
"directory" is the name of the Directory object the File is
located in, "name" is the filename (basename) of the File and
checksum is the SHA256 hash of the file at the time of insertion
into the database. None is returned if no such record is found.
"""
try:
result = ReplicationFile.objects.get(uuid=fileuuid)
return (result.directory.name, result.name, result.checksum)
except ReplicationFile.DoesNotExist:
return None
def removeFile(self, directory: ContentMonsterDirectory, name: str) -> None:
"""Remove a ReplicationFile from the database based on ContentMonsterDirectory and filename
Args:
directory (ContentMonsterDirectory): ContentMonsterDirectory object
containing the ContentMonsterFile to remove
name (str): Filename of the ContentMonsterFile to remove
"""
ReplicationFile.objects.filter(directory__name=directory.name, name=name).delete()
def removeFileByUUID(self, fileuuid: str) -> None:
"""Remove a ReplicationFile from the database based on UUID
Args:
fileuuid (str): The UUID of the ContentMonsterFile to remove from the database
"""
ReplicationFile.objects.filter(uuid=fileuuid).delete()
def logCompletion(self, file: ContentMonsterFile, vessel: ContentMonsterVessel):
"""Log the completion of a ContentMonsterFile upload
Args:
file (ContentMonsterFile): The ContentMonsterFile object that has been uploaded
vessel (ContentMonsterVessel): The ContentMonsterVessel the File has been
uploaded to
"""
fileobj = ReplicationFile.objects.get(uuid=file.uuid)
vesselobj = ReplicationTarget.objects.get(name=vessel.name)
ReplicationFileLog.objects.create(file=fileobj, vessel=vesselobj)
def getCompletionForVessel(self, vessel: ContentMonsterVessel) -> list[Optional[str]]:
"""Get completed uploads for a vessel
Args:
vessel (ContentMonsterVessel): The ContentMonsterVessel object to retrieve
uploaded files for
Returns:
list: List of UUIDs of ContentMonsterFiles that have been successfully uploaded
"""
vesselobj = ReplicationTarget.objects.get(name=vessel.name)
objects = ReplicationFileLog.objects.filter(vessel=vesselobj)
return [o.file.uuid for o in objects]
def getCompletionByFileUUID(self, fileuuid: str) -> list[Optional[str]]:
objects = ReplicationFileLog.objects.filter(file__uuid=fileuuid)
return [o.vessel.name for o in objects]
def migrate(self) -> None:
"""noop
"""
pass
def __del__(self):
"""noop
"""
pass

View file

View file

View file

@ -0,0 +1,51 @@
from django.core.management.base import BaseCommand
from django.conf import settings
from ...models.replication import ReplicationSource, ReplicationTarget
from ...classes.replication import ContentMonsterDatabase
from contentmonster.classes.config import MonsterConfig
from contentmonster.classes.vesselthread import VesselThread
from contentmonster.classes.shorethread import ShoreThread
from multiprocessing import Manager
import time
class Command(BaseCommand):
help = 'Runs the file replication service (ContentMonster)'
def handle(self, *args, **kwargs):
config = MonsterConfig()
for source in ReplicationSource.objects.all():
config.directories.append(source.to_directory())
for target in ReplicationTarget.objects.all():
config.vessels.append(target.to_vessel(dbclass=ContentMonsterDatabase))
with Manager() as manager:
state = manager.dict()
state["files"] = manager.list()
state["config"] = config
threads = []
for vessel in config.vessels:
thread = VesselThread(vessel, state, dbclass=ContentMonsterDatabase)
thread.start()
threads.append(thread)
shore = ShoreThread(state, dbclass=ContentMonsterDatabase)
shore.start()
while True:
try:
time.sleep(10)
except KeyboardInterrupt:
print("Keyboard interrupt received - stopping threads")
shore.terminate()
for thread in threads:
thread.terminate()
exit()

View file

@ -0,0 +1,55 @@
# Generated by Django 4.1.1 on 2022-09-20 05:05
import core.validators
from django.db import migrations, models
import django.db.models.deletion
import uuid
class Migration(migrations.Migration):
dependencies = [
('core', '0006_alter_vessel_imo_alter_vessel_mmsi'),
]
operations = [
migrations.CreateModel(
name='ReplicationFile',
fields=[
('uuid', models.UUIDField(default=uuid.uuid4, primary_key=True, serialize=False)),
('name', models.CharField(max_length=128)),
('checksum', models.CharField(max_length=64)),
],
),
migrations.CreateModel(
name='ReplicationSource',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(max_length=128)),
('location', models.CharField(max_length=2048, validators=[core.validators.validate_directory])),
],
),
migrations.CreateModel(
name='ReplicationTarget',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(max_length=128)),
('address', models.CharField(max_length=256)),
('username', models.CharField(blank=True, default='code', max_length=64, null=True)),
],
),
migrations.CreateModel(
name='ReplicationFileLog',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('timestamp', models.DateTimeField(auto_now_add=True)),
('file', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='core.replicationfile')),
('vessel', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='core.replicationtarget')),
],
),
migrations.AddField(
model_name='replicationfile',
name='directory',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='core.replicationsource'),
),
]

View file

@ -1 +1,3 @@
from .auth import User, OTPSession
from .vessel import Vessel
from .replication import ReplicationFile, ReplicationFileLog, ReplicationSource, ReplicationTarget

View file

@ -0,0 +1,48 @@
from django.db import models
from contentmonster.classes.directory import Directory
from contentmonster.classes.vessel import Vessel
from ..validators import validate_directory
from getpass import getuser
import uuid
class ReplicationSource(models.Model):
"""Represents Directory objects in ContentMonster"""
name = models.CharField(max_length=128)
location = models.CharField(max_length=2048, validators=[validate_directory])
def to_directory(self) -> Directory:
return Directory(name, location)
class ReplicationTarget(models.Model):
"""Represents Vessel objects in ContentMonster"""
name = models.CharField(max_length=128)
address = models.CharField(max_length=256)
username = models.CharField(max_length=64, default=getuser(), null=True, blank=True)
def to_vessel(self, dbclass=None) -> Vessel:
return Vessel(name, address, username, dbclass=ContentMonsterDatabase)
class ReplicationFile(models.Model):
"""Represents File objects in ContentMonster"""
uuid = models.UUIDField(primary_key=True, default=uuid.uuid4)
directory = models.ForeignKey(ReplicationSource, models.CASCADE)
name = models.CharField(max_length=128)
checksum = models.CharField(max_length=64)
class ReplicationFileLog(models.Model):
"""Represents File completion in ContentMonster"""
file = models.ForeignKey(ReplicationFile, models.CASCADE)
vessel = models.ForeignKey(ReplicationTarget, models.CASCADE)
timestamp = models.DateTimeField(auto_now_add=True)

8
core/validators.py Normal file
View file

@ -0,0 +1,8 @@
from django.core.exceptions import ValidationError
from pathlib import Path
def validate_directory(value):
if not Path(value).is_dir():
raise ValidationError(f"{value} is not a directory")

View file

@ -11,6 +11,7 @@ django-crispy-forms
dbsettings
django-autosecretkey
pycruisemapper
contentmonster
git+https://kumig.it/kumisystems/reportmonster.git
git+https://kumig.it/kumisystems/pyadonis.git