Logic improvements

More documentation
This commit is contained in:
Kumi 2021-11-25 16:31:49 +01:00
parent 313c24f727
commit f3f6e74c89
11 changed files with 290 additions and 84 deletions

View file

@ -5,7 +5,7 @@ class Chunk:
"""A class defining a single chunk of a file to be uploaded""" """A class defining a single chunk of a file to be uploaded"""
@staticmethod @staticmethod
def fromFile(fileobj, count: int, chunksize: int) -> type[Chunk]: def fromFile(fileobj, count: int, chunksize: int):
"""Create a new Chunk object from a File """Create a new Chunk object from a File
Args: Args:
@ -15,9 +15,9 @@ class Chunk:
chunksize (int): Size of each chunk in bytes chunksize (int): Size of each chunk in bytes
Returns: Returns:
A Chunk object containing the portion of the File object beginning classes.chunk.Chunk: A Chunk object containing the portion of the
at (count * chunksize) bytes and ending at ((count + 1) * chunksize File object beginning at (count * chunksize) bytes and ending at
- 1) bytes ((count + 1) * chunksize - 1) bytes
""" """
return fileobj.getChunk(count, chunksize) return fileobj.getChunk(count, chunksize)

View file

@ -143,7 +143,7 @@ class Connection:
return True return True
return False return False
def pushChunk(self, chunk, path: Optional[str, Path] = None) -> None: def pushChunk(self, chunk, path: Optional[Union[str, Path]] = None) -> None:
"""Push the content of a Chunk object to the Vessel """Push the content of a Chunk object to the Vessel
Args: Args:

View file

@ -32,10 +32,14 @@ class Database:
""" """
cur = self.getCursor() cur = self.getCursor()
cur.execute(query, parameters) cur.execute(query, parameters)
self.commit() # Instantly commit after every write action self.commit() # Instantly commit after every (potential) write action
def commit(self) -> None: def commit(self) -> None:
"""Commit the current database transaction """Commit the current database transaction
N.B.: Commit instantly after every write action to make the database
"thread-safe". Connections will time out if the database is locked for
more than five seconds.
""" """
self._con.commit() self._con.commit()

View file

@ -4,14 +4,14 @@ import os
import pathlib import pathlib
from configparser import SectionProxy from configparser import SectionProxy
from typing import Union from typing import Union, Optional
class Directory: class Directory:
"""Class representing a Directory on the local filesystem """Class representing a Directory on the local filesystem
""" """
@classmethod @classmethod
def fromConfig(cls, config: SectionProxy) -> Directory: # pylint: disable=undefined-variable def fromConfig(cls, config: SectionProxy):
"""Create Directory object from a Directory section in the Config file """Create Directory object from a Directory section in the Config file
Args: Args:
@ -54,7 +54,22 @@ class Directory:
"""Get all Files in Directory """Get all Files in Directory
Returns: Returns:
list: List of names (str) of files within the Directory list: List of File objects for files within the Directory
""" """
files = [f for f in os.listdir(self.location) if os.path.isfile] files = [f for f in os.listdir(self.location) if os.path.isfile]
return [File(f, self) for f in files] return [File(f, self) for f in files]
def getFile(self, name: str) -> Optional[File]:
"""Get a file in the Directory by name
Args:
name (str): Filename of the File to get
Returns:
File, optional: File object if the file was found, else None
"""
try:
return File(name, self)
except FileNotFoundError:
return None

View file

@ -4,9 +4,13 @@ from classes.database import Database
from typing import Optional from typing import Optional
import hashlib import hashlib
import os.path
class File: class File:
"""Object representing a file found in a local Directory
"""
def __init__(self, name: str, directory, uuid: Optional[str] = None) -> None: def __init__(self, name: str, directory, uuid: Optional[str] = None) -> None:
"""Initialize new File object """Initialize new File object
@ -16,13 +20,25 @@ class File:
is located within is located within
uuid (str, optional): Unique identifier of this File object. Will uuid (str, optional): Unique identifier of this File object. Will
be retrieved from database if None. Defaults to None. be retrieved from database if None. Defaults to None.
Raises:
FileNotFoundError: Raised if the specified File does not exist
""" """
self.name = name self.name = name
self.directory = directory self.directory = directory
if not self.exists():
raise FileNotFoundError(f"File {self.name} does not exist in {self.directory}!")
self.uuid = uuid or self.getUUID() self.uuid = uuid or self.getUUID()
"""Object representing a file found in a local Directory def exists(self) -> bool:
""" """Check if the File exists on the local file system
Returns:
bool: True if the File exists, else False
"""
return os.path.isfile(self.directory.location / self.name)
def getUUID(self) -> str: def getUUID(self) -> str:
"""Return unique identifier for this File object """Return unique identifier for this File object

View file

@ -2,42 +2,98 @@ from const import STATUS_COMPLETE, STATUS_START
class RemoteFile: class RemoteFile:
def __init__(self, fileobj, vessel, chunksize=1048576): """Class describing the transfer status of a File to a Vessel
"""
def __init__(self, fileobj, vessel, chunksize: int) -> None:
"""Initialize a new RemoteFile object
Args:
fileobj (classes.file.File): File object to transfer to a Vessel
vessel (classes.vessel.Vessel): Vessel to transfer the File to
chunksize (int): Size of a single Chunk to transfer
"""
self.file = fileobj self.file = fileobj
self.vessel = vessel self.vessel = vessel
self.chunksize = chunksize self.chunksize = chunksize
def getStatus(self): def getStatus(self) -> int:
"""Get the current transfer status
Returns:
int: Number of the last Chunk that was uploaded, or STATUS_COMPLETE
(-1) if a file upload is complete and waiting for finalization,
or STATUS_START (-2) if no Chunk has been uploaded yet
"""
# Get all files in the vessel's tempdir
ls = self.vessel.connection._listdir(self.vessel.tempdir) ls = self.vessel.connection._listdir(self.vessel.tempdir)
files = [f for f in ls if f.startswith( files = [f for f in ls if f.startswith(
self.file.uuid) and f.endswith(".part")] self.file.uuid) and f.endswith(".part")]
ids = [-1] # Find the file with the largest chunk number
count = -1
for f in files: for f in files:
part = f.split("_")[1].split(".")[0] part = f.split("_")[1].split(".")[0]
if part == "complete": if part == "complete": # If a reassembled file is found
if self.validateComplete(): if self.validateComplete(True): # and it is not broken
return STATUS_COMPLETE return STATUS_COMPLETE # the upload is complete
ids.append(int(part))
count = max(ids) # Else save the chunk number if it is larger than the previous
count = max(count, int(part))
# Find and return the largest non-corrupt chunk
while count >= 0: while count >= 0:
if self.validateChunk(count): if self.validateChunk(count):
return count return count
count -= 1 count -= 1
# If no (more) files exist, we are just getting started
return STATUS_START return STATUS_START
def validateChunk(self, count): def validateChunk(self, count: int) -> bool:
"""Validate that a Chunk was uploaded correctly
Args:
count (int): Chunk number to validate
Returns:
bool: True if file has been uploaded correctly, else False
"""
return self.vessel.connection.assertChunkComplete(self.getChunk(count)) return self.vessel.connection.assertChunkComplete(self.getChunk(count))
def validateComplete(self): def validateComplete(self, allow_retry: bool = False):
return self.validateChunk(-1) """Validate that the complete File was reassembled correctly
def compileComplete(self): Args:
allow_retry (bool, optional): If True, assume that compileComplete
failed for some other reason than corrupt Chunks, and only delete
compiled file, else clear entire temporary directory. Defaults to
False.
Returns:
bool: True if file was reassembled correctly, else False
"""
return self.vessel.connection.assertComplete(self, allow_retry)
def compileComplete(self) -> None:
"""Reassemble a complete File from the uploaded Chunks
"""
self.vessel.connection.compileComplete(self) self.vessel.connection.compileComplete(self)
def getChunk(self, count): def getChunk(self, count: int):
"""Get a Chunk of the source file
Args:
count (int): Number of the Chunk to generate
Returns:
classes.chunk.Chunk: A Chunk object containing the portion of the
File object beginning at (count * chunksize) bytes and ending at
((count + 1) * chunksize - 1) bytes, with chunksize taken from the
RemoteFile initialization value
"""
return self.file.getChunk(count, self.chunksize) return self.file.getChunk(count, self.chunksize)

View file

@ -1,13 +1,32 @@
from paramiko.ssh_exception import SSHException from paramiko.ssh_exception import SSHException
class retry: class retry:
def __init__(self, exceptions=None): """Decorator used to automatically retry operations throwing exceptions
"""
def __init__(self, exceptions: tuple[BaseException] = None):
"""Initializing the retry decorator
Args:
exceptions (tuple, optional): A tuple containing exception classes
that should be handled by the decorator. If none, handle only
paramiko.ssh_exception.SSHException. Defaults to None.
"""
self.exceptions = exceptions or (SSHException,) self.exceptions = exceptions or (SSHException,)
def __call__(self, f): def __call__(self, f):
def wrapped_f(*args): """Return a function through the retry decorator
try:
f(*args) Args:
except self.exceptions as e: f (function): Function to wrap in the decorator
print("Caught expected exception: " + e)
Returns:
function: Function wrapping the passed function
"""
def wrapped_f(*args, **kwargs):
while True:
try:
return f(*args, **kwargs)
except self.exceptions as e:
print("Caught expected exception: " + repr(e))
return wrapped_f return wrapped_f

View file

@ -4,32 +4,49 @@ from classes.doghandler import DogHandler
from watchdog.observers import Observer from watchdog.observers import Observer
from multiprocessing import Process, Queue from multiprocessing import Process, Queue
from typing import NoReturn
import time import time
import os.path import os.path
class ShoreThread: class ShoreThread(Process):
def __init__(self, files, directories): """Thread handling the discovery of shore-side file changes
"""
def __init__(self, state: dict) -> None:
"""Create a new ShoreThread object
Args:
state (dict): Dictionary containing the application state
"""
super().__init__() super().__init__()
self._dogs = [] self._dogs = []
self.files = files self._state = state
self.queue = Queue() self.queue = Queue()
self.directories = directories
def getAllFiles(self): def getAllFiles(self) -> list:
"""Return File objects for all files in all Directories
Returns:
list: List of all File objects discovered
"""
files = [] files = []
for directory in self.directories: for directory in self._state["config"].directories:
files.append(directory.getFiles()) for f in directory.getFiles():
files.append(f)
return files return files
def clearFiles(self): def clearFiles(self) -> None:
del self.files[:] """Clear the files variable in the application state
"""
del self._state["files"][:]
def monitor(self): def monitor(self) -> None:
for directory in self.directories: """Initialize monitoring of Directories specified in configuration
"""
for directory in self._state["config"].directories:
print("Creating dog for " + str(directory.location)) print("Creating dog for " + str(directory.location))
handler = DogHandler(directory, self.queue) handler = DogHandler(directory, self.queue)
dog = Observer() dog = Observer()
@ -37,27 +54,34 @@ class ShoreThread:
dog.start() dog.start()
self._dogs.append(dog) self._dogs.append(dog)
def run(self): def run(self) -> NoReturn:
"""Launch the ShoreThread and start monitoring for file changes
"""
print("Launched Shore Thread") print("Launched Shore Thread")
self.getAllFiles() self.getAllFiles()
self.monitor() self.monitor()
try:
while True: while True:
self.joinDogs() self.joinDogs()
self.processQueue() self.processQueue()
except KeyboardInterrupt:
self.stop()
raise
def joinDogs(self): def joinDogs(self) -> None:
"""Join observers to receive updates on the queue
"""
for dog in self._dogs: for dog in self._dogs:
dog.join(1) dog.join(1)
def processQueue(self): def processQueue(self) -> None:
event = self.queue.get() """Handle events currently on the queue
"""
event = self.queue.get() # Will block until an event is found
print(event) print(event)
def stop(self): def terminate(self, *args, **kwargs) -> None:
"""Terminate observer threads, then terminate self
"""
for dog in self._dogs: for dog in self._dogs:
dog.stop() dog.terminate()
dog.join() dog.join()
super().terminate(*args, **kwargs)

View file

@ -4,45 +4,99 @@ from classes.file import File
from paramiko.ssh_exception import SSHException from paramiko.ssh_exception import SSHException
from configparser import SectionProxy
from typing import Optional, Union
import pathlib import pathlib
class Vessel: class Vessel:
"""Class describing a Vessel (= a replication destination)
"""
@classmethod @classmethod
def fromConfig(cls, config): def fromConfig(cls, config: SectionProxy):
"""Create Vessel object from a Vessel section in the Config file
Args:
config (configparser.SectionProxy): Vessel section defining a
Vessel
Raises:
ValueError: Raised if section does not contain Address parameter
Returns:
classes.vessel.Vessel: Vessel object for the vessel specified in
the config section
"""
tempdir = None
if "TempDir" in config.keys(): if "TempDir" in config.keys():
tempdir = config["TempDir"] tempdir = config["TempDir"]
else:
tempdir = "/tmp/.ContentMonster/"
if "Address" in config.keys():
return cls(config.name.split()[1], config["Address"], pathlib.Path(tempdir))
else:
raise ValueError("Definition for Vessel " + config.name.split()[1] + " does not contain Address!")
def __init__(self, name: str, address: str, tempdir: pathlib.Path): if "Address" in config.keys():
return cls(config.name.split()[1], config["Address"], tempdir)
else:
raise ValueError("Definition for Vessel " +
config.name.split()[1] + " does not contain Address!")
def __init__(self, name: str, address: str, tempdir: Optional[Union[str, pathlib.Path]]) -> None:
"""Initialize new Vessel object
Args:
name (str): Name of the Vessel
address (str): Address (IP or resolvable hostname) of the Vessel
tempdir (pathlib.Path, optional): Temporary upload location on the
Vessel, to store Chunks in
"""
self.name = name self.name = name
self.address = address self.address = address
self.tempdir = tempdir self.tempdir = pathlib.Path(tempdir or "/tmp/.ContentMonster/")
self._connection = None self._connection = None
self._uploaded = self.getUploadedFromDB() self._uploaded = self.getUploadedFromDB() # Files already uploaded
@property @property
def connection(self): def connection(self) -> Connection:
"""Get a Connection to the Vessel
Returns:
classes.connection.Connection: SSH/SFTP connection to the Vessel
"""
# If a connection exists
if self._connection: if self._connection:
try: try:
# ... check if it is up
self._connection._listdir() self._connection._listdir()
except SSHException: except SSHException:
# ... and throw it away if it isn't
self._connection = None self._connection = None
# If no connection exists (anymore), set up a new one
self._connection = self._connection or Connection(self) self._connection = self._connection or Connection(self)
return self._connection return self._connection
def getUploadedFromDB(self): def getUploadedFromDB(self) -> list[str]:
"""Get a list of files that have previously been uploaded to the Vessel
Returns:
list: List of UUIDs of Files that have been successfully uploaded
"""
db = Database() db = Database()
return db.getCompletionForVessel(self) return db.getCompletionForVessel(self)
def currentUpload(self): def currentUpload(self) -> File:
"""Get the File that is currently being uploaded to this Vessel
Returns:
classes.file.File: File object representing the file currently
being uploaded
"""
db = Database() db = Database()
directory, name, _ = db.getFileByUUID(fileuuid := self.connection.getCurrentUploadUUID()) directory, name, _ = db.getFileByUUID(
fileuuid := self.connection.getCurrentUploadUUID())
return File(name, directory, fileuuid) return File(name, directory, fileuuid)
def clearTempDir(self): def clearTempDir(self) -> None:
return self.connection.clearTempDir() """Clean up the temporary directory on the Vessel
"""
self.connection.clearTempDir()

View file

@ -1,18 +1,30 @@
from multiprocessing import Process from multiprocessing import Process
from classes.vessel import Vessel
import time import time
class VesselThread(Process): class VesselThread(Process):
def __init__(self, vessel, files): """Thread processing uploads to a single vessel
"""
def __init__(self, vessel: Vessel, state: dict):
"""Initialize a new VesselThread
Args:
vessel (classes.vessel.Vessel): Vessel object to handle uploads for
state (dict): Dictionary containing the current application state
"""
super().__init__() super().__init__()
self.vessel = vessel self.vessel = vessel
self.files = files self._state = state
def run(self): def run(self):
"""Run thread and process uploads to the vessel
"""
print("Launched Vessel Thread for " + self.vessel.name) print("Launched Vessel Thread for " + self.vessel.name)
while True: while True:
try: try:
print(self.files[0]) print(self._state["files"][0])
except: except:
print("Nothing.") pass
time.sleep(10) time.sleep(1)

24
worker.py Normal file → Executable file
View file

@ -11,11 +11,13 @@ import time
if __name__ == '__main__': if __name__ == '__main__':
config_path = pathlib.Path(__file__).parent.absolute() / "settings.ini" config_path = pathlib.Path(__file__).parent.absolute() / "settings.ini"
config = MonsterConfig.fromFile(config_path) config = MonsterConfig()
config.readFile(config_path)
with Manager() as manager: with Manager() as manager:
state = manager.dict() state = manager.dict()
state["files"] = manager.list() state["files"] = manager.list()
state["config"] = config
threads = [] threads = []
@ -24,11 +26,15 @@ if __name__ == '__main__':
thread.start() thread.start()
threads.append(thread) threads.append(thread)
try: shore = ShoreThread(state)
shore = ShoreThread(state, config.directories) shore.start()
shore.run()
except KeyboardInterrupt: while True:
print("Keyboard interrupt received - stopping threads") try:
for thread in threads: time.sleep(10)
thread.kill() except KeyboardInterrupt:
exit() print("Keyboard interrupt received - stopping threads")
shore.terminate()
for thread in threads:
thread.terminate()
exit()