From 48bc92653d7c613055c10c46bebf724ad179e496 Mon Sep 17 00:00:00 2001 From: Klaus-Uwe Mitterer Date: Thu, 25 Nov 2021 10:40:25 +0100 Subject: [PATCH] Fixing Dog Handler usage Lots of documentation --- classes/chunk.py | 46 +++++++++- classes/connection.py | 192 +++++++++++++++++++++++++++++++++++------ classes/database.py | 43 +++++---- classes/doghandler.py | 20 +++-- classes/file.py | 70 ++++++++++++--- classes/remotefile.py | 13 +-- classes/shorethread.py | 20 +++-- const.py | 4 + worker.py | 15 ++-- 9 files changed, 345 insertions(+), 78 deletions(-) create mode 100644 const.py diff --git a/classes/chunk.py b/classes/chunk.py index fe33592..c1df1bd 100644 --- a/classes/chunk.py +++ b/classes/chunk.py @@ -1,13 +1,51 @@ import hashlib + class Chunk: - def __init__(self, fileobj, count, data): + """A class defining a single chunk of a file to be uploaded""" + + @staticmethod + def fromFile(fileobj, count: int, chunksize: int) -> type[Chunk]: + """Create a new Chunk object from a File + + Args: + fileobj (classes.file.File): The file object from local storage + count (int): Position of the current chunk in the list of total + chunks (first index: 0) or -1 to get the complete file + chunksize (int): Size of each chunk in bytes + + Returns: + A Chunk object containing the portion of the File object beginning + at (count * chunksize) bytes and ending at ((count + 1) * chunksize + - 1) bytes + """ + return fileobj.getChunk(count, chunksize) + + def __init__(self, fileobj, count: int, data: bytes) -> None: + """Initialize a new Chunk object + + Args: + fileobj (classes.file.File): The file object from local storage + count (int): Position of the current chunk in the list of total + chunks (first index: 0) or -1 to get the complete file + data (bytes): Content of the chunk + """ self.file = fileobj self.count = count if count >= 0 else "complete" self.data = data - def getTempName(self): + def getTempName(self) -> str: + """Get filename for this Chunk in the temp directory on the Vessel + + Returns: + str: Filename to use for this chunk in the Vessel tempdir + """ return f"{self.file.uuid}_{self.count}.part" - def getHash(self): - return hashlib.sha256(self.data).hexdigest() \ No newline at end of file + def getHash(self) -> str: + """Generate a hash for this Chunk + + Returns: + str: SHA256 hash of Chunk.data + """ + return hashlib.sha256(self.data).hexdigest() diff --git a/classes/connection.py b/classes/connection.py index 1ed3abc..af2f95f 100644 --- a/classes/connection.py +++ b/classes/connection.py @@ -1,14 +1,25 @@ -import paramiko as pikuniku # :P +import paramiko as pikuniku # :P from paramiko.client import SSHClient from io import BytesIO +from pathlib import Path +from typing import Union, Optional import errno import stat + class Connection: + """Class representing an SSH/SFTP connection to a Vessel + """ + def __init__(self, vessel): + """Initialize a new Connection to a Vessel + + Args: + vessel (classes.vessel.Vessel): Vessel object to open connection to + """ self._vessel = vessel self._client = SSHClient() self._client.load_system_host_keys() @@ -17,59 +28,170 @@ class Connection: self._transport.set_keepalive(10) self._sftp = self._client.open_sftp() - def _exists(self, path): + def _exists(self, path: Union[str, Path]) -> bool: + """Check if a path exists on the Vessel. Symlinks are not followed. + + Args: + path (str, pathlib.Path): Path to check on the vessel + + Returns: + bool: True if the path exists on the Vessel, else False + """ try: self._sftp.stat(str(path)) return True except FileNotFoundError: return False - def _isdir(self, path): + def _isdir(self, path: Union[str, Path]) -> bool: + """Check if a path is a directory on the Vessel. Symlinks are followed. + + Args: + path (str, pathlib.Path): Path to check on the vessel + + Returns: + bool: True if the path provided is a directory on the Vessel, False + if it is a different kind of file. + + Raises: + FileNotFoundError: Raised if the path does not exist on the Vessel + """ return stat.S_ISDIR(self._sftp.lstat(str(path)).st_mode) - def _mkdir(self, path): - return self._sftp.mkdir(str(path)) + def _mkdir(self, path: Union[str, Path]) -> None: + """Create new directory on the Vessel - def _listdir(self, path=None): + Args: + path (str, pathlib.Path): Path at which to create a new directory + on the Vessel + + Raises: + IOError: Raised if the directory cannot be created + """ + self._sftp.mkdir(str(path)) + + def _listdir(self, path: Optional[Union[str, Path]] = None) -> list[Optional[str]]: + """List files in a directory on the Vessel + + Args: + path (str, pathlib.Path, optional): Path at which to list contents. + Will use current working directory if None. Defaults to None. + + Returns: + list: List of the names of files (str) located at the provided path + """ return self._sftp.listdir(str(path) if path else None) - def _remove(self, path): + def _remove(self, path: Union[str, Path]) -> None: + """Remove a file from the Vessel + + Args: + path (str, pathlib.Path): Path of the file to delete + + Raises: + FileNotFoundError: Raised if no file is found at the given path + IOError: Raised if the file cannot be deleted + """ return self._sftp.remove(str(path)) - def assertDirectories(self, directory): + def assertDirectories(self, directory) -> None: + """Make sure that destination and temp directories exist on the Vessel + + Args: + directory (classes.directory.Directory): Directory object + representing the directory to check + + Raises: + ValueError: Raised if a path is already in use on the vessel but + not a directory. + IOError: Raised if a directory that does not exist cannot be + created. + """ for d in [directory, self._vessel.tempdir]: if not self._exists(d): self._mkdir(d) elif not self._isdir(d): - raise ValueError(f"{d} exists but is not a directory on Vessel {self._vessel.name}!") + raise ValueError( + f"{d} exists but is not a directory on Vessel {self._vessel.name}!") - def assertChunkComplete(self, chunk, path=None): + def assertChunkComplete(self, chunk, path: Optional[Union[str, Path]] = None) -> bool: + """Check if a Chunk has been uploaded correctly + + Args: + chunk (classes.chunk.Chunk): Chunk object to verify upload for + path (str, pathlib.Path, optional): Optional path at which to + check. If None, will get default path from Chunk object. + Defaults to None. + + Returns: + bool: True if file has been uploaded correctly, else False + """ path = path or self._vessel.tempdir / chunk.getTempName() if self._exists(path): - _,o,_ = self._client.exec_command("sha256sum -b " + str(path)) + # "-b" should not be required, but makes sure to use binary mode + _, o, _ = self._client.exec_command("sha256sum -b " + str(path)) + + # Blocking for the command to complete o.channel.recv_exit_status() + + # Remove the file if it is broken if not o.readline().split()[0] == chunk.getHash(): self._remove(path) + else: return True return False - def pushChunk(self, chunk): - path = self._vessel.tempdir / chunk.getTempName() + def pushChunk(self, chunk, path: Optional[str, Path] = None) -> None: + """Push the content of a Chunk object to the Vessel + + Args: + chunk (classes.chunk.Chunk): Chunk object containing the data to + push to the Vessel + path (str, pathlib.Path, optional): Path at which to store the + Chunk on the Vessel. If None, use default location provided by + Chunk object. Defaults to None. + """ + path = path or (self._vessel.tempdir / chunk.getTempName()) flo = BytesIO(chunk.data) self._sftp.putfo(flo, path, len(chunk.data)) - def compileComplete(self, remotefile): + def compileComplete(self, remotefile) -> None: + """Build a complete File from uploaded Chunks. + + Args: + remotefile (classes.remotefile.RemoteFile): RemoteFile object + describing the uploaded File + """ numchunks = remotefile.getStatus() + 1 - files = " ".join([str(self._vessel.tempdir / f"{remotefile.file.uuid}_{i}.part") for i in range(numchunks)]) + + # Get files in correct order to concatenate + files = " ".join( + [str(self._vessel.tempdir / f"{remotefile.file.uuid}_{i}.part") for i in range(numchunks)]) + completefile = remotefile.file.getChunk(-1) outname = completefile.getTempName() outpath = self._vessel.tempdir / outname - _,o,_ = self._client.exec_command(f"cat {files} > {outpath}") + _, o, _ = self._client.exec_command(f"cat {files} > {outpath}") + + # Block for command to complete o.channel.recv_exit_status() - def assertComplete(self, remotefile, allow_retry=False): + def assertComplete(self, remotefile, allow_retry: bool = False) -> bool: + """Check if File has been reassembled from Chunks correctly + + Args: + remotefile (classes.remotefile.RemoteFile): RemoteFile object + describing the uploaded File + allow_retry (bool, optional): If True, assume that compileComplete + failed for some other reason than corrupt Chunks, and only delete + compiled file, else clear entire temporary directory. Defaults to + False. + + Returns: + bool: True if file was reassembled correctly, else False + """ completefile = remotefile.file.getChunk(-1) outname = completefile.getTempName() outpath = self._vessel.tempdir / outname @@ -83,24 +205,44 @@ class Connection: else: self.clearTempDir() return False - + return True - def moveComplete(self, remotefile): + def moveComplete(self, remotefile) -> None: + """Moves reassembled file to output destination + + Args: + remotefile (classes.remotefile.RemoteFile): RemoteFile object + describing the uploaded File. + + Returns: + [type]: [description] + """ completefile = remotefile.file.getChunk(-1) destination = remotefile.getFullPath() - self._sftp.rename(str(self._vessel.tempdir / completefile.getTempName()), str(destination)) - self._sftp.stat(str(destination)) - return True + self._sftp.rename( + str(self._vessel.tempdir / completefile.getTempName()), str(destination)) - def getCurrentUploadUUID(self): + # Make sure that file has actually been created at destination + self._sftp.stat(str(destination)) + + def getCurrentUploadUUID(self) -> Optional[str]: + """Get UUID of file currently being uploaded + + Returns: + str, optional: UUID of the File being uploaded, if any, else None + """ for f in self._listdir(self._vessel.tempdir): if f.endswith(".part"): return f.split("_")[0] - def clearTempDir(self): + def clearTempDir(self) -> None: + """Clean up the temporary directory on the Vessel + """ for f in self._listdir(self._vessel.tempdir): self._remove(self._vessel.tempdir / f) def __del__(self): - self._client.close() \ No newline at end of file + """Close SSH connection when ending Connection + """ + self._client.close() diff --git a/classes/database.py b/classes/database.py index 9cca443..f3423eb 100644 --- a/classes/database.py +++ b/classes/database.py @@ -2,9 +2,11 @@ import sqlite3 import pathlib import uuid + class Database: def __init__(self, filename=None): - filename = filename or pathlib.Path(__file__).parent.parent.absolute() / "database.sqlite3" + filename = filename or pathlib.Path( + __file__).parent.parent.absolute() / "database.sqlite3" self._con = sqlite3.connect(filename) self.migrate() @@ -22,7 +24,8 @@ class Database: def getVersion(self): cur = self.getCursor() try: - cur.execute("SELECT value FROM contentmonster_settings WHERE key = 'dbversion'") + cur.execute( + "SELECT value FROM contentmonster_settings WHERE key = 'dbversion'") assert (version := cur.fetchone()) return int(version[0]) except (sqlite3.OperationalError, AssertionError): @@ -32,7 +35,8 @@ class Database: hash = fileobj.getHash() cur = self.getCursor() - cur.execute("SELECT uuid, checksum FROM contentmonster_file WHERE directory = ? AND name = ?", (fileobj.directory.name, fileobj.name)) + cur.execute("SELECT uuid, checksum FROM contentmonster_file WHERE directory = ? AND name = ?", + (fileobj.directory.name, fileobj.name)) fileuuid = None for result in cur.fetchall(): @@ -46,38 +50,47 @@ class Database: def addFile(self, fileobj, hash=None): hash = hash or fileobj.getHash() fileuuid = str(uuid.uuid4()) - self._execute("INSERT INTO contentmonster_file(uuid, directory, name, checksum) VALUES (?, ?, ?, ?)", (fileuuid, fileobj.directory.name, fileobj.name, hash)) + self._execute("INSERT INTO contentmonster_file(uuid, directory, name, checksum) VALUES (?, ?, ?, ?)", + (fileuuid, fileobj.directory.name, fileobj.name, hash)) return fileuuid def getFileByUUID(self, fileuuid): cur = self.getCursor() - cur.execute("SELECT directory, name, checksum FROM contentmonster_file WHERE uuid = ?", (fileuuid ,)) + cur.execute( + "SELECT directory, name, checksum FROM contentmonster_file WHERE uuid = ?", (fileuuid,)) if (result := cur.fetchone()): return result - + def removeFileByUUID(self, fileuuid): - self._execute("DELETE FROM contentmonster_file WHERE uuid = ?", (fileuuid,)) + self._execute( + "DELETE FROM contentmonster_file WHERE uuid = ?", (fileuuid,)) def logCompletion(self, file, vessel): - self._execute("INSERT INTO contentmonster_file_log(file, vessel) VALUES(?, ?)", (file.uuid, vessel.name)) + self._execute( + "INSERT INTO contentmonster_file_log(file, vessel) VALUES(?, ?)", (file.uuid, vessel.name)) def getCompletionForVessel(self, vessel): cur = self.getCursor() - cur.execute("SELECT file FROM contentmonster_file_log WHERE vessel = ?", (vessel.name,)) + cur.execute( + "SELECT file FROM contentmonster_file_log WHERE vessel = ?", (vessel.name,)) def migrate(self): cur = self.getCursor() if self.getVersion() == 0: - cur.execute("CREATE TABLE IF NOT EXISTS contentmonster_settings(key VARCHAR(64) PRIMARY KEY, value TEXT)") - cur.execute("INSERT INTO contentmonster_settings(key, value) VALUES ('dbversion', '1')") + cur.execute( + "CREATE TABLE IF NOT EXISTS contentmonster_settings(key VARCHAR(64) PRIMARY KEY, value TEXT)") + cur.execute( + "INSERT INTO contentmonster_settings(key, value) VALUES ('dbversion', '1')") self.commit() if self.getVersion() == 1: - cur.execute("CREATE TABLE IF NOT EXISTS contentmonster_file(uuid VARCHAR(36) PRIMARY KEY, directory VARCHAR(128), name VARCHAR(128), checksum VARCHAR(64))") - cur.execute("CREATE TABLE IF NOT EXISTS contentmonster_file_log(file VARCHAR(36), vessel VARCHAR(128), PRIMARY KEY (file, vessel), FOREIGN KEY (file) REFERENCES contentmonster_files(uuid) ON DELETE CASCADE)") - cur.execute("UPDATE contentmonster_settings SET value = '2' WHERE key = 'dbversion'") + cur.execute( + "CREATE TABLE IF NOT EXISTS contentmonster_file(uuid VARCHAR(36) PRIMARY KEY, directory VARCHAR(128), name VARCHAR(128), checksum VARCHAR(64))") + cur.execute("CREATE TABLE IF NOT EXISTS contentmonster_file_log(file VARCHAR(36), vessel VARCHAR(128), PRIMARY KEY (file, vessel), FOREIGN KEY (file) REFERENCES contentmonster_files(uuid) ON DELETE CASCADE)") + cur.execute( + "UPDATE contentmonster_settings SET value = '2' WHERE key = 'dbversion'") self.commit() def __del__(self): - self._con.close() \ No newline at end of file + self._con.close() diff --git a/classes/doghandler.py b/classes/doghandler.py index af6843b..c71159e 100644 --- a/classes/doghandler.py +++ b/classes/doghandler.py @@ -1,18 +1,28 @@ from watchdog.events import FileSystemEventHandler +import os.path + + class DogHandler(FileSystemEventHandler): - def __init__(self, queue, *args, **kwargs): + def __init__(self, directory, queue, *args, **kwargs): + print("Initialized") super().__init__(*args, **kwargs) + self._directory = directory self._queue = queue + def dispatch(self, event): + if not event.is_directory: + super().dispatch(event) + def on_created(self, event): - self._queue.put("Created: " + event.src_path) + self._queue.put((self._directory, os.path.basename(event.src_path))) def on_modified(self, event): - self._queue.put("Modified: " + event.src_path) + self._queue.put((self._directory, os.path.basename(event.src_path))) def on_moved(self, event): - self._queue.put("Moved: " + event.src_path + " to: " + event.dest_path) + self._queue.put((self._directory, os.path.basename(event.src_path))) + self._queue.put((self._directory, os.path.basename(event.dest_path))) def on_deleted(self, event): - self._queue.put("Deleted: " + event.src_path) \ No newline at end of file + self._queue.put((self._directory, os.path.basename(event.src_path))) diff --git a/classes/file.py b/classes/file.py index 8755cb4..d4a75c7 100644 --- a/classes/file.py +++ b/classes/file.py @@ -1,27 +1,77 @@ from classes.chunk import Chunk from classes.database import Database +from typing import Optional + import hashlib -class File: - def getUUID(self): - db = Database() - db.getFileUUID(self) - def __init__(self, name, directory, uuid=None): +class File: + def __init__(self, name: str, directory, uuid: Optional[str] = None) -> None: + """Initialize new File object + + Args: + name (str): Filename (basename without path) of the File to create + directory (classes.directory.Directory): Directory object the File + is located within + uuid (str, optional): Unique identifier of this File object. Will + be retrieved from database if None. Defaults to None. + """ self.name = name self.directory = directory self.uuid = uuid or self.getUUID() - def getFullPath(self): - return self.directory / self.name + """Object representing a file found in a local Directory + """ - def getHash(self): + def getUUID(self) -> str: + """Return unique identifier for this File object + + Returns: + str: File object's UUID retrieved from Database + """ + db = Database() + return db.getFileUUID(self) + + def getFullPath(self) -> str: + """Get the full path of the File + + Returns: + str: Full path of the File on the local file system + """ + return self.directory.location / self.name + + def getHash(self) -> str: + """Get hash for this File + + Returns: + str: SHA256 for the full content of this File + """ return self.getChunk(-1).getHash() - def getChunk(self, count, size=1048576): + def getChunk(self, count: int, size: Optional[int] = None) -> Chunk: + """Get a Chunk of this File + + Args: + count (int): Position of the Chunk in a list of equally large + Chunks (first index: 0). -1 represents a Chunk containing the + complete file. + size (int, optional): Size of the Chunk to create in Bytes. Must + be set if count is not -1. Defaults to None. + + Returns: + classes.chunk.Chunk: Chunk object containing File content from + (count * size) bytes to ((count + 1) * size - 1) bytes + + Raises: + ValueError: Raised if size is not set for a count that is not -1 + """ + if count != -1 and not size: + raise ValueError( + "A Chunk size needs to be passed to getChunk() if not using the complete file (-1)!") + with open(self.getFullPath(), "rb") as binary: binary.seek((count * size) if count > 0 else 0) data = binary.read(size if count >= 0 else None) - return Chunk(self, count, data) \ No newline at end of file + return Chunk(self, count, data) diff --git a/classes/remotefile.py b/classes/remotefile.py index 276dcc0..41e347d 100644 --- a/classes/remotefile.py +++ b/classes/remotefile.py @@ -1,5 +1,5 @@ -STATUS_START = -1 -STATUS_COMPLETE = -2 +from const import STATUS_COMPLETE, STATUS_START + class RemoteFile: def __init__(self, fileobj, vessel, chunksize=1048576): @@ -9,7 +9,8 @@ class RemoteFile: def getStatus(self): ls = self.vessel.connection._listdir(self.vessel.tempdir) - files = [f for f in ls if f.startswith(self.file.uuid) and f.endswith(".part")] + files = [f for f in ls if f.startswith( + self.file.uuid) and f.endswith(".part")] ids = [-1] @@ -25,8 +26,8 @@ class RemoteFile: while count >= 0: if self.validateChunk(count): return count - count -=1 - + count -= 1 + return STATUS_START def validateChunk(self, count): @@ -39,4 +40,4 @@ class RemoteFile: self.vessel.connection.compileComplete(self) def getChunk(self, count): - return self.file.getChunk(count, self.chunksize) \ No newline at end of file + return self.file.getChunk(count, self.chunksize) diff --git a/classes/shorethread.py b/classes/shorethread.py index db36230..0d1c19b 100644 --- a/classes/shorethread.py +++ b/classes/shorethread.py @@ -6,6 +6,8 @@ from watchdog.observers import Observer from multiprocessing import Process, Queue import time +import os.path + class ShoreThread: def __init__(self, files, directories): @@ -29,27 +31,33 @@ class ShoreThread: def monitor(self): for directory in self.directories: print("Creating dog for " + str(directory.location)) - handler = DogHandler(self.queue) + handler = DogHandler(directory, self.queue) dog = Observer() - dog.schedule(handler, str(directory.location)) + dog.schedule(handler, str(directory.location), False) dog.start() self._dogs.append(dog) def run(self): print("Launched Shore Thread") + self.getAllFiles() self.monitor() try: while True: + self.joinDogs() self.processQueue() except KeyboardInterrupt: self.stop() raise + def joinDogs(self): + for dog in self._dogs: + dog.join(1) + def processQueue(self): - if not self.queue.empty: - event = self.queue.get() - print(event) + event = self.queue.get() + print(event) def stop(self): for dog in self._dogs: - dog.kill() \ No newline at end of file + dog.stop() + dog.join() diff --git a/const.py b/const.py new file mode 100644 index 0000000..dcd67b1 --- /dev/null +++ b/const.py @@ -0,0 +1,4 @@ +# Constants for remote file status + +STATUS_START = -1 +STATUS_COMPLETE = -2 \ No newline at end of file diff --git a/worker.py b/worker.py index 569bec2..57435e6 100644 --- a/worker.py +++ b/worker.py @@ -14,20 +14,21 @@ if __name__ == '__main__': config = MonsterConfig.fromFile(config_path) with Manager() as manager: - files = manager.list() + state = manager.dict() + state["files"] = manager.list() threads = [] for vessel in config.vessels: - thread = VesselThread(vessel, files) + thread = VesselThread(vessel, state) thread.start() threads.append(thread) try: - shore = ShoreThread(files, config.directories) + shore = ShoreThread(state, config.directories) shore.run() except KeyboardInterrupt: - print("Keyboard interrupt received - stopping threads") - for thread in threads: - thread.kill() - exit() \ No newline at end of file + print("Keyboard interrupt received - stopping threads") + for thread in threads: + thread.kill() + exit()