diff --git a/classes/database.py b/classes/database.py index d4e5e7d..2cbcef8 100644 --- a/classes/database.py +++ b/classes/database.py @@ -133,6 +133,17 @@ class Database: if (result := cur.fetchone()): return result + def removeFile(self, directory, name: str) -> None: + """Remove a File from the database based on Directory and filename + + Args: + directory (classes.directory.Directory): Directory object + containing the File to remove + name (str): Filename of the File to remove + """ + self._execute( + "DELETE FROM contentmonster_file WHERE directory = ? AND name = ?", (directory.name, name)) + def removeFileByUUID(self, fileuuid: str) -> None: """Remove a File from the database based on UUID @@ -166,7 +177,7 @@ class Database: cur = self.getCursor() cur.execute( "SELECT file FROM contentmonster_file_log WHERE vessel = ?", (vessel.name,)) - + return [f[0] for f in cur.fetchall()] def migrate(self) -> None: diff --git a/classes/shorethread.py b/classes/shorethread.py index 5fcff9b..3d26948 100644 --- a/classes/shorethread.py +++ b/classes/shorethread.py @@ -1,5 +1,7 @@ from classes.config import MonsterConfig from classes.doghandler import DogHandler +from classes.directory import Directory +from classes.database import Database from watchdog.observers import Observer @@ -60,7 +62,7 @@ class ShoreThread(Process): print("Launched Shore Thread") self.getAllFiles() self.monitor() - + while True: self.joinDogs() self.processQueue() @@ -71,13 +73,65 @@ class ShoreThread(Process): for dog in self._dogs: dog.join(1) + def purgeFile(self, directory: Directory, name: str) -> None: + """Purge a removed File from the processing queue and database + + Args: + directory (Directory): Directory (previously) containing the File + name (str): Filename of the deleted File + """ + # Remove file from processing queue + for f in self._state["files"]: + if f.directory == directory and f.name == name: + del(f) + + # Remove file from database + db = Database() + db.removeFile(directory, name) + + def addFile(self, fileobj): + """Add a File object to the processing queue, if not already there + + Args: + fileobj (classes.file.File): File object to add to the queue + """ + found = False + + for f in self._state["files"]: + if f.directory == fileobj.directory and f.name == fileobj.name: + if f.uuid != fileobj.uuid: + del(f) + else: + found = True + + if not found: + self._state["files"].append(fileobj) + + def processFile(self, directory: Directory, name: str) -> None: + """Process a file entry from the observer queue + + Args: + directory (classes.directory.Directory): Directory containing the + created, deleted, modified or moved File + name (str): Filename of the created, deleted, modified or moved + File + """ + if (fileobj := directory.getFile(name)): + self.addFile(fileobj) + else: + self.purgeFile(directory, name) + def processQueue(self) -> None: """Handle events currently on the queue - """ - event = self.queue.get() # Will block until an event is found - print(event) - def terminate(self, *args, **kwargs) -> None: + N.B.: An event on the queue is a (directory, basename) tuple, where + "directory" is a Directory object, and "basename" is the name of a + File that has been created, moved, modified or deleted. + """ + directory, basename = self.queue.get() # Will block until an event is found + self.processFile(directory, basename) + + def terminate(self, *args, **kwargs) -> NoReturn: """Terminate observer threads, then terminate self """ for dog in self._dogs: diff --git a/classes/vesselthread.py b/classes/vesselthread.py index d9db5a3..9270230 100644 --- a/classes/vesselthread.py +++ b/classes/vesselthread.py @@ -1,4 +1,5 @@ from multiprocessing import Process +from typing import NoReturn from classes.vessel import Vessel @@ -7,7 +8,7 @@ import time class VesselThread(Process): """Thread processing uploads to a single vessel """ - def __init__(self, vessel: Vessel, state: dict): + def __init__(self, vessel: Vessel, state: dict) -> None: """Initialize a new VesselThread Args: @@ -18,7 +19,7 @@ class VesselThread(Process): self.vessel = vessel self._state = state - def run(self): + def run(self) -> NoReturn: """Run thread and process uploads to the vessel """ print("Launched Vessel Thread for " + self.vessel.name)