Logic to add and delete files to processing queue

This commit is contained in:
Kumi 2021-11-25 17:02:09 +01:00
parent f3f6e74c89
commit 7cfcc99909
3 changed files with 74 additions and 8 deletions

View file

@ -133,6 +133,17 @@ class Database:
if (result := cur.fetchone()): if (result := cur.fetchone()):
return result return result
def removeFile(self, directory, name: str) -> None:
"""Remove a File from the database based on Directory and filename
Args:
directory (classes.directory.Directory): Directory object
containing the File to remove
name (str): Filename of the File to remove
"""
self._execute(
"DELETE FROM contentmonster_file WHERE directory = ? AND name = ?", (directory.name, name))
def removeFileByUUID(self, fileuuid: str) -> None: def removeFileByUUID(self, fileuuid: str) -> None:
"""Remove a File from the database based on UUID """Remove a File from the database based on UUID
@ -166,7 +177,7 @@ class Database:
cur = self.getCursor() cur = self.getCursor()
cur.execute( cur.execute(
"SELECT file FROM contentmonster_file_log WHERE vessel = ?", (vessel.name,)) "SELECT file FROM contentmonster_file_log WHERE vessel = ?", (vessel.name,))
return [f[0] for f in cur.fetchall()] return [f[0] for f in cur.fetchall()]
def migrate(self) -> None: def migrate(self) -> None:

View file

@ -1,5 +1,7 @@
from classes.config import MonsterConfig from classes.config import MonsterConfig
from classes.doghandler import DogHandler from classes.doghandler import DogHandler
from classes.directory import Directory
from classes.database import Database
from watchdog.observers import Observer from watchdog.observers import Observer
@ -60,7 +62,7 @@ class ShoreThread(Process):
print("Launched Shore Thread") print("Launched Shore Thread")
self.getAllFiles() self.getAllFiles()
self.monitor() self.monitor()
while True: while True:
self.joinDogs() self.joinDogs()
self.processQueue() self.processQueue()
@ -71,13 +73,65 @@ class ShoreThread(Process):
for dog in self._dogs: for dog in self._dogs:
dog.join(1) dog.join(1)
def purgeFile(self, directory: Directory, name: str) -> None:
"""Purge a removed File from the processing queue and database
Args:
directory (Directory): Directory (previously) containing the File
name (str): Filename of the deleted File
"""
# Remove file from processing queue
for f in self._state["files"]:
if f.directory == directory and f.name == name:
del(f)
# Remove file from database
db = Database()
db.removeFile(directory, name)
def addFile(self, fileobj):
"""Add a File object to the processing queue, if not already there
Args:
fileobj (classes.file.File): File object to add to the queue
"""
found = False
for f in self._state["files"]:
if f.directory == fileobj.directory and f.name == fileobj.name:
if f.uuid != fileobj.uuid:
del(f)
else:
found = True
if not found:
self._state["files"].append(fileobj)
def processFile(self, directory: Directory, name: str) -> None:
"""Process a file entry from the observer queue
Args:
directory (classes.directory.Directory): Directory containing the
created, deleted, modified or moved File
name (str): Filename of the created, deleted, modified or moved
File
"""
if (fileobj := directory.getFile(name)):
self.addFile(fileobj)
else:
self.purgeFile(directory, name)
def processQueue(self) -> None: def processQueue(self) -> None:
"""Handle events currently on the queue """Handle events currently on the queue
"""
event = self.queue.get() # Will block until an event is found
print(event)
def terminate(self, *args, **kwargs) -> None: N.B.: An event on the queue is a (directory, basename) tuple, where
"directory" is a Directory object, and "basename" is the name of a
File that has been created, moved, modified or deleted.
"""
directory, basename = self.queue.get() # Will block until an event is found
self.processFile(directory, basename)
def terminate(self, *args, **kwargs) -> NoReturn:
"""Terminate observer threads, then terminate self """Terminate observer threads, then terminate self
""" """
for dog in self._dogs: for dog in self._dogs:

View file

@ -1,4 +1,5 @@
from multiprocessing import Process from multiprocessing import Process
from typing import NoReturn
from classes.vessel import Vessel from classes.vessel import Vessel
@ -7,7 +8,7 @@ import time
class VesselThread(Process): class VesselThread(Process):
"""Thread processing uploads to a single vessel """Thread processing uploads to a single vessel
""" """
def __init__(self, vessel: Vessel, state: dict): def __init__(self, vessel: Vessel, state: dict) -> None:
"""Initialize a new VesselThread """Initialize a new VesselThread
Args: Args:
@ -18,7 +19,7 @@ class VesselThread(Process):
self.vessel = vessel self.vessel = vessel
self._state = state self._state = state
def run(self): def run(self) -> NoReturn:
"""Run thread and process uploads to the vessel """Run thread and process uploads to the vessel
""" """
print("Launched Vessel Thread for " + self.vessel.name) print("Launched Vessel Thread for " + self.vessel.name)