Bump version to 0.3.0-dev, replace Tracker class with Core class.

This commit is contained in:
Kumi 2023-09-06 12:51:07 +02:00
parent 98e610a2a1
commit 66cee293ec
Signed by: kumi
GPG key ID: ECBCC9082395383F
11 changed files with 265 additions and 265 deletions

View file

@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project] [project]
name = "trackbert" name = "trackbert"
version = "0.2.6" version = "0.3.0-dev"
authors = [ authors = [
{ name="Kumi Mitterer", email="trackbert@kumi.email" }, { name="Kumi Mitterer", email="trackbert@kumi.email" },
] ]

View file

@ -11,7 +11,7 @@ import asyncio
from typing import Tuple, Never, Optional from typing import Tuple, Never, Optional
from .classes.database import Database from .classes.database import Database
from .classes.tracker import Tracker from .classes.core import Core
def main(): def main():
@ -102,7 +102,7 @@ def main():
print(f"Config file {config_file} does not exist. Use -g to generate it.") print(f"Config file {config_file} does not exist. Use -g to generate it.")
exit(1) exit(1)
tracker = Tracker(config_file) tracker = Core(config_file)
# List carriers if requested # List carriers if requested

View file

@ -0,0 +1,238 @@
import logging
import subprocess
import time
import importlib
import asyncio
import sqlalchemy.exc
from pathlib import Path
from typing import Optional, Tuple, Never
from os import PathLike
from configparser import ConfigParser
from .database import Database
from .tracker import BaseTracker
class Core:
loop_interval = 60
loop_timeout = 30
def __init__(self, config: Optional[PathLike] = None):
logging.basicConfig(
format="%(asctime)s %(levelname)s: %(message)s",
level=logging.WARN,
datefmt="%Y-%m-%d %H:%M:%S",
)
self._pre_start(config)
self.find_apis()
def find_apis(self):
logging.debug("Finding APIs")
self.apis = []
for api in Path(__file__).parent.parent.glob("trackers/*.py"):
if api.name in ("__init__.py", "base.py"):
continue
logging.debug(f"Found API {api.stem}")
try:
module = importlib.import_module(f"trackbert.trackers.{api.stem}")
except:
logging.error(f"Error loading class {api.stem}")
if "tracker" in module.__dict__:
tracker = module.tracker
logging.debug(f"Found tracker {api.stem}")
try:
api = tracker(config=self.config_path)
carriers = api.supported_carriers()
for carrier in carriers:
self.apis.append((carrier[0], carrier[1], api, (carrier[2] if len(carrier) > 2 else None)))
except Exception as e:
logging.error(f"Error loading tracker {api.__class__.__name__}: {e}")
def query_api(self, tracking_number: str, carrier: str) -> list:
logging.debug(f"Querying API for {tracking_number} with carrier {carrier}")
for api_entry in sorted(self.apis, key=lambda x: x[1], reverse=True):
api_carrier = api_entry[0]
priority = api_entry[1]
api = api_entry[2]
name = api_entry[3] if len(api_entry) > 3 else None
if api_carrier == "*" or api_carrier == carrier:
logging.debug(
f"Using API {api.__class__.__name__} for {tracking_number} with carrier {carrier}"
)
return list(api.get_status(tracking_number, carrier))
def notify(
self,
title: str,
message: str,
urgency: str = "normal",
timeout: Optional[int] = 5000,
) -> None:
logging.debug(f"Sending notification: {title} - {message}")
command = [
"notify-send",
"-a",
"trackbert",
"-u",
urgency,
"-i",
str(Path(__file__).parent.parent / "assets" / "parcel-delivery-icon.webp"),
]
if timeout:
command += ["-t", str(timeout)]
command = command + [title, message]
try:
subprocess.run(command)
except FileNotFoundError:
logging.warning("notify-send not found, not sending notification")
def notify_event(self, shipment, event, critical=False) -> None:
logging.info(
f"New event for {shipment.tracking_number}: {event.event_description} - {event.event_time}"
)
self.notify(
f"New event for {shipment.description or shipment.tracking_number}",
event.event_description + " - " + event.event_time,
urgency="critical" if critical else "normal",
)
def process_shipment(self, shipment) -> None:
if not shipment.carrier:
logging.info(
f"Shipment {shipment.tracking_number} has no carrier, skipping"
)
return
logging.debug(
f"Checking shipment {shipment.tracking_number} with carrier {shipment.carrier}"
)
latest_known_event = self.db.get_latest_event(shipment.id)
try:
events = self.query_api(shipment.tracking_number, shipment.carrier)
except Exception as e:
logging.exception(f"Error querying API for {shipment.tracking_number}")
return
events = sorted(events, key=lambda x: x.event_time)
if not events:
logging.debug(f"No events found for {shipment.tracking_number}")
return
if latest_known_event:
logging.debug(
f"Latest known event for {shipment.tracking_number}: {latest_known_event.event_description} - {latest_known_event.event_time}"
)
else:
logging.debug(f"No known events for {shipment.tracking_number}")
logging.debug(
f"Latest upstream event for {shipment.tracking_number}: {events[-1].event_description} - {events[-1].event_time}"
)
for event in events:
if (
latest_known_event is None
or event.event_time > latest_known_event.event_time
):
event.shipment_id = shipment.id
self.db.write_event(event)
self.notify_event(shipment, event, event == events[-1])
def start_loop(self) -> Never:
logging.debug("Starting loop")
while True:
try:
for shipment in self.db.get_shipments():
self.process_shipment(shipment)
time.sleep(self.loop_interval)
except sqlalchemy.exc.TimeoutError:
logging.warning("Database timeout while processing shipments")
self.db.engine.dispose()
except KeyboardInterrupt:
logging.info("Keyboard interrupt, exiting")
exit(0)
except Exception as e:
logging.exception(f"Unknown error in loop: {e}")
async def start_loop_async(self) -> Never:
logging.debug("Starting loop")
loop = asyncio.get_running_loop()
while True:
tasks = []
for shipment in self.db.get_shipments():
task = asyncio.wait_for(
asyncio.to_thread(self.process_shipment, shipment),
timeout=self.loop_timeout,
)
tasks.append(task)
try:
await asyncio.gather(*tasks)
except asyncio.TimeoutError:
logging.warning("Timeout while processing shipments")
except sqlalchemy.exc.TimeoutError:
logging.warning("Database timeout while processing shipments")
except (KeyboardInterrupt, asyncio.CancelledError):
logging.info("Keyboard interrupt, exiting")
exit(0)
except Exception as e:
logging.exception(f"Unknown error in loop: {e}")
await asyncio.sleep(self.loop_interval)
def _pre_start(self, config: Optional[PathLike] = None):
self.config_path = config
parser = ConfigParser()
parser.read(config or [])
self.debug = parser.getboolean("Trackbert", "debug", fallback=False)
if self.debug:
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
self.database_uri = parser.get(
"Trackbert", "database", fallback="sqlite:///trackbert.db"
)
self.db = Database(self.database_uri)
self.loop_interval = parser.getint("Trackbert", "interval", fallback=60)
def start(self, config: Optional[PathLike] = None):
self.notify("Trackbert", "Starting up")
self.start_loop()
async def start_async(self, config: Optional[PathLike] = None):
self.notify("Trackbert", "Starting up")
await self.start_loop_async()

View file

@ -1,238 +1,24 @@
import logging from typing import Optional, Tuple, List, Generator
import subprocess
import time
import importlib
import asyncio
import sqlalchemy.exc
from pathlib import Path from ..classes.database import Event
from typing import Optional, Tuple, Never
from os import PathLike
from configparser import ConfigParser
from .database import Database class BaseTracker:
from ..trackers.base import BaseTracker def __init__(self, *args, **kwargs):
pass
def get_status(self, tracking_number: str, carrier: str) -> Generator[Event, None, None]:
raise NotImplementedError()
class Tracker: def supported_carriers(self) -> List[Tuple[str, int, Optional[str]]]:
loop_interval = 60 """Defines the carriers supported by this tracker.
loop_timeout = 30
def __init__(self, config: Optional[PathLike] = None): Returns:
logging.basicConfig( list: List of supported carriers as tuples of (carrier_code, priority,
format="%(asctime)s %(levelname)s: %(message)s", carrier_name (optional)), where priority is an integer. The carrier
level=logging.WARN, with the highest priority will be used when tracking a shipment.
datefmt="%Y-%m-%d %H:%M:%S", "*" can be used as a wildcard to match all carriers.
)
self._pre_start(config) Raises:
NotImplementedError: When this method is not implemented by the subclass.
self.find_apis() """
raise NotImplementedError()
def find_apis(self):
logging.debug("Finding APIs")
self.apis = []
for api in Path(__file__).parent.parent.glob("trackers/*.py"):
if api.name in ("__init__.py", "base.py"):
continue
logging.debug(f"Found API {api.stem}")
try:
module = importlib.import_module(f"trackbert.trackers.{api.stem}")
except:
logging.error(f"Error loading class {api.stem}")
if "tracker" in module.__dict__:
tracker = module.tracker
logging.debug(f"Found tracker {api.stem}")
try:
api = tracker(config=self.config_path)
carriers = api.supported_carriers()
for carrier in carriers:
self.apis.append((carrier[0], carrier[1], api, (carrier[2] if len(carrier) > 2 else None)))
except Exception as e:
logging.error(f"Error loading tracker {api.__class__.__name__}: {e}")
def query_api(self, tracking_number: str, carrier: str) -> list:
logging.debug(f"Querying API for {tracking_number} with carrier {carrier}")
for api_entry in sorted(self.apis, key=lambda x: x[1], reverse=True):
api_carrier = api_entry[0]
priority = api_entry[1]
api = api_entry[2]
name = api_entry[3] if len(api_entry) > 3 else None
if api_carrier == "*" or api_carrier == carrier:
logging.debug(
f"Using API {api.__class__.__name__} for {tracking_number} with carrier {carrier}"
)
return list(api.get_status(tracking_number, carrier))
def notify(
self,
title: str,
message: str,
urgency: str = "normal",
timeout: Optional[int] = 5000,
) -> None:
logging.debug(f"Sending notification: {title} - {message}")
command = [
"notify-send",
"-a",
"trackbert",
"-u",
urgency,
"-i",
str(Path(__file__).parent.parent / "assets" / "parcel-delivery-icon.webp"),
]
if timeout:
command += ["-t", str(timeout)]
command = command + [title, message]
try:
subprocess.run(command)
except FileNotFoundError:
logging.warning("notify-send not found, not sending notification")
def notify_event(self, shipment, event, critical=False) -> None:
logging.info(
f"New event for {shipment.tracking_number}: {event.event_description} - {event.event_time}"
)
self.notify(
f"New event for {shipment.description or shipment.tracking_number}",
event.event_description + " - " + event.event_time,
urgency="critical" if critical else "normal",
)
def process_shipment(self, shipment) -> None:
if not shipment.carrier:
logging.info(
f"Shipment {shipment.tracking_number} has no carrier, skipping"
)
return
logging.debug(
f"Checking shipment {shipment.tracking_number} with carrier {shipment.carrier}"
)
latest_known_event = self.db.get_latest_event(shipment.id)
try:
events = self.query_api(shipment.tracking_number, shipment.carrier)
except Exception as e:
logging.exception(f"Error querying API for {shipment.tracking_number}")
return
events = sorted(events, key=lambda x: x.event_time)
if not events:
logging.debug(f"No events found for {shipment.tracking_number}")
return
if latest_known_event:
logging.debug(
f"Latest known event for {shipment.tracking_number}: {latest_known_event.event_description} - {latest_known_event.event_time}"
)
else:
logging.debug(f"No known events for {shipment.tracking_number}")
logging.debug(
f"Latest upstream event for {shipment.tracking_number}: {events[-1].event_description} - {events[-1].event_time}"
)
for event in events:
if (
latest_known_event is None
or event.event_time > latest_known_event.event_time
):
event.shipment_id = shipment.id
self.db.write_event(event)
self.notify_event(shipment, event, event == events[-1])
def start_loop(self) -> Never:
logging.debug("Starting loop")
while True:
try:
for shipment in self.db.get_shipments():
self.process_shipment(shipment)
time.sleep(self.loop_interval)
except sqlalchemy.exc.TimeoutError:
logging.warning("Database timeout while processing shipments")
self.db.engine.dispose()
except KeyboardInterrupt:
logging.info("Keyboard interrupt, exiting")
exit(0)
except Exception as e:
logging.exception(f"Unknown error in loop: {e}")
async def start_loop_async(self) -> Never:
logging.debug("Starting loop")
loop = asyncio.get_running_loop()
while True:
tasks = []
for shipment in self.db.get_shipments():
task = asyncio.wait_for(
asyncio.to_thread(self.process_shipment, shipment),
timeout=self.loop_timeout,
)
tasks.append(task)
try:
await asyncio.gather(*tasks)
except asyncio.TimeoutError:
logging.warning("Timeout while processing shipments")
except sqlalchemy.exc.TimeoutError:
logging.warning("Database timeout while processing shipments")
except (KeyboardInterrupt, asyncio.CancelledError):
logging.info("Keyboard interrupt, exiting")
exit(0)
except Exception as e:
logging.exception(f"Unknown error in loop: {e}")
await asyncio.sleep(self.loop_interval)
def _pre_start(self, config: Optional[PathLike] = None):
self.config_path = config
parser = ConfigParser()
parser.read(config or [])
self.debug = parser.getboolean("Trackbert", "debug", fallback=False)
if self.debug:
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
self.database_uri = parser.get(
"Trackbert", "database", fallback="sqlite:///trackbert.db"
)
self.db = Database(self.database_uri)
self.loop_interval = parser.getint("Trackbert", "interval", fallback=60)
def start(self, config: Optional[PathLike] = None):
self.notify("Trackbert", "Starting up")
self.start_loop()
async def start_async(self, config: Optional[PathLike] = None):
self.notify("Trackbert", "Starting up")
await self.start_loop_async()

View file

@ -1,24 +0,0 @@
from typing import Optional, Tuple, List, Generator
from ..classes.database import Event
class BaseTracker:
def __init__(self, *args, **kwargs):
pass
def get_status(self, tracking_number: str, carrier: str) -> Generator[Event, None, None]:
raise NotImplementedError()
def supported_carriers(self) -> List[Tuple[str, int, Optional[str]]]:
"""Defines the carriers supported by this tracker.
Returns:
list: List of supported carriers as tuples of (carrier_code, priority,
carrier_name (optional)), where priority is an integer. The carrier
with the highest priority will be used when tracking a shipment.
"*" can be used as a wildcard to match all carriers.
Raises:
NotImplementedError: When this method is not implemented by the subclass.
"""
raise NotImplementedError()

View file

@ -1,4 +1,4 @@
from .base import BaseTracker from ..classes.tracker import BaseTracker
from ..classes.database import Event from ..classes.database import Event
from dhltrack import DHL as DHLAPI from dhltrack import DHL as DHLAPI

View file

@ -1,4 +1,4 @@
from .base import BaseTracker from ..classes.tracker import BaseTracker
from ..classes.database import Event from ..classes.database import Event
import json import json

View file

@ -1,4 +1,4 @@
from .base import BaseTracker from ..classes.tracker import BaseTracker
from ..classes.database import Event from ..classes.database import Event
from fedextrack import FedEx as FedExAPI from fedextrack import FedEx as FedExAPI

View file

@ -1,4 +1,4 @@
from .base import BaseTracker from ..classes.tracker import BaseTracker
from ..classes.database import Event from ..classes.database import Event
import json import json

View file

@ -1,4 +1,4 @@
from .base import BaseTracker from ..classes.tracker import BaseTracker
from ..classes.database import Event from ..classes.database import Event
from ..classes.http import HTTPRequest from ..classes.http import HTTPRequest

View file

@ -1,4 +1,4 @@
from .base import BaseTracker from ..classes.tracker import BaseTracker
from ..classes.database import Event from ..classes.database import Event
import json import json