Current state
This commit is contained in:
parent
5f8f94284f
commit
71a48efefb
11 changed files with 176 additions and 36 deletions
|
@ -12,14 +12,16 @@ class MonsterConfig:
|
|||
if not "MONSTER" in parser.sections():
|
||||
raise ValueError("Config file does not contain a MONSTER section!")
|
||||
|
||||
directories = []
|
||||
vessels = []
|
||||
config = cls()
|
||||
|
||||
for section in parser.sections():
|
||||
if section.startswith("Directory"):
|
||||
directories.append(Directory.fromConfig(parser[section]))
|
||||
config.directories.append(Directory.fromConfig(parser[section]))
|
||||
elif section.startswith("Vessel"):
|
||||
vessels.append(Vessel.fromConfig(parser[section]))
|
||||
config.vessels.append(Vessel.fromConfig(parser[section]))
|
||||
|
||||
return config
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
self.directories = []
|
||||
self.vessels = []
|
||||
|
|
|
@ -30,21 +30,21 @@ class Connection:
|
|||
def _mkdir(self, path):
|
||||
return self._sftp.mkdir(str(path))
|
||||
|
||||
def _listdir(self, path):
|
||||
return self._sftp.listdir(str(path))
|
||||
def _listdir(self, path=None):
|
||||
return self._sftp.listdir(str(path) if path else None)
|
||||
|
||||
def _remove(self, path):
|
||||
return self._sftp.remove(str(path))
|
||||
|
||||
def assertTempDirectory(self, directory):
|
||||
for d in [directory, directory.tempdir]:
|
||||
def assertDirectories(self, directory):
|
||||
for d in [directory, self._vessel.tempdir]:
|
||||
if not self._exists(d):
|
||||
self._mkdir(d)
|
||||
elif not self._isdir(d):
|
||||
raise ValueError(f"{d} exists but is not a directory on Vessel {self._vessel.name}!")
|
||||
|
||||
def assertChunkComplete(self, chunk):
|
||||
path = chunk.file.directory.tempdir / chunk.getTempName()
|
||||
def assertChunkComplete(self, chunk, path=None):
|
||||
path = path or self._vessel.tempdir / chunk.getTempName()
|
||||
|
||||
if self._exists(path):
|
||||
_,o,_ = self._client.exec_command("sha256sum -b " + str(path))
|
||||
|
@ -56,20 +56,51 @@ class Connection:
|
|||
return False
|
||||
|
||||
def pushChunk(self, chunk):
|
||||
path = chunk.file.directory.tempdir / chunk.getTempName()
|
||||
path = self._vessel.tempdir / chunk.getTempName()
|
||||
flo = BytesIO(chunk.data)
|
||||
self._sftp.putfo(flo, path, len(chunk.data))
|
||||
|
||||
def compileComplete(self, remotefile):
|
||||
numchunks = remotefile.getStatus() + 1
|
||||
files = " ".join([str(remotefile.file.directory.tempdir / f"{remotefile.file.uuid}_{i}.part") for i in range(numchunks)])
|
||||
files = " ".join([str(self._vessel.tempdir / f"{remotefile.file.uuid}_{i}.part") for i in range(numchunks)])
|
||||
completefile = remotefile.file.getChunk(-1)
|
||||
outname = completefile.getTempName()
|
||||
outpath = remotefile.file.directory.tempdir / outname
|
||||
outpath = self._vessel.tempdir / outname
|
||||
_,o,_ = self._client.exec_command(f"cat {files} > {outpath}")
|
||||
o.channel.recv_exit_status()
|
||||
|
||||
return self.assertChunkComplete(completefile)
|
||||
def assertComplete(self, remotefile, allow_retry=False):
|
||||
completefile = remotefile.file.getChunk(-1)
|
||||
outname = completefile.getTempName()
|
||||
outpath = self._vessel.tempdir / outname
|
||||
|
||||
if not self._exists(outpath):
|
||||
return False
|
||||
|
||||
if not self.assertChunkComplete(completefile):
|
||||
if allow_retry:
|
||||
self._remove(outpath)
|
||||
else:
|
||||
self.clearTempDir()
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def moveComplete(self, remotefile):
|
||||
completefile = remotefile.file.getChunk(-1)
|
||||
destination = remotefile.getFullPath()
|
||||
self._sftp.rename(str(self._vessel.tempdir / completefile.getTempName()), str(destination))
|
||||
self._sftp.stat(str(destination))
|
||||
return True
|
||||
|
||||
def getCurrentUploadUUID(self):
|
||||
for f in self._listdir(self._vessel.tempdir):
|
||||
if f.endswith(".part"):
|
||||
return f.split("_")[0]
|
||||
|
||||
def clearTempDir(self):
|
||||
for f in self._listdir(self._vessel.tempdir):
|
||||
self._remove(self._vessel.tempdir / f)
|
||||
|
||||
def __del__(self):
|
||||
self._client.close()
|
|
@ -49,14 +49,21 @@ class Database:
|
|||
self._execute("INSERT INTO contentmonster_file(uuid, directory, name, checksum) VALUES (?, ?, ?, ?)", (fileuuid, fileobj.directory.name, fileobj.name, hash))
|
||||
return fileuuid
|
||||
|
||||
def getFileByUUID(self, fileuuid):
|
||||
cur = self.getCursor()
|
||||
cur.execute("SELECT directory, name, checksum FROM contentmonster_file WHERE uuid = ?", (fileuuid ,))
|
||||
if (result := cur.fetchone()):
|
||||
return result
|
||||
|
||||
def removeFileByUUID(self, fileuuid):
|
||||
self._execute("DELETE FROM contentmonster_file WHERE uuid = ?", (fileuuid,))
|
||||
|
||||
def logStart(self, file, vessel):
|
||||
self._execute("INSERT INTO contentmonster_file_log(file, vessel, status) VALUES(?, ?, ?)", (file.uuid, vessel.name, False))
|
||||
|
||||
def logCompletion(self, file, vessel):
|
||||
self._execute("UPDATE contentmonster_file_log SET status = ? WHERE file = ? AND vessel = ?", (True, file.uuid, vessel.name))
|
||||
self._execute("INSERT INTO contentmonster_file_log(file, vessel) VALUES(?, ?)", (file.uuid, vessel.name))
|
||||
|
||||
def getCompletionForVessel(self, vessel):
|
||||
cur = self.getCursor()
|
||||
cur.execute("SELECT file FROM contentmonster_file_log WHERE vessel = ?", (vessel.name,))
|
||||
|
||||
def migrate(self):
|
||||
cur = self.getCursor()
|
||||
|
@ -68,7 +75,7 @@ class Database:
|
|||
|
||||
if self.getVersion() == 1:
|
||||
cur.execute("CREATE TABLE IF NOT EXISTS contentmonster_file(uuid VARCHAR(36) PRIMARY KEY, directory VARCHAR(128), name VARCHAR(128), checksum VARCHAR(64))")
|
||||
cur.execute("CREATE TABLE IF NOT EXISTS contentmonster_file_log(file VARCHAR(36), vessel VARCHAR(128), status BOOLEAN, PRIMARY KEY (file, vessel), FOREIGN KEY (file) REFERENCES contentmonster_files(uuid) ON DELETE CASCADE)")
|
||||
cur.execute("CREATE TABLE IF NOT EXISTS contentmonster_file_log(file VARCHAR(36), vessel VARCHAR(128), PRIMARY KEY (file, vessel), FOREIGN KEY (file) REFERENCES contentmonster_files(uuid) ON DELETE CASCADE)")
|
||||
cur.execute("UPDATE contentmonster_settings SET value = '2' WHERE key = 'dbversion'")
|
||||
self.commit()
|
||||
|
||||
|
|
|
@ -22,7 +22,3 @@ class Directory:
|
|||
def getFiles(self):
|
||||
files = [f for f in os.listdir(self.location) if os.path.isfile]
|
||||
return [File(f, self) for f in files]
|
||||
|
||||
@property
|
||||
def tempdir(self):
|
||||
return self.location / ".temp"
|
18
classes/doghandler.py
Normal file
18
classes/doghandler.py
Normal file
|
@ -0,0 +1,18 @@
|
|||
from watchdog.events import FileSystemEventHandler
|
||||
|
||||
class DogHandler(FileSystemEventHandler):
|
||||
def __init__(self, queue, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self._queue = queue
|
||||
|
||||
def on_created(self, event):
|
||||
pass
|
||||
|
||||
def on_modified(self, event):
|
||||
pass
|
||||
|
||||
def on_moved(self, event):
|
||||
pass
|
||||
|
||||
def on_deleted(self, event):
|
||||
pass
|
|
@ -5,11 +5,10 @@ class RemoteFile:
|
|||
def __init__(self, fileobj, vessel, chunksize=1048576):
|
||||
self.file = fileobj
|
||||
self.vessel = vessel
|
||||
self.tempdir = self.vessel.connection.assertTempDirectory(self.file.directory)
|
||||
self.chunksize = chunksize
|
||||
|
||||
def getStatus(self):
|
||||
ls = self.vessel.connection._listdir(self.tempdir)
|
||||
ls = self.vessel.connection._listdir(self.vessel.tempdir)
|
||||
files = [f for f in ls if f.startswith(self.file.uuid) and f.endswith(".part")]
|
||||
|
||||
ids = [-1]
|
||||
|
|
37
classes/shorethread.py
Normal file
37
classes/shorethread.py
Normal file
|
@ -0,0 +1,37 @@
|
|||
from classes.config import MonsterConfig
|
||||
from classes.doghandler import DogHandler
|
||||
|
||||
from watchdog.observers import Observer
|
||||
|
||||
from multiprocessing import Process, Queue
|
||||
|
||||
import time
|
||||
|
||||
class ShoreThread:
|
||||
def __init__(self, files):
|
||||
super().__init__()
|
||||
self._config = MonsterConfig()
|
||||
self._dogs = []
|
||||
self.files = files
|
||||
self.queue = Queue()
|
||||
|
||||
def getAllFiles(self):
|
||||
files = []
|
||||
|
||||
for directory in self._config.directories:
|
||||
files.append(directory.getFiles())
|
||||
|
||||
return files
|
||||
|
||||
def clearFiles(self):
|
||||
del self.files[:]
|
||||
|
||||
def monitor(self):
|
||||
for directory in self._config.directories:
|
||||
dog = DogHandler(self.queue)
|
||||
|
||||
self._dogs.append(dog)
|
||||
|
||||
def run(self):
|
||||
print("Launched Shore Thread")
|
||||
self.clearFiles()
|
|
@ -1,28 +1,48 @@
|
|||
from classes.connection import Connection
|
||||
from classes.database import Database
|
||||
from classes.file import File
|
||||
|
||||
from paramiko.ssh_exception import SSHException
|
||||
|
||||
import pathlib
|
||||
|
||||
class Vessel:
|
||||
@classmethod
|
||||
def fromConfig(cls, config):
|
||||
if "TempDir" in config.keys():
|
||||
tempdir = config["TempDir"]
|
||||
else:
|
||||
tempdir = "/tmp/.ContentMonster/"
|
||||
if "Address" in config.keys():
|
||||
return cls(config.name.split()[1], config["Address"])
|
||||
return cls(config.name.split()[1], config["Address"], pathlib.Path(tempdir))
|
||||
else:
|
||||
raise ValueError("Definition for Vessel " + config.name.split()[1] + " does not contain Address!")
|
||||
|
||||
def __init__(self, name: str, address: str):
|
||||
def __init__(self, name: str, address: str, tempdir: pathlib.Path):
|
||||
self.name = name
|
||||
self.address = address
|
||||
self.tempdir = tempdir
|
||||
self._connection = None
|
||||
self._uploaded = self.getUploadedFromDB()
|
||||
|
||||
@property
|
||||
def connection(self):
|
||||
if self._connection:
|
||||
try:
|
||||
self._connection._listdir()
|
||||
return self._connection
|
||||
except SSHException:
|
||||
self._connection = None
|
||||
self._connection = Connection(self)
|
||||
self._connection = self._connection or Connection(self)
|
||||
return self._connection
|
||||
|
||||
def currentUpload()
|
||||
def getUploadedFromDB(self):
|
||||
db = Database()
|
||||
return db.getCompletionForVessel(self)
|
||||
|
||||
def currentUpload(self):
|
||||
db = Database()
|
||||
directory, name, _ = db.getFileByUUID(fileuuid := self.connection.getCurrentUploadUUID())
|
||||
return File(name, directory, fileuuid)
|
||||
|
||||
def clearTempDir(self):
|
||||
return self.connection.clearTempDir()
|
|
@ -1,9 +1,18 @@
|
|||
from multiprocessing import Process
|
||||
|
||||
import time
|
||||
|
||||
class VesselThread(Process):
|
||||
def __init__(self, vessel, files):
|
||||
super().__init__()
|
||||
self.vessel = vessel
|
||||
self.files = files
|
||||
|
||||
def run(self):
|
||||
pass
|
||||
print("Launched Vessel Thread for " + self.vessel.name)
|
||||
while True:
|
||||
try:
|
||||
print(self.files[0])
|
||||
except:
|
||||
print("Nothing.")
|
||||
time.sleep(10)
|
|
@ -1 +1,2 @@
|
|||
paramiko
|
||||
watchdog
|
22
worker.py
22
worker.py
|
@ -2,12 +2,32 @@
|
|||
|
||||
from classes.config import MonsterConfig
|
||||
from classes.vesselthread import VesselThread
|
||||
from classes.shorethread import ShoreThread
|
||||
|
||||
from multiprocessing import Manager
|
||||
|
||||
import pathlib
|
||||
import time
|
||||
|
||||
if __name__ == '__main__':
|
||||
config_path = pathlib.Path(__file__).parent.absolute() / "settings.ini"
|
||||
config = MonsterConfig.fromFile(config_path)
|
||||
|
||||
config = MonsterConfig.fromFile(settings_path)
|
||||
with Manager() as manager:
|
||||
files = manager.list()
|
||||
|
||||
threads = []
|
||||
|
||||
for vessel in config.vessels:
|
||||
thread = VesselThread(vessel, files)
|
||||
thread.start()
|
||||
threads.append(thread)
|
||||
|
||||
try:
|
||||
shore = ShoreThread(files)
|
||||
shore.run()
|
||||
except KeyboardInterrupt:
|
||||
print("Keyboard interrupt received - stopping threads")
|
||||
for thread in threads:
|
||||
thread.kill()
|
||||
exit()
|
Loading…
Reference in a new issue