From a45f20e87b5f39d85057dec859a7d938078a89b3 Mon Sep 17 00:00:00 2001 From: Kumi Date: Thu, 24 Aug 2023 08:44:21 +0200 Subject: [PATCH] Current status --- .gitignore | 5 + .gitlab-ci.yml | 26 ++ LICENSE | 19 ++ README.md | 36 +++ config.dist.ini | 3 + pyproject.toml | 23 ++ src/postat/__init__.py | 0 src/postat/classes/__init__.py | 0 src/postat/classes/api.py | 452 +++++++++++++++++++++++++++++++++ src/postat/classes/config.py | 14 + src/postat/classes/http.py | 30 +++ src/postat/classes/log.py | 58 +++++ src/postat/classes/parser.py | 23 ++ tests/__init__.py | 0 tests/authenticated.py | 26 ++ tests/public.py | 19 ++ 16 files changed, 734 insertions(+) create mode 100644 .gitignore create mode 100644 .gitlab-ci.yml create mode 100644 LICENSE create mode 100644 README.md create mode 100644 config.dist.ini create mode 100644 pyproject.toml create mode 100644 src/postat/__init__.py create mode 100644 src/postat/classes/__init__.py create mode 100644 src/postat/classes/api.py create mode 100644 src/postat/classes/config.py create mode 100644 src/postat/classes/http.py create mode 100644 src/postat/classes/log.py create mode 100644 src/postat/classes/parser.py create mode 100644 tests/__init__.py create mode 100644 tests/authenticated.py create mode 100644 tests/public.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d15492e --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +config.ini +venv/ +.venv/ +__pycache__/ +*.pyc \ No newline at end of file diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml new file mode 100644 index 0000000..bb4c868 --- /dev/null +++ b/.gitlab-ci.yml @@ -0,0 +1,26 @@ +image: python:3.10 + +stages: + - test + - publish + +before_script: + - python -V + - python -m venv venv + - source venv/bin/activate + - pip install -U pip + - pip install . + +test: + stage: test + script: + - python -m unittest tests/public.py + +publish: + stage: publish + script: + - pip install -U hatchling twine build + - python -m build . + - python -m twine upload --username __token__ --password ${PYPI_TOKEN} dist/* + only: + - tags \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..350d27a --- /dev/null +++ b/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2023 Kumi Mitterer + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..5541875 --- /dev/null +++ b/README.md @@ -0,0 +1,36 @@ +# Python wrapper for Austrian Post web services + +This is a Python wrapper for web services provided by [Austrian Post](https://www.post.at/). + +Currently, it only supports looking up shipment details by tracking number. + +Groundwork for services requiring authentication is laid out, but not working +yet. If you want to contribute, please feel free to do so. + +## Installation + +```bash +pip install git+https://kumig.it/kumitterer/postat.git +``` + +## Usage + +```python +from postat.classes.api import PostAPI + +api = PostAPI() + +# Get shipment details + +shipment_details = api.get_shipment_details("123456789012345678901234567890") + +shipment = shipment["data"]["einzelsendung"] + +# Get latest event + +latest_event = shipment["sendungsEvents"][-1] +``` + +## License + +This project is licensed under the MIT License. See [LICENSE](LICENSE) for details. \ No newline at end of file diff --git a/config.dist.ini b/config.dist.ini new file mode 100644 index 0000000..893f365 --- /dev/null +++ b/config.dist.ini @@ -0,0 +1,3 @@ +[POST] +Username = email@add.ress +Password = v3rys3cr3t! \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..7e52096 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,23 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "postat" +version = "0.9.0" +authors = [ + { name="Kumi Mitterer", email="postat@kumi.email" }, +] +description = "Simple Python wrapper to fetch data from Austrian Post (post.at)" +readme = "README.md" +license = { file="LICENSE" } +requires-python = ">=3.10" +classifiers = [ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", +] + +[project.urls] +"Homepage" = "https://kumig.it/kumitterer/postat" +"Bug Tracker" = "https://kumig.it/kumitterer/postat/issues" \ No newline at end of file diff --git a/src/postat/__init__.py b/src/postat/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/postat/classes/__init__.py b/src/postat/classes/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/postat/classes/api.py b/src/postat/classes/api.py new file mode 100644 index 0000000..10b891d --- /dev/null +++ b/src/postat/classes/api.py @@ -0,0 +1,452 @@ +from typing import Optional, Dict +from http.client import HTTPResponse +from http.cookies import SimpleCookie +from random import SystemRandom + +import urllib.parse +import urllib.error +import hashlib +import re +import json + +from .http import HTTPRequest +from .parser import FormParser + +HOME_URL = "https://www.post.at/" +LOGIN_INIT_URL = "https://www.post.at/identity/externallogin?authenticationType=post.azureADB2C&ReturnUrl=%2fidentity%2fexternallogincallback%3fReturnUrl%3dhttps%253a%252f%252fwww.post.at%252fen%26sc_site%3dPostAT%26authenticationSource%3dDefault&sc_site=PostAT" +LOGIN_SELF_ASSERTED = "https://login.post.at%(tenant)s/SelfAsserted" +LOGIN_CONFIRMED = "https://login.post.at%(tenant)s/api/%(api)s/confirmed" +OAUTH_BASE = "https://login.post.at/%(tenant)s/%(api)s/oauth2/v2.0" +TOKEN = OAUTH_BASE + "token" +AUTHORIZE = OAUTH_BASE + "authorize" +GRAPHQL_AUTHENTICATED = "https://api.post.at/sendungen/sv/graphqlAuthenticated" +GRAPHQL_PUBLIC = "https://api.post.at/sendungen/sv/graphqlPublic" + + +class PostAPI: + """ A class providing a pseudo-API for the Austrian Post website. """ + + def __init__(self, username: Optional[str] = None, password: Optional[str] = None, login: bool = True): + """ Initialize the connection to the Austrian Post website. + + Parameters: + username: The username to use for login (optional) + password: The password to use for login (optional) + login: Whether to login immediately or not (default: True, only works if username and password are provided) + """ + + # Initialize variables + + self.cookies: Dict[str, str] = {} + self.username: Optional[str] = None + self.password: Optional[str] = None + self.login_data: Optional[dict] = None + self.login_settings: Optional[dict] = None + self.login_params: Optional[dict] = None + + # Login if credentials are provided and login is enabled + + if username and password: + self.username = username + self.password = password + + if login: + self.login() + + def update_cookies(self, response: HTTPResponse): + """ Update the cookies from the given response. + + Parameters: + response: The HTTPResponse object to update the cookies from + """ + + # Get the Set-Cookie header and parse it + + set_cookie_header = response.getheader("Set-Cookie") + if set_cookie_header: + cookies = SimpleCookie() + + # Split the header by comma and load the individual cookies + + for cookie in set_cookie_header.split(","): + cookies.load(cookie.strip()) + + # Update the cookies + + for value in cookies.values(): + self.cookies[value.key] = value.value + + def request(self, *args, **kwargs) -> HTTPRequest: + """ Create a new HTTPRequest with the current cookies. + + Returns: + The new HTTPRequest + """ + + req = HTTPRequest(*args, **kwargs) + req.cookies = self.cookies + return req + + def login(self) -> bool: + """ Login to the Austrian Post website. + + As there are no public APIs for the Austrian Post website, this method + basically just emulates a browser login. + + Returns: + True if the login was successful, False otherwise + """ + + assert self.username and self.password, "Username and password must be provided" + + # Get the home page and update the cookies + # Not sure if this is necessary, but it doesn't hurt + + # TODO: Fetch the login URL from the home page + + home_req = self.request(HOME_URL) + home_res = home_req.open() + + self.update_cookies(home_res) + + # Get the login page and update the cookies + # This is a POST request for some reason + + login_req = self.request(LOGIN_INIT_URL, method="POST") + login_res = login_req.open() + + self.update_cookies(login_res) + + # The previous request redirects to the actual login page + # Get the URL and parse the query parameters + + login_url_parts = urllib.parse.urlsplit(login_res.geturl()) + self.login_params = login_params = urllib.parse.parse_qs( + login_url_parts.query) + + # Generate a code verifier and code challenge + # The code verifier is a random string of 32 bytes + # The code challenge is the SHA256 hash of the code verifier + # This is used for PKCE for the token request + + self.code_verifier = SystemRandom().getrandbits(256).to_bytes(32, "big").hex() + code_challenge = hashlib.sha256( + self.code_verifier.encode()).digest().hex() + login_params["code_challenge"] = code_challenge + + # Reconstruct the login URL + + login_url = urllib.parse.urlunsplit((login_url_parts.scheme, login_url_parts.netloc, + login_url_parts.path, urllib.parse.urlencode(login_params), login_url_parts.fragment)) + + # Get the settings from the login page + + login_html = login_res.read().decode() + + login_settings_regex = re.compile( + r'var SETTINGS = ({.*?});', re.DOTALL) + login_settings_match = login_settings_regex.search(login_html) + self.login_settings = login_settings = json.loads( + login_settings_match.group(1)) + + # Get required parameters from the settings + + transaction_id = login_settings["transId"] + policy = login_settings["hosts"]["policy"] + tenant = login_settings["hosts"]["tenant"] + csrf = login_settings["csrf"] + api = login_settings["api"] + + # Prepare the parameters for the self-asserted login + + self_asserted_params = { + "tx": transaction_id, + "p": policy, + } + + # Get the self-asserted login URL + + self_asserted_url = LOGIN_SELF_ASSERTED % {"tenant": tenant} + self_asserted_query = urllib.parse.urlencode(self_asserted_params) + + # Encode the payload for the self-asserted login + + self_asserted_payload = urllib.parse.urlencode({ + "request_type": "RESPONSE", + "signInName": self.username, + "password": self.password, + }).encode() + + self_asserted_req = self.request( + f"{self_asserted_url}?{self_asserted_query}", data=self_asserted_payload, method="POST") + + # Add required headers for the self-asserted login + + self_asserted_req.add_header( + "Content-Type", "application/x-www-form-urlencoded") + self_asserted_req.add_header("X-CSRF-TOKEN", csrf) + self_asserted_req.add_header("X-Requested-With", "XMLHttpRequest") + + # Perform the self-asserted login + + self_asserted_res = self_asserted_req.open() + self_asserted_response = json.loads(self_asserted_res.read().decode()) + + # Check if the login was successful + + if self_asserted_response["status"] != "200": + return False + + # Update the cookies + + self.update_cookies(self_asserted_res) + + # Prepare the parameters for the confirmation page + + confirmation_params = { + "rememberMe": "false", + "csrf_token": csrf, + "tx": transaction_id, + "p": policy, + } + + # Prepare the URL for the confirmation page + + confirmation_url = LOGIN_CONFIRMED % { + "tenant": tenant, "api": api} + f"?{urllib.parse.urlencode(confirmation_params)}" + + # Get the confirmation page and update the cookies + + confirmation_req = self.request(confirmation_url) + + confirmation_res = confirmation_req.open() + + self.update_cookies(confirmation_res) + + # Use FormParser to parse the confirmation page + + confirmation_html = confirmation_res.read().decode() + + confirmation_parser = FormParser() + confirmation_parser.feed(confirmation_html) + + # Get the form data from the confirmation page + + form_action = confirmation_parser.form_action + form_data = confirmation_parser.form_fields + + # Check if the server returned an error + + if "error" in form_data.keys(): + print(form_data["error_description"]) + return False + + # Prepare the final request + + final_url = form_action + final_payload = urllib.parse.urlencode(form_data).encode() + + final_req = self.request(final_url, data=final_payload, method="POST") + + # Add required headers for the final request + + final_req.add_header( + "Content-Type", "application/x-www-form-urlencoded") + + # Perform the final request and update the cookies + + final_res = final_req.open() + self.update_cookies(final_res) + + # Store the login data + + self.login_data = form_data + + # If we got this far, the login was hopefully successful + + return self.logged_in() + + def logged_in(self) -> bool: + '''Check if the user is logged in + + TODO: Actually verify the login status instead of just checking if login_data is set + + Returns: + bool: True if the user is logged in, False otherwise + ''' + return bool(self.login_data) + + def get_token(self) -> str: + '''Exchange the id_token for an access_token + + Currently not working – may be replaced by more specific methods + + Returns: + str: The access_token + ''' + + # Assert that the user is logged in + + if not self.logged_in(): + raise Exception("Not authenticated.") + + # Prepare the request + + token_req = self.request(TOKEN, method="POST") + token_req.add_header( + "Content-Type", "application/x-www-form-urlencoded") + + # Prepare the payload + + print(self.login_data) + + token_payload = { + # TODO: Get this from somewhere dynamically + "client_id": "e2cfcb2a-7dc8-4110-ac2a-9647dd095d9e", + # TODO: Get this from somewhere dynamically if required + "redirect_uri": "https://services.post.at", + # TODO: Get this from somewhere dynamically if required + "scope": "openid profile offline_access", + "code": self.login_data["code"], # TODO: Is this correct? + "code_verifier": self.code_verifier, + # TODO: Get this from somewhere dynamically if required + "grant_type": "authorization_code", + "client_info": 1, # TODO: Get this from somewhere dynamically if required + # TODO: Get this from somewhere dynamically... + "client-request-id": "706a9572-5395-4e05-9d80-9317491696d6", + } + + # Perform the request + + token_req.data = urllib.parse.urlencode(token_payload).encode() + token_res = token_req.open() + + # Parse the response + + token_response = json.loads(token_res.read().decode()) + + # For now, just print the response + + print(token_response) + + return "" # TODO: Actually return the access_token + + def get_shipments_token(self): + + if not self.logged_in(): + raise Exception("Not authenticated.") + + # Prepare the request + + authorize_url = AUTHORIZE % ( + self.login_settings["hosts"]["tenant"], self.login_settings["api"]) + + print(authorize_url) + + token_req = self.request(authorize_url, method="GET") + + # Prepare the query string + + token_payload = { + "response_type": "token", + "scope": "https://login.post.at/sendungenapi-prod/Sendungen.All openid profile", + "client_id": self.login_params["client_id"], + "redirect_uri": self.login_params["redirect_uri"], + "state": self.login_params["state"], + "nonce": self.login_params["nonce"], + } + + + + def get_shipments(self): + if not self.logged_in(): + raise Exception("Not authenticated.") + + query = { + "query": """query + { sendungen: sendungen( + tagFilter: \"Empfangen\" + postProcessingOptions : {elementCount: 2} + ) + { sendungen { + sendungsnummer + estimatedDelivery { + startDate + endDate + startTime + endTime + } + sender + status + bezeichnung + sendungsEvents { + status + timestamp + reasontypecode + } + customsInformation { + customsDocumentAvailable + userDocumentNeeded + } + } + } + }""" + } + + req = self.request(GRAPHQL_AUTHENTICATED, + data=json.dumps(query).encode()) + req.add_header("Content-Type", "application/json") + req.add_header("Authorization", f"Bearer {self.id_token}") + + res = req.open() + return json.loads(res.read().decode()) + + def get_shipment_status_public(self, tracking_number): + req = self.request(GRAPHQL_PUBLIC) + + query = { + "query": """query { + einzelsendung(sendungsnummer: \"%s\") { + sendungsnummer + branchkey + estimatedDelivery { + startDate + endDate + startTime + endTime + } + dimensions { + height + width + length + } + status + weight + sendungsEvents { + timestamp + status + reasontypecode + text + textEn + eventpostalcode + eventcountry + } + customsInformation { + customsDocumentAvailable, + userDocumentNeeded + } + } + }""" % tracking_number + } + + req.add_json_payload(query) + + res = req.open() + return json.loads(res.read().decode()) + + def get_shipment_status(self, tracking_number): + if self.logged_in(): + # TODO: Implement function using graphqlAuthenticated + pass + + return self.get_shipment_status_public(tracking_number) \ No newline at end of file diff --git a/src/postat/classes/config.py b/src/postat/classes/config.py new file mode 100644 index 0000000..1ac7151 --- /dev/null +++ b/src/postat/classes/config.py @@ -0,0 +1,14 @@ +from configparser import ConfigParser +from pathlib import Path + +class Config(ConfigParser): + def __init__(self, path: str = "config.ini", *args, **kwargs): + super().__init__(*args, **kwargs) + + self.read(path) + + def username(self): + return self["POST"].get("Username") + + def password(self): + return self["POST"].get("Password") \ No newline at end of file diff --git a/src/postat/classes/http.py b/src/postat/classes/http.py new file mode 100644 index 0000000..aa8e2f4 --- /dev/null +++ b/src/postat/classes/http.py @@ -0,0 +1,30 @@ +from urllib.request import Request, urlopen + +import json + +USER_AGENT = "PostAT/dev (https://kumig.it/kumitterer/postat.git)" + +class HTTPRequest(Request): + def build_cookie_header(self): + cookiestrings = [f"{name}={value}" for name, value in self.cookies.items()] + return "; ".join(cookiestrings) + + def open(self): + if self.cookies: + self.add_header("Cookie", self.build_cookie_header()) + + return urlopen(self) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.cookies = dict() + self.add_header("User-Agent", USER_AGENT) + + def add_json_payload(self, payload: dict|str): + self.add_header("Content-Type", "application/json") + + if isinstance(payload, dict): + payload = json.dumps(payload) + + self.data = payload.encode("utf-8") \ No newline at end of file diff --git a/src/postat/classes/log.py b/src/postat/classes/log.py new file mode 100644 index 0000000..5636758 --- /dev/null +++ b/src/postat/classes/log.py @@ -0,0 +1,58 @@ +import logging +import datetime +from typing import Optional +from enum import Enum + +class LogLevel(Enum): + DEBUG = logging.DEBUG + INFO = logging.INFO + WARNING = logging.WARNING + ERROR = logging.ERROR + CRITICAL = logging.CRITICAL + +class Logger: + def __init__(self, name: Optional[str] = None, log_file: Optional[str] = None, level: LogLevel = LogLevel.WARNING): + self.logger = logging.getLogger(name or __name__) + + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + + if log_file: + fh = logging.FileHandler(log_file) + fh.setFormatter(formatter) + self.logger.addHandler(fh) + + ch = logging.StreamHandler() + ch.setFormatter(formatter) + self.logger.addHandler(ch) + + self.set_log_level(level) + + def set_log_level(self, level: LogLevel): + self.logger.setLevel(level.value) + + def log_message(self, message: str, level: LogLevel = LogLevel.INFO): + if level == LogLevel.DEBUG: + self.logger.debug(message) + elif level == LogLevel.INFO: + self.logger.info(message) + elif level == LogLevel.WARNING: + self.logger.warning(message) + elif level == LogLevel.ERROR: + self.logger.error(message) + elif level == LogLevel.CRITICAL: + self.logger.critical(message) + + def log_debug(self, message: str): + self.log_message(message, LogLevel.DEBUG) + + def log_info(self, message: str): + self.log_message(message, LogLevel.INFO) + + def log_warning(self, message: str): + self.log_message(message, LogLevel.WARNING) + + def log_error(self, message: str): + self.log_message(message, LogLevel.ERROR) + + def log_critical(self, message: str): + self.log_message(message, LogLevel.CRITICAL) \ No newline at end of file diff --git a/src/postat/classes/parser.py b/src/postat/classes/parser.py new file mode 100644 index 0000000..6f7afc9 --- /dev/null +++ b/src/postat/classes/parser.py @@ -0,0 +1,23 @@ +from html.parser import HTMLParser + +class FormParser(HTMLParser): + def __init__(self): + super().__init__() + self.form_fields = {} + self.form_action = None + + def handle_starttag(self, tag, attrs): + if tag == 'form': + for name, value in attrs: + if name == 'action': + self.form_action = value + elif tag == 'input': + field_name = None + field_value = None + for name, value in attrs: + if name == 'name': + field_name = value + elif name == 'value': + field_value = value + if field_name: + self.form_fields[field_name] = field_value \ No newline at end of file diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/authenticated.py b/tests/authenticated.py new file mode 100644 index 0000000..48a1379 --- /dev/null +++ b/tests/authenticated.py @@ -0,0 +1,26 @@ +from postat.classes.api import PostAPI +from postat.classes.config import Config + +import unittest + +class TestConfig(unittest.TestCase): + def test_config(self): + config = Config() + config.read("config.ini") + self.assertTrue(config.username()) + self.assertTrue(config.password()) + +class TestAPI(unittest.TestCase): + def setUp(self): + self.config = Config() + self.config.read("config.ini") + self.api = PostAPI(self.config.username(), self.config.password(), False) + + def test_login(self): + login_status = self.api.login() + self.assertTrue(self.api.logged_in()) + + def test_token(self): + self.api.login() + token = self.api.get_token() + self.assertTrue(token) diff --git a/tests/public.py b/tests/public.py new file mode 100644 index 0000000..80adbf7 --- /dev/null +++ b/tests/public.py @@ -0,0 +1,19 @@ +import unittest + +from postat.classes.api import PostAPI + +class TestAPI(unittest.TestCase): + def setUp(self): + self.api = PostAPI() + + def test_login(self): + with self.assertRaises(AssertionError): + login_status = self.api.login() + + self.assertFalse(self.api.logged_in()) + + def test_shipment_status(self): + tracking_number = "1040906121766650280101" + shipment_status = self.api.get_shipment_status(tracking_number) + self.assertEqual(shipment_status["data"]["einzelsendung"]["sendungsnummer"], tracking_number) +