Add PyFiche pastebin service with servers and configs

Introducing PyFiche, a pastebin service optimized for command-line usage, including a full suite of servers – a TCP-based 'Fiche' server, an HTTP 'Lines' server, and a 'Recup' server for file retrieval via netcat. The initial release features configuration via pyproject.toml, a comprehensive README, and the MIT license.

This robust solution allows for easy code snippet sharing, file uploads, and downloads without dependencies outside the Python standard library. Setup involves environment configuration and pip installation with the project's handle to git. The 'Fiche' server accepts specific command-line options for customizability, and banlists/allowlists manage server access control, with detailed logging available per user request. Users should note that HTTPS support requires a reverse proxy setup, as PyFiche itself does not handle SSL/TLS.

The '.gitignore' file ensures that local development directories, bytecode and sensitive files are excluded from the repository to maintain a clean working environment. This addition represents a significant milestone for the Private.coffee team in providing a self-contained pastebin service.
This commit is contained in:
Kumi 2024-01-21 21:38:17 +01:00
commit 9c485c7875
Signed by: kumi
GPG key ID: ECBCC9082395383F
12 changed files with 1064 additions and 0 deletions

6
.gitignore vendored Normal file
View file

@ -0,0 +1,6 @@
data/
*.pyc
__pycache__/
dist/
*.txt
venv/

19
LICENSE Normal file
View file

@ -0,0 +1,19 @@
Copyright (c) 2023, 2024 Private.coffee Team <support@private.coffee>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

57
README.md Normal file
View file

@ -0,0 +1,57 @@
# PyFiche
PyFiche is a simple pastebin optimized for the command line, written in Python
and heavily inspired by [fiche](https://github.com/solusipse/fiche/), or rather
a shameless translation.
It also comes with a re-implementation of Lines, the HTTP server that comes
with Fiche, which this time allows you to upload files, and comes doesn't have
dependencies outside the standard library. Additionally, PyFiche also comes
with a simple TCP server, Recup, to download pastes through netcat without
using HTTP(S), in the same way you upload them.
## Installation
### Dependencies
* Python 3 (tested with 3.11)
### Local Installation
```bash
$ python -m venv venv
$ source venv/bin/activate
$ pip install -U git+https://kumig.it/PrivateCoffee/pyfiche.git
```
## Usage
### Fiche Server
```bash
$ source venv/bin/activate
$ pyfiche-server # try --help for options
```
With the exception of the `-u` option, all arguments of the original Fiche
should work as expected. `-u` is not implemented because, well, just use the
right user in the first place. 🤷‍♀️
### Recup Server
```bash
$ source venv/bin/activate
$ pyfiche-recup # try --help for options
```
### Lines Server
```bash
$ source venv/bin/activate
$ pyfiche-lines # try --help for options
```
## License
PyFiche is licensed under the MIT license. See the [LICENSE](LICENSE) file for
more information.

52
pyproject.toml Normal file
View file

@ -0,0 +1,52 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.metadata]
allow-direct-references = true
[project]
name = "pyfiche"
version = "0.0.1"
authors = [
{ name="Private.coffee Team", email="support@private.coffee" },
]
description = "Pastebin for command-line use"
readme = "README.md"
license = { file="LICENSE" }
requires-python = ">=3.10"
packages = [
"src/gptbot"
]
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
dependencies = [
]
[project.optional-dependencies]
dev = [
"black",
"hatchling",
"twine",
"build",
]
[project.urls]
"Homepage" = "https://kumig.it/PrivateCoffee/pyfiche"
"Bug Tracker" = "https://kumig.it/PrivateCoffee/pyfiche/issues"
[project.scripts]
pyfiche-server = "pyfiche.fiche_server:main"
pyfiche-recup = "pyfiche.recup_server:main"
pyfiche-lines = "pyfiche.lines_server:main"
[tool.hatch.build.targets.wheel]
packages = ["src/pyfiche"]

1
src/pyfiche/__init__.py Normal file
View file

@ -0,0 +1 @@
from .classes import *

View file

@ -0,0 +1,3 @@
from .fiche import FicheServer
from .recup import RecupServer
from .lines import LinesServer

View file

@ -0,0 +1,296 @@
import pathlib
import os
import socket
import time
import datetime
import threading
import random
import string
import logging
import ipaddress
from typing import Optional, Tuple
class FicheServer:
FICHE_SYMBOLS = string.ascii_letters + string.digits
OUTPUT_FILE_NAME = 'index.txt'
domain: str = 'localhost'
port: int = 9999
listen_addr: str = '0.0.0.0'
slug_size: int = 8
https: bool = False
buffer_size: int = 4096
_output_dir: pathlib.Path = pathlib.Path('data/')
_log_file: Optional[pathlib.Path] = None
_banlist: Optional[pathlib.Path] = None
_allowlist: Optional[pathlib.Path] = None
logger: Optional[logging.Logger] = None
@property
def output_dir(self) -> pathlib.Path:
return self._output_dir
@output_dir.setter
def output_dir(self, value: str|pathlib.Path) -> None:
self._output_dir = pathlib.Path(value) if isinstance(value, str) else value
@property
def output_dir_path(self) -> str:
return str(self.output_dir.absolute())
@property
def log_file(self) -> Optional[pathlib.Path]:
return self._log_file
@log_file.setter
def log_file(self, value: str|pathlib.Path) -> None:
self._log_file = pathlib.Path(value) if isinstance(value, str) else value
@property
def log_file_path(self) -> Optional[str]:
return str(self.log_file.absolute()) if self.log_file else None
@property
def banlist(self) -> Optional[pathlib.Path]:
return self._banlist
@banlist.setter
def banlist(self, value: str|pathlib.Path) -> None:
self._banlist = pathlib.Path(value) if isinstance(value, str) else value
@property
def banlist_path(self) -> Optional[str]:
return str(self.banlist.absolute()) if self.banlist else None
@property
def allowlist(self) -> Optional[pathlib.Path]:
return self._allowlist
@allowlist.setter
def allowlist(self, value: str|pathlib.Path) -> None:
self._allowlist = pathlib.Path(value) if isinstance(value, str) else value
@property
def allowlist_path(self) -> Optional[str]:
return str(self.allowlist.absolute()) if self.allowlist else None
@property
def base_url(self) -> str:
return f"{'https' if self.https else 'http'}://{self.domain}"
@classmethod
def from_args(cls, args) -> 'FicheServer':
fiche = cls()
fiche.domain = args.domain or fiche.domain
fiche.port = args.port or fiche.port
fiche.listen_addr = args.listen_addr or fiche.listen_addr
fiche.slug_size = args.slug_size or fiche.slug_size
fiche.https = args.https or fiche.https
fiche.output_dir = args.output_dir or fiche.output_dir
fiche.log_file = args.log_file or fiche.log_file
fiche.banlist = args.banlist or fiche.banlist
fiche.allowlist = args.allowlist or fiche.allowlist
fiche.buffer_size = args.buffer_size or fiche.buffer_size
fiche.logger = logging.getLogger('pyfiche')
fiche.logger.setLevel(logging.INFO if not args.debug else logging.DEBUG)
handler = logging.StreamHandler() if not args.log_file else logging.FileHandler(args.log_file)
handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
fiche.logger.addHandler(handler)
if args.user_name:
fiche.logger.fatal("PyFiche does not support switching to a different user. Please run as the appropriate user directly.")
return fiche
def get_date(self):
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def start_server(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((self.listen_addr, self.port))
s.listen()
self.logger.info(f"Server started listening on: {self.listen_addr}:{self.port}")
while True:
conn, addr = s.accept()
threading.Thread(target=self.handle_connection, args=(conn, addr,)).start()
def generate_slug(self, length: Optional[int] = None):
slug = ''.join(random.choice(self.FICHE_SYMBOLS) for _ in range(length or self.slug_size))
if self.output_dir:
while os.path.exists(os.path.join(self.output_dir, slug)):
slug = ''.join(random.choice(self.FICHE_SYMBOLS) for _ in range(length + extra_length))
return slug
def create_directory(self, output_dir, slug):
path = os.path.join(output_dir, slug)
try:
if not os.path.exists(path):
os.makedirs(path)
return path
except Exception as e:
self.logger.error(f"Error creating directory {path}: {e}")
return None
def save_to_file(self, data, slug):
path = os.path.join(self.output_dir, slug, self.OUTPUT_FILE_NAME)
os.makedirs(os.path.dirname(path), exist_ok=True)
try:
with open(path, 'wb') as file:
file.write(data)
return path
except Exception as e:
self.logger.error(f"Error saving file {path}: {e}")
return None
def handle_connection(self, conn: socket.socket, addr: Tuple[str, int]):
self.logger.info(f"Incoming connection from: {addr}")
if self.check_banlist(addr[0]):
conn.sendall(b"Your IP address is banned from this server.\n")
self.logger.info(f"Connections from {addr} are banned.")
conn.close()
return
if not self.check_allowlist(addr[0]):
conn.sendall(b"Your IP address is not allowed to connect to this server.\n")
self.logger.info(f"Connection from {addr} is not allowed.")
conn.close()
return
conn.setblocking(False)
conn.settimeout(3)
try:
received_data = bytearray()
try:
while True:
data = conn.recv(self.buffer_size)
self.logger.debug(f"Read {len(data)} bytes from {addr}")
if not data:
break
received_data.extend(data)
except socket.timeout:
pass
data = bytes(received_data)
self.logger.debug(f"Received {len(data)} bytes in total from {addr}")
if not data:
self.logger.error("No data received from the client!")
return
slug = self.generate_slug(self.slug_size)
dir_path = self.create_directory(self.output_dir, slug)
if dir_path is None:
return
file_path = self.save_to_file(data, slug)
if file_path:
url = f"{self.base_url}/{slug}\n"
conn.sendall(url.encode('utf-8'))
self.logger.info(f"Received {len(data)} bytes, saved to: {slug}")
else:
self.logger.error("Failed to save data to file.")
except Exception as e:
self.logger.error(f"An error occurred: {e}")
raise
finally:
conn.close()
def run(self):
if not self.logger:
self.logger = logging.getLogger('pyfiche')
self.logger.setLevel(logging.INFO)
handler = logging.StreamHandler() if not self.log_file else logging.FileHandler(self.log_file)
handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
self.logger.addHandler(handler)
if self.banlist and self.allowlist:
self.logger.fatal("Banlist and allowlist cannot be used together!")
exit(1)
if self.banlist_path and not os.path.exists(self.banlist_path):
self.logger.fatal(f"Banlist file ({self.banlist_path}) does not exist!")
exit(1)
if self.allowlist_path and not os.path.exists(self.allowlist_path):
self.logger.fatal(f"Allowlist file ({self.allowlist_path}) does not exist!")
exit(1)
self.logger.info(f"Starting PyFiche...")
if self.output_dir.exists() and not os.access(self.output_dir_path, os.W_OK):
self.logger.fatal(f"Output directory ({self.output_dir}) not writable!")
exit(1)
elif not self.output_dir.exists():
try:
self.output_dir.mkdir(parents=True)
except Exception as e:
self.logger.fatal(f"Error creating output directory ({self.output_dir}): {e}")
exit(1)
if self.log_file_path:
try:
with open(self.log_file_path, 'a'):
pass
except IOError:
self.logger.fatal("Log file not writable!")
exit(1)
self.start_server()
return 0
def check_allowlist(self, addr):
if not self.allowlist_path:
return True
try:
ip = ipaddress.ip_address(addr)
with open(self.allowlist_path, 'r') as file:
for line in file:
line = line.strip()
if line:
network = ipaddress.ip_network(line, strict=False)
if ip in network:
return True
except ValueError as e:
self.logger.error(f"Invalid IP address or network: {e}")
return False
return False
def check_banlist(self, addr):
if not self.banlist_path:
return False
try:
ip = ipaddress.ip_address(addr)
with open(self.banlist_path, 'r') as file:
for line in file:
line = line.strip()
if line:
network = ipaddress.ip_network(line, strict=False)
if ip in network:
return True
except ValueError as e:
self.logger.error(f"Invalid IP address or network: {e}")
return False
return False

View file

@ -0,0 +1,276 @@
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse
from typing import Union, Optional
import logging
import pathlib
from .fiche import FicheServer
class LinesHTTPRequestHandler(BaseHTTPRequestHandler):
FICHE_SYMBOLS = FicheServer.FICHE_SYMBOLS
DATA_FILE_NAME = FicheServer.OUTPUT_FILE_NAME
BASE_HTML = """<!DOCTYPE html>
<html>
<head>
<title>PyFiche Lines</title>
<style>
code {{
white-space: pre-wrap;
word-wrap: break-word;
font-family: monospace;
font-size: 1em;
font-weight: 400;
color: #212529;
background-color: #f8f9fa;
border-radius: 0.25rem;
padding: 0.2rem 0.4rem;
margin: 0.2rem 0;
display: inline-block;
overflow: auto;
}}
body {{
font-family: sans-serif;
font-size: 1em;
}}
</style>
</head>
<body>
<pre>{content}</pre>
</body>
</html>
"""
server_version = "PyFiche Lines/dev"
def not_found(self):
self.send_response(404)
self.end_headers()
self.wfile.write(b"Not found")
def check_allowlist(self, addr):
if not self.allowlist:
return True
try:
ip = ipaddress.ip_address(addr)
with open(self.allowlist, "r") as file:
for line in file:
line = line.strip()
if line:
network = ipaddress.ip_network(line, strict=False)
if ip in network:
return True
except ValueError as e:
self.logger.error(f"Invalid IP address or network: {e}")
return False
return False
def check_banlist(self, addr):
if not self.banlist:
return False
try:
ip = ipaddress.ip_address(addr)
with open(self.banlist, "r") as file:
for line in file:
line = line.strip()
if line:
network = ipaddress.ip_network(line, strict=False)
if ip in network:
return True
except ValueError as e:
self.logger.error(f"Invalid IP address or network: {e}")
return False
return False
def do_GET(self):
client_ip, client_port = self.client_address
self.logger.info(f"GET request from {client_ip}:{client_port}")
url = urlparse(self.path.rstrip("/"))
# Discard any URLs that aren't of the form /<slug> or /<slug>/raw
if not len(url.path.split("/")) in [2, 3]:
return self.not_found()
raw = False
if len(url.path.split("/")) == 3:
if url.path.split("/")[2] != "raw":
return self.not_found()
raw = True
slug = url.path.split("/")[1]
# Prevent any invalid characters from being used.
# This should prevent directory traversal attacks.
if any([c not in self.FICHE_SYMBOLS for c in slug]):
return self.not_found()
file_path = self.data_dir / slug / self.DATA_FILE_NAME
if not file_path.exists():
return self.not_found()
with open(file_path, "rb") as f:
content = f.read()
try:
content.decode("utf-8")
binary = False
except UnicodeDecodeError:
binary = True
self.send_response(200)
if raw:
# Veeeeery basic MIME type detection - TODO?
self.send_header(
"Content-Type",
"application/octet-stream" if binary else "text/plain",
)
self.send_header("Content-Length", len(content))
self.send_header(
"Content-Disposition",
f'attachment; filename="{slug}.{"bin" if binary else "txt"}"',
)
self.end_headers()
self.wfile.write(content)
return
if binary:
content = (
f'Binary file - cannot display. <a href="{slug}/raw">Download</a>'
)
else:
content = f'Displaying text file content below. <a href="{slug}/raw">Download</a><br><br><code>{content.decode("utf-8")}</code>'
full_html = self.BASE_HTML.format(content=content)
self.send_header("Content-Type", "text/html")
self.send_header("Content-Length", len(full_html))
self.end_headers()
self.wfile.write(full_html.encode("utf-8"))
def make_lines_handler(data_dir, logger, banlist=None, allowlist=None):
class CustomHandler(LinesHTTPRequestHandler):
def __init__(self, *args, **kwargs):
self.data_dir: pathlib.Path = data_dir
self.logger: logging.Logger = logger
self.banlist: Optional[pathlib.Path] = banlist
self.allowlist: Optional[pathlib.Path] = allowlist
super().__init__(*args, **kwargs)
return CustomHandler
class LinesServer:
port: int = 9997
listen_addr: str = "0.0.0.0"
_data_dir: pathlib.Path = pathlib.Path("data/")
_log_file: Optional[pathlib.Path] = None
_banlist: Optional[pathlib.Path] = None
_allowlist: Optional[pathlib.Path] = None
logger: Optional[logging.Logger] = None
@property
def data_dir(self) -> pathlib.Path:
return self._data_dir
@data_dir.setter
def data_dir(self, value: Union[str, pathlib.Path]) -> None:
if isinstance(value, str):
value = pathlib.Path(value)
self._data_dir = value
@property
def data_dir_path(self) -> str:
return str(self.data_dir.absolute())
@property
def log_file(self) -> Optional[pathlib.Path]:
return self._log_file
@log_file.setter
def log_file(self, value: Union[str, pathlib.Path]) -> None:
if isinstance(value, str):
value = pathlib.Path(value)
self._log_file = value
@property
def log_file_path(self) -> Optional[str]:
return str(self.log_file.absolute()) if self.log_file else None
@property
def banlist(self) -> Optional[pathlib.Path]:
return self._banlist
@banlist.setter
def banlist(self, value: Union[str, pathlib.Path]) -> None:
if isinstance(value, str):
value = pathlib.Path(value)
self._banlist = value
@property
def banlist_path(self) -> Optional[str]:
return str(self.banlist.absolute()) if self.banlist else None
@property
def allowlist(self) -> Optional[pathlib.Path]:
return self._allowlist
@allowlist.setter
def allowlist(self, value: Union[str, pathlib.Path]) -> None:
if isinstance(value, str):
value = pathlib.Path(value)
self._allowlist = value
@property
def allowlist_path(self) -> Optional[str]:
return str(self.allowlist.absolute()) if self.allowlist else None
@classmethod
def from_args(cls, args):
lines = cls()
lines.port = args.port or lines.port
lines.listen_addr = args.listen_addr or lines.listen_addr
lines.data_dir = args.data_dir or lines.data_dir
lines.log_file = args.log_file or lines.log_file
lines.banlist = args.banlist or lines.banlist
lines.allowlist = args.allowlist or lines.allowlist
lines.logger = logging.getLogger("pyfiche")
lines.logger.setLevel(logging.INFO if not args.debug else logging.DEBUG)
handler = (
logging.StreamHandler()
if not args.log_file
else logging.FileHandler(args.log_file)
)
handler.setFormatter(
logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
)
lines.logger.addHandler(handler)
return lines
def run(self):
handler_class = make_lines_handler(
self.data_dir, self.logger, self.banlist, self.allowlist
)
with HTTPServer((self.listen_addr, self.port), handler_class) as httpd:
self.logger.info(f"Listening on {self.listen_addr}:{self.port}")
httpd.serve_forever()

View file

@ -0,0 +1,250 @@
import re
import socket
import pathlib
import logging
import argparse
import ipaddress
import os
import sys
import threading
from typing import Optional, Union
from .fiche import FicheServer
class RecupServer:
FICHE_SYMBOLS = FicheServer.FICHE_SYMBOLS
DATA_FILE_NAME = FicheServer.OUTPUT_FILE_NAME
port: int = 9998
listen_addr: str = '0.0.0.0'
buffer_size: int = 16
_data_dir: pathlib.Path = pathlib.Path('data/')
_log_file: Optional[pathlib.Path] = None
_banlist: Optional[pathlib.Path] = None
_allowlist: Optional[pathlib.Path] = None
logger: Optional[logging.Logger] = None
@property
def data_dir(self) -> pathlib.Path:
return self._data_dir
@data_dir.setter
def data_dir(self, value: Union[str, pathlib.Path]) -> None:
if isinstance(value, str):
value = pathlib.Path(value)
self._data_dir = value
@property
def data_dir_path(self) -> str:
return str(self.data_dir.absolute())
@property
def log_file(self) -> Optional[pathlib.Path]:
return self._log_file
@log_file.setter
def log_file(self, value: Union[str, pathlib.Path]) -> None:
if isinstance(value, str):
value = pathlib.Path(value)
self._log_file = value
@property
def log_file_path(self) -> Optional[str]:
return str(self.log_file.absolute()) if self.log_file else None
@property
def banlist(self) -> Optional[pathlib.Path]:
return self._banlist
@banlist.setter
def banlist(self, value: Union[str, pathlib.Path]) -> None:
if isinstance(value, str):
value = pathlib.Path(value)
self._banlist = value
@property
def banlist_path(self) -> Optional[str]:
return str(self.banlist.absolute()) if self.banlist else None
@property
def allowlist(self) -> Optional[pathlib.Path]:
return self._allowlist
@allowlist.setter
def allowlist(self, value: Union[str, pathlib.Path]) -> None:
if isinstance(value, str):
value = pathlib.Path(value)
self._allowlist = value
@property
def allowlist_path(self) -> Optional[str]:
return str(self.allowlist.absolute()) if self.allowlist else None
@classmethod
def from_args(cls, args: argparse.Namespace) -> 'RecupServer':
recup = cls()
recup.port = args.port or recup.port
recup.listen_addr = args.listen_addr or recup.listen_addr
recup.data_dir = args.data_dir or recup.data_dir
recup.buffer_size = args.buffer_size or recup.buffer_size
recup.log_file = args.log_file or recup.log_file
recup.banlist = args.banlist or recup.banlist
recup.allowlist = args.allowlist or recup.allowlist
recup.logger = logging.getLogger('pyfiche')
recup.logger.setLevel(logging.INFO if not args.debug else logging.DEBUG)
handler = logging.StreamHandler() if not args.log_file else logging.FileHandler(args.log_file)
handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
recup.logger.addHandler(handler)
return recup
def handle_connection(self, conn, addr):
self.logger.info(f"Incoming connection from: {addr}")
if self.check_banlist(addr[0]):
conn.sendall(b"Your IP address is banned from this server.\n")
self.logger.info(f"Connections from {addr} are banned.")
conn.close()
return
if not self.check_allowlist(addr[0]):
conn.sendall(b"Your IP address is not allowed to connect to this server.\n")
self.logger.info(f"Connection from {addr} is not allowed.")
conn.close()
return
conn.setblocking(False)
conn.settimeout(3)
with conn:
self.logger.debug(f"New connection by {addr}")
if not self.check_allowlist(addr[0]):
conn.sendall(b"Your IP address is not allowed to connect to this server.\n")
self.logger.info(f"Connection from {addr} is not allowed.")
conn.close()
return
if self.check_banlist(addr[0]):
conn.sendall(b"Your IP address is banned from this server.\n")
self.logger.info(f"Connections from {addr} are banned.")
conn.close()
return
try:
slug = conn.recv(self.buffer_size).decode().strip()
if not slug:
raise ValueError('No slug received, terminating connection.')
# Check if the received slug matches the allowed pattern.
# This should effectively prevent directory traversal attacks.
if any([c not in self.FICHE_SYMBOLS for c in slug]):
raise ValueError('Invalid slug received, terminating connection.')
file_path = self.data_dir / slug / self.DATA_FILE_NAME
if not file_path.is_file():
raise FileNotFoundError(f"File with slug '{slug}' not found.")
with open(file_path, 'rb') as file:
content = file.read()
conn.sendall(content)
except (ValueError, FileNotFoundError) as e:
self.logger.error(e)
conn.close()
def start_server(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((self.listen_addr, self.port))
s.listen()
self.logger.info(f"Server started listening on: {self.listen_addr}:{self.port}")
while True:
conn, addr = s.accept()
threading.Thread(target=self.handle_connection, args=(conn, addr,)).start()
def run(self):
if not self.logger:
self.logger = logging.getLogger('pyfiche')
self.logger.setLevel(logging.INFO)
handler = logging.StreamHandler() if not self.log_file else logging.FileHandler(self.log_file)
handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
self.logger.addHandler(handler)
if self.banlist and self.allowlist:
self.logger.fatal("Banlist and allowlist cannot be used together!")
sys.exit(1)
if self.banlist_path and not os.path.exists(self.banlist_path):
self.logger.fatal(f"Banlist file ({self.banlist_path}) does not exist!")
sys.exit(1)
if self.allowlist_path and not os.path.exists(self.allowlist_path):
self.logger.fatal(f"Allowlist file ({self.allowlist_path}) does not exist!")
sys.exit(1)
self.logger.info(f"Starting PyFiche-Recup...")
if self.data_dir.exists() and not os.access(self.data_dir_path, os.R_OK):
self.logger.fatal(f"Data directory ({self.data_dir}) not readable!")
sys.exit(1)
elif not self.data_dir.exists():
self.logger.fatal(f"Data directory ({self.data_dir}) does not exist!")
sys.exit(1)
if self.log_file_path:
try:
with open(self.log_file_path, 'a'):
pass
except IOError:
self.logger.fatal("Log file not writable!")
sys.exit(1)
self.start_server()
return 0
def check_allowlist(self, addr):
if not self.allowlist_path:
return True
try:
ip = ipaddress.ip_address(addr)
with open(self.allowlist_path, 'r') as file:
for line in file:
line = line.strip()
if line:
network = ipaddress.ip_network(line, strict=False)
if ip in network:
return True
except ValueError as e:
self.logger.error(f"Invalid IP address or network: {e}")
return False
return False
def check_banlist(self, addr):
if not self.banlist_path:
return False
try:
ip = ipaddress.ip_address(addr)
with open(self.banlist_path, 'r') as file:
for line in file:
line = line.strip()
if line:
network = ipaddress.ip_network(line, strict=False)
if ip in network:
return True
except ValueError as e:
self.logger.error(f"Invalid IP address or network: {e}")
return False
return False

View file

@ -0,0 +1,38 @@
import argparse
import sys
import threading
from . import FicheServer
# Define the main function
def main():
# Create an argument parser
parser = argparse.ArgumentParser(description='PyFiche Server - upload and share files through the terminal')
# Add arguments to the parser
parser.add_argument('-d', '--domain', help='Domain to use in URLs (default: localhost)')
parser.add_argument('-p', '--port', type=int, help='Port of Fiche server (default: 9999)')
parser.add_argument('-L', '--listen_addr', help='Listen Address (default: 0.0.0.0)')
parser.add_argument('-s', '--slug_size', type=int, help='Length of slugs to generate (default: 8)')
parser.add_argument('-S', '--https', action='store_true', help='HTTPS (requires reverse proxy)')
parser.add_argument('-o', '--output_dir', help='Output directory path (default: data/)')
parser.add_argument('-B', '--buffer_size', type=int, help='Buffer size (default: 4096)')
parser.add_argument('-l', '--log_file', help='Log file path (default: None - log to stdout)')
parser.add_argument('-b', '--banlist', help='Banlist file path')
parser.add_argument('-w', '--allowlist', help='Allowlist file path')
parser.add_argument('-D', '--debug', action='store_true', help='Debug mode')
parser.add_argument('-t', '--timeout', type=int, help='Timeout for incoming connections (in seconds)')
parser.add_argument('-u', '--user_name', help=argparse.SUPPRESS)
# Parse the arguments
args = parser.parse_args()
# Create a Fiche object
fiche = FicheServer.from_args(args)
# Run the server
fiche.run()
# Check if the script is run directly
if __name__ == '__main__':
main()

View file

@ -0,0 +1,32 @@
import argparse
import sys
import threading
from . import LinesServer
# Define the main function
def main():
# Create an argument parser
parser = argparse.ArgumentParser(description='PyFiche Lines - HTTP server for PyFiche')
# Add arguments to the parser
parser.add_argument('-p', '--port', type=int, help='Port of Recup server (default: 9997)')
parser.add_argument('-L', '--listen_addr', help='Listen Address (default: 0.0.0.0)')
parser.add_argument('-o', '--data_dir', help='Fiche server output directory path (default: data/)')
parser.add_argument('-l', '--log_file', help='Log file path (default: None - log to stdout)')
parser.add_argument('-b', '--banlist', help='Banlist file path')
parser.add_argument('-w', '--allowlist', help='Allowlist file path')
parser.add_argument('-D', '--debug', action='store_true', help='Debug mode')
# Parse the arguments
args = parser.parse_args()
# Create a Lines object
lines = LinesServer.from_args(args)
# Run the server
lines.run()
# Check if the script is run directly
if __name__ == '__main__':
main()

View file

@ -0,0 +1,34 @@
import argparse
import sys
import threading
from . import RecupServer
# Define the main function
def main():
# Create an argument parser
parser = argparse.ArgumentParser(description='PyRecup Server - returns files uploaded through PyFiche')
# Add arguments to the parser
parser.add_argument('-p', '--port', type=int, help='Port of Recup server (default: 9998)')
parser.add_argument('-L', '--listen_addr', help='Listen Address (default: 0.0.0.0)')
parser.add_argument('-o', '--data_dir', help='Fiche server output directory path (default: data/)')
parser.add_argument('-B', '--buffer_size', type=int, help='Buffer size (default: 16)') # TODO: Do we *really* need this?
parser.add_argument('-l', '--log_file', help='Log file path (default: None - log to stdout)')
parser.add_argument('-b', '--banlist', help='Banlist file path')
parser.add_argument('-w', '--allowlist', help='Allowlist file path')
parser.add_argument('-D', '--debug', action='store_true', help='Debug mode')
parser.add_argument('-t', '--timeout', type=int, help='Timeout for incoming connections (in seconds)')
# Parse the arguments
args = parser.parse_args()
# Create a Recup object
recup = RecupServer.from_args(args)
# Run the server
recup.run()
# Check if the script is run directly
if __name__ == '__main__':
main()