chore: Run black on code

This commit is contained in:
Kumi 2023-08-24 17:43:23 +02:00
parent 9374b1284e
commit 1d90de9205
Signed by: kumi
GPG key ID: ECBCC9082395383F
4 changed files with 29 additions and 20 deletions

View file

@ -90,7 +90,7 @@ class Tracker:
except FileNotFoundError:
logging.warning("notify-send not found, not sending notification")
def notify_event(self, shipment, event, critical = False) -> None:
def notify_event(self, shipment, event, critical=False) -> None:
logging.info(
f"New event for {shipment.tracking_number}: {event.event_description} - {event.event_time}"
)
@ -141,7 +141,6 @@ class Tracker:
self.db.write_event(event)
self.notify_event(shipment, event, event == events[0])
def start_loop(self) -> Never:
logging.debug("Starting loop")
@ -159,14 +158,17 @@ class Tracker:
while True:
tasks = []
for shipment in self.db.get_shipments():
task = asyncio.wait_for(asyncio.to_thread(self.process_shipment, shipment), timeout=self.loop_timeout)
task = asyncio.wait_for(
asyncio.to_thread(self.process_shipment, shipment),
timeout=self.loop_timeout,
)
tasks.append(task)
try:
await asyncio.gather(*tasks)
except asyncio.TimeoutError:
logging.warning("Timeout while processing shipments")
await asyncio.sleep(self.loop_interval)
def start(self):
@ -177,4 +179,4 @@ class Tracker:
async def start_async(self):
self.db = Database("sqlite:///trackbert.db")
self.notify("Trackbert", "Starting up")
await self.start_loop_async()
await self.start_loop_async()

View file

@ -10,7 +10,7 @@ class BaseTracker:
"""Defines the carriers supported by this tracker.
Returns:
list: List of supported carriers as tuples of (carrier_code, priority),
list: List of supported carriers as tuples of (carrier_code, priority),
where priority is an integer. The carrier with the highest priority
will be used when tracking a shipment. "*" can be used as a wildcard
to match all carriers.
@ -18,4 +18,4 @@ class BaseTracker:
Raises:
NotImplementedError: When this method is not implemented by the subclass.
"""
raise NotImplementedError()
raise NotImplementedError()

View file

@ -15,25 +15,30 @@ class KeyDelivery(BaseTracker):
all_events = self.api.realtime(carrier, tracking_number)
try:
logging.debug(f"Got events for {tracking_number}: {len(all_events['data']['items'])}")
logging.debug(
f"Got events for {tracking_number}: {len(all_events['data']['items'])}"
)
except KeyError:
print(f"Error getting events for {tracking_number}: {all_events}")
return
events = sorted(all_events["data"]["items"], key=lambda x: x["time"], reverse=True)
events = sorted(
all_events["data"]["items"], key=lambda x: x["time"], reverse=True
)
for event in events:
yield Event(
shipment_id = 0,
event_time = event["time"],
event_description = event["context"],
raw_event = json.dumps(event)
shipment_id=0,
event_time=event["time"],
event_description=event["context"],
raw_event=json.dumps(event),
)
@staticmethod
def supported_carriers():
return [
("*", 1),
]
tracker = KeyDelivery
tracker = KeyDelivery

View file

@ -6,6 +6,7 @@ import json
from dateutil.parser import parse
from postat.classes.api import PostAPI
class PostAT(BaseTracker):
def __init__(self, *args, **kwargs):
pass
@ -21,10 +22,10 @@ class PostAT(BaseTracker):
py_timestamp = parse(timestamp)
event_time = py_timestamp.strftime("%Y-%m-%d %H:%M:%S")
yield Event(
shipment_id = 0,
event_time = event_time,
event_description = event["text"],
raw_event = json.dumps(event)
shipment_id=0,
event_time=event_time,
event_description=event["text"],
raw_event=json.dumps(event),
)
@staticmethod
@ -33,4 +34,5 @@ class PostAT(BaseTracker):
("austrian_post", 100),
]
tracker = PostAT
tracker = PostAT