From 06f6f4ced691992d8190c19334fc06ed2a818069 Mon Sep 17 00:00:00 2001 From: Klaus-Uwe Mitterer Date: Thu, 25 Nov 2021 19:03:58 +0100 Subject: [PATCH] Upload logic finalized --- classes/config.py | 6 ++++++ classes/remotefile.py | 6 ++++++ classes/shorethread.py | 4 +++- classes/vessel.py | 24 +++++++++++++++++++++++- classes/vesselthread.py | 41 ++++++++++++++++++++++++++++++++++------- 5 files changed, 72 insertions(+), 9 deletions(-) diff --git a/classes/config.py b/classes/config.py index 890b1de..94a7ba9 100644 --- a/classes/config.py +++ b/classes/config.py @@ -25,6 +25,11 @@ class MonsterConfig: if not "MONSTER" in parser.sections(): raise ValueError("Config file does not contain a MONSTER section!") + try: + self.chunksize = parser["MONSTER"]["ChunkSize"] + except KeyError: + pass + for section in parser.sections(): # Read Directories from the config file if section.startswith("Directory"): @@ -40,3 +45,4 @@ class MonsterConfig: """ self.directories = [] self.vessels = [] + self.chunksize = 10485760 # Default: 10 MiB \ No newline at end of file diff --git a/classes/remotefile.py b/classes/remotefile.py index 684c1c5..6affbc0 100644 --- a/classes/remotefile.py +++ b/classes/remotefile.py @@ -97,3 +97,9 @@ class RemoteFile: RemoteFile initialization value """ return self.file.getChunk(count, self.chunksize) + + def finalizeUpload(self) -> None: + """Move complete file to its final destination and clean up + """ + self.vessel.connection.moveComplete(self) + self.vessel.connection.clearTempDir() \ No newline at end of file diff --git a/classes/shorethread.py b/classes/shorethread.py index 3d26948..eb1bcee 100644 --- a/classes/shorethread.py +++ b/classes/shorethread.py @@ -105,7 +105,9 @@ class ShoreThread(Process): found = True if not found: - self._state["files"].append(fileobj) + # Do not queue file if it is of size 0 + if os.path.getsize(str(fileobj.getFullPath())): + self._state["files"].append(fileobj) def processFile(self, directory: Directory, name: str) -> None: """Process a file entry from the observer queue diff --git a/classes/vessel.py b/classes/vessel.py index e788f9b..1aace0a 100644 --- a/classes/vessel.py +++ b/classes/vessel.py @@ -53,7 +53,7 @@ class Vessel: self.address = address self.tempdir = pathlib.Path(tempdir or "/tmp/.ContentMonster/") self._connection = None - self._uploaded = self.getUploadedFromDB() # Files already uploaded + self._uploaded = self.getUploadedFromDB() # Files already uploaded @property def connection(self) -> Connection: @@ -100,3 +100,25 @@ class Vessel: """Clean up the temporary directory on the Vessel """ self.connection.clearTempDir() + + def pushChunk(self, chunk, path: Optional[Union[str, pathlib.Path]] = None) -> None: + """Push the content of a Chunk object to the Vessel + + Args: + chunk (classes.chunk.Chunk): Chunk object containing the data to + push to the Vessel + path (str, pathlib.Path, optional): Path at which to store the + Chunk on the Vessel. If None, use default location provided by + Vessel configuration and name provided by Chunk object. Defaults + to None. + """ + self.connection.pushChunk(chunk, path) + + def compileComplete(self, remotefile) -> None: + """Build a complete File from uploaded Chunks. + + Args: + remotefile (classes.remotefile.RemoteFile): RemoteFile object + describing the uploaded File + """ + self.connection.compileComplete(remotefile) \ No newline at end of file diff --git a/classes/vesselthread.py b/classes/vesselthread.py index 0e6d294..d35b4d5 100644 --- a/classes/vesselthread.py +++ b/classes/vesselthread.py @@ -1,7 +1,11 @@ from multiprocessing import Process -from typing import NoReturn +from typing import NoReturn, Optional from classes.vessel import Vessel +from classes.remotefile import RemoteFile +from classes.retry import retry +from classes.database import Database +from const import STATUS_COMPLETE, STATUS_START import time @@ -33,17 +37,40 @@ class VesselThread(Process): self.vessel.name) print(repr(e)) + @retry() def upload(self) -> None: """Continue uploading process """ - if not (current := self.vessel.currentUpload): - self.processQueue() + if not (current := self.vessel.currentUpload() or self.processQueue()): return - pass - def processQueue(self) -> None: - """Start uploading a file from the processing queue + remotefile = RemoteFile(current, self.vessel, + self._state["config"].chunksize) + + while True: + status = remotefile.getStatus() + + if status == STATUS_COMPLETE: + remotefile.finalizeUpload() + db = Database() + db.logCompletion(current, self.vessel) + return + + nextchunk = 0 if status == STATUS_START else status + 1 + + chunk = remotefile.getChunk(nextchunk) + + # If the Chunk has no data, the selected range is beyond the end + # of the file, i.e. the complete file has already been uploaded + + if chunk.data: + self.vessel.pushChunk(chunk) + else: + self.vessel.compileComplete(remotefile) + + def processQueue(self) -> Optional[str]: + """Return a file from the processing queue """ for f in self._state["files"]: if not f.uuid in self.vessel._uploaded: - pass \ No newline at end of file + return f