feat: Add asynchronous support for tracking shipments
This commit adds support for asynchronous tracking of shipments in the `Tracker` class. The `start_loop_async` method uses asyncio to run the `process_shipment` method concurrently for each shipment, improving performance. The `start_async` method is called from the main entry point, `trackbert.py`, to start the asynchronous tracking loop.
This commit is contained in:
parent
208762ec38
commit
080b019efc
2 changed files with 74 additions and 44 deletions
|
@ -2,6 +2,7 @@ import logging
|
|||
import subprocess
|
||||
import time
|
||||
import importlib
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
from typing import Optional, Tuple, Never
|
||||
|
||||
|
@ -12,6 +13,8 @@ from pykeydelivery import KeyDelivery
|
|||
|
||||
|
||||
class Tracker:
|
||||
loop_interval = 60
|
||||
|
||||
def __init__(self):
|
||||
logging.basicConfig(
|
||||
format="%(asctime)s %(levelname)s: %(message)s",
|
||||
|
@ -86,61 +89,87 @@ class Tracker:
|
|||
except FileNotFoundError:
|
||||
logging.warning("notify-send not found, not sending notification")
|
||||
|
||||
def notify_event(self, shipment, event, critical = False) -> None:
|
||||
logging.info(
|
||||
f"New event for {shipment.tracking_number}: {event.event_description} - {event.event_time}"
|
||||
)
|
||||
self.notify(
|
||||
f"New event for {shipment.description or shipment.tracking_number}",
|
||||
event.event_description + " - " + event.event_time,
|
||||
urgency="critical" if critical else "normal",
|
||||
)
|
||||
|
||||
def process_shipment(self, shipment) -> None:
|
||||
if not shipment.carrier:
|
||||
logging.warning(
|
||||
f"Shipment {shipment.tracking_number} has no carrier, skipping"
|
||||
)
|
||||
return
|
||||
|
||||
logging.debug(
|
||||
f"Checking shipment {shipment.tracking_number} with carrier {shipment.carrier}"
|
||||
)
|
||||
|
||||
latest_known_event = self.db.get_latest_event(shipment.id)
|
||||
|
||||
try:
|
||||
events = self.query_api(shipment.tracking_number, shipment.carrier)
|
||||
except Exception as e:
|
||||
logging.exception(f"Error querying API for {shipment.tracking_number}")
|
||||
return
|
||||
|
||||
events = sorted(events, key=lambda x: x.event_time, reverse=True)
|
||||
|
||||
if latest_known_event:
|
||||
logging.debug(
|
||||
f"Latest known event for {shipment.tracking_number}: {latest_known_event.event_description} - {latest_known_event.event_time}"
|
||||
)
|
||||
else:
|
||||
logging.debug(f"No known events for {shipment.tracking_number}")
|
||||
|
||||
logging.debug(
|
||||
f"Latest upstream event for {shipment.tracking_number}: {events[0].event_description} - {events[0].event_time}"
|
||||
)
|
||||
|
||||
for event in events:
|
||||
if (
|
||||
latest_known_event is None
|
||||
or event.event_time > latest_known_event.event_time
|
||||
):
|
||||
event.shipment_id = shipment.id
|
||||
self.db.write_event(event)
|
||||
self.notify_event(shipment, event, event == events[0])
|
||||
|
||||
|
||||
def start_loop(self) -> Never:
|
||||
logging.debug("Starting loop")
|
||||
|
||||
while True:
|
||||
for shipment in self.db.get_shipments():
|
||||
if not shipment.carrier:
|
||||
logging.warning(
|
||||
f"Shipment {shipment.tracking_number} has no carrier, skipping"
|
||||
)
|
||||
continue
|
||||
self.process_shipment(shipment)
|
||||
|
||||
logging.debug(
|
||||
f"Checking shipment {shipment.tracking_number} with carrier {shipment.carrier}"
|
||||
)
|
||||
time.sleep(self.loop_interval)
|
||||
|
||||
latest_known_event = self.db.get_latest_event(shipment.id)
|
||||
async def start_loop_async(self) -> Never:
|
||||
logging.debug("Starting loop")
|
||||
|
||||
events = self.query_api(shipment.tracking_number, shipment.carrier)
|
||||
events = sorted(events, key=lambda x: x.event_time, reverse=True)
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
if latest_known_event:
|
||||
logging.debug(
|
||||
f"Latest known event for {shipment.tracking_number}: {latest_known_event.event_description} - {latest_known_event.event_time}"
|
||||
)
|
||||
else:
|
||||
logging.debug(f"No known events for {shipment.tracking_number}")
|
||||
while True:
|
||||
tasks = []
|
||||
for shipment in self.db.get_shipments():
|
||||
task = asyncio.create_task(asyncio.to_thread(self.process_shipment, shipment))
|
||||
tasks.append(task)
|
||||
|
||||
logging.debug(
|
||||
f"Latest upstream event for {shipment.tracking_number}: {events[0].event_description} - {events[0].event_time}"
|
||||
)
|
||||
|
||||
latest = True
|
||||
|
||||
for event in events:
|
||||
if (
|
||||
latest_known_event is None
|
||||
or event.event_time > latest_known_event.event_time
|
||||
):
|
||||
event.shipment_id = shipment.id
|
||||
self.db.write_event(event)
|
||||
|
||||
logging.info(
|
||||
f"New event for {shipment.tracking_number}: {event.event_description} - {event.event_time}"
|
||||
)
|
||||
self.notify(
|
||||
f"New event for {shipment.description or shipment.tracking_number}",
|
||||
event.event_description + " - " + event.event_time,
|
||||
urgency="critical" if latest else "normal",
|
||||
)
|
||||
|
||||
latest = False
|
||||
|
||||
time.sleep(300)
|
||||
await asyncio.gather(*tasks)
|
||||
await asyncio.sleep(self.loop_interval)
|
||||
|
||||
def start(self):
|
||||
self.db = Database("sqlite:///trackbert.db")
|
||||
self.notify("Trackbert", "Starting up")
|
||||
self.start_loop()
|
||||
|
||||
async def start_async(self):
|
||||
self.db = Database("sqlite:///trackbert.db")
|
||||
self.notify("Trackbert", "Starting up")
|
||||
await self.start_loop_async()
|
|
@ -5,6 +5,7 @@ import time
|
|||
import subprocess
|
||||
import argparse
|
||||
import logging
|
||||
import asyncio
|
||||
from typing import Tuple, Never, Optional
|
||||
|
||||
from classes.database import Database
|
||||
|
@ -74,4 +75,4 @@ if __name__ == "__main__":
|
|||
exit(1)
|
||||
|
||||
tracker = Tracker()
|
||||
tracker.start()
|
||||
asyncio.run(tracker.start_async())
|
||||
|
|
Loading…
Reference in a new issue