diff --git a/classes/config.py b/classes/config.py index 63d113e..13f3b74 100644 --- a/classes/config.py +++ b/classes/config.py @@ -12,14 +12,16 @@ class MonsterConfig: if not "MONSTER" in parser.sections(): raise ValueError("Config file does not contain a MONSTER section!") - directories = [] - vessels = [] + config = cls() for section in parser.sections(): if section.startswith("Directory"): - directories.append(Directory.fromConfig(parser[section])) + config.directories.append(Directory.fromConfig(parser[section])) elif section.startswith("Vessel"): - vessels.append(Vessel.fromConfig(parser[section])) + config.vessels.append(Vessel.fromConfig(parser[section])) + + return config def __init__(self): - pass \ No newline at end of file + self.directories = [] + self.vessels = [] diff --git a/classes/connection.py b/classes/connection.py index 22c407a..1ed3abc 100644 --- a/classes/connection.py +++ b/classes/connection.py @@ -30,21 +30,21 @@ class Connection: def _mkdir(self, path): return self._sftp.mkdir(str(path)) - def _listdir(self, path): - return self._sftp.listdir(str(path)) + def _listdir(self, path=None): + return self._sftp.listdir(str(path) if path else None) def _remove(self, path): return self._sftp.remove(str(path)) - def assertTempDirectory(self, directory): - for d in [directory, directory.tempdir]: + def assertDirectories(self, directory): + for d in [directory, self._vessel.tempdir]: if not self._exists(d): self._mkdir(d) elif not self._isdir(d): raise ValueError(f"{d} exists but is not a directory on Vessel {self._vessel.name}!") - def assertChunkComplete(self, chunk): - path = chunk.file.directory.tempdir / chunk.getTempName() + def assertChunkComplete(self, chunk, path=None): + path = path or self._vessel.tempdir / chunk.getTempName() if self._exists(path): _,o,_ = self._client.exec_command("sha256sum -b " + str(path)) @@ -56,20 +56,51 @@ class Connection: return False def pushChunk(self, chunk): - path = chunk.file.directory.tempdir / chunk.getTempName() + path = self._vessel.tempdir / chunk.getTempName() flo = BytesIO(chunk.data) self._sftp.putfo(flo, path, len(chunk.data)) def compileComplete(self, remotefile): numchunks = remotefile.getStatus() + 1 - files = " ".join([str(remotefile.file.directory.tempdir / f"{remotefile.file.uuid}_{i}.part") for i in range(numchunks)]) + files = " ".join([str(self._vessel.tempdir / f"{remotefile.file.uuid}_{i}.part") for i in range(numchunks)]) completefile = remotefile.file.getChunk(-1) outname = completefile.getTempName() - outpath = remotefile.file.directory.tempdir / outname + outpath = self._vessel.tempdir / outname _,o,_ = self._client.exec_command(f"cat {files} > {outpath}") o.channel.recv_exit_status() - return self.assertChunkComplete(completefile) + def assertComplete(self, remotefile, allow_retry=False): + completefile = remotefile.file.getChunk(-1) + outname = completefile.getTempName() + outpath = self._vessel.tempdir / outname + + if not self._exists(outpath): + return False + + if not self.assertChunkComplete(completefile): + if allow_retry: + self._remove(outpath) + else: + self.clearTempDir() + return False + + return True + + def moveComplete(self, remotefile): + completefile = remotefile.file.getChunk(-1) + destination = remotefile.getFullPath() + self._sftp.rename(str(self._vessel.tempdir / completefile.getTempName()), str(destination)) + self._sftp.stat(str(destination)) + return True + + def getCurrentUploadUUID(self): + for f in self._listdir(self._vessel.tempdir): + if f.endswith(".part"): + return f.split("_")[0] + + def clearTempDir(self): + for f in self._listdir(self._vessel.tempdir): + self._remove(self._vessel.tempdir / f) def __del__(self): self._client.close() \ No newline at end of file diff --git a/classes/database.py b/classes/database.py index 0470d86..9cca443 100644 --- a/classes/database.py +++ b/classes/database.py @@ -48,15 +48,22 @@ class Database: fileuuid = str(uuid.uuid4()) self._execute("INSERT INTO contentmonster_file(uuid, directory, name, checksum) VALUES (?, ?, ?, ?)", (fileuuid, fileobj.directory.name, fileobj.name, hash)) return fileuuid + + def getFileByUUID(self, fileuuid): + cur = self.getCursor() + cur.execute("SELECT directory, name, checksum FROM contentmonster_file WHERE uuid = ?", (fileuuid ,)) + if (result := cur.fetchone()): + return result def removeFileByUUID(self, fileuuid): self._execute("DELETE FROM contentmonster_file WHERE uuid = ?", (fileuuid,)) - def logStart(self, file, vessel): - self._execute("INSERT INTO contentmonster_file_log(file, vessel, status) VALUES(?, ?, ?)", (file.uuid, vessel.name, False)) - def logCompletion(self, file, vessel): - self._execute("UPDATE contentmonster_file_log SET status = ? WHERE file = ? AND vessel = ?", (True, file.uuid, vessel.name)) + self._execute("INSERT INTO contentmonster_file_log(file, vessel) VALUES(?, ?)", (file.uuid, vessel.name)) + + def getCompletionForVessel(self, vessel): + cur = self.getCursor() + cur.execute("SELECT file FROM contentmonster_file_log WHERE vessel = ?", (vessel.name,)) def migrate(self): cur = self.getCursor() @@ -68,7 +75,7 @@ class Database: if self.getVersion() == 1: cur.execute("CREATE TABLE IF NOT EXISTS contentmonster_file(uuid VARCHAR(36) PRIMARY KEY, directory VARCHAR(128), name VARCHAR(128), checksum VARCHAR(64))") - cur.execute("CREATE TABLE IF NOT EXISTS contentmonster_file_log(file VARCHAR(36), vessel VARCHAR(128), status BOOLEAN, PRIMARY KEY (file, vessel), FOREIGN KEY (file) REFERENCES contentmonster_files(uuid) ON DELETE CASCADE)") + cur.execute("CREATE TABLE IF NOT EXISTS contentmonster_file_log(file VARCHAR(36), vessel VARCHAR(128), PRIMARY KEY (file, vessel), FOREIGN KEY (file) REFERENCES contentmonster_files(uuid) ON DELETE CASCADE)") cur.execute("UPDATE contentmonster_settings SET value = '2' WHERE key = 'dbversion'") self.commit() diff --git a/classes/directory.py b/classes/directory.py index 03c6c05..b0fe374 100644 --- a/classes/directory.py +++ b/classes/directory.py @@ -21,8 +21,4 @@ class Directory: def getFiles(self): files = [f for f in os.listdir(self.location) if os.path.isfile] - return [File(f, self) for f in files] - - @property - def tempdir(self): - return self.location / ".temp" \ No newline at end of file + return [File(f, self) for f in files] \ No newline at end of file diff --git a/classes/doghandler.py b/classes/doghandler.py new file mode 100644 index 0000000..01177f1 --- /dev/null +++ b/classes/doghandler.py @@ -0,0 +1,18 @@ +from watchdog.events import FileSystemEventHandler + +class DogHandler(FileSystemEventHandler): + def __init__(self, queue, *args, **kwargs): + super().__init__(*args, **kwargs) + self._queue = queue + + def on_created(self, event): + pass + + def on_modified(self, event): + pass + + def on_moved(self, event): + pass + + def on_deleted(self, event): + pass \ No newline at end of file diff --git a/classes/remotefile.py b/classes/remotefile.py index 16c4691..276dcc0 100644 --- a/classes/remotefile.py +++ b/classes/remotefile.py @@ -5,11 +5,10 @@ class RemoteFile: def __init__(self, fileobj, vessel, chunksize=1048576): self.file = fileobj self.vessel = vessel - self.tempdir = self.vessel.connection.assertTempDirectory(self.file.directory) self.chunksize = chunksize def getStatus(self): - ls = self.vessel.connection._listdir(self.tempdir) + ls = self.vessel.connection._listdir(self.vessel.tempdir) files = [f for f in ls if f.startswith(self.file.uuid) and f.endswith(".part")] ids = [-1] diff --git a/classes/shorethread.py b/classes/shorethread.py new file mode 100644 index 0000000..38e2e5c --- /dev/null +++ b/classes/shorethread.py @@ -0,0 +1,37 @@ +from classes.config import MonsterConfig +from classes.doghandler import DogHandler + +from watchdog.observers import Observer + +from multiprocessing import Process, Queue + +import time + +class ShoreThread: + def __init__(self, files): + super().__init__() + self._config = MonsterConfig() + self._dogs = [] + self.files = files + self.queue = Queue() + + def getAllFiles(self): + files = [] + + for directory in self._config.directories: + files.append(directory.getFiles()) + + return files + + def clearFiles(self): + del self.files[:] + + def monitor(self): + for directory in self._config.directories: + dog = DogHandler(self.queue) + + self._dogs.append(dog) + + def run(self): + print("Launched Shore Thread") + self.clearFiles() diff --git a/classes/vessel.py b/classes/vessel.py index dcd6cf4..5d03f10 100644 --- a/classes/vessel.py +++ b/classes/vessel.py @@ -1,28 +1,48 @@ from classes.connection import Connection +from classes.database import Database +from classes.file import File from paramiko.ssh_exception import SSHException +import pathlib + class Vessel: @classmethod def fromConfig(cls, config): + if "TempDir" in config.keys(): + tempdir = config["TempDir"] + else: + tempdir = "/tmp/.ContentMonster/" if "Address" in config.keys(): - return cls(config.name.split()[1], config["Address"]) + return cls(config.name.split()[1], config["Address"], pathlib.Path(tempdir)) else: raise ValueError("Definition for Vessel " + config.name.split()[1] + " does not contain Address!") - def __init__(self, name: str, address: str): + def __init__(self, name: str, address: str, tempdir: pathlib.Path): self.name = name self.address = address + self.tempdir = tempdir self._connection = None + self._uploaded = self.getUploadedFromDB() @property def connection(self): if self._connection: try: self._connection._listdir() - return self._connection except SSHException: self._connection = None - self._connection = Connection(self) + self._connection = self._connection or Connection(self) + return self._connection - def currentUpload() \ No newline at end of file + def getUploadedFromDB(self): + db = Database() + return db.getCompletionForVessel(self) + + def currentUpload(self): + db = Database() + directory, name, _ = db.getFileByUUID(fileuuid := self.connection.getCurrentUploadUUID()) + return File(name, directory, fileuuid) + + def clearTempDir(self): + return self.connection.clearTempDir() \ No newline at end of file diff --git a/classes/vesselthread.py b/classes/vesselthread.py index 6d1793e..f573df7 100644 --- a/classes/vesselthread.py +++ b/classes/vesselthread.py @@ -1,9 +1,18 @@ from multiprocessing import Process +import time + class VesselThread(Process): def __init__(self, vessel, files): super().__init__() self.vessel = vessel + self.files = files def run(self): - pass \ No newline at end of file + print("Launched Vessel Thread for " + self.vessel.name) + while True: + try: + print(self.files[0]) + except: + print("Nothing.") + time.sleep(10) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index aa35c71..ca9e5d7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ -paramiko \ No newline at end of file +paramiko +watchdog \ No newline at end of file diff --git a/worker.py b/worker.py index b543820..5ef0536 100644 --- a/worker.py +++ b/worker.py @@ -2,12 +2,32 @@ from classes.config import MonsterConfig from classes.vesselthread import VesselThread +from classes.shorethread import ShoreThread from multiprocessing import Manager import pathlib +import time -config_path = pathlib.Path(__file__).parent.absolute() / "settings.ini" +if __name__ == '__main__': + config_path = pathlib.Path(__file__).parent.absolute() / "settings.ini" + config = MonsterConfig.fromFile(config_path) -config = MonsterConfig.fromFile(settings_path) + with Manager() as manager: + files = manager.list() + threads = [] + + for vessel in config.vessels: + thread = VesselThread(vessel, files) + thread.start() + threads.append(thread) + + try: + shore = ShoreThread(files) + shore.run() + except KeyboardInterrupt: + print("Keyboard interrupt received - stopping threads") + for thread in threads: + thread.kill() + exit() \ No newline at end of file