Current code status
This commit is contained in:
commit
5f8f94284f
16 changed files with 367 additions and 0 deletions
7
.gitignore
vendored
Normal file
7
.gitignore
vendored
Normal file
|
@ -0,0 +1,7 @@
|
|||
__pycache__/
|
||||
*.pyc
|
||||
*.swp
|
||||
settings.ini
|
||||
database.sqlite3
|
||||
venv/
|
||||
.vscode
|
0
__main__.py
Normal file
0
__main__.py
Normal file
0
classes/__init__.py
Normal file
0
classes/__init__.py
Normal file
13
classes/chunk.py
Normal file
13
classes/chunk.py
Normal file
|
@ -0,0 +1,13 @@
|
|||
import hashlib
|
||||
|
||||
class Chunk:
|
||||
def __init__(self, fileobj, count, data):
|
||||
self.file = fileobj
|
||||
self.count = count if count >= 0 else "complete"
|
||||
self.data = data
|
||||
|
||||
def getTempName(self):
|
||||
return f"{self.file.uuid}_{self.count}.part"
|
||||
|
||||
def getHash(self):
|
||||
return hashlib.sha256(self.data).hexdigest()
|
25
classes/config.py
Normal file
25
classes/config.py
Normal file
|
@ -0,0 +1,25 @@
|
|||
import configparser
|
||||
|
||||
from classes.vessel import Vessel
|
||||
from classes.directory import Directory
|
||||
|
||||
class MonsterConfig:
|
||||
@classmethod
|
||||
def fromFile(cls, path):
|
||||
parser = configparser.ConfigParser()
|
||||
parser.read(path)
|
||||
|
||||
if not "MONSTER" in parser.sections():
|
||||
raise ValueError("Config file does not contain a MONSTER section!")
|
||||
|
||||
directories = []
|
||||
vessels = []
|
||||
|
||||
for section in parser.sections():
|
||||
if section.startswith("Directory"):
|
||||
directories.append(Directory.fromConfig(parser[section]))
|
||||
elif section.startswith("Vessel"):
|
||||
vessels.append(Vessel.fromConfig(parser[section]))
|
||||
|
||||
def __init__(self):
|
||||
pass
|
75
classes/connection.py
Normal file
75
classes/connection.py
Normal file
|
@ -0,0 +1,75 @@
|
|||
import paramiko as pikuniku # :P
|
||||
|
||||
from paramiko.client import SSHClient
|
||||
|
||||
from io import BytesIO
|
||||
|
||||
import errno
|
||||
import stat
|
||||
|
||||
class Connection:
|
||||
def __init__(self, vessel):
|
||||
self._vessel = vessel
|
||||
self._client = SSHClient()
|
||||
self._client.load_system_host_keys()
|
||||
self._client.connect(vessel.address)
|
||||
self._transport = self._client.get_transport()
|
||||
self._transport.set_keepalive(10)
|
||||
self._sftp = self._client.open_sftp()
|
||||
|
||||
def _exists(self, path):
|
||||
try:
|
||||
self._sftp.stat(str(path))
|
||||
return True
|
||||
except FileNotFoundError:
|
||||
return False
|
||||
|
||||
def _isdir(self, path):
|
||||
return stat.S_ISDIR(self._sftp.lstat(str(path)).st_mode)
|
||||
|
||||
def _mkdir(self, path):
|
||||
return self._sftp.mkdir(str(path))
|
||||
|
||||
def _listdir(self, path):
|
||||
return self._sftp.listdir(str(path))
|
||||
|
||||
def _remove(self, path):
|
||||
return self._sftp.remove(str(path))
|
||||
|
||||
def assertTempDirectory(self, directory):
|
||||
for d in [directory, directory.tempdir]:
|
||||
if not self._exists(d):
|
||||
self._mkdir(d)
|
||||
elif not self._isdir(d):
|
||||
raise ValueError(f"{d} exists but is not a directory on Vessel {self._vessel.name}!")
|
||||
|
||||
def assertChunkComplete(self, chunk):
|
||||
path = chunk.file.directory.tempdir / chunk.getTempName()
|
||||
|
||||
if self._exists(path):
|
||||
_,o,_ = self._client.exec_command("sha256sum -b " + str(path))
|
||||
o.channel.recv_exit_status()
|
||||
if not o.readline().split()[0] == chunk.getHash():
|
||||
self._remove(path)
|
||||
else:
|
||||
return True
|
||||
return False
|
||||
|
||||
def pushChunk(self, chunk):
|
||||
path = chunk.file.directory.tempdir / chunk.getTempName()
|
||||
flo = BytesIO(chunk.data)
|
||||
self._sftp.putfo(flo, path, len(chunk.data))
|
||||
|
||||
def compileComplete(self, remotefile):
|
||||
numchunks = remotefile.getStatus() + 1
|
||||
files = " ".join([str(remotefile.file.directory.tempdir / f"{remotefile.file.uuid}_{i}.part") for i in range(numchunks)])
|
||||
completefile = remotefile.file.getChunk(-1)
|
||||
outname = completefile.getTempName()
|
||||
outpath = remotefile.file.directory.tempdir / outname
|
||||
_,o,_ = self._client.exec_command(f"cat {files} > {outpath}")
|
||||
o.channel.recv_exit_status()
|
||||
|
||||
return self.assertChunkComplete(completefile)
|
||||
|
||||
def __del__(self):
|
||||
self._client.close()
|
76
classes/database.py
Normal file
76
classes/database.py
Normal file
|
@ -0,0 +1,76 @@
|
|||
import sqlite3
|
||||
import pathlib
|
||||
import uuid
|
||||
|
||||
class Database:
|
||||
def __init__(self, filename=None):
|
||||
filename = filename or pathlib.Path(__file__).parent.parent.absolute() / "database.sqlite3"
|
||||
self._con = sqlite3.connect(filename)
|
||||
self.migrate()
|
||||
|
||||
def _execute(self, query, parameters=None):
|
||||
cur = self.getCursor()
|
||||
cur.execute(query, parameters)
|
||||
self.commit()
|
||||
|
||||
def commit(self):
|
||||
return self._con.commit()
|
||||
|
||||
def getCursor(self):
|
||||
return self._con.cursor()
|
||||
|
||||
def getVersion(self):
|
||||
cur = self.getCursor()
|
||||
try:
|
||||
cur.execute("SELECT value FROM contentmonster_settings WHERE key = 'dbversion'")
|
||||
assert (version := cur.fetchone())
|
||||
return int(version[0])
|
||||
except (sqlite3.OperationalError, AssertionError):
|
||||
return 0
|
||||
|
||||
def getFileUUID(self, fileobj):
|
||||
hash = fileobj.getHash()
|
||||
|
||||
cur = self.getCursor()
|
||||
cur.execute("SELECT uuid, checksum FROM contentmonster_file WHERE directory = ? AND name = ?", (fileobj.directory.name, fileobj.name))
|
||||
|
||||
fileuuid = None
|
||||
for result in cur.fetchall():
|
||||
if result[1] == hash:
|
||||
fileuuid = result[0]
|
||||
else:
|
||||
self.removeFileByUUID(result[0])
|
||||
|
||||
return fileuuid or self.addFile(fileobj, hash)
|
||||
|
||||
def addFile(self, fileobj, hash=None):
|
||||
hash = hash or fileobj.getHash()
|
||||
fileuuid = str(uuid.uuid4())
|
||||
self._execute("INSERT INTO contentmonster_file(uuid, directory, name, checksum) VALUES (?, ?, ?, ?)", (fileuuid, fileobj.directory.name, fileobj.name, hash))
|
||||
return fileuuid
|
||||
|
||||
def removeFileByUUID(self, fileuuid):
|
||||
self._execute("DELETE FROM contentmonster_file WHERE uuid = ?", (fileuuid,))
|
||||
|
||||
def logStart(self, file, vessel):
|
||||
self._execute("INSERT INTO contentmonster_file_log(file, vessel, status) VALUES(?, ?, ?)", (file.uuid, vessel.name, False))
|
||||
|
||||
def logCompletion(self, file, vessel):
|
||||
self._execute("UPDATE contentmonster_file_log SET status = ? WHERE file = ? AND vessel = ?", (True, file.uuid, vessel.name))
|
||||
|
||||
def migrate(self):
|
||||
cur = self.getCursor()
|
||||
|
||||
if self.getVersion() == 0:
|
||||
cur.execute("CREATE TABLE IF NOT EXISTS contentmonster_settings(key VARCHAR(64) PRIMARY KEY, value TEXT)")
|
||||
cur.execute("INSERT INTO contentmonster_settings(key, value) VALUES ('dbversion', '1')")
|
||||
self.commit()
|
||||
|
||||
if self.getVersion() == 1:
|
||||
cur.execute("CREATE TABLE IF NOT EXISTS contentmonster_file(uuid VARCHAR(36) PRIMARY KEY, directory VARCHAR(128), name VARCHAR(128), checksum VARCHAR(64))")
|
||||
cur.execute("CREATE TABLE IF NOT EXISTS contentmonster_file_log(file VARCHAR(36), vessel VARCHAR(128), status BOOLEAN, PRIMARY KEY (file, vessel), FOREIGN KEY (file) REFERENCES contentmonster_files(uuid) ON DELETE CASCADE)")
|
||||
cur.execute("UPDATE contentmonster_settings SET value = '2' WHERE key = 'dbversion'")
|
||||
self.commit()
|
||||
|
||||
def __del__(self):
|
||||
self._con.close()
|
28
classes/directory.py
Normal file
28
classes/directory.py
Normal file
|
@ -0,0 +1,28 @@
|
|||
from classes.file import File
|
||||
|
||||
import os
|
||||
import pathlib
|
||||
|
||||
class Directory:
|
||||
@classmethod
|
||||
def fromConfig(cls, config):
|
||||
if "Location" in config.keys():
|
||||
return cls(config.name.split()[1], config["Location"])
|
||||
else:
|
||||
raise ValueError("Definition for Directory " + config.name.split()[1] + " does not contain Location!")
|
||||
|
||||
def __init__(self, name, location):
|
||||
self.name = name
|
||||
|
||||
if os.path.isdir(location):
|
||||
self.location = pathlib.Path(location)
|
||||
else:
|
||||
raise ValueError(f"Location {location} for Directory {name} does not exist or is not a directory.")
|
||||
|
||||
def getFiles(self):
|
||||
files = [f for f in os.listdir(self.location) if os.path.isfile]
|
||||
return [File(f, self) for f in files]
|
||||
|
||||
@property
|
||||
def tempdir(self):
|
||||
return self.location / ".temp"
|
27
classes/file.py
Normal file
27
classes/file.py
Normal file
|
@ -0,0 +1,27 @@
|
|||
from classes.chunk import Chunk
|
||||
from classes.database import Database
|
||||
|
||||
import hashlib
|
||||
|
||||
class File:
|
||||
def getUUID(self):
|
||||
db = Database()
|
||||
db.getFileUUID(self)
|
||||
|
||||
def __init__(self, name, directory, uuid=None):
|
||||
self.name = name
|
||||
self.directory = directory
|
||||
self.uuid = uuid or self.getUUID()
|
||||
|
||||
def getFullPath(self):
|
||||
return self.directory / self.name
|
||||
|
||||
def getHash(self):
|
||||
return self.getChunk(-1).getHash()
|
||||
|
||||
def getChunk(self, count, size=1048576):
|
||||
with open(self.getFullPath(), "rb") as binary:
|
||||
binary.seek((count * size) if count > 0 else 0)
|
||||
data = binary.read(size if count >= 0 else None)
|
||||
|
||||
return Chunk(self, count, data)
|
43
classes/remotefile.py
Normal file
43
classes/remotefile.py
Normal file
|
@ -0,0 +1,43 @@
|
|||
STATUS_START = -1
|
||||
STATUS_COMPLETE = -2
|
||||
|
||||
class RemoteFile:
|
||||
def __init__(self, fileobj, vessel, chunksize=1048576):
|
||||
self.file = fileobj
|
||||
self.vessel = vessel
|
||||
self.tempdir = self.vessel.connection.assertTempDirectory(self.file.directory)
|
||||
self.chunksize = chunksize
|
||||
|
||||
def getStatus(self):
|
||||
ls = self.vessel.connection._listdir(self.tempdir)
|
||||
files = [f for f in ls if f.startswith(self.file.uuid) and f.endswith(".part")]
|
||||
|
||||
ids = [-1]
|
||||
|
||||
for f in files:
|
||||
part = f.split("_")[1].split(".")[0]
|
||||
if part == "complete":
|
||||
if self.validateComplete():
|
||||
return STATUS_COMPLETE
|
||||
ids.append(int(part))
|
||||
|
||||
count = max(ids)
|
||||
|
||||
while count >= 0:
|
||||
if self.validateChunk(count):
|
||||
return count
|
||||
count -=1
|
||||
|
||||
return STATUS_START
|
||||
|
||||
def validateChunk(self, count):
|
||||
return self.vessel.connection.assertChunkComplete(self.getChunk(count))
|
||||
|
||||
def validateComplete(self):
|
||||
return self.validateChunk(-1)
|
||||
|
||||
def compileComplete(self):
|
||||
self.vessel.connection.compileComplete(self)
|
||||
|
||||
def getChunk(self, count):
|
||||
return self.file.getChunk(count, self.chunksize)
|
13
classes/retry.py
Normal file
13
classes/retry.py
Normal file
|
@ -0,0 +1,13 @@
|
|||
from paramiko.ssh_exception import SSHException
|
||||
|
||||
class retry:
|
||||
def __init__(self, exceptions=None):
|
||||
self.exceptions = exceptions or (SSHException,)
|
||||
|
||||
def __call__(self, f):
|
||||
def wrapped_f(*args):
|
||||
try:
|
||||
f(*args)
|
||||
except self.exceptions as e:
|
||||
print("Caught expected exception: " + e)
|
||||
return wrapped_f
|
28
classes/vessel.py
Normal file
28
classes/vessel.py
Normal file
|
@ -0,0 +1,28 @@
|
|||
from classes.connection import Connection
|
||||
|
||||
from paramiko.ssh_exception import SSHException
|
||||
|
||||
class Vessel:
|
||||
@classmethod
|
||||
def fromConfig(cls, config):
|
||||
if "Address" in config.keys():
|
||||
return cls(config.name.split()[1], config["Address"])
|
||||
else:
|
||||
raise ValueError("Definition for Vessel " + config.name.split()[1] + " does not contain Address!")
|
||||
|
||||
def __init__(self, name: str, address: str):
|
||||
self.name = name
|
||||
self.address = address
|
||||
self._connection = None
|
||||
|
||||
@property
|
||||
def connection(self):
|
||||
if self._connection:
|
||||
try:
|
||||
self._connection._listdir()
|
||||
return self._connection
|
||||
except SSHException:
|
||||
self._connection = None
|
||||
self._connection = Connection(self)
|
||||
|
||||
def currentUpload()
|
9
classes/vesselthread.py
Normal file
9
classes/vesselthread.py
Normal file
|
@ -0,0 +1,9 @@
|
|||
from multiprocessing import Process
|
||||
|
||||
class VesselThread(Process):
|
||||
def __init__(self, vessel, files):
|
||||
super().__init__()
|
||||
self.vessel = vessel
|
||||
|
||||
def run(self):
|
||||
pass
|
1
requirements.txt
Normal file
1
requirements.txt
Normal file
|
@ -0,0 +1 @@
|
|||
paramiko
|
9
settings.example.ini
Normal file
9
settings.example.ini
Normal file
|
@ -0,0 +1,9 @@
|
|||
[MONSTER]
|
||||
Database = database.sqlite3
|
||||
ChunkSize = 1048576
|
||||
|
||||
[Directory sampledir]
|
||||
Location = /home/user/replication
|
||||
|
||||
[Vessel samplevessel]
|
||||
Address = example.com
|
13
worker.py
Normal file
13
worker.py
Normal file
|
@ -0,0 +1,13 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from classes.config import MonsterConfig
|
||||
from classes.vesselthread import VesselThread
|
||||
|
||||
from multiprocessing import Manager
|
||||
|
||||
import pathlib
|
||||
|
||||
config_path = pathlib.Path(__file__).parent.absolute() / "settings.ini"
|
||||
|
||||
config = MonsterConfig.fromFile(settings_path)
|
||||
|
Loading…
Add table
Reference in a new issue