From f3f6e74c8984fc36199b4ece1b07cdac517414ed Mon Sep 17 00:00:00 2001 From: Klaus-Uwe Mitterer Date: Thu, 25 Nov 2021 16:31:49 +0100 Subject: [PATCH] Logic improvements More documentation --- classes/chunk.py | 8 ++-- classes/connection.py | 2 +- classes/database.py | 6 ++- classes/directory.py | 21 ++++++++-- classes/file.py | 20 +++++++++- classes/remotefile.py | 82 ++++++++++++++++++++++++++++++++------- classes/retry.py | 31 ++++++++++++--- classes/shorethread.py | 72 ++++++++++++++++++++++------------ classes/vessel.py | 86 +++++++++++++++++++++++++++++++++-------- classes/vesselthread.py | 22 ++++++++--- worker.py | 24 +++++++----- 11 files changed, 290 insertions(+), 84 deletions(-) mode change 100644 => 100755 worker.py diff --git a/classes/chunk.py b/classes/chunk.py index c1df1bd..4548740 100644 --- a/classes/chunk.py +++ b/classes/chunk.py @@ -5,7 +5,7 @@ class Chunk: """A class defining a single chunk of a file to be uploaded""" @staticmethod - def fromFile(fileobj, count: int, chunksize: int) -> type[Chunk]: + def fromFile(fileobj, count: int, chunksize: int): """Create a new Chunk object from a File Args: @@ -15,9 +15,9 @@ class Chunk: chunksize (int): Size of each chunk in bytes Returns: - A Chunk object containing the portion of the File object beginning - at (count * chunksize) bytes and ending at ((count + 1) * chunksize - - 1) bytes + classes.chunk.Chunk: A Chunk object containing the portion of the + File object beginning at (count * chunksize) bytes and ending at + ((count + 1) * chunksize - 1) bytes """ return fileobj.getChunk(count, chunksize) diff --git a/classes/connection.py b/classes/connection.py index af2f95f..092e97a 100644 --- a/classes/connection.py +++ b/classes/connection.py @@ -143,7 +143,7 @@ class Connection: return True return False - def pushChunk(self, chunk, path: Optional[str, Path] = None) -> None: + def pushChunk(self, chunk, path: Optional[Union[str, Path]] = None) -> None: """Push the content of a Chunk object to the Vessel Args: diff --git a/classes/database.py b/classes/database.py index b442706..d4e5e7d 100644 --- a/classes/database.py +++ b/classes/database.py @@ -32,10 +32,14 @@ class Database: """ cur = self.getCursor() cur.execute(query, parameters) - self.commit() # Instantly commit after every write action + self.commit() # Instantly commit after every (potential) write action def commit(self) -> None: """Commit the current database transaction + + N.B.: Commit instantly after every write action to make the database + "thread-safe". Connections will time out if the database is locked for + more than five seconds. """ self._con.commit() diff --git a/classes/directory.py b/classes/directory.py index 9633f8b..c3c2cf4 100644 --- a/classes/directory.py +++ b/classes/directory.py @@ -4,14 +4,14 @@ import os import pathlib from configparser import SectionProxy -from typing import Union +from typing import Union, Optional class Directory: """Class representing a Directory on the local filesystem """ @classmethod - def fromConfig(cls, config: SectionProxy) -> Directory: # pylint: disable=undefined-variable + def fromConfig(cls, config: SectionProxy): """Create Directory object from a Directory section in the Config file Args: @@ -54,7 +54,22 @@ class Directory: """Get all Files in Directory Returns: - list: List of names (str) of files within the Directory + list: List of File objects for files within the Directory """ files = [f for f in os.listdir(self.location) if os.path.isfile] return [File(f, self) for f in files] + + def getFile(self, name: str) -> Optional[File]: + """Get a file in the Directory by name + + Args: + name (str): Filename of the File to get + + Returns: + File, optional: File object if the file was found, else None + """ + + try: + return File(name, self) + except FileNotFoundError: + return None \ No newline at end of file diff --git a/classes/file.py b/classes/file.py index d4a75c7..0c161e0 100644 --- a/classes/file.py +++ b/classes/file.py @@ -4,9 +4,13 @@ from classes.database import Database from typing import Optional import hashlib +import os.path class File: + """Object representing a file found in a local Directory + """ + def __init__(self, name: str, directory, uuid: Optional[str] = None) -> None: """Initialize new File object @@ -16,13 +20,25 @@ class File: is located within uuid (str, optional): Unique identifier of this File object. Will be retrieved from database if None. Defaults to None. + + Raises: + FileNotFoundError: Raised if the specified File does not exist """ self.name = name self.directory = directory + + if not self.exists(): + raise FileNotFoundError(f"File {self.name} does not exist in {self.directory}!") + self.uuid = uuid or self.getUUID() - """Object representing a file found in a local Directory - """ + def exists(self) -> bool: + """Check if the File exists on the local file system + + Returns: + bool: True if the File exists, else False + """ + return os.path.isfile(self.directory.location / self.name) def getUUID(self) -> str: """Return unique identifier for this File object diff --git a/classes/remotefile.py b/classes/remotefile.py index 41e347d..684c1c5 100644 --- a/classes/remotefile.py +++ b/classes/remotefile.py @@ -2,42 +2,98 @@ from const import STATUS_COMPLETE, STATUS_START class RemoteFile: - def __init__(self, fileobj, vessel, chunksize=1048576): + """Class describing the transfer status of a File to a Vessel + """ + + def __init__(self, fileobj, vessel, chunksize: int) -> None: + """Initialize a new RemoteFile object + + Args: + fileobj (classes.file.File): File object to transfer to a Vessel + vessel (classes.vessel.Vessel): Vessel to transfer the File to + chunksize (int): Size of a single Chunk to transfer + """ self.file = fileobj self.vessel = vessel self.chunksize = chunksize - def getStatus(self): + def getStatus(self) -> int: + """Get the current transfer status + + Returns: + int: Number of the last Chunk that was uploaded, or STATUS_COMPLETE + (-1) if a file upload is complete and waiting for finalization, + or STATUS_START (-2) if no Chunk has been uploaded yet + """ + + # Get all files in the vessel's tempdir + ls = self.vessel.connection._listdir(self.vessel.tempdir) files = [f for f in ls if f.startswith( self.file.uuid) and f.endswith(".part")] - ids = [-1] + # Find the file with the largest chunk number + + count = -1 for f in files: part = f.split("_")[1].split(".")[0] - if part == "complete": - if self.validateComplete(): - return STATUS_COMPLETE - ids.append(int(part)) + if part == "complete": # If a reassembled file is found + if self.validateComplete(True): # and it is not broken + return STATUS_COMPLETE # the upload is complete - count = max(ids) + # Else save the chunk number if it is larger than the previous + count = max(count, int(part)) + # Find and return the largest non-corrupt chunk while count >= 0: if self.validateChunk(count): return count count -= 1 + # If no (more) files exist, we are just getting started return STATUS_START - def validateChunk(self, count): + def validateChunk(self, count: int) -> bool: + """Validate that a Chunk was uploaded correctly + + Args: + count (int): Chunk number to validate + + Returns: + bool: True if file has been uploaded correctly, else False + """ return self.vessel.connection.assertChunkComplete(self.getChunk(count)) - def validateComplete(self): - return self.validateChunk(-1) + def validateComplete(self, allow_retry: bool = False): + """Validate that the complete File was reassembled correctly - def compileComplete(self): + Args: + allow_retry (bool, optional): If True, assume that compileComplete + failed for some other reason than corrupt Chunks, and only delete + compiled file, else clear entire temporary directory. Defaults to + False. + + Returns: + bool: True if file was reassembled correctly, else False + """ + return self.vessel.connection.assertComplete(self, allow_retry) + + def compileComplete(self) -> None: + """Reassemble a complete File from the uploaded Chunks + """ self.vessel.connection.compileComplete(self) - def getChunk(self, count): + def getChunk(self, count: int): + """Get a Chunk of the source file + + Args: + count (int): Number of the Chunk to generate + + Returns: + classes.chunk.Chunk: A Chunk object containing the portion of the + File object beginning at (count * chunksize) bytes and ending at + ((count + 1) * chunksize - 1) bytes, with chunksize taken from the + RemoteFile initialization value + """ return self.file.getChunk(count, self.chunksize) diff --git a/classes/retry.py b/classes/retry.py index 298e7bf..17504e1 100644 --- a/classes/retry.py +++ b/classes/retry.py @@ -1,13 +1,32 @@ from paramiko.ssh_exception import SSHException class retry: - def __init__(self, exceptions=None): + """Decorator used to automatically retry operations throwing exceptions + """ + def __init__(self, exceptions: tuple[BaseException] = None): + """Initializing the retry decorator + + Args: + exceptions (tuple, optional): A tuple containing exception classes + that should be handled by the decorator. If none, handle only + paramiko.ssh_exception.SSHException. Defaults to None. + """ self.exceptions = exceptions or (SSHException,) def __call__(self, f): - def wrapped_f(*args): - try: - f(*args) - except self.exceptions as e: - print("Caught expected exception: " + e) + """Return a function through the retry decorator + + Args: + f (function): Function to wrap in the decorator + + Returns: + function: Function wrapping the passed function + """ + def wrapped_f(*args, **kwargs): + while True: + try: + return f(*args, **kwargs) + except self.exceptions as e: + print("Caught expected exception: " + repr(e)) + return wrapped_f \ No newline at end of file diff --git a/classes/shorethread.py b/classes/shorethread.py index 0d1c19b..5fcff9b 100644 --- a/classes/shorethread.py +++ b/classes/shorethread.py @@ -4,32 +4,49 @@ from classes.doghandler import DogHandler from watchdog.observers import Observer from multiprocessing import Process, Queue +from typing import NoReturn import time import os.path -class ShoreThread: - def __init__(self, files, directories): +class ShoreThread(Process): + """Thread handling the discovery of shore-side file changes + """ + def __init__(self, state: dict) -> None: + """Create a new ShoreThread object + + Args: + state (dict): Dictionary containing the application state + """ super().__init__() self._dogs = [] - self.files = files + self._state = state self.queue = Queue() - self.directories = directories - def getAllFiles(self): + def getAllFiles(self) -> list: + """Return File objects for all files in all Directories + + Returns: + list: List of all File objects discovered + """ files = [] - for directory in self.directories: - files.append(directory.getFiles()) + for directory in self._state["config"].directories: + for f in directory.getFiles(): + files.append(f) return files - def clearFiles(self): - del self.files[:] + def clearFiles(self) -> None: + """Clear the files variable in the application state + """ + del self._state["files"][:] - def monitor(self): - for directory in self.directories: + def monitor(self) -> None: + """Initialize monitoring of Directories specified in configuration + """ + for directory in self._state["config"].directories: print("Creating dog for " + str(directory.location)) handler = DogHandler(directory, self.queue) dog = Observer() @@ -37,27 +54,34 @@ class ShoreThread: dog.start() self._dogs.append(dog) - def run(self): + def run(self) -> NoReturn: + """Launch the ShoreThread and start monitoring for file changes + """ print("Launched Shore Thread") self.getAllFiles() self.monitor() - try: - while True: - self.joinDogs() - self.processQueue() - except KeyboardInterrupt: - self.stop() - raise + + while True: + self.joinDogs() + self.processQueue() - def joinDogs(self): + def joinDogs(self) -> None: + """Join observers to receive updates on the queue + """ for dog in self._dogs: dog.join(1) - def processQueue(self): - event = self.queue.get() + def processQueue(self) -> None: + """Handle events currently on the queue + """ + event = self.queue.get() # Will block until an event is found print(event) - def stop(self): + def terminate(self, *args, **kwargs) -> None: + """Terminate observer threads, then terminate self + """ for dog in self._dogs: - dog.stop() + dog.terminate() dog.join() + + super().terminate(*args, **kwargs) diff --git a/classes/vessel.py b/classes/vessel.py index 5d03f10..e788f9b 100644 --- a/classes/vessel.py +++ b/classes/vessel.py @@ -4,45 +4,99 @@ from classes.file import File from paramiko.ssh_exception import SSHException +from configparser import SectionProxy +from typing import Optional, Union + import pathlib + class Vessel: + """Class describing a Vessel (= a replication destination) + """ @classmethod - def fromConfig(cls, config): + def fromConfig(cls, config: SectionProxy): + """Create Vessel object from a Vessel section in the Config file + + Args: + config (configparser.SectionProxy): Vessel section defining a + Vessel + + Raises: + ValueError: Raised if section does not contain Address parameter + + Returns: + classes.vessel.Vessel: Vessel object for the vessel specified in + the config section + """ + + tempdir = None + if "TempDir" in config.keys(): tempdir = config["TempDir"] - else: - tempdir = "/tmp/.ContentMonster/" - if "Address" in config.keys(): - return cls(config.name.split()[1], config["Address"], pathlib.Path(tempdir)) - else: - raise ValueError("Definition for Vessel " + config.name.split()[1] + " does not contain Address!") - def __init__(self, name: str, address: str, tempdir: pathlib.Path): + if "Address" in config.keys(): + return cls(config.name.split()[1], config["Address"], tempdir) + else: + raise ValueError("Definition for Vessel " + + config.name.split()[1] + " does not contain Address!") + + def __init__(self, name: str, address: str, tempdir: Optional[Union[str, pathlib.Path]]) -> None: + """Initialize new Vessel object + + Args: + name (str): Name of the Vessel + address (str): Address (IP or resolvable hostname) of the Vessel + tempdir (pathlib.Path, optional): Temporary upload location on the + Vessel, to store Chunks in + """ self.name = name self.address = address - self.tempdir = tempdir + self.tempdir = pathlib.Path(tempdir or "/tmp/.ContentMonster/") self._connection = None - self._uploaded = self.getUploadedFromDB() + self._uploaded = self.getUploadedFromDB() # Files already uploaded @property - def connection(self): + def connection(self) -> Connection: + """Get a Connection to the Vessel + + Returns: + classes.connection.Connection: SSH/SFTP connection to the Vessel + """ + # If a connection exists if self._connection: try: + # ... check if it is up self._connection._listdir() except SSHException: + # ... and throw it away if it isn't self._connection = None + + # If no connection exists (anymore), set up a new one self._connection = self._connection or Connection(self) return self._connection - def getUploadedFromDB(self): + def getUploadedFromDB(self) -> list[str]: + """Get a list of files that have previously been uploaded to the Vessel + + Returns: + list: List of UUIDs of Files that have been successfully uploaded + """ db = Database() return db.getCompletionForVessel(self) - def currentUpload(self): + def currentUpload(self) -> File: + """Get the File that is currently being uploaded to this Vessel + + Returns: + classes.file.File: File object representing the file currently + being uploaded + """ db = Database() - directory, name, _ = db.getFileByUUID(fileuuid := self.connection.getCurrentUploadUUID()) + directory, name, _ = db.getFileByUUID( + fileuuid := self.connection.getCurrentUploadUUID()) return File(name, directory, fileuuid) - def clearTempDir(self): - return self.connection.clearTempDir() \ No newline at end of file + def clearTempDir(self) -> None: + """Clean up the temporary directory on the Vessel + """ + self.connection.clearTempDir() diff --git a/classes/vesselthread.py b/classes/vesselthread.py index f573df7..d9db5a3 100644 --- a/classes/vesselthread.py +++ b/classes/vesselthread.py @@ -1,18 +1,30 @@ from multiprocessing import Process +from classes.vessel import Vessel + import time class VesselThread(Process): - def __init__(self, vessel, files): + """Thread processing uploads to a single vessel + """ + def __init__(self, vessel: Vessel, state: dict): + """Initialize a new VesselThread + + Args: + vessel (classes.vessel.Vessel): Vessel object to handle uploads for + state (dict): Dictionary containing the current application state + """ super().__init__() self.vessel = vessel - self.files = files + self._state = state def run(self): + """Run thread and process uploads to the vessel + """ print("Launched Vessel Thread for " + self.vessel.name) while True: try: - print(self.files[0]) + print(self._state["files"][0]) except: - print("Nothing.") - time.sleep(10) \ No newline at end of file + pass + time.sleep(1) \ No newline at end of file diff --git a/worker.py b/worker.py old mode 100644 new mode 100755 index 57435e6..5f4f98d --- a/worker.py +++ b/worker.py @@ -11,11 +11,13 @@ import time if __name__ == '__main__': config_path = pathlib.Path(__file__).parent.absolute() / "settings.ini" - config = MonsterConfig.fromFile(config_path) + config = MonsterConfig() + config.readFile(config_path) with Manager() as manager: state = manager.dict() state["files"] = manager.list() + state["config"] = config threads = [] @@ -24,11 +26,15 @@ if __name__ == '__main__': thread.start() threads.append(thread) - try: - shore = ShoreThread(state, config.directories) - shore.run() - except KeyboardInterrupt: - print("Keyboard interrupt received - stopping threads") - for thread in threads: - thread.kill() - exit() + shore = ShoreThread(state) + shore.start() + + while True: + try: + time.sleep(10) + except KeyboardInterrupt: + print("Keyboard interrupt received - stopping threads") + shore.terminate() + for thread in threads: + thread.terminate() + exit()