Initial commit - supports post.at so far
This commit is contained in:
commit
ef6a842c90
13 changed files with 236 additions and 0 deletions
5
.gitignore
vendored
Normal file
5
.gitignore
vendored
Normal file
|
@ -0,0 +1,5 @@
|
|||
.vscode/
|
||||
__pycache__/
|
||||
*.__pycache__
|
||||
settings.py
|
||||
database.db
|
0
__init__.py
Normal file
0
__init__.py
Normal file
38
database.py
Normal file
38
database.py
Normal file
|
@ -0,0 +1,38 @@
|
|||
import sqlite3
|
||||
|
||||
from settings import DB_NAME
|
||||
|
||||
DATE_FORMAT = "%Y-%m-%d %H:%M:%S%z"
|
||||
|
||||
class Database:
|
||||
def __init__(self, path=DB_NAME):
|
||||
self.conn = sqlite3.connect(path)
|
||||
self.cur = self.conn.cursor()
|
||||
self.setup()
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def setup(self):
|
||||
self.execute('CREATE TABLE IF NOT EXISTS "tracking" (`number` VARCHAR(128) PRIMARY KEY, `service` VARCHAR(128));')
|
||||
self.execute('CREATE TABLE IF NOT EXISTS "status" (`trackingnumber` VARCHAR(128), `timestamp` TIMESTAMP, `status` TEXT, FOREIGN KEY (`trackingnumber`) REFERENCES `tracking` (`number`) ON DELETE CASCADE);')
|
||||
self.commit()
|
||||
|
||||
def execute(self, *args, **kwargs):
|
||||
self.cur.execute(*args, **kwargs)
|
||||
|
||||
def fetchone(self):
|
||||
return self.cur.fetchone()
|
||||
|
||||
def commit(self):
|
||||
self.conn.commit()
|
||||
|
||||
def close(self):
|
||||
self.cur.close()
|
||||
self.conn.close()
|
||||
|
||||
def __exit__(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def __del__(self):
|
||||
self.close()
|
28
notify.py
Normal file
28
notify.py
Normal file
|
@ -0,0 +1,28 @@
|
|||
from notifypy import Notify
|
||||
|
||||
import telegram
|
||||
|
||||
import settings
|
||||
|
||||
def notify_local(title, message, icon=None):
|
||||
notification = Notify()
|
||||
notification.title = title
|
||||
notification.message = message
|
||||
|
||||
if icon:
|
||||
notification.icon = icon
|
||||
|
||||
notification.send()
|
||||
|
||||
def notify_telegram(title, message, icon=None, token=settings.TELEGRAM_TOKEN, recipients=settings.TELEGRAM_RECIPIENT_IDS):
|
||||
bot = telegram.Bot(token=token)
|
||||
|
||||
for chat_id in recipients:
|
||||
bot.sendMessage(chat_id=chat_id, text="%s: %s" % (title, message))
|
||||
|
||||
def notify(title, message, icon=None):
|
||||
for handler in (notify_local, notify_telegram, ):
|
||||
try:
|
||||
handler(title, message, icon)
|
||||
except:
|
||||
pass
|
6
request.py
Normal file
6
request.py
Normal file
|
@ -0,0 +1,6 @@
|
|||
from urllib.request import Request as UrllibRequest
|
||||
|
||||
class Request(UrllibRequest):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.headers["user-agent"] = "PostTrack (https://kumig.it/kumitterer/posttrack)"
|
2
requirements.txt
Normal file
2
requirements.txt
Normal file
|
@ -0,0 +1,2 @@
|
|||
notify-py
|
||||
python-telegram-bot
|
18
runner.py
Executable file
18
runner.py
Executable file
|
@ -0,0 +1,18 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from tracking import Tracking
|
||||
from status import Status
|
||||
from services import get_status
|
||||
|
||||
def run():
|
||||
for tracking in Tracking.all():
|
||||
try:
|
||||
timestamp = Status.from_database(tracking.trackingnumber).timestamp
|
||||
except ValueError:
|
||||
timestamp = None
|
||||
|
||||
status = get_status(tracking.trackingnumber, tracking.service)
|
||||
|
||||
if (not timestamp) or timestamp < status.timestamp:
|
||||
status.to_database()
|
||||
status.notify()
|
14
services/__init__.py
Normal file
14
services/__init__.py
Normal file
|
@ -0,0 +1,14 @@
|
|||
import importlib
|
||||
|
||||
SERVICES = ["austrianpost"]
|
||||
|
||||
def get_status(trackingnumber, service=None, *args, **kwargs):
|
||||
if not service:
|
||||
raise NotImplementedError("Auto-discovery of courier service is not yet supported.")
|
||||
|
||||
if not service in SERVICES:
|
||||
raise NotImplementedError("Service %s is not supported." % service)
|
||||
|
||||
handler = importlib.import_module("services.%s" % service)
|
||||
|
||||
return handler.service(*args, **kwargs).get_status(trackingnumber)
|
36
services/austrianpost.py
Normal file
36
services/austrianpost.py
Normal file
|
@ -0,0 +1,36 @@
|
|||
from services.base import PostalService
|
||||
|
||||
from request import Request
|
||||
from status import Status
|
||||
|
||||
import urllib.request
|
||||
import json
|
||||
import datetime
|
||||
|
||||
API_URL = "https://api.post.at/sendungen/sv/graphqlPublic"
|
||||
STATUS_REQUEST = "query { sendung(sendungsnummer: \"%s\") { sendungsnummer branchkey estimatedDelivery { startDate endDate startTime endTime } dimensions { height width length } status weight sendungsEvents { timestamp status reasontypecode text textEn eventpostalcode eventcountry } } }"
|
||||
DATE_FORMAT = "%Y-%m-%dT%H:%M:%S%z"
|
||||
|
||||
class AustrianPost(PostalService):
|
||||
regex = [r"\d{22}"]
|
||||
|
||||
def get_data(self, trackingcode):
|
||||
query = {"query": STATUS_REQUEST % trackingcode}
|
||||
|
||||
payload = json.dumps(query).encode("utf8")
|
||||
req = Request(API_URL, data=payload, headers={"content-type": "application/json"})
|
||||
|
||||
res = urllib.request.urlopen(req)
|
||||
|
||||
return json.load(res)
|
||||
|
||||
def get_status(self, trackingcode):
|
||||
data = self.get_data(trackingcode)
|
||||
events = data["data"]["sendung"][0]["sendungsEvents"]
|
||||
latest = events[-1]
|
||||
|
||||
timestamp = datetime.datetime.strptime(latest["timestamp"][:-3] + latest["timestamp"][-2:], DATE_FORMAT)
|
||||
|
||||
return Status(trackingcode, timestamp, latest["text"])
|
||||
|
||||
service = AustrianPost
|
10
services/base.py
Normal file
10
services/base.py
Normal file
|
@ -0,0 +1,10 @@
|
|||
class PostalService:
|
||||
@property
|
||||
def regex(self):
|
||||
return NotImplementedError("%s does not implement regex" % self.__class__)
|
||||
|
||||
def get_status(self, trackingcode):
|
||||
return NotImplementedError("%s does not implement get_status()" % self.__class__)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
pass
|
14
settings.dist.py
Normal file
14
settings.dist.py
Normal file
|
@ -0,0 +1,14 @@
|
|||
from pathlib import Path
|
||||
|
||||
# The base directory of the project. Don't change unless you know why.
|
||||
|
||||
BASE_DIR = Path(__file__).resolve().parent
|
||||
|
||||
# File name of the sqlite3 database. Don't change unless you know why.
|
||||
|
||||
DB_NAME = BASE_DIR / "database.db"
|
||||
|
||||
# To enable Telegram notifications, insert a bot token and a list of recipient chat IDs
|
||||
|
||||
TELEGRAM_TOKEN = ""
|
||||
TELEGRAM_RECIPIENT_IDS = []
|
32
status.py
Normal file
32
status.py
Normal file
|
@ -0,0 +1,32 @@
|
|||
from database import Database, DATE_FORMAT
|
||||
|
||||
import notify
|
||||
|
||||
import datetime
|
||||
|
||||
class Status:
|
||||
def __init__(self, trackingcode, timestamp, status):
|
||||
self.trackingcode = trackingcode
|
||||
self.timestamp = timestamp
|
||||
self.status = status
|
||||
|
||||
@classmethod
|
||||
def from_database(cls, trackingcode):
|
||||
with Database() as db:
|
||||
db.execute("SELECT timestamp, status FROM status WHERE trackingnumber=? ORDER BY timestamp DESC LIMIT 1", (trackingcode,))
|
||||
status = db.fetchone()
|
||||
|
||||
if not status:
|
||||
raise ValueError("No status for tracking code %s in database" % trackingcode)
|
||||
|
||||
timestamp = datetime.datetime.strptime(status[0][:-3] + status[0][-2:], DATE_FORMAT)
|
||||
|
||||
return cls(trackingcode, timestamp, status[1])
|
||||
|
||||
def to_database(self):
|
||||
with Database() as db:
|
||||
db.execute("INSERT INTO status VALUES (?, ?, ?)", (self.trackingcode, self.timestamp, self.status))
|
||||
db.commit()
|
||||
|
||||
def notify(self):
|
||||
notify.notify("Status changed", "Status for %s has changed at %s: %s" % (self.trackingcode, self.timestamp, self.status))
|
33
tracking.py
Normal file
33
tracking.py
Normal file
|
@ -0,0 +1,33 @@
|
|||
from database import Database
|
||||
|
||||
class Tracking:
|
||||
def __init__(self, trackingnumber, service=None):
|
||||
self.trackingnumber = trackingnumber
|
||||
self.service = service
|
||||
|
||||
@classmethod
|
||||
def from_database(cls, trackingnumber):
|
||||
with Database() as db:
|
||||
db.execute("SELECT service FROM tracking WHERE number=?", (trackingnumber,))
|
||||
tracking = db.fetchone()
|
||||
|
||||
if not tracking:
|
||||
raise ValueError("No tracking for tracking code %s in database" % trackingnumber)
|
||||
|
||||
return cls(trackingnumber, tracking[0])
|
||||
|
||||
def to_database(self):
|
||||
with Database() as db:
|
||||
db.execute("INSERT INTO tracking VALUES (?, ?)", (self.trackingnumber, self.service))
|
||||
db.commit()
|
||||
|
||||
@classmethod
|
||||
def all(cls):
|
||||
trackings = list()
|
||||
with Database() as db:
|
||||
db.execute("SELECT number, service FROM tracking")
|
||||
|
||||
while current := db.fetchone():
|
||||
trackings.append(cls(current[0], current[1]))
|
||||
|
||||
return trackings
|
Loading…
Reference in a new issue