Enhance FicheServer robustness and add form UI

Refactored FicheServer to streamline setter methods, improving type annotations for clarity. Adjusted code formatting and string quoting for consistency across the board, promoting adherence to PEP 8 guidelines. Enhanced `generate_slug` method to recursively resolve file naming conflicts and extended its functionality to use custom inputs. Introduced a form interface to the PyFiche Lines HTML template, allowing easy uploading of pastes through the web UI. This commit improves code maintainability and provides a more user-friendly way for submitting content to the server.

Resolves issue with slug regeneration and implements feature request for web-based content submission.
This commit is contained in:
Kumi 2024-02-04 08:56:35 +01:00
parent dbb4aff4e7
commit e76d05caf9
Signed by: kumi
GPG key ID: ECBCC9082395383F
2 changed files with 121 additions and 47 deletions

View file

@ -11,18 +11,19 @@ import ipaddress
from typing import Optional, Tuple from typing import Optional, Tuple
class FicheServer: class FicheServer:
FICHE_SYMBOLS = string.ascii_letters + string.digits FICHE_SYMBOLS = string.ascii_letters + string.digits
OUTPUT_FILE_NAME = 'index.txt' OUTPUT_FILE_NAME = "index.txt"
domain: str = 'localhost' domain: str = "localhost"
port: int = 9999 port: int = 9999
listen_addr: str = '0.0.0.0' listen_addr: str = "0.0.0.0"
slug_size: int = 8 slug_size: int = 8
https: bool = False https: bool = False
buffer_size: int = 4096 buffer_size: int = 4096
max_size: int = 5242880 # 5 MB by default max_size: int = 5242880 # 5 MB by default
_output_dir: pathlib.Path = pathlib.Path('data/') _output_dir: pathlib.Path = pathlib.Path("data/")
_log_file: Optional[pathlib.Path] = None _log_file: Optional[pathlib.Path] = None
_banlist: Optional[pathlib.Path] = None _banlist: Optional[pathlib.Path] = None
_allowlist: Optional[pathlib.Path] = None _allowlist: Optional[pathlib.Path] = None
@ -33,7 +34,7 @@ class FicheServer:
return self._output_dir return self._output_dir
@output_dir.setter @output_dir.setter
def output_dir(self, value: str|pathlib.Path) -> None: def output_dir(self, value: str | pathlib.Path) -> None:
self._output_dir = pathlib.Path(value) if isinstance(value, str) else value self._output_dir = pathlib.Path(value) if isinstance(value, str) else value
@property @property
@ -45,7 +46,7 @@ class FicheServer:
return self._log_file return self._log_file
@log_file.setter @log_file.setter
def log_file(self, value: str|pathlib.Path) -> None: def log_file(self, value: str | pathlib.Path) -> None:
self._log_file = pathlib.Path(value) if isinstance(value, str) else value self._log_file = pathlib.Path(value) if isinstance(value, str) else value
@property @property
@ -57,7 +58,7 @@ class FicheServer:
return self._banlist return self._banlist
@banlist.setter @banlist.setter
def banlist(self, value: str|pathlib.Path) -> None: def banlist(self, value: str | pathlib.Path) -> None:
self._banlist = pathlib.Path(value) if isinstance(value, str) else value self._banlist = pathlib.Path(value) if isinstance(value, str) else value
@property @property
@ -69,7 +70,7 @@ class FicheServer:
return self._allowlist return self._allowlist
@allowlist.setter @allowlist.setter
def allowlist(self, value: str|pathlib.Path) -> None: def allowlist(self, value: str | pathlib.Path) -> None:
self._allowlist = pathlib.Path(value) if isinstance(value, str) else value self._allowlist = pathlib.Path(value) if isinstance(value, str) else value
@property @property
@ -81,7 +82,7 @@ class FicheServer:
return f"{'https' if self.https else 'http'}://{self.domain}" return f"{'https' if self.https else 'http'}://{self.domain}"
@classmethod @classmethod
def from_args(cls, args) -> 'FicheServer': def from_args(cls, args) -> "FicheServer":
fiche = cls() fiche = cls()
fiche.domain = args.domain or fiche.domain fiche.domain = args.domain or fiche.domain
fiche.port = args.port or fiche.port fiche.port = args.port or fiche.port
@ -95,14 +96,22 @@ class FicheServer:
fiche.buffer_size = args.buffer_size or fiche.buffer_size fiche.buffer_size = args.buffer_size or fiche.buffer_size
fiche.max_size = args.max_size or fiche.max_size fiche.max_size = args.max_size or fiche.max_size
fiche.logger = logging.getLogger('pyfiche') fiche.logger = logging.getLogger("pyfiche")
fiche.logger.setLevel(logging.INFO if not args.debug else logging.DEBUG) fiche.logger.setLevel(logging.INFO if not args.debug else logging.DEBUG)
handler = logging.StreamHandler() if not args.log_file else logging.FileHandler(args.log_file) handler = (
handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logging.StreamHandler()
if not args.log_file
else logging.FileHandler(args.log_file)
)
handler.setFormatter(
logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
)
fiche.logger.addHandler(handler) fiche.logger.addHandler(handler)
if args.user_name: if args.user_name:
fiche.logger.fatal("PyFiche does not support switching to a different user. Please run as the appropriate user directly.") fiche.logger.fatal(
"PyFiche does not support switching to a different user. Please run as the appropriate user directly."
)
return fiche return fiche
@ -115,18 +124,37 @@ class FicheServer:
s.bind((self.listen_addr, self.port)) s.bind((self.listen_addr, self.port))
s.listen() s.listen()
self.logger.info(f"Server started listening on: {self.listen_addr}:{self.port}") self.logger.info(
f"Server started listening on: {self.listen_addr}:{self.port}"
)
while True: while True:
conn, addr = s.accept() conn, addr = s.accept()
threading.Thread(target=self.handle_connection, args=(conn, addr,)).start() threading.Thread(
target=self.handle_connection,
args=(
conn,
addr,
),
).start()
def generate_slug(self, length: Optional[int] = None): def generate_slug(
slug = ''.join(random.choice(self.FICHE_SYMBOLS) for _ in range(length or self.slug_size)) self,
length: Optional[int] = None,
symbols: Optional[str] = None,
output_dir: Optional[str] = None,
):
symbols = symbols or self.FICHE_SYMBOLS
if self.output_dir: slug = "".join(
while os.path.exists(os.path.join(self.output_dir, slug)): random.choice(symbols) for _ in range(length or self.slug_size)
slug = ''.join(random.choice(self.FICHE_SYMBOLS) for _ in range(length + extra_length)) )
output_dir = output_dir or self.output_dir
if output_dir:
if os.path.exists(os.path.join(output_dir, slug)):
return self.generate_slug(length, symbols, output_dir)
return slug return slug
@ -146,7 +174,7 @@ class FicheServer:
os.makedirs(os.path.dirname(path), exist_ok=True) os.makedirs(os.path.dirname(path), exist_ok=True)
try: try:
with open(path, 'wb') as file: with open(path, "wb") as file:
file.write(data) file.write(data)
return path return path
except Exception as e: except Exception as e:
@ -184,7 +212,9 @@ class FicheServer:
received_data.extend(data) received_data.extend(data)
if len(received_data) > self.max_size: if len(received_data) > self.max_size:
self.logger.error(f"Received data exceeds maximum size ({self.max_size} bytes), terminating connection.") self.logger.error(
f"Received data exceeds maximum size ({self.max_size} bytes), terminating connection."
)
conn.sendall(b"Data exceeds maximum size.\n") conn.sendall(b"Data exceeds maximum size.\n")
return return
@ -208,7 +238,7 @@ class FicheServer:
if file_path: if file_path:
url = f"{self.base_url}/{slug}\n" url = f"{self.base_url}/{slug}\n"
conn.sendall(url.encode('utf-8')) conn.sendall(url.encode("utf-8"))
self.logger.info(f"Received {len(data)} bytes, saved to: {slug}") self.logger.info(f"Received {len(data)} bytes, saved to: {slug}")
else: else:
self.logger.error("Failed to save data to file.") self.logger.error("Failed to save data to file.")
@ -221,10 +251,16 @@ class FicheServer:
def run(self): def run(self):
if not self.logger: if not self.logger:
self.logger = logging.getLogger('pyfiche') self.logger = logging.getLogger("pyfiche")
self.logger.setLevel(logging.INFO) self.logger.setLevel(logging.INFO)
handler = logging.StreamHandler() if not self.log_file else logging.FileHandler(self.log_file) handler = (
handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logging.StreamHandler()
if not self.log_file
else logging.FileHandler(self.log_file)
)
handler.setFormatter(
logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
)
self.logger.addHandler(handler) self.logger.addHandler(handler)
if self.banlist and self.allowlist: if self.banlist and self.allowlist:
@ -249,12 +285,14 @@ class FicheServer:
try: try:
self.output_dir.mkdir(parents=True) self.output_dir.mkdir(parents=True)
except Exception as e: except Exception as e:
self.logger.fatal(f"Error creating output directory ({self.output_dir}): {e}") self.logger.fatal(
f"Error creating output directory ({self.output_dir}): {e}"
)
exit(1) exit(1)
if self.log_file_path: if self.log_file_path:
try: try:
with open(self.log_file_path, 'a'): with open(self.log_file_path, "a"):
pass pass
except IOError: except IOError:
self.logger.fatal("Log file not writable!") self.logger.fatal("Log file not writable!")
@ -270,7 +308,7 @@ class FicheServer:
try: try:
ip = ipaddress.ip_address(addr) ip = ipaddress.ip_address(addr)
with open(self.allowlist_path, 'r') as file: with open(self.allowlist_path, "r") as file:
for line in file: for line in file:
line = line.strip() line = line.strip()
if line: if line:
@ -289,7 +327,7 @@ class FicheServer:
try: try:
ip = ipaddress.ip_address(addr) ip = ipaddress.ip_address(addr)
with open(self.banlist_path, 'r') as file: with open(self.banlist_path, "r") as file:
for line in file: for line in file:
line = line.strip() line = line.strip()
if line: if line:

View file

@ -4,6 +4,8 @@ from typing import Union, Optional
import logging import logging
import pathlib import pathlib
import cgi
import ipaddress
from .fiche import FicheServer from .fiche import FicheServer
@ -50,7 +52,16 @@ body {{
content="""<h1>PyFiche Lines</h1> content="""<h1>PyFiche Lines</h1>
<p>Welcome to PyFiche Lines, a HTTP server for PyFiche.</p> <p>Welcome to PyFiche Lines, a HTTP server for PyFiche.</p>
<p>PyFiche Lines is a HTTP server for PyFiche. It allows you to view files uploaded through PyFiche in your browser.</p> <p>PyFiche Lines is a HTTP server for PyFiche. It allows you to view files uploaded through PyFiche in your browser.</p>
<p>For more information, see <a href="https://kumig.it/PrivateCoffee/pyfiche">the PyFiche Git repo</a>.</p>""" <p>For more information, see <a href="https://kumig.it/PrivateCoffee/pyfiche">the PyFiche Git repo</a>.</p>
<h2>Upload a paste</h2>
<form action="" method="post" enctype="multipart/form-data">
<label for="file">Paste your content here:</label><br>
<textarea id="file" name="file" rows="10" cols="50"></textarea><br>
<input type="submit" value="Upload">
</form>
"""
) )
server_version = "PyFiche Lines/dev" server_version = "PyFiche Lines/dev"
@ -68,34 +79,58 @@ body {{
url = urlparse(self.path.rstrip("/")) url = urlparse(self.path.rstrip("/"))
if url.path != "/": if url.path != "":
return self.not_found() return self.not_found()
# Reject any POST requests that don't have a Content-Length header # Check if we are handling form data
if (
"Content-Type" in self.headers
and "multipart/form-data" in self.headers["Content-Type"]
):
form_data = cgi.FieldStorage(
fp=self.rfile,
headers=self.headers,
environ={
"REQUEST_METHOD": "POST",
"CONTENT_TYPE": self.headers["Content-Type"],
},
)
if "Content-Length" not in self.headers: content = form_data.getvalue("file")
return self.invalid_request()
if not self.headers["Content-Length"].isdigit(): if len(content) > self.max_size:
return self.invalid_request() return self.file_too_large()
if int(self.headers["Content-Length"]) > self.max_size: else:
return self.file_too_large() if "Content-Length" not in self.headers:
return self.invalid_request()
# Upload the file if not self.headers["Content-Length"].isdigit():
return self.invalid_request()
content_length = int(self.headers["Content-Length"]) if int(self.headers["Content-Length"]) > self.max_size:
content = self.rfile.read(content_length) return self.file_too_large()
content_length = int(self.headers["Content-Length"])
content = self.rfile.read(content_length)
if len(content) != content_length:
return self.invalid_request()
if not content: if not content:
return self.not_found() return self.not_found()
slug = FicheServer.generate_slug(self.data_dir, self.FICHE_SYMBOLS) slug = FicheServer().generate_slug(
self.slug_size, self.FICHE_SYMBOLS, self.data_dir
)
file_path = self.data_dir / slug / self.DATA_FILE_NAME file_path = self.data_dir / slug / self.DATA_FILE_NAME
file_path.parent.mkdir(parents=True, exist_ok=True) file_path.parent.mkdir(parents=True, exist_ok=True)
if isinstance(content, str):
content = content.encode("utf-8")
with file_path.open("wb") as f: with file_path.open("wb") as f:
f.write(content) f.write(content)
@ -170,7 +205,7 @@ body {{
url = urlparse(self.path.rstrip("/")) url = urlparse(self.path.rstrip("/"))
# If the URL is /, display the index page # If the URL is /, display the index page
if url.path == "/": if url.path == "":
self.send_response(200) self.send_response(200)
self.send_header("Content-Type", "text/html") self.send_header("Content-Type", "text/html")
self.send_header("Content-Length", len(self.INDEX_CONTENT)) self.send_header("Content-Length", len(self.INDEX_CONTENT))
@ -245,7 +280,7 @@ body {{
def make_lines_handler( def make_lines_handler(
data_dir, logger, banlist=None, allowlist=None, max_size=5242880 data_dir, logger, banlist=None, allowlist=None, max_size=5242880, slug_size=8
): ):
class CustomHandler(LinesHTTPRequestHandler): class CustomHandler(LinesHTTPRequestHandler):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
@ -254,6 +289,7 @@ def make_lines_handler(
self.banlist: Optional[pathlib.Path] = banlist self.banlist: Optional[pathlib.Path] = banlist
self.allowlist: Optional[pathlib.Path] = allowlist self.allowlist: Optional[pathlib.Path] = allowlist
self.max_size: int = max_size self.max_size: int = max_size
self.slug_size: int = slug_size
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)