Implement POST file upload and enhance security checks

Introduced the ability to handle file uploads via POST requests with necessary validations and directory management. Added security checks on both POST and GET methods which now verify client IP against allowlist and banlist. Implemented redirection after successful file upload, with a generated slug as the file identifier.

Refactored the 'do_POST' method, incorporating checks on request path and 'Content-Length' header, rejecting inappropriate requests swiftly. Created the upload functionality that reads the content and writes it to the designated data directory, using a new, unique slug for each file. Also added logging for requests, aiding in monitoring and debugging.

These enhancements create a more functional and secure system, laying the groundwork for future improvements and feature integrations.
This commit is contained in:
Kumi 2024-01-21 21:46:53 +01:00
parent b23e97e3b8
commit bf60f75769
Signed by: kumi
GPG key ID: ECBCC9082395383F

View file

@ -48,11 +48,49 @@ body {{
server_version = "PyFiche Lines/dev"
# TODO: Implement uploading
def do_POST(self):
self.send_response(501)
client_ip, client_port = self.client_address
self.logger.info(f"POST request from {client_ip}:{client_port}")
if (not self.check_allowlist(client_ip)) or self.check_banlist(client_ip):
self.logger.info(f"Rejected request from {client_ip}:{client_port}")
return self.not_found()
# Reject any POST requests that aren't to /
url = urlparse(self.path.rstrip("/"))
if url.path != "/":
return self.not_found()
# Reject any POST requests that don't have a Content-Length header
if "Content-Length" not in self.headers:
return self.not_found()
# Upload the file
content_length = int(self.headers["Content-Length"])
content = self.rfile.read(content_length)
if not content:
return self.not_found()
slug = FicheServer.generate_slug(self.data_dir, self.FICHE_SYMBOLS)
file_path = self.data_dir / slug / self.DATA_FILE_NAME
file_path.parent.mkdir(parents=True, exist_ok=True)
with file_path.open("wb") as f:
f.write(content)
# Redirect the user to the new file
self.send_response(303)
self.send_header("Location", f"/{slug}")
self.end_headers()
self.wfile.write(b"Not implemented")
def not_found(self):
self.send_response(404)
@ -100,6 +138,10 @@ body {{
def do_GET(self):
client_ip, client_port = self.client_address
if (not self.check_allowlist(client_ip)) or self.check_banlist(client_ip):
self.logger.info(f"Rejected request from {client_ip}:{client_port}")
return self.not_found()
self.logger.info(f"GET request from {client_ip}:{client_port}")
url = urlparse(self.path.rstrip("/"))