feat: Add logging and error handling for notifications

In this commit, logging and error handling have been added to the `notify` function in order to provide more visibility into notification sending. If the `notify-send` command is not found, a warning message is logged instead of sending a notification. Additionally, logging statements have been added to various functions throughout the script to provide visibility into the execution flow.
This commit is contained in:
Kumi 2023-08-18 09:13:10 +02:00
parent 44d854653f
commit 9802e28454
Signed by: kumi
GPG key ID: ECBCC9082395383F

View file

@ -5,19 +5,27 @@ import json
import time
import subprocess
import argparse
import logging
logging.basicConfig(level=logging.DEBUG)
def notify(title, message):
subprocess.run(
[
"notify-send",
title,
message,
]
)
logging.debug(f"Sending notification: {title} - {message}")
try:
subprocess.run(
[
"notify-send",
title,
message,
]
)
except FileNotFoundError:
logging.warning("notify-send not found, not sending notification")
def create_shipment(db, tracking_number: str, carrier: str, description=""):
logging.debug(f"Creating shipment for {tracking_number} with carrier {carrier}")
db.execute(
"INSERT INTO shipments (tracking_number, carrier, description) VALUES (?, ?, ?)",
(tracking_number, carrier, description),
@ -26,18 +34,21 @@ def create_shipment(db, tracking_number: str, carrier: str, description=""):
def get_shipment(db: sqlite3.Connection, tracking_number: str):
logging.debug(f"Getting shipment for {tracking_number}")
cur = db.cursor()
cur.execute("SELECT * FROM shipments WHERE tracking_number = ?", (tracking_number,))
return cur.fetchone()
def get_shipments(db: sqlite3.Connection):
logging.debug(f"Getting all shipments")
cur = db.cursor()
cur.execute("SELECT * FROM shipments")
return cur.fetchall()
def get_shipment_events(db, shipment_id):
logging.debug(f"Getting events for shipment {shipment_id}")
cur = db.cursor()
cur.execute("SELECT * FROM events WHERE shipment_id = ?", (shipment_id,))
return cur.fetchall()
@ -50,6 +61,7 @@ def create_event(
event_description,
raw_event,
):
logging.debug(f"Creating event for shipment {shipment_id}: {event_description} - {event_time}")
db.execute(
"INSERT INTO events (shipment_id, event_time, event_description, raw_event) VALUES (?, ?, ?, ?)",
(
@ -63,6 +75,7 @@ def create_event(
def get_latest_event(db, shipment_id):
logging.debug(f"Getting latest event for shipment {shipment_id}")
cur = db.cursor()
cur.execute(
"SELECT * FROM events WHERE shipment_id = ? ORDER BY event_time DESC LIMIT 1",
@ -72,6 +85,7 @@ def get_latest_event(db, shipment_id):
def initialize_db(db):
logging.debug("Initializing database")
db.execute(
"CREATE TABLE IF NOT EXISTS shipments (id INTEGER PRIMARY KEY AUTOINCREMENT, tracking_number TEXT, carrier TEXT, description TEXT)"
)
@ -82,18 +96,23 @@ def initialize_db(db):
def get_db():
logging.debug("Connecting to database")
db = sqlite3.connect("trackbert.db")
initialize_db(db)
return db
def start_loop(db, api: KeyDelivery):
logging.debug("Starting loop")
while True:
for shipment in get_shipments(db):
shipment_id = shipment[0]
tracking_number = shipment[1]
carrier = shipment[2]
description = shipment[3]
logging.debug(f"Checking shipment {tracking_number} with carrier {carrier}")
latest_known_event = get_latest_event(db, shipment_id)
all_events = api.realtime(carrier, tracking_number)
@ -113,7 +132,7 @@ def start_loop(db, api: KeyDelivery):
event,
)
print(f"New event for {tracking_number}: {event['context']} - {event['time']}")
logging.info(f"New event for {tracking_number}: {event['context']} - {event['time']}")
notify(f"New event for {description or tracking_number}", event["context"] + " - " + event["time"])
time.sleep(300)