refactor: improve code readability and structure

Enhanced readability and consistency by reformatting `pyproject.toml` and other Python files. Removed unnecessary imports and ensured consistent styling across the codebase. These changes boost maintainability and reduce potential errors.
This commit is contained in:
Kumi 2024-07-02 08:21:34 +02:00
parent b4f0da0d1c
commit 76b77c17ba
Signed by: kumi
GPG key ID: ECBCC9082395383F
11 changed files with 67 additions and 64 deletions

View file

@ -5,29 +5,27 @@ build-backend = "hatchling.build"
[project]
name = "trackbert"
version = "0.3.0-dev"
authors = [
{ name="Kumi Mitterer", email="trackbert@kumi.email" },
]
authors = [{ name = "Kumi Mitterer", email = "trackbert@kumi.email" }]
description = "Python application tracking your shipments"
readme = "README.md"
license = { file="LICENSE" }
license = { file = "LICENSE" }
requires-python = ">=3.10"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
dependencies = [
"pykeydelivery >= 0.9.1",
"postat",
"glsapi",
"fedextrack",
"dhltrack",
"dpdtrack",
"sqlalchemy",
"alembic",
"python-dateutil",
"tabulate",
"pykeydelivery >= 0.9.1",
"postat",
"glsapi",
"fedextrack",
"dhltrack",
"dpdtrack",
"sqlalchemy",
"alembic",
"python-dateutil",
"tabulate",
]
[project.urls]
@ -36,4 +34,4 @@ dependencies = [
"Source Code" = "https://git.private.coffee/kumi/trackbert"
[project.scripts]
trackbert = "trackbert.__main__:main"
trackbert = "trackbert.__main__:main"

View file

@ -1,16 +1,10 @@
from pathlib import Path
from tabulate import tabulate
import json
import time
import subprocess
import argparse
import logging
import asyncio
from typing import Tuple, Never, Optional
from .classes.database import Database
from .classes.core import Core
@ -116,7 +110,9 @@ def main():
if not any(
[
others[1] > provider[1]
for others in filter(lambda x: x[0] == provider[0], tracker.providers)
for others in filter(
lambda x: x[0] == provider[0], tracker.providers
)
]
)
]
@ -127,9 +123,8 @@ def main():
if args.tracking_number is not None and args.carrier is not None:
if (
(shipment := tracker.db.get_shipment(args.tracking_number))
and not args.update
):
shipment := tracker.db.get_shipment(args.tracking_number)
) and not args.update:
print(f"Shipment {args.tracking_number} already exists. Use -u to update.")
exit(1)
@ -143,7 +138,9 @@ def main():
exit(0)
if not shipment and args.update:
print(f"Shipment {args.tracking_number} does not exist. Remove -u to create.")
print(
f"Shipment {args.tracking_number} does not exist. Remove -u to create."
)
exit(1)
if not shipment and not args.update:

View file

@ -1,5 +1,4 @@
import logging
import subprocess
import time
import importlib
import asyncio
@ -8,12 +7,11 @@ import importlib.metadata
import sqlalchemy.exc
from pathlib import Path
from typing import Optional, Tuple, Never
from typing import Optional, Never
from os import PathLike
from configparser import ConfigParser
from .database import Database
from .provider import BaseProvider
class Core:
@ -76,11 +74,13 @@ class Core:
logging.debug("Finding external notifiers")
notifiers = []
for entry_point in importlib.metadata.entry_points().get("trackbert.notifiers", []):
for entry_point in importlib.metadata.entry_points().get(
"trackbert.notifiers", []
):
logging.debug(f"Considering external notifier {entry_point.name}")
try:
notifier = entry_point.load()
notifier_class = entry_point.load()
except Exception as e:
logging.error(f"Error loading class {entry_point.name}: {e}")
continue
@ -97,9 +97,7 @@ class Core:
notifiers.append(nobj)
except Exception as e:
logging.error(
f"Error loading notifier {notifier_class.__name__}: {e}"
)
logging.error(f"Error loading notifier {notifier_class.__name__}: {e}")
return notifiers
@ -151,7 +149,9 @@ class Core:
providers = []
for entry_point in importlib.metadata.entry_points().get("trackbert.providers", []):
for entry_point in importlib.metadata.entry_points().get(
"trackbert.providers", []
):
logging.debug(f"Considering external provider {entry_point.name}")
try:
@ -188,9 +188,9 @@ class Core:
for api_entry in sorted(self.providers, key=lambda x: x[1], reverse=True):
api_carrier = api_entry[0]
priority = api_entry[1]
priority = api_entry[1] # noqa: F841
provider = api_entry[2]
name = api_entry[3] if len(api_entry) > 3 else None
name = api_entry[3] if len(api_entry) > 3 else None # noqa: F841
if api_carrier == "*" or api_carrier == carrier:
logging.debug(
@ -283,8 +283,6 @@ class Core:
async def start_loop_async(self) -> Never:
logging.debug("Starting loop")
loop = asyncio.get_running_loop()
while True:
tasks = []
for shipment in self.db.get_shipments():

View file

@ -1,4 +1,12 @@
from sqlalchemy import Column, Integer, String, Boolean, create_engine, ForeignKey, event
from sqlalchemy import (
Column,
Integer,
String,
Boolean,
create_engine,
ForeignKey,
event,
)
from sqlalchemy.orm import sessionmaker, relationship, scoped_session
from sqlalchemy.ext.declarative import declarative_base
@ -6,7 +14,6 @@ from alembic.config import Config
from alembic import command
import json
import time
import logging
from functools import wraps
@ -57,8 +64,12 @@ class Database:
self.engine = create_engine(database_uri, pool_size=20, max_overflow=20)
self.session = scoped_session(sessionmaker(bind=self.engine))
event.listen(self.engine, "connect", lambda _, __: logging.debug("DB connected"))
event.listen(self.engine, "close", lambda _, __: logging.debug("DB connection closed"))
event.listen(
self.engine, "connect", lambda _, __: logging.debug("DB connected")
)
event.listen(
self.engine, "close", lambda _, __: logging.debug("DB connection closed")
)
self.run_migrations()

View file

@ -7,4 +7,4 @@ class BaseNotifier:
@property
def enabled(self) -> bool:
raise NotImplementedError
raise NotImplementedError

View file

@ -2,20 +2,23 @@ from typing import Optional, Tuple, List, Generator
from ..classes.database import Event
class BaseProvider:
def __init__(self, *args, **kwargs):
pass
def get_status(self, tracking_number: str, carrier: str) -> Generator[Event, None, None]:
def get_status(
self, tracking_number: str, carrier: str
) -> Generator[Event, None, None]:
raise NotImplementedError()
def supported_carriers(self) -> List[Tuple[str, int, Optional[str]]]:
"""Defines the carriers supported by this tracker.
Returns:
list: List of supported carriers as tuples of (carrier_code, priority,
carrier_name (optional)), where priority is an integer. The carrier
with the highest priority will be used when tracking a shipment.
list: List of supported carriers as tuples of (carrier_code, priority,
carrier_name (optional)), where priority is an integer. The carrier
with the highest priority will be used when tracking a shipment.
"*" can be used as a wildcard to match all carriers.
Raises:

View file

@ -6,6 +6,7 @@ import logging
from typing import Optional
from pathlib import Path
class NotifySend(BaseNotifier):
def __init__(self, *args, **kwargs):
pass
@ -48,4 +49,5 @@ class NotifySend(BaseNotifier):
except FileNotFoundError:
return False
notifier = NotifySend
notifier = NotifySend

View file

@ -15,7 +15,7 @@ class DPD(BaseProvider):
def get_status(self, tracking_number, carrier):
api = DPDAPI()
status = api.tracking(tracking_number)
events = status["data"][0]["lifecycle"]["entries"]
for event in events:

View file

@ -17,7 +17,7 @@ class FedEx(BaseProvider):
try:
all_results = response["output"]["completeTrackResults"][0]["trackResults"]
all_events = []
for result in all_results:
@ -30,9 +30,7 @@ class FedEx(BaseProvider):
logging.error(f"Error getting events for {tracking_number}: {all_events}")
return
events = sorted(
all_events, key=lambda x: x["date"], reverse=True
)
events = sorted(all_events, key=lambda x: x["date"], reverse=True)
for event in events:
event_time = parse(event["date"]).strftime("%Y-%m-%d %H:%M:%S")

View file

@ -3,7 +3,6 @@ from ..classes.database import Event
import json
from dateutil.parser import parse
from glsapi.classes.api import GLSAPI

View file

@ -1,6 +1,5 @@
from ..classes.provider import BaseProvider
from ..classes.database import Event
from ..classes.http import HTTPRequest
from pykeydelivery import KeyDelivery as KeyDeliveryAPI
@ -20,8 +19,7 @@ class KeyDelivery(BaseProvider):
f"Got events for {tracking_number}: {len(all_events['data']['items'])}"
)
except KeyError:
logging.error(
f"Error getting events for {tracking_number}: {all_events}")
logging.error(f"Error getting events for {tracking_number}: {all_events}")
return
events = sorted(
@ -40,11 +38,10 @@ class KeyDelivery(BaseProvider):
try:
response = self.api.list_carriers()
carriers = [
(carrier["code"], 1, carrier["name"])
for carrier in response["data"]
(carrier["code"], 1, carrier["name"]) for carrier in response["data"]
]
return carriers
except:
except Exception:
return [
("*", 1),
]