Kumi
1ae1a88975
This commit adds a new method called `write_event` to the `Database` class in `database.py`. The method takes an `Event` object as a parameter and adds it to the session before committing the changes to the database. This change simplifies the code in the `create_event` method by making use of the new `write_event` method. The `write_event` method is used in the `Tracker` class in `tracker.py` to replace the call to `create_event`. This change improves the clarity and readability of the code. Additionally, in the `Tracker` class, the `api` object is no longer created using `KeyDelivery.from_config`, as the `api` object is no longer referenced in the code. This change is made to remove unused code and improve code quality. This commit also adds some new import statements to the `Tracker` class in `tracker.py` to support the new changes. Lastly, two new tracker classes `KeyDelivery` and `PostAT` are added under the `trackers` directory. These new classes inherit from the `BaseTracker` class and implement the `get_status` method. The `KeyDelivery` class makes use of the `pykeydelivery.KeyDelivery` API to fetch tracking events, while the `PostAT` class uses the `postat.classes.api.PostAPI` API to fetch tracking events. The new tracker classes also define the supported carriers and their priorities. Overall, these changes improve the functionality and structure of the codebase.
116 lines
No EOL
4.2 KiB
Python
116 lines
No EOL
4.2 KiB
Python
import logging
|
|
import subprocess
|
|
import time
|
|
import importlib
|
|
from pathlib import Path
|
|
from typing import Optional, Tuple, Never
|
|
|
|
from .database import Database
|
|
from trackers.base import BaseTracker
|
|
|
|
from pykeydelivery import KeyDelivery
|
|
|
|
class Tracker:
|
|
def __init__(self):
|
|
logging.basicConfig(
|
|
format="%(asctime)s %(levelname)s: %(message)s",
|
|
level=logging.DEBUG,
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
|
|
self.find_apis()
|
|
|
|
def find_apis(self):
|
|
logging.debug("Finding APIs")
|
|
|
|
self.apis = []
|
|
|
|
for api in Path(__file__).parent.parent.glob("trackers/*.py"):
|
|
if api.name in ("__init__.py", "base.py"):
|
|
continue
|
|
|
|
logging.debug(f"Found API {api.stem}")
|
|
|
|
module = importlib.import_module(f"trackers.{api.stem}")
|
|
|
|
if "tracker" in module.__dict__:
|
|
tracker = module.tracker
|
|
logging.debug(f"Found tracker {api.stem}")
|
|
try:
|
|
carriers = tracker.supported_carriers()
|
|
api = tracker()
|
|
|
|
for carrier, priority in carriers:
|
|
self.apis.append((carrier, priority, api))
|
|
except:
|
|
logging.exception(f"Error loading tracker {name}")
|
|
|
|
def query_api(self, tracking_number: str, carrier: str) -> list:
|
|
logging.debug(f"Querying API for {tracking_number} with carrier {carrier}")
|
|
|
|
for api_carrier, _, api in sorted(self.apis, key=lambda x: x[1], reverse=True):
|
|
if api_carrier == "*" or api_carrier == carrier:
|
|
logging.debug(f"Using API {api.__class__.__name__} for {tracking_number} with carrier {carrier}")
|
|
return list(api.get_status(tracking_number, carrier))
|
|
|
|
def notify(self, title: str, message: str, urgency: str = "normal", timeout: Optional[int] = 5000) -> None:
|
|
logging.debug(f"Sending notification: {title} - {message}")
|
|
|
|
command = [
|
|
"notify-send",
|
|
"-a", "trackbert",
|
|
"-u", urgency,
|
|
"-i", str(Path(__file__).parent / "assets" / "parcel-delivery-icon.webp"),
|
|
]
|
|
|
|
if timeout:
|
|
command += ["-t", str(timeout)]
|
|
|
|
command = command + [title, message]
|
|
|
|
try:
|
|
subprocess.run(command)
|
|
|
|
except FileNotFoundError:
|
|
logging.warning("notify-send not found, not sending notification")
|
|
|
|
def start_loop(self) -> Never:
|
|
logging.debug("Starting loop")
|
|
|
|
while True:
|
|
for shipment in self.db.get_shipments():
|
|
shipment_id = shipment.id
|
|
tracking_number = shipment.tracking_number
|
|
carrier = shipment.carrier
|
|
description = shipment.description
|
|
|
|
logging.debug(f"Checking shipment {tracking_number} with carrier {carrier}")
|
|
|
|
latest_known_event = self.db.get_latest_event(shipment_id)
|
|
|
|
events = self.query_api(tracking_number, carrier)
|
|
|
|
if latest_known_event:
|
|
logging.debug(f"Latest known event for {tracking_number}: {latest_known_event.event_description} - {latest_known_event.event_time}")
|
|
else:
|
|
logging.debug(f"No known events for {tracking_number}")
|
|
|
|
logging.debug(f"Latest upstream event for {tracking_number}: {events[0].event_description} - {events[0].event_time}")
|
|
|
|
latest = True
|
|
|
|
for event in events:
|
|
if latest_known_event is None or event.event_time > latest_known_event.event_time:
|
|
self.db.write_event(event)
|
|
|
|
logging.info(f"New event for {tracking_number}: {event.event_description} - {event.event_time}")
|
|
self.notify(f"New event for {description or tracking_number}", event.event_description + " - " + event.event_time, urgency="critical" if latest else "normal")
|
|
|
|
latest = False
|
|
|
|
time.sleep(300)
|
|
|
|
def start(self):
|
|
self.db = Database('sqlite:///trackbert.db')
|
|
self.notify("Trackbert", "Starting up")
|
|
self.start_loop() |