classes/database.py: Updated table names and added methods to update and disable shipments

classes/tracker.py: Added handling for disabled shipments and improved logging statements
trackbert.py: Added commands to update and disable shipments
This commit is contained in:
Kumi 2023-08-24 16:05:46 +02:00
parent 0e0b24daba
commit 208762ec38
Signed by: kumi
GPG key ID: ECBCC9082395383F
3 changed files with 145 additions and 37 deletions

View file

@ -7,8 +7,9 @@ import json
Base = declarative_base()
class Shipment(Base):
__tablename__ = 'shipments'
__tablename__ = "shipments"
id = Column(Integer, primary_key=True)
tracking_number = Column(String)
@ -17,15 +18,17 @@ class Shipment(Base):
events = relationship("Event")
class Event(Base):
__tablename__ = 'events'
__tablename__ = "events"
id = Column(Integer, primary_key=True)
shipment_id = Column(Integer, ForeignKey('shipments.id'))
shipment_id = Column(Integer, ForeignKey("shipments.id"))
event_time = Column(String)
event_description = Column(String)
raw_event = Column(String)
class Database:
def __init__(self, database_uri):
self.engine = create_engine(database_uri)
@ -33,12 +36,35 @@ class Database:
self.session = Session()
def create_shipment(self, tracking_number, carrier, description=""):
new_shipment = Shipment(tracking_number=tracking_number, carrier=carrier, description=description)
new_shipment = Shipment(
tracking_number=tracking_number, carrier=carrier, description=description
)
self.session.add(new_shipment)
self.session.commit()
def update_shipment(self, tracking_number, carrier, description=""):
shipment = self.get_shipment(tracking_number)
if shipment:
shipment.carrier = carrier
shipment.description = description
self.session.commit()
else:
raise ValueError(f"Shipment {tracking_number} does not exist")
def disable_shipment(self, tracking_number):
shipment = self.get_shipment(tracking_number)
if shipment:
shipment.carrier = ""
self.session.commit()
else:
raise ValueError(f"Shipment {tracking_number} does not exist")
def get_shipment(self, tracking_number):
shipment = self.session.query(Shipment).filter(Shipment.tracking_number == tracking_number).first()
shipment = (
self.session.query(Shipment)
.filter(Shipment.tracking_number == tracking_number)
.first()
)
return shipment
def get_shipments(self):
@ -49,7 +75,12 @@ class Database:
if isinstance(raw_event, dict):
raw_event = json.dumps(raw_event)
new_event = Event(shipment_id=shipment_id, event_time=event_time, event_description=event_description, raw_event=raw_event)
new_event = Event(
shipment_id=shipment_id,
event_time=event_time,
event_description=event_description,
raw_event=raw_event,
)
self.write_event(new_event)
def write_event(self, event):
@ -57,11 +88,18 @@ class Database:
self.session.commit()
def get_shipment_events(self, shipment_id):
shipment = self.session.query(Shipment).filter(Shipment.id == shipment_id).first()
shipment = (
self.session.query(Shipment).filter(Shipment.id == shipment_id).first()
)
return shipment.events if shipment else None
def get_latest_event(self, shipment_id):
event = self.session.query(Event).filter(Event.shipment_id == shipment_id).order_by(Event.event_time.desc()).first()
event = (
self.session.query(Event)
.filter(Event.shipment_id == shipment_id)
.order_by(Event.event_time.desc())
.first()
)
return event
def initialize_db(self):

View file

@ -10,6 +10,7 @@ from trackers.base import BaseTracker
from pykeydelivery import KeyDelivery
class Tracker:
def __init__(self):
logging.basicConfig(
@ -44,24 +45,35 @@ class Tracker:
self.apis.append((carrier, priority, api))
except:
logging.exception(f"Error loading tracker {name}")
def query_api(self, tracking_number: str, carrier: str) -> list:
logging.debug(f"Querying API for {tracking_number} with carrier {carrier}")
for api_carrier, _, api in sorted(self.apis, key=lambda x: x[1], reverse=True):
if api_carrier == "*" or api_carrier == carrier:
logging.debug(f"Using API {api.__class__.__name__} for {tracking_number} with carrier {carrier}")
logging.debug(
f"Using API {api.__class__.__name__} for {tracking_number} with carrier {carrier}"
)
return list(api.get_status(tracking_number, carrier))
def notify(self, title: str, message: str, urgency: str = "normal", timeout: Optional[int] = 5000) -> None:
def notify(
self,
title: str,
message: str,
urgency: str = "normal",
timeout: Optional[int] = 5000,
) -> None:
logging.debug(f"Sending notification: {title} - {message}")
command = [
"notify-send",
"-a", "trackbert",
"-u", urgency,
"-i", str(Path(__file__).parent.parent / "assets" / "parcel-delivery-icon.webp"),
]
"notify-send",
"-a",
"trackbert",
"-u",
urgency,
"-i",
str(Path(__file__).parent.parent / "assets" / "parcel-delivery-icon.webp"),
]
if timeout:
command += ["-t", str(timeout)]
@ -79,40 +91,56 @@ class Tracker:
while True:
for shipment in self.db.get_shipments():
shipment_id = shipment.id
tracking_number = shipment.tracking_number
carrier = shipment.carrier
description = shipment.description
if not shipment.carrier:
logging.warning(
f"Shipment {shipment.tracking_number} has no carrier, skipping"
)
continue
logging.debug(f"Checking shipment {tracking_number} with carrier {carrier}")
logging.debug(
f"Checking shipment {shipment.tracking_number} with carrier {shipment.carrier}"
)
latest_known_event = self.db.get_latest_event(shipment_id)
latest_known_event = self.db.get_latest_event(shipment.id)
events = self.query_api(tracking_number, carrier)
events = self.query_api(shipment.tracking_number, shipment.carrier)
events = sorted(events, key=lambda x: x.event_time, reverse=True)
if latest_known_event:
logging.debug(f"Latest known event for {tracking_number}: {latest_known_event.event_description} - {latest_known_event.event_time}")
logging.debug(
f"Latest known event for {shipment.tracking_number}: {latest_known_event.event_description} - {latest_known_event.event_time}"
)
else:
logging.debug(f"No known events for {tracking_number}")
logging.debug(f"No known events for {shipment.tracking_number}")
logging.debug(f"Latest upstream event for {tracking_number}: {events[0].event_description} - {events[0].event_time}")
logging.debug(
f"Latest upstream event for {shipment.tracking_number}: {events[0].event_description} - {events[0].event_time}"
)
latest = True
for event in events:
if latest_known_event is None or event.event_time > latest_known_event.event_time:
event.shipment_id = shipment_id
if (
latest_known_event is None
or event.event_time > latest_known_event.event_time
):
event.shipment_id = shipment.id
self.db.write_event(event)
logging.info(f"New event for {tracking_number}: {event.event_description} - {event.event_time}")
self.notify(f"New event for {description or tracking_number}", event.event_description + " - " + event.event_time, urgency="critical" if latest else "normal")
logging.info(
f"New event for {shipment.tracking_number}: {event.event_description} - {event.event_time}"
)
self.notify(
f"New event for {shipment.description or shipment.tracking_number}",
event.event_description + " - " + event.event_time,
urgency="critical" if latest else "normal",
)
latest = False
time.sleep(300)
def start(self):
self.db = Database('sqlite:///trackbert.db')
self.db = Database("sqlite:///trackbert.db")
self.notify("Trackbert", "Starting up")
self.start_loop()
self.start_loop()

View file

@ -16,20 +16,62 @@ if __name__ == "__main__":
parser.add_argument("--tracking-number", "-n", type=str, required=False)
parser.add_argument("--carrier", "-c", type=str, required=False)
parser.add_argument("--description", "-d", type=str, required=False)
parser.add_argument("--timeout", "-t", type=int, required=False, default=30, help="Notification timeout in seconds")
parser.add_argument(
"--update",
"-u",
action="store_true",
required=False,
help="Update existing shipment",
)
parser.add_argument(
"--disable",
"-D",
action="store_true",
required=False,
help="Disable existing shipment",
)
parser.add_argument(
"--timeout",
"-t",
type=int,
required=False,
default=30,
help="Notification timeout in seconds",
)
args = parser.parse_args()
tracker = Tracker()
db = Database("sqlite:///trackbert.db")
if args.tracking_number is not None and args.carrier is not None:
db = Database('sqlite:///trackbert.db')
db.create_shipment(args.tracking_number, args.carrier, args.description)
print(f"Created shipment for {args.tracking_number} with carrier {args.carrier}")
if shipment := db.get_shipment(args.tracking_number) and not args.update:
print(f"Shipment {args.tracking_number} already exists. Use -u to update.")
exit(1)
if shipment:
db.update_shipment(args.tracking_number, args.carrier, args.description)
print(
f"Updated shipment for {args.tracking_number} with carrier {args.carrier}"
)
else:
db.create_shipment(args.tracking_number, args.carrier, args.description)
print(
f"Created shipment for {args.tracking_number} with carrier {args.carrier}"
)
exit(0)
if args.tracking_number is not None:
if args.disable:
if not db.get_shipment(args.tracking_number):
print(f"Shipment {args.tracking_number} does not exist.")
exit(1)
db.disable_shipment(args.tracking_number)
print(f"Disabled shipment for {args.tracking_number}")
exit(0)
print("You must specify a carrier with -c")
exit(1)
tracker = Tracker()
tracker.start()