Fixing Dog Handler usage

Lots of documentation
This commit is contained in:
Kumi 2021-11-25 10:40:25 +01:00
parent cd2dd051e9
commit 48bc92653d
9 changed files with 345 additions and 78 deletions

View file

@ -1,13 +1,51 @@
import hashlib import hashlib
class Chunk: class Chunk:
def __init__(self, fileobj, count, data): """A class defining a single chunk of a file to be uploaded"""
@staticmethod
def fromFile(fileobj, count: int, chunksize: int) -> type[Chunk]:
"""Create a new Chunk object from a File
Args:
fileobj (classes.file.File): The file object from local storage
count (int): Position of the current chunk in the list of total
chunks (first index: 0) or -1 to get the complete file
chunksize (int): Size of each chunk in bytes
Returns:
A Chunk object containing the portion of the File object beginning
at (count * chunksize) bytes and ending at ((count + 1) * chunksize
- 1) bytes
"""
return fileobj.getChunk(count, chunksize)
def __init__(self, fileobj, count: int, data: bytes) -> None:
"""Initialize a new Chunk object
Args:
fileobj (classes.file.File): The file object from local storage
count (int): Position of the current chunk in the list of total
chunks (first index: 0) or -1 to get the complete file
data (bytes): Content of the chunk
"""
self.file = fileobj self.file = fileobj
self.count = count if count >= 0 else "complete" self.count = count if count >= 0 else "complete"
self.data = data self.data = data
def getTempName(self): def getTempName(self) -> str:
"""Get filename for this Chunk in the temp directory on the Vessel
Returns:
str: Filename to use for this chunk in the Vessel tempdir
"""
return f"{self.file.uuid}_{self.count}.part" return f"{self.file.uuid}_{self.count}.part"
def getHash(self): def getHash(self) -> str:
return hashlib.sha256(self.data).hexdigest() """Generate a hash for this Chunk
Returns:
str: SHA256 hash of Chunk.data
"""
return hashlib.sha256(self.data).hexdigest()

View file

@ -1,14 +1,25 @@
import paramiko as pikuniku # :P import paramiko as pikuniku # :P
from paramiko.client import SSHClient from paramiko.client import SSHClient
from io import BytesIO from io import BytesIO
from pathlib import Path
from typing import Union, Optional
import errno import errno
import stat import stat
class Connection: class Connection:
"""Class representing an SSH/SFTP connection to a Vessel
"""
def __init__(self, vessel): def __init__(self, vessel):
"""Initialize a new Connection to a Vessel
Args:
vessel (classes.vessel.Vessel): Vessel object to open connection to
"""
self._vessel = vessel self._vessel = vessel
self._client = SSHClient() self._client = SSHClient()
self._client.load_system_host_keys() self._client.load_system_host_keys()
@ -17,59 +28,170 @@ class Connection:
self._transport.set_keepalive(10) self._transport.set_keepalive(10)
self._sftp = self._client.open_sftp() self._sftp = self._client.open_sftp()
def _exists(self, path): def _exists(self, path: Union[str, Path]) -> bool:
"""Check if a path exists on the Vessel. Symlinks are not followed.
Args:
path (str, pathlib.Path): Path to check on the vessel
Returns:
bool: True if the path exists on the Vessel, else False
"""
try: try:
self._sftp.stat(str(path)) self._sftp.stat(str(path))
return True return True
except FileNotFoundError: except FileNotFoundError:
return False return False
def _isdir(self, path): def _isdir(self, path: Union[str, Path]) -> bool:
"""Check if a path is a directory on the Vessel. Symlinks are followed.
Args:
path (str, pathlib.Path): Path to check on the vessel
Returns:
bool: True if the path provided is a directory on the Vessel, False
if it is a different kind of file.
Raises:
FileNotFoundError: Raised if the path does not exist on the Vessel
"""
return stat.S_ISDIR(self._sftp.lstat(str(path)).st_mode) return stat.S_ISDIR(self._sftp.lstat(str(path)).st_mode)
def _mkdir(self, path): def _mkdir(self, path: Union[str, Path]) -> None:
return self._sftp.mkdir(str(path)) """Create new directory on the Vessel
def _listdir(self, path=None): Args:
path (str, pathlib.Path): Path at which to create a new directory
on the Vessel
Raises:
IOError: Raised if the directory cannot be created
"""
self._sftp.mkdir(str(path))
def _listdir(self, path: Optional[Union[str, Path]] = None) -> list[Optional[str]]:
"""List files in a directory on the Vessel
Args:
path (str, pathlib.Path, optional): Path at which to list contents.
Will use current working directory if None. Defaults to None.
Returns:
list: List of the names of files (str) located at the provided path
"""
return self._sftp.listdir(str(path) if path else None) return self._sftp.listdir(str(path) if path else None)
def _remove(self, path): def _remove(self, path: Union[str, Path]) -> None:
"""Remove a file from the Vessel
Args:
path (str, pathlib.Path): Path of the file to delete
Raises:
FileNotFoundError: Raised if no file is found at the given path
IOError: Raised if the file cannot be deleted
"""
return self._sftp.remove(str(path)) return self._sftp.remove(str(path))
def assertDirectories(self, directory): def assertDirectories(self, directory) -> None:
"""Make sure that destination and temp directories exist on the Vessel
Args:
directory (classes.directory.Directory): Directory object
representing the directory to check
Raises:
ValueError: Raised if a path is already in use on the vessel but
not a directory.
IOError: Raised if a directory that does not exist cannot be
created.
"""
for d in [directory, self._vessel.tempdir]: for d in [directory, self._vessel.tempdir]:
if not self._exists(d): if not self._exists(d):
self._mkdir(d) self._mkdir(d)
elif not self._isdir(d): elif not self._isdir(d):
raise ValueError(f"{d} exists but is not a directory on Vessel {self._vessel.name}!") raise ValueError(
f"{d} exists but is not a directory on Vessel {self._vessel.name}!")
def assertChunkComplete(self, chunk, path=None): def assertChunkComplete(self, chunk, path: Optional[Union[str, Path]] = None) -> bool:
"""Check if a Chunk has been uploaded correctly
Args:
chunk (classes.chunk.Chunk): Chunk object to verify upload for
path (str, pathlib.Path, optional): Optional path at which to
check. If None, will get default path from Chunk object.
Defaults to None.
Returns:
bool: True if file has been uploaded correctly, else False
"""
path = path or self._vessel.tempdir / chunk.getTempName() path = path or self._vessel.tempdir / chunk.getTempName()
if self._exists(path): if self._exists(path):
_,o,_ = self._client.exec_command("sha256sum -b " + str(path)) # "-b" should not be required, but makes sure to use binary mode
_, o, _ = self._client.exec_command("sha256sum -b " + str(path))
# Blocking for the command to complete
o.channel.recv_exit_status() o.channel.recv_exit_status()
# Remove the file if it is broken
if not o.readline().split()[0] == chunk.getHash(): if not o.readline().split()[0] == chunk.getHash():
self._remove(path) self._remove(path)
else: else:
return True return True
return False return False
def pushChunk(self, chunk): def pushChunk(self, chunk, path: Optional[str, Path] = None) -> None:
path = self._vessel.tempdir / chunk.getTempName() """Push the content of a Chunk object to the Vessel
Args:
chunk (classes.chunk.Chunk): Chunk object containing the data to
push to the Vessel
path (str, pathlib.Path, optional): Path at which to store the
Chunk on the Vessel. If None, use default location provided by
Chunk object. Defaults to None.
"""
path = path or (self._vessel.tempdir / chunk.getTempName())
flo = BytesIO(chunk.data) flo = BytesIO(chunk.data)
self._sftp.putfo(flo, path, len(chunk.data)) self._sftp.putfo(flo, path, len(chunk.data))
def compileComplete(self, remotefile): def compileComplete(self, remotefile) -> None:
"""Build a complete File from uploaded Chunks.
Args:
remotefile (classes.remotefile.RemoteFile): RemoteFile object
describing the uploaded File
"""
numchunks = remotefile.getStatus() + 1 numchunks = remotefile.getStatus() + 1
files = " ".join([str(self._vessel.tempdir / f"{remotefile.file.uuid}_{i}.part") for i in range(numchunks)])
# Get files in correct order to concatenate
files = " ".join(
[str(self._vessel.tempdir / f"{remotefile.file.uuid}_{i}.part") for i in range(numchunks)])
completefile = remotefile.file.getChunk(-1) completefile = remotefile.file.getChunk(-1)
outname = completefile.getTempName() outname = completefile.getTempName()
outpath = self._vessel.tempdir / outname outpath = self._vessel.tempdir / outname
_,o,_ = self._client.exec_command(f"cat {files} > {outpath}") _, o, _ = self._client.exec_command(f"cat {files} > {outpath}")
# Block for command to complete
o.channel.recv_exit_status() o.channel.recv_exit_status()
def assertComplete(self, remotefile, allow_retry=False): def assertComplete(self, remotefile, allow_retry: bool = False) -> bool:
"""Check if File has been reassembled from Chunks correctly
Args:
remotefile (classes.remotefile.RemoteFile): RemoteFile object
describing the uploaded File
allow_retry (bool, optional): If True, assume that compileComplete
failed for some other reason than corrupt Chunks, and only delete
compiled file, else clear entire temporary directory. Defaults to
False.
Returns:
bool: True if file was reassembled correctly, else False
"""
completefile = remotefile.file.getChunk(-1) completefile = remotefile.file.getChunk(-1)
outname = completefile.getTempName() outname = completefile.getTempName()
outpath = self._vessel.tempdir / outname outpath = self._vessel.tempdir / outname
@ -83,24 +205,44 @@ class Connection:
else: else:
self.clearTempDir() self.clearTempDir()
return False return False
return True return True
def moveComplete(self, remotefile): def moveComplete(self, remotefile) -> None:
"""Moves reassembled file to output destination
Args:
remotefile (classes.remotefile.RemoteFile): RemoteFile object
describing the uploaded File.
Returns:
[type]: [description]
"""
completefile = remotefile.file.getChunk(-1) completefile = remotefile.file.getChunk(-1)
destination = remotefile.getFullPath() destination = remotefile.getFullPath()
self._sftp.rename(str(self._vessel.tempdir / completefile.getTempName()), str(destination)) self._sftp.rename(
self._sftp.stat(str(destination)) str(self._vessel.tempdir / completefile.getTempName()), str(destination))
return True
def getCurrentUploadUUID(self): # Make sure that file has actually been created at destination
self._sftp.stat(str(destination))
def getCurrentUploadUUID(self) -> Optional[str]:
"""Get UUID of file currently being uploaded
Returns:
str, optional: UUID of the File being uploaded, if any, else None
"""
for f in self._listdir(self._vessel.tempdir): for f in self._listdir(self._vessel.tempdir):
if f.endswith(".part"): if f.endswith(".part"):
return f.split("_")[0] return f.split("_")[0]
def clearTempDir(self): def clearTempDir(self) -> None:
"""Clean up the temporary directory on the Vessel
"""
for f in self._listdir(self._vessel.tempdir): for f in self._listdir(self._vessel.tempdir):
self._remove(self._vessel.tempdir / f) self._remove(self._vessel.tempdir / f)
def __del__(self): def __del__(self):
self._client.close() """Close SSH connection when ending Connection
"""
self._client.close()

View file

@ -2,9 +2,11 @@ import sqlite3
import pathlib import pathlib
import uuid import uuid
class Database: class Database:
def __init__(self, filename=None): def __init__(self, filename=None):
filename = filename or pathlib.Path(__file__).parent.parent.absolute() / "database.sqlite3" filename = filename or pathlib.Path(
__file__).parent.parent.absolute() / "database.sqlite3"
self._con = sqlite3.connect(filename) self._con = sqlite3.connect(filename)
self.migrate() self.migrate()
@ -22,7 +24,8 @@ class Database:
def getVersion(self): def getVersion(self):
cur = self.getCursor() cur = self.getCursor()
try: try:
cur.execute("SELECT value FROM contentmonster_settings WHERE key = 'dbversion'") cur.execute(
"SELECT value FROM contentmonster_settings WHERE key = 'dbversion'")
assert (version := cur.fetchone()) assert (version := cur.fetchone())
return int(version[0]) return int(version[0])
except (sqlite3.OperationalError, AssertionError): except (sqlite3.OperationalError, AssertionError):
@ -32,7 +35,8 @@ class Database:
hash = fileobj.getHash() hash = fileobj.getHash()
cur = self.getCursor() cur = self.getCursor()
cur.execute("SELECT uuid, checksum FROM contentmonster_file WHERE directory = ? AND name = ?", (fileobj.directory.name, fileobj.name)) cur.execute("SELECT uuid, checksum FROM contentmonster_file WHERE directory = ? AND name = ?",
(fileobj.directory.name, fileobj.name))
fileuuid = None fileuuid = None
for result in cur.fetchall(): for result in cur.fetchall():
@ -46,38 +50,47 @@ class Database:
def addFile(self, fileobj, hash=None): def addFile(self, fileobj, hash=None):
hash = hash or fileobj.getHash() hash = hash or fileobj.getHash()
fileuuid = str(uuid.uuid4()) fileuuid = str(uuid.uuid4())
self._execute("INSERT INTO contentmonster_file(uuid, directory, name, checksum) VALUES (?, ?, ?, ?)", (fileuuid, fileobj.directory.name, fileobj.name, hash)) self._execute("INSERT INTO contentmonster_file(uuid, directory, name, checksum) VALUES (?, ?, ?, ?)",
(fileuuid, fileobj.directory.name, fileobj.name, hash))
return fileuuid return fileuuid
def getFileByUUID(self, fileuuid): def getFileByUUID(self, fileuuid):
cur = self.getCursor() cur = self.getCursor()
cur.execute("SELECT directory, name, checksum FROM contentmonster_file WHERE uuid = ?", (fileuuid ,)) cur.execute(
"SELECT directory, name, checksum FROM contentmonster_file WHERE uuid = ?", (fileuuid,))
if (result := cur.fetchone()): if (result := cur.fetchone()):
return result return result
def removeFileByUUID(self, fileuuid): def removeFileByUUID(self, fileuuid):
self._execute("DELETE FROM contentmonster_file WHERE uuid = ?", (fileuuid,)) self._execute(
"DELETE FROM contentmonster_file WHERE uuid = ?", (fileuuid,))
def logCompletion(self, file, vessel): def logCompletion(self, file, vessel):
self._execute("INSERT INTO contentmonster_file_log(file, vessel) VALUES(?, ?)", (file.uuid, vessel.name)) self._execute(
"INSERT INTO contentmonster_file_log(file, vessel) VALUES(?, ?)", (file.uuid, vessel.name))
def getCompletionForVessel(self, vessel): def getCompletionForVessel(self, vessel):
cur = self.getCursor() cur = self.getCursor()
cur.execute("SELECT file FROM contentmonster_file_log WHERE vessel = ?", (vessel.name,)) cur.execute(
"SELECT file FROM contentmonster_file_log WHERE vessel = ?", (vessel.name,))
def migrate(self): def migrate(self):
cur = self.getCursor() cur = self.getCursor()
if self.getVersion() == 0: if self.getVersion() == 0:
cur.execute("CREATE TABLE IF NOT EXISTS contentmonster_settings(key VARCHAR(64) PRIMARY KEY, value TEXT)") cur.execute(
cur.execute("INSERT INTO contentmonster_settings(key, value) VALUES ('dbversion', '1')") "CREATE TABLE IF NOT EXISTS contentmonster_settings(key VARCHAR(64) PRIMARY KEY, value TEXT)")
cur.execute(
"INSERT INTO contentmonster_settings(key, value) VALUES ('dbversion', '1')")
self.commit() self.commit()
if self.getVersion() == 1: if self.getVersion() == 1:
cur.execute("CREATE TABLE IF NOT EXISTS contentmonster_file(uuid VARCHAR(36) PRIMARY KEY, directory VARCHAR(128), name VARCHAR(128), checksum VARCHAR(64))") cur.execute(
cur.execute("CREATE TABLE IF NOT EXISTS contentmonster_file_log(file VARCHAR(36), vessel VARCHAR(128), PRIMARY KEY (file, vessel), FOREIGN KEY (file) REFERENCES contentmonster_files(uuid) ON DELETE CASCADE)") "CREATE TABLE IF NOT EXISTS contentmonster_file(uuid VARCHAR(36) PRIMARY KEY, directory VARCHAR(128), name VARCHAR(128), checksum VARCHAR(64))")
cur.execute("UPDATE contentmonster_settings SET value = '2' WHERE key = 'dbversion'") cur.execute("CREATE TABLE IF NOT EXISTS contentmonster_file_log(file VARCHAR(36), vessel VARCHAR(128), PRIMARY KEY (file, vessel), FOREIGN KEY (file) REFERENCES contentmonster_files(uuid) ON DELETE CASCADE)")
cur.execute(
"UPDATE contentmonster_settings SET value = '2' WHERE key = 'dbversion'")
self.commit() self.commit()
def __del__(self): def __del__(self):
self._con.close() self._con.close()

View file

@ -1,18 +1,28 @@
from watchdog.events import FileSystemEventHandler from watchdog.events import FileSystemEventHandler
import os.path
class DogHandler(FileSystemEventHandler): class DogHandler(FileSystemEventHandler):
def __init__(self, queue, *args, **kwargs): def __init__(self, directory, queue, *args, **kwargs):
print("Initialized")
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self._directory = directory
self._queue = queue self._queue = queue
def dispatch(self, event):
if not event.is_directory:
super().dispatch(event)
def on_created(self, event): def on_created(self, event):
self._queue.put("Created: " + event.src_path) self._queue.put((self._directory, os.path.basename(event.src_path)))
def on_modified(self, event): def on_modified(self, event):
self._queue.put("Modified: " + event.src_path) self._queue.put((self._directory, os.path.basename(event.src_path)))
def on_moved(self, event): def on_moved(self, event):
self._queue.put("Moved: " + event.src_path + " to: " + event.dest_path) self._queue.put((self._directory, os.path.basename(event.src_path)))
self._queue.put((self._directory, os.path.basename(event.dest_path)))
def on_deleted(self, event): def on_deleted(self, event):
self._queue.put("Deleted: " + event.src_path) self._queue.put((self._directory, os.path.basename(event.src_path)))

View file

@ -1,27 +1,77 @@
from classes.chunk import Chunk from classes.chunk import Chunk
from classes.database import Database from classes.database import Database
from typing import Optional
import hashlib import hashlib
class File:
def getUUID(self):
db = Database()
db.getFileUUID(self)
def __init__(self, name, directory, uuid=None): class File:
def __init__(self, name: str, directory, uuid: Optional[str] = None) -> None:
"""Initialize new File object
Args:
name (str): Filename (basename without path) of the File to create
directory (classes.directory.Directory): Directory object the File
is located within
uuid (str, optional): Unique identifier of this File object. Will
be retrieved from database if None. Defaults to None.
"""
self.name = name self.name = name
self.directory = directory self.directory = directory
self.uuid = uuid or self.getUUID() self.uuid = uuid or self.getUUID()
def getFullPath(self): """Object representing a file found in a local Directory
return self.directory / self.name """
def getHash(self): def getUUID(self) -> str:
"""Return unique identifier for this File object
Returns:
str: File object's UUID retrieved from Database
"""
db = Database()
return db.getFileUUID(self)
def getFullPath(self) -> str:
"""Get the full path of the File
Returns:
str: Full path of the File on the local file system
"""
return self.directory.location / self.name
def getHash(self) -> str:
"""Get hash for this File
Returns:
str: SHA256 for the full content of this File
"""
return self.getChunk(-1).getHash() return self.getChunk(-1).getHash()
def getChunk(self, count, size=1048576): def getChunk(self, count: int, size: Optional[int] = None) -> Chunk:
"""Get a Chunk of this File
Args:
count (int): Position of the Chunk in a list of equally large
Chunks (first index: 0). -1 represents a Chunk containing the
complete file.
size (int, optional): Size of the Chunk to create in Bytes. Must
be set if count is not -1. Defaults to None.
Returns:
classes.chunk.Chunk: Chunk object containing File content from
(count * size) bytes to ((count + 1) * size - 1) bytes
Raises:
ValueError: Raised if size is not set for a count that is not -1
"""
if count != -1 and not size:
raise ValueError(
"A Chunk size needs to be passed to getChunk() if not using the complete file (-1)!")
with open(self.getFullPath(), "rb") as binary: with open(self.getFullPath(), "rb") as binary:
binary.seek((count * size) if count > 0 else 0) binary.seek((count * size) if count > 0 else 0)
data = binary.read(size if count >= 0 else None) data = binary.read(size if count >= 0 else None)
return Chunk(self, count, data) return Chunk(self, count, data)

View file

@ -1,5 +1,5 @@
STATUS_START = -1 from const import STATUS_COMPLETE, STATUS_START
STATUS_COMPLETE = -2
class RemoteFile: class RemoteFile:
def __init__(self, fileobj, vessel, chunksize=1048576): def __init__(self, fileobj, vessel, chunksize=1048576):
@ -9,7 +9,8 @@ class RemoteFile:
def getStatus(self): def getStatus(self):
ls = self.vessel.connection._listdir(self.vessel.tempdir) ls = self.vessel.connection._listdir(self.vessel.tempdir)
files = [f for f in ls if f.startswith(self.file.uuid) and f.endswith(".part")] files = [f for f in ls if f.startswith(
self.file.uuid) and f.endswith(".part")]
ids = [-1] ids = [-1]
@ -25,8 +26,8 @@ class RemoteFile:
while count >= 0: while count >= 0:
if self.validateChunk(count): if self.validateChunk(count):
return count return count
count -=1 count -= 1
return STATUS_START return STATUS_START
def validateChunk(self, count): def validateChunk(self, count):
@ -39,4 +40,4 @@ class RemoteFile:
self.vessel.connection.compileComplete(self) self.vessel.connection.compileComplete(self)
def getChunk(self, count): def getChunk(self, count):
return self.file.getChunk(count, self.chunksize) return self.file.getChunk(count, self.chunksize)

View file

@ -6,6 +6,8 @@ from watchdog.observers import Observer
from multiprocessing import Process, Queue from multiprocessing import Process, Queue
import time import time
import os.path
class ShoreThread: class ShoreThread:
def __init__(self, files, directories): def __init__(self, files, directories):
@ -29,27 +31,33 @@ class ShoreThread:
def monitor(self): def monitor(self):
for directory in self.directories: for directory in self.directories:
print("Creating dog for " + str(directory.location)) print("Creating dog for " + str(directory.location))
handler = DogHandler(self.queue) handler = DogHandler(directory, self.queue)
dog = Observer() dog = Observer()
dog.schedule(handler, str(directory.location)) dog.schedule(handler, str(directory.location), False)
dog.start() dog.start()
self._dogs.append(dog) self._dogs.append(dog)
def run(self): def run(self):
print("Launched Shore Thread") print("Launched Shore Thread")
self.getAllFiles()
self.monitor() self.monitor()
try: try:
while True: while True:
self.joinDogs()
self.processQueue() self.processQueue()
except KeyboardInterrupt: except KeyboardInterrupt:
self.stop() self.stop()
raise raise
def joinDogs(self):
for dog in self._dogs:
dog.join(1)
def processQueue(self): def processQueue(self):
if not self.queue.empty: event = self.queue.get()
event = self.queue.get() print(event)
print(event)
def stop(self): def stop(self):
for dog in self._dogs: for dog in self._dogs:
dog.kill() dog.stop()
dog.join()

4
const.py Normal file
View file

@ -0,0 +1,4 @@
# Constants for remote file status
STATUS_START = -1
STATUS_COMPLETE = -2

View file

@ -14,20 +14,21 @@ if __name__ == '__main__':
config = MonsterConfig.fromFile(config_path) config = MonsterConfig.fromFile(config_path)
with Manager() as manager: with Manager() as manager:
files = manager.list() state = manager.dict()
state["files"] = manager.list()
threads = [] threads = []
for vessel in config.vessels: for vessel in config.vessels:
thread = VesselThread(vessel, files) thread = VesselThread(vessel, state)
thread.start() thread.start()
threads.append(thread) threads.append(thread)
try: try:
shore = ShoreThread(files, config.directories) shore = ShoreThread(state, config.directories)
shore.run() shore.run()
except KeyboardInterrupt: except KeyboardInterrupt:
print("Keyboard interrupt received - stopping threads") print("Keyboard interrupt received - stopping threads")
for thread in threads: for thread in threads:
thread.kill() thread.kill()
exit() exit()