From 82737215feb08ff05848e575cbc19286ed73c163 Mon Sep 17 00:00:00 2001 From: Klaus-Uwe Mitterer Date: Fri, 26 Nov 2021 11:23:39 +0100 Subject: [PATCH] Survive reboot and directory loss "log" to stdout --- classes/config.py | 2 +- classes/connection.py | 24 ++++++++++-- classes/doghandler.py | 23 +++++++++-- classes/file.py | 2 +- classes/logger.py | 9 +++++ classes/retry.py | 7 ++-- classes/shorethread.py | 64 ++++++++++++++++++++++-------- classes/vessel.py | 57 ++++++++++++++++++++------- classes/vesselthread.py | 86 +++++++++++++++++++++++++++++++++++++++-- 9 files changed, 230 insertions(+), 44 deletions(-) create mode 100644 classes/logger.py diff --git a/classes/config.py b/classes/config.py index 94a7ba9..cf5d493 100644 --- a/classes/config.py +++ b/classes/config.py @@ -26,7 +26,7 @@ class MonsterConfig: raise ValueError("Config file does not contain a MONSTER section!") try: - self.chunksize = parser["MONSTER"]["ChunkSize"] + self.chunksize = int(parser["MONSTER"]["ChunkSize"]) except KeyError: pass diff --git a/classes/connection.py b/classes/connection.py index 9716890..ca3d4f2 100644 --- a/classes/connection.py +++ b/classes/connection.py @@ -25,7 +25,8 @@ class Connection: self._client.load_system_host_keys() self._client.set_missing_host_key_policy(WarningPolicy) self._client.connect(vessel.address, vessel.port, vessel.username, - vessel.password, passphrase=vessel.passphrase) + vessel.password, timeout=vessel.timeout, + passphrase=vessel.passphrase) self._transport = self._client.get_transport() self._transport.set_keepalive(10) self._sftp = self._client.open_sftp() @@ -96,6 +97,21 @@ class Connection: """ return self._sftp.remove(str(path)) + def assertTempDirectory(self) -> None: + """Make sure that the temp directory exists on the Vessel + + Raises: + ValueError: Raised if the path is already in use on the vessel but + is not a directory. + IOError: Raised if the directory does not exist but cannot be + created. + """ + if not self._exists(self._vessel.tempdir): + self._mkdir(self._vessel.tempdir) + elif not self._isdir(self._vessel.tempdir): + raise ValueError( + f"{self._vessel.tempdir} exists but is not a directory on Vessel {self._vessel.name}!") + def assertDirectories(self, directory) -> None: """Make sure that destination and temp directories exist on the Vessel @@ -157,7 +173,7 @@ class Connection: """ path = path or (self._vessel.tempdir / chunk.getTempName()) flo = BytesIO(chunk.data) - self._sftp.putfo(flo, path, len(chunk.data)) + self._sftp.putfo(flo, str(path), len(chunk.data)) def compileComplete(self, remotefile) -> None: """Build a complete File from uploaded Chunks. @@ -221,8 +237,8 @@ class Connection: [type]: [description] """ completefile = remotefile.file.getChunk(-1) - destination = remotefile.getFullPath() - self._sftp.rename( + destination = remotefile.file.getFullPath() + self._sftp.posix_rename( str(self._vessel.tempdir / completefile.getTempName()), str(destination)) # Make sure that file has actually been created at destination diff --git a/classes/doghandler.py b/classes/doghandler.py index a585c18..2fe2bb9 100644 --- a/classes/doghandler.py +++ b/classes/doghandler.py @@ -1,10 +1,13 @@ from watchdog.events import (FileSystemEventHandler, FileSystemEvent, - FileCreatedEvent, FileDeletedEvent, + FileCreatedEvent, FileDeletedEvent, FileModifiedEvent, FileMovedEvent) from multiprocessing import Queue +from classes.logger import Logger + import os.path +import time class DogHandler(FileSystemEventHandler): @@ -22,6 +25,7 @@ class DogHandler(FileSystemEventHandler): super().__init__(*args, **kwargs) self._directory = directory self._queue = queue + self._logger = Logger() def dispatch(self, event: FileSystemEvent): """Dispatch events to the appropriate event handlers @@ -39,7 +43,12 @@ class DogHandler(FileSystemEventHandler): event (watchdog.events.FileCreatedEvent): Event describing the created file """ - self._queue.put((self._directory, os.path.basename(event.src_path))) + self._logger.debug(f"Detected creation event of {event.src_path}") + + size = os.path.getsize(event.src_path) + time.sleep(5) + if size == os.path.getsize(event.src_path): + self._queue.put((self._directory, os.path.basename(event.src_path))) def on_modified(self, event: FileModifiedEvent): """Put file modification events on the queue @@ -48,7 +57,12 @@ class DogHandler(FileSystemEventHandler): event (watchdog.events.FileModifiedEvent): Event describing the modified file """ - self._queue.put((self._directory, os.path.basename(event.src_path))) + self._logger.debug(f"Detected modification event of {event.src_path}") + + size = os.path.getsize(event.src_path) + time.sleep(5) + if size == os.path.getsize(event.src_path): + self._queue.put((self._directory, os.path.basename(event.src_path))) def on_moved(self, event: FileMovedEvent): """Put file move events on the queue @@ -57,6 +71,8 @@ class DogHandler(FileSystemEventHandler): event (watchdog.events.FileMovedEvent): Event describing the moved file (source and destination) """ + self._logger.debug( + f"Detected move event of {event.src_path} to {event.dest_path}") self._queue.put((self._directory, os.path.basename(event.src_path))) self._queue.put((self._directory, os.path.basename(event.dest_path))) @@ -67,4 +83,5 @@ class DogHandler(FileSystemEventHandler): event (watchdog.events.FileDeletedEvent): Event describing the deleted file """ + self._logger.debug(f"Detected deletion event of {event.src_path}") self._queue.put((self._directory, os.path.basename(event.src_path))) diff --git a/classes/file.py b/classes/file.py index 0c161e0..f494a3b 100644 --- a/classes/file.py +++ b/classes/file.py @@ -28,7 +28,7 @@ class File: self.directory = directory if not self.exists(): - raise FileNotFoundError(f"File {self.name} does not exist in {self.directory}!") + raise FileNotFoundError(f"File {self.name} does not exist in {self.directory.name}!") self.uuid = uuid or self.getUUID() diff --git a/classes/logger.py b/classes/logger.py new file mode 100644 index 0000000..b1e8f42 --- /dev/null +++ b/classes/logger.py @@ -0,0 +1,9 @@ +import logging + + +class Logger: + def debug(self, message): + print(message) + + def info(self, message): + print(message) \ No newline at end of file diff --git a/classes/retry.py b/classes/retry.py index 17504e1..a845b5d 100644 --- a/classes/retry.py +++ b/classes/retry.py @@ -1,4 +1,4 @@ -from paramiko.ssh_exception import SSHException +from paramiko.ssh_exception import SSHException, NoValidConnectionsError class retry: """Decorator used to automatically retry operations throwing exceptions @@ -9,9 +9,10 @@ class retry: Args: exceptions (tuple, optional): A tuple containing exception classes that should be handled by the decorator. If none, handle only - paramiko.ssh_exception.SSHException. Defaults to None. + paramiko.ssh_exception.SSHException/NoValidConnectionsError. + Defaults to None. """ - self.exceptions = exceptions or (SSHException,) + self.exceptions = exceptions or (SSHException, NoValidConnectionsError) def __call__(self, f): """Return a function through the retry decorator diff --git a/classes/shorethread.py b/classes/shorethread.py index eb1bcee..6b147db 100644 --- a/classes/shorethread.py +++ b/classes/shorethread.py @@ -2,6 +2,7 @@ from classes.config import MonsterConfig from classes.doghandler import DogHandler from classes.directory import Directory from classes.database import Database +from classes.logger import Logger from watchdog.observers import Observer @@ -25,6 +26,7 @@ class ShoreThread(Process): self._dogs = [] self._state = state self.queue = Queue() + self._logger = Logger() def getAllFiles(self) -> list: """Return File objects for all files in all Directories @@ -32,9 +34,12 @@ class ShoreThread(Process): Returns: list: List of all File objects discovered """ + self._logger.debug("Getting all files from Shore") + files = [] for directory in self._state["config"].directories: + self._logger.debug(f"Getting all files in {directory.name}") for f in directory.getFiles(): files.append(f) @@ -43,13 +48,14 @@ class ShoreThread(Process): def clearFiles(self) -> None: """Clear the files variable in the application state """ + self._logger.debug("Clearing global files variable") del self._state["files"][:] def monitor(self) -> None: """Initialize monitoring of Directories specified in configuration """ for directory in self._state["config"].directories: - print("Creating dog for " + str(directory.location)) + self._logger.debug("Creating dog for " + str(directory.location)) handler = DogHandler(directory, self.queue) dog = Observer() dog.schedule(handler, str(directory.location), False) @@ -59,8 +65,11 @@ class ShoreThread(Process): def run(self) -> NoReturn: """Launch the ShoreThread and start monitoring for file changes """ - print("Launched Shore Thread") - self.getAllFiles() + self._logger.info("Launched Shore Thread") + + for f in self.getAllFiles(): + self._state["files"].append(f) + self.monitor() while True: @@ -70,6 +79,7 @@ class ShoreThread(Process): def joinDogs(self) -> None: """Join observers to receive updates on the queue """ + self._logger.debug("Joining dogs") for dog in self._dogs: dog.join(1) @@ -80,12 +90,23 @@ class ShoreThread(Process): directory (Directory): Directory (previously) containing the File name (str): Filename of the deleted File """ - # Remove file from processing queue - for f in self._state["files"]: - if f.directory == directory and f.name == name: - del(f) + self._logger.debug(f"Purging file {name} from {directory.name}") - # Remove file from database + # Remove file from processing queue + outlist = [] + for f in self._state["files"]: + if f.directory.name == directory.name and f.name == name: + self._logger.debug(f"Found {name} in files queue, deleting.") + else: + outlist.append(f) + + self.clearFiles() + + for f in outlist: + self._state["files"].append(f) + + # Remove file from database + self._logger.debug(f"Purging file {name} from database") db = Database() db.removeFile(directory, name) @@ -95,19 +116,27 @@ class ShoreThread(Process): Args: fileobj (classes.file.File): File object to add to the queue """ - found = False + self._logger.debug(f"Adding file {fileobj.name} to directory {fileobj.directory.name}") + + outlist = [] for f in self._state["files"]: - if f.directory == fileobj.directory and f.name == fileobj.name: + if f.directory.name == fileobj.directory.name and f.name == fileobj.name: + self._logger.debug(f"File {fileobj.name} already in processing queue") if f.uuid != fileobj.uuid: - del(f) + self._logger.debug("UUID does not match - deleting entry") else: - found = True + self._logger.debug("Found duplicate - deleting") + else: + outlist.append(f) - if not found: - # Do not queue file if it is of size 0 - if os.path.getsize(str(fileobj.getFullPath())): - self._state["files"].append(fileobj) + self._logger.debug(f"File {fileobj.name} not in processing queue (anymore) - adding") + outlist.append(fileobj) + + self.clearFiles() + + for f in outlist: + self._state["files"].append(f) def processFile(self, directory: Directory, name: str) -> None: """Process a file entry from the observer queue @@ -118,6 +147,7 @@ class ShoreThread(Process): name (str): Filename of the created, deleted, modified or moved File """ + self._logger.debug(f"Processing change to file {name} in directory {directory.name}") if (fileobj := directory.getFile(name)): self.addFile(fileobj) else: @@ -130,12 +160,14 @@ class ShoreThread(Process): "directory" is a Directory object, and "basename" is the name of a File that has been created, moved, modified or deleted. """ + self._logger.debug("Waiting for new changes...") directory, basename = self.queue.get() # Will block until an event is found self.processFile(directory, basename) def terminate(self, *args, **kwargs) -> NoReturn: """Terminate observer threads, then terminate self """ + self._logger.info("Terminating dogs and shore thread") for dog in self._dogs: dog.terminate() dog.join() diff --git a/classes/vessel.py b/classes/vessel.py index 00c0063..6f78275 100644 --- a/classes/vessel.py +++ b/classes/vessel.py @@ -59,7 +59,8 @@ class Vessel: def __init__(self, name: str, address: str, username: Optional[str] = None, password: Optional[str] = None, passphrase: Optional[str] = None, - port: Optional[int] = None, tempdir: Optional[Union[str, pathlib.Path]] = None) -> None: + port: Optional[int] = None, timeout: Optional[int] = None, + tempdir: Optional[Union[str, pathlib.Path]] = None) -> None: """Initialize new Vessel object Args: @@ -75,6 +76,7 @@ class Vessel: self.password = password self.passphrase = passphrase self.port = port or 22 + self.timeout = timeout or 10 self._connection = None self._uploaded = self.getUploadedFromDB() # Files already uploaded @@ -89,8 +91,8 @@ class Vessel: if self._connection: try: # ... check if it is up - self._connection._listdir() - except SSHException: + self._connection._listdir(".") + except (SSHException, OSError): # ... and throw it away if it isn't self._connection = None @@ -107,20 +109,23 @@ class Vessel: db = Database() return db.getCompletionForVessel(self) - def currentUpload(self) -> Optional[File]: + def currentUpload(self) -> Optional[tuple[str, str, str]]: """Get the File that is currently being uploaded to this Vessel Returns: - classes.file.File: File object representing the file currently - being uploaded, if any + tuple: A tuple consisting of (directory, name, checksum), where + "directory" is the name of the Directory object the File is + located in, "name" is the filename (basename) of the File and + checksum is the SHA256 hash of the file at the time of insertion + into the database. None is returned if no such record is found. """ + self.assertTempDirectory() # After a reboot, the tempdir may be gone + db = Database() - output = db.getFileByUUID( - fileuuid := self.connection.getCurrentUploadUUID()) - - if output: - directory, name, _ = output - return File(name, directory, fileuuid) + output = db.getFileByUUID(self.connection.getCurrentUploadUUID()) + del db + + return output def clearTempDir(self) -> None: """Clean up the temporary directory on the Vessel @@ -138,7 +143,7 @@ class Vessel: Vessel configuration and name provided by Chunk object. Defaults to None. """ - self.connection.pushChunk(chunk, path) + self.connection.pushChunk(chunk, str(path) if path else None) def compileComplete(self, remotefile) -> None: """Build a complete File from uploaded Chunks. @@ -148,3 +153,29 @@ class Vessel: describing the uploaded File """ self.connection.compileComplete(remotefile) + + def assertDirectories(self, directory) -> None: + """Make sure that destination and temp directories exist on the Vessel + + Args: + directory (classes.directory.Directory): Directory object + representing the directory to check + + Raises: + ValueError: Raised if a path is already in use on the vessel but + not a directory. + IOError: Raised if a directory that does not exist cannot be + created. + """ + self.connection.assertDirectories(directory) + + def assertTempDirectory(self) -> None: + """Make sure that the temp directory exists on the Vessel + + Raises: + ValueError: Raised if the path is already in use on the vessel but + is not a directory. + IOError: Raised if the directory does not exist but cannot be + created. + """ + self.connection.assertTempDirectory() \ No newline at end of file diff --git a/classes/vesselthread.py b/classes/vesselthread.py index d35b4d5..f3c4c2e 100644 --- a/classes/vesselthread.py +++ b/classes/vesselthread.py @@ -5,6 +5,8 @@ from classes.vessel import Vessel from classes.remotefile import RemoteFile from classes.retry import retry from classes.database import Database +from classes.logger import Logger +from classes.file import File from const import STATUS_COMPLETE, STATUS_START import time @@ -24,53 +26,131 @@ class VesselThread(Process): super().__init__() self.vessel = vessel self._state = state + self._logger = Logger() def run(self) -> NoReturn: """Run thread and process uploads to the vessel """ print("Launched Vessel Thread for " + self.vessel.name) + self.assertDirectories() while True: try: self.upload() + time.sleep(5) except Exception as e: print("An exception occurred in the Vessel Thread for " + self.vessel.name) print(repr(e)) + @retry() + def assertDirectories(self) -> None: + for directory in self._state["config"].directories: + print( + f"Making sure directory {directory.name} exists on Vessel {self.vessel.name}") + self.vessel.connection.assertDirectories(directory) + @retry() def upload(self) -> None: """Continue uploading process """ - if not (current := self.vessel.currentUpload() or self.processQueue()): + if not (current := (self.vessel.currentUpload() or self.processQueue())): + self._logger.debug( + f"No file needs to be uploaded to Vessel {self.vessel.name} at the moment") return - remotefile = RemoteFile(current, self.vessel, + if isinstance(current, tuple): + dirname, name, _ = current + self._logger.debug( + f"Found file {name} in directory {dirname} for vessel {self.vessel.name}") + + directory = None + + for d in self._state["config"].directories: + if d.name == dirname: + directory = d + break + + if not directory: + self._logger.debug( + f"Directory {dirname} not specified in config - deleting File from Vessel {self.name}") + self.vessel.clearTempDir() + return + + try: + fileobj = File(name, directory) + except FileNotFoundError: + self._logger.debug( + f"File {name} does not exist in Directory {dirname} on shore - deleting from Vessel {self.name}") + self.vessel.clearTempDir() + return + + else: + fileobj = current + + remotefile = RemoteFile(fileobj, self.vessel, self._state["config"].chunksize) + self._logger.debug( + f"Start processing file {fileobj.name} in directory {fileobj.directory.name} on vessel {self.vessel.name}") + while True: + db = Database() + if not db.getFileByUUID(fileobj.uuid): + self._logger.debug( + f"File {fileobj.name} in directory {fileobj.directory.name} does not exist anymore - deleting from {self.vessel.name}") + self.vessel.clearTempDir() + del(db) + + self.vessel.assertDirectories(fileobj.directory) + status = remotefile.getStatus() if status == STATUS_COMPLETE: + self._logger.debug( + f"File {fileobj.name} uploaded to vessel {self.vessel.name} completely - finalizing") remotefile.finalizeUpload() + db = Database() - db.logCompletion(current, self.vessel) + db.logCompletion(fileobj, self.vessel) + del(db) + + self.vessel._uploaded.append(fileobj.uuid) + self._logger.debug( + f"Moved {fileobj.name} to its final destination on {self.vessel.name} - done!") return nextchunk = 0 if status == STATUS_START else status + 1 + self._logger.debug( + f"Getting chunk #{nextchunk} for file {fileobj.name} for vessel {self.vessel.name}") chunk = remotefile.getChunk(nextchunk) + self._logger.debug("Got chunk") + # If the Chunk has no data, the selected range is beyond the end # of the file, i.e. the complete file has already been uploaded if chunk.data: + self._logger.debug( + f"Uploading chunk to vessel {self.vessel.name}") self.vessel.pushChunk(chunk) else: + self._logger.debug( + f"No more data to upload to vessel {self.vessel.name} for file {fileobj.name} - compiling") self.vessel.compileComplete(remotefile) def processQueue(self) -> Optional[str]: """Return a file from the processing queue """ + self._logger.debug( + f"Trying to fetch new file for vessel {self.vessel.name} from queue") for f in self._state["files"]: if not f.uuid in self.vessel._uploaded: + self._logger.debug( + f"Using file {f.name} for vessel {self.vessel.name}") return f + self._logger.debug( + f"Disregarding file {f.name} for vessel {self.vessel.name} - already uploaded") + + self._logger.debug( + f"Didn't find any new files for vessel {self.vessel.name}")