Kumi
488aea5d3b
Use single gauge Added a README Added a lot of comments Get more specific message for HTTP errors on startup Add command line arguments Get rid of default metrics Add error messages to assertions Make importable by adding main function
334 lines
12 KiB
Python
Executable file
334 lines
12 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
###################################################################
|
|
# allkeyshop.py - Prometheus exporter for allkeyshop.com
|
|
# 2022 - 2023 kumitterer (https://kumig.it/kumitterer)
|
|
#
|
|
# This program is free software under the terms of the MIT License,
|
|
# except where otherwise noted.
|
|
###################################################################
|
|
|
|
from prometheus_client import start_http_server, Gauge, CollectorRegistry
|
|
|
|
from configparser import ConfigParser
|
|
from argparse import ArgumentParser
|
|
from pathlib import Path
|
|
from urllib.request import urlopen, Request
|
|
from urllib.error import HTTPError
|
|
from html.parser import HTMLParser
|
|
from typing import List, Tuple, Optional
|
|
|
|
import re
|
|
import json
|
|
import time
|
|
|
|
|
|
class AllKeyShop:
|
|
"""A class abstracting interaction with allkeyshop.com
|
|
"""
|
|
|
|
# Define AllKeyShop's internal names for platforms as they are used in URLs
|
|
|
|
PLATFORM_PC = "cd-key"
|
|
PLATFORM_PS5 = "ps5"
|
|
PLATFORM_PS4 = "ps4"
|
|
PLATFORM_XB1 = "xbox-one"
|
|
PLATFORM_XBSX = "xbox-series-x"
|
|
PLATFORM_SWITCH = "nintendo-switch"
|
|
|
|
class ProductParser(HTMLParser):
|
|
"""A parser for the product page of allkeyshop.com
|
|
Yields the product ID of the product in its result attribute
|
|
"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.reset()
|
|
self.result: int
|
|
|
|
def handle_starttag(self, tag: str, attrs: List[Tuple[str, str]]):
|
|
# Basically, we're looking for a tag with the "data-product-id"
|
|
# attribute and parse the value of that attribute as an integer
|
|
|
|
for attr in attrs:
|
|
if attr[0] == "data-product-id":
|
|
try:
|
|
self.result = int(attr[1])
|
|
except (ValueError, IndexError):
|
|
# Not sure if this can even happen,
|
|
# but better safe than sorry
|
|
|
|
pass
|
|
except Exception as e:
|
|
# If this happens, something is seriously wrong
|
|
|
|
print(f"Error while parsing product ID: {e}")
|
|
|
|
class HTTPRequest(Request):
|
|
"""Custom HTTP request class with a custom user agent
|
|
"""
|
|
|
|
def __init__(self, url: str, *args, **kwargs):
|
|
super().__init__(url, *args, **kwargs)
|
|
self.headers["user-agent"] = "allkeyshop.com prometheus exporter (https://kumig.it/kumitterer/prometheus-allkeyshop)"
|
|
|
|
class ProductPageRequest(HTTPRequest):
|
|
"""Class for generating requests to the product page of allkeyshop.com
|
|
"""
|
|
@staticmethod
|
|
def to_slug(string: str) -> str:
|
|
"""Helper function for generating slugs from strings
|
|
|
|
Shamelessly stolen from https://www.30secondsofcode.org/python/s/slugify
|
|
Website, name & logo © 2017-2022 30 seconds of code (https://github.com/30-seconds)
|
|
Individual snippets licensed under CC-BY-4.0 (https://creativecommons.org/licenses/by/4.0/)
|
|
|
|
Args:
|
|
string (str): The string to generate a slug from
|
|
|
|
Returns:
|
|
str: The generated slug
|
|
"""
|
|
|
|
string = string.lower().strip()
|
|
string = re.sub(r'[^\w\s-]', '', string)
|
|
string = re.sub(r'[\s_-]+', '-', string)
|
|
string = re.sub(r'^-+|-+$', '', string)
|
|
|
|
return string
|
|
|
|
def __init__(self, product: str, platform: str, *args, **kwargs):
|
|
|
|
# Get slug to use in URL
|
|
|
|
slug = self.__class__.to_slug(product)
|
|
|
|
# Allow "pc" as shorthand platform name for PCs
|
|
|
|
if platform == "pc":
|
|
platform = AllKeyShop.PLATFORM_PC
|
|
|
|
# Set request URL
|
|
|
|
url = f"https://www.allkeyshop.com/blog/buy-{slug}-{platform}-compare-prices/"
|
|
super().__init__(url, *args, **kwargs)
|
|
|
|
class OffersRequest(HTTPRequest):
|
|
"""Class for generating requests to the offers API of allkeyshop.com
|
|
"""
|
|
|
|
def __init__(self, product: int, *args, currency: str, **kwargs):
|
|
"""Initializes the request
|
|
|
|
Args:
|
|
product (int): Product ID of the product to get offers for
|
|
currency (str): Currency to get offers in (e.g. "eur")
|
|
region (str, optional): Region to get offers for (e.g. "eu"). Defaults to "".
|
|
edition (str, optional): Edition to get offers for (e.g. "standard"). Defaults to "".
|
|
moreq (str, optional): Additional query parameters. Defaults to "".
|
|
"""
|
|
region: str = kwargs.pop("region", "")
|
|
edition: str = kwargs.pop("edition", "")
|
|
moreq: str = kwargs.pop("moreq", "")
|
|
|
|
url = f"https://www.allkeyshop.com/blog/wp-admin/admin-ajax.php?action=get_offers&product={product}¤cy={currency}®ion={region}&edition={edition}&moreq={moreq}&use_beta_offers_display=1"
|
|
|
|
super().__init__(url, *args, **kwargs)
|
|
|
|
def __init__(self, product: int | str, platform: Optional[str] = None, **kwargs):
|
|
"""Initializes the AllKeyShop object
|
|
|
|
Args:
|
|
product (int | str): Product ID or name of the product to get offers for
|
|
platform (Optional[str], optional): Platform to get offers for, if a product name is passed. Defaults to None.
|
|
"""
|
|
self.product: int
|
|
self.kwargs: dict = kwargs
|
|
|
|
if isinstance(product, int):
|
|
# Product ID is already known - no need to resolve it
|
|
self.product = product
|
|
else:
|
|
# Resolve product ID from product name and platform
|
|
assert platform, "Platform must be specified if product name is passed"
|
|
self.product = self.__class__.resolve_product(product, platform)
|
|
|
|
@classmethod
|
|
def resolve_product(cls, product: str, platform: str) -> int:
|
|
"""Resolves a product ID from a product name and platform
|
|
|
|
Args:
|
|
product (str): Name of the product to resolve
|
|
platform (str): Platform to get the product ID for
|
|
|
|
Returns:
|
|
int: Product ID matching the given product name and platform
|
|
"""
|
|
|
|
# Get product page
|
|
|
|
content = urlopen(cls.ProductPageRequest(product, platform))
|
|
html = content.read().decode()
|
|
|
|
# Pass the content to the custom HTML parser
|
|
|
|
parser = cls.ProductParser()
|
|
parser.feed(html)
|
|
|
|
# Return the result, or raise an exception if no result was found
|
|
|
|
assert parser.result, f"Could not resolve product ID for product {product} on platform {platform}"
|
|
return parser.result
|
|
|
|
def get_offers(self) -> dict:
|
|
"""Gets all offers for the product
|
|
|
|
Returns:
|
|
dict: Offers for the product
|
|
"""
|
|
|
|
# Get offers
|
|
|
|
content = urlopen(self.__class__.OffersRequest(
|
|
self.product, **self.kwargs))
|
|
raw = content.read()
|
|
|
|
content = json.loads(raw)
|
|
|
|
# Return the offers, or raise an exception if the request failed
|
|
|
|
assert content["success"], "Something went wrong while getting offers"
|
|
return content["offers"]
|
|
|
|
|
|
def main():
|
|
# Parse command line arguments
|
|
|
|
parser = ArgumentParser(
|
|
description="Prometheus exporter for allkeyshop.com")
|
|
parser.add_argument("-c", "--config", type=Path, default=Path(__file__).parent /
|
|
"settings.ini", help="Path to config file (default: settings.ini in script directory)")
|
|
parser.add_argument("-p", "--port", type=int, default=8090,
|
|
help="Port to listen on (default: 8090)")
|
|
parser.add_argument("-a", "--address", type=str, default="0.0.0.0",
|
|
help="Address to listen on (default: 0.0.0.0)")
|
|
args = parser.parse_args()
|
|
|
|
# Read configuration file
|
|
|
|
config = ConfigParser()
|
|
config.read(args.config)
|
|
|
|
if config.has_section("DEFAULT"):
|
|
defaults = config["DEFAULT"]
|
|
else:
|
|
defaults = dict()
|
|
|
|
# Initialize a custom CollectorRegistry so we don't get the default metrics
|
|
|
|
registry = CollectorRegistry()
|
|
|
|
# Initialize Gauge
|
|
|
|
gauge = Gauge(
|
|
f"allkeyshop_best_price", f"Best price for a product on allkeyshop.com",
|
|
["product_name", "currency"], registry=registry)
|
|
|
|
# Initialize products
|
|
|
|
products: List[Tuple[AllKeyShop, str, str]] = list()
|
|
|
|
for section, settings in filter(lambda x: x[0] != "DEFAULT", config.items()):
|
|
try:
|
|
# Assert that we know the currency we want to use
|
|
|
|
currency: str
|
|
assert (currency := settings.get(
|
|
"Currency", fallback=defaults.get("Currency"))), "Currency not set for section {section}"
|
|
currency = currency.lower()
|
|
|
|
# Check if we need a specific region or edition
|
|
|
|
region: str = settings.get(
|
|
"Region", fallback=defaults.get("Region", ""))
|
|
edition: str = settings.get("Edition", fallback="")
|
|
|
|
product: str | int
|
|
platform: Optional[str]
|
|
name: str
|
|
|
|
# Initialize AllKeyShop object
|
|
|
|
try:
|
|
product = int(section)
|
|
name = settings.get("Name", fallback=section)
|
|
aks: AllKeyShop = AllKeyShop(
|
|
product, currency=currency, region=region, edition=edition)
|
|
|
|
except ValueError:
|
|
product = name = section
|
|
platform = settings.get(
|
|
"Platform", fallback=defaults.get("Platform"))
|
|
assert platform
|
|
aks: AllKeyShop = AllKeyShop(
|
|
product, platform, currency=currency,
|
|
region=region, edition=edition)
|
|
|
|
# Finally, add the product to the list
|
|
|
|
products.append((aks, name, currency))
|
|
|
|
# If something goes wrong at this point, we assume that there is a
|
|
# problem with the configuration file and exit
|
|
|
|
except HTTPError as e:
|
|
print(f"Error calling URL {e.url} for section {section}: {e}")
|
|
exit(1)
|
|
except Exception as e:
|
|
print(f"Error setting up gauge for section {section}: {e}")
|
|
exit(1)
|
|
|
|
# Self-explanatory line, no?
|
|
|
|
start_http_server(args.port, args.address, registry)
|
|
|
|
# Start updating the prices
|
|
|
|
while True:
|
|
for aks, name, currency in products:
|
|
try:
|
|
# Get all offers and filter out the ones that are not in stock
|
|
|
|
offers: List[dict] = aks.get_offers()
|
|
available_offers: List[dict] = filter(
|
|
lambda x: x["stock"] == "InStock", offers)
|
|
|
|
# If we have a store preference, filter out the offers that are
|
|
# not from that store
|
|
|
|
store: Optional[str]
|
|
|
|
if (store := settings.get("Store", fallback=defaults.get("Store", ""))):
|
|
available_offers: List[dict] = filter(
|
|
lambda x: x["platform"] == store, available_offers)
|
|
|
|
# Get the best offer and update the gauge
|
|
|
|
best_offer: dict = min(
|
|
available_offers, key=lambda x: x["price"][currency]["price"])
|
|
gauge.labels(product_name=name, currency=currency).set(
|
|
best_offer["price"][currency]["price"])
|
|
|
|
# If something goes wrong at this stage, we assume that there is just
|
|
# a problem with our connectivity or the website itself and continue
|
|
|
|
except Exception as e:
|
|
print(f"Error updating gauge value for {gauge._name}: {e}")
|
|
|
|
# Finally, wait for a minute before updating the prices again
|
|
|
|
time.sleep(60)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|