diff --git a/.gitignore b/.gitignore index 1c8f647..410a434 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ venv/ *.pyc __pycache__/ -settings.ini \ No newline at end of file +config.yaml \ No newline at end of file diff --git a/LICENSE b/LICENSE index 088243d..28a555a 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2022-2023 Kumi Mitterer +Copyright (c) 2022-2024 Kumi Mitterer, Private.coffee Team Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index 3daf6b2..8b100da 100644 --- a/README.md +++ b/README.md @@ -1,42 +1,34 @@ -# Prometheus Exporter for allkeyshop.com +# Prometheus Exporter for Piped instances -This is a simple exporter for allkeyshop.com. It exports the lowest price for a -given game. +This is a simple exporter for Prometheus that gets statistics from Piped +instances. Currently, it only supports getting the number of registered users. ## Prerequisites - Python >= 3.8 -- prometheus-client (pip install prometheus-client) +- prometheus-client, pyyaml (pip install -r requirements.txt) ## Configuration -The exporter is configured using settings.ini. The provided settings.dist.ini +The exporter is configured using config.yaml. The provided config.dist.yaml is a template for the configuration file. -To add a new game/product, add a new section to the configuration file. The -section name can be either the product ID from allkeyshop.com or the product -name. +To add a new instance, simply add its API URL to the `piped` list. ## Usage -To run the exporter, simply execute the allkeyshop.py script. The exporter will -listen on port 8090 by default. - -To get a list of all available command line options, run the following command: - -```bash -./allkeyshop.py --help -``` +To run the exporter, simply execute the prometheus_piped.py script. The +exporter will listen on port 8098. A sample output of the exporter looks like this: ``` -# HELP allkeyshop_best_price Best price for a product on allkeyshop.com -# TYPE allkeyshop_best_price gauge -allkeyshop_best_price{currency="eur",product_name="Persona 5 Royal"} 49.99 -allkeyshop_best_price{currency="eur",product_name="Cyberpunk 2077"} 49.48 +# HELP registered_users Number of registered users +# TYPE registered_users gauge +registered_users{domain="api.piped.private.coffee"} 54.0 ``` ## License -This project is licensed under the MIT License - see the [LICENSE](LICENSE) file \ No newline at end of file +This project is licensed under the MIT License - see the [LICENSE](LICENSE) +file. diff --git a/allkeyshop.py b/allkeyshop.py deleted file mode 100755 index 12bc59a..0000000 --- a/allkeyshop.py +++ /dev/null @@ -1,334 +0,0 @@ -#!/usr/bin/env python3 - -################################################################### -# allkeyshop.py - Prometheus exporter for allkeyshop.com -# 2022 - 2023 kumitterer (https://kumig.it/kumitterer) -# -# This program is free software under the terms of the MIT License, -# except where otherwise noted. -################################################################### - -from prometheus_client import start_http_server, Gauge, CollectorRegistry - -from configparser import ConfigParser -from argparse import ArgumentParser -from pathlib import Path -from urllib.request import urlopen, Request -from urllib.error import HTTPError -from html.parser import HTMLParser -from typing import List, Tuple, Optional - -import re -import json -import time - - -class AllKeyShop: - """A class abstracting interaction with allkeyshop.com - """ - - # Define AllKeyShop's internal names for platforms as they are used in URLs - - PLATFORM_PC = "cd-key" - PLATFORM_PS5 = "ps5" - PLATFORM_PS4 = "ps4" - PLATFORM_XB1 = "xbox-one" - PLATFORM_XBSX = "xbox-series-x" - PLATFORM_SWITCH = "nintendo-switch" - - class ProductParser(HTMLParser): - """A parser for the product page of allkeyshop.com - Yields the product ID of the product in its result attribute - """ - - def __init__(self): - super().__init__() - self.reset() - self.result: int - - def handle_starttag(self, tag: str, attrs: List[Tuple[str, str]]): - # Basically, we're looking for a tag with the "data-product-id" - # attribute and parse the value of that attribute as an integer - - for attr in attrs: - if attr[0] == "data-product-id": - try: - self.result = int(attr[1]) - except (ValueError, IndexError): - # Not sure if this can even happen, - # but better safe than sorry - - pass - except Exception as e: - # If this happens, something is seriously wrong - - print(f"Error while parsing product ID: {e}") - - class HTTPRequest(Request): - """Custom HTTP request class with a custom user agent - """ - - def __init__(self, url: str, *args, **kwargs): - super().__init__(url, *args, **kwargs) - self.headers["user-agent"] = "allkeyshop.com prometheus exporter (https://kumig.it/kumitterer/prometheus-allkeyshop)" - - class ProductPageRequest(HTTPRequest): - """Class for generating requests to the product page of allkeyshop.com - """ - @staticmethod - def to_slug(string: str) -> str: - """Helper function for generating slugs from strings - - Shamelessly stolen from https://www.30secondsofcode.org/python/s/slugify - Website, name & logo © 2017-2022 30 seconds of code (https://github.com/30-seconds) - Individual snippets licensed under CC-BY-4.0 (https://creativecommons.org/licenses/by/4.0/) - - Args: - string (str): The string to generate a slug from - - Returns: - str: The generated slug - """ - - string = string.lower().strip() - string = re.sub(r'[^\w\s-]', '', string) - string = re.sub(r'[\s_-]+', '-', string) - string = re.sub(r'^-+|-+$', '', string) - - return string - - def __init__(self, product: str, platform: str, *args, **kwargs): - - # Get slug to use in URL - - slug = self.__class__.to_slug(product) - - # Allow "pc" as shorthand platform name for PCs - - if platform == "pc": - platform = AllKeyShop.PLATFORM_PC - - # Set request URL - - url = f"https://www.allkeyshop.com/blog/buy-{slug}-{platform}-compare-prices/" - super().__init__(url, *args, **kwargs) - - class OffersRequest(HTTPRequest): - """Class for generating requests to the offers API of allkeyshop.com - """ - - def __init__(self, product: int, *args, currency: str, **kwargs): - """Initializes the request - - Args: - product (int): Product ID of the product to get offers for - currency (str): Currency to get offers in (e.g. "eur") - region (str, optional): Region to get offers for (e.g. "eu"). Defaults to "". - edition (str, optional): Edition to get offers for (e.g. "standard"). Defaults to "". - moreq (str, optional): Additional query parameters. Defaults to "". - """ - region: str = kwargs.pop("region", "") - edition: str = kwargs.pop("edition", "") - moreq: str = kwargs.pop("moreq", "") - - url = f"https://www.allkeyshop.com/blog/wp-admin/admin-ajax.php?action=get_offers&product={product}¤cy={currency}®ion={region}&edition={edition}&moreq={moreq}&use_beta_offers_display=1" - - super().__init__(url, *args, **kwargs) - - def __init__(self, product: int | str, platform: Optional[str] = None, **kwargs): - """Initializes the AllKeyShop object - - Args: - product (int | str): Product ID or name of the product to get offers for - platform (Optional[str], optional): Platform to get offers for, if a product name is passed. Defaults to None. - """ - self.product: int - self.kwargs: dict = kwargs - - if isinstance(product, int): - # Product ID is already known - no need to resolve it - self.product = product - else: - # Resolve product ID from product name and platform - assert platform, "Platform must be specified if product name is passed" - self.product = self.__class__.resolve_product(product, platform) - - @classmethod - def resolve_product(cls, product: str, platform: str) -> int: - """Resolves a product ID from a product name and platform - - Args: - product (str): Name of the product to resolve - platform (str): Platform to get the product ID for - - Returns: - int: Product ID matching the given product name and platform - """ - - # Get product page - - content = urlopen(cls.ProductPageRequest(product, platform)) - html = content.read().decode() - - # Pass the content to the custom HTML parser - - parser = cls.ProductParser() - parser.feed(html) - - # Return the result, or raise an exception if no result was found - - assert parser.result, f"Could not resolve product ID for product {product} on platform {platform}" - return parser.result - - def get_offers(self) -> dict: - """Gets all offers for the product - - Returns: - dict: Offers for the product - """ - - # Get offers - - content = urlopen(self.__class__.OffersRequest( - self.product, **self.kwargs)) - raw = content.read() - - content = json.loads(raw) - - # Return the offers, or raise an exception if the request failed - - assert content["success"], "Something went wrong while getting offers" - return content["offers"] - - -def main(): - # Parse command line arguments - - parser = ArgumentParser( - description="Prometheus exporter for allkeyshop.com") - parser.add_argument("-c", "--config", type=Path, default=Path(__file__).parent / - "settings.ini", help="Path to config file (default: settings.ini in script directory)") - parser.add_argument("-p", "--port", type=int, default=8090, - help="Port to listen on (default: 8090)") - parser.add_argument("-a", "--address", type=str, default="0.0.0.0", - help="Address to listen on (default: 0.0.0.0)") - args = parser.parse_args() - - # Read configuration file - - config = ConfigParser() - config.read(args.config) - - if config.has_section("DEFAULT"): - defaults = config["DEFAULT"] - else: - defaults = dict() - - # Initialize a custom CollectorRegistry so we don't get the default metrics - - registry = CollectorRegistry() - - # Initialize Gauge - - gauge = Gauge( - f"allkeyshop_best_price", f"Best price for a product on allkeyshop.com", - ["product_name", "currency"], registry=registry) - - # Initialize products - - products: List[Tuple[AllKeyShop, str, str]] = list() - - for section, settings in filter(lambda x: x[0] != "DEFAULT", config.items()): - try: - # Assert that we know the currency we want to use - - currency: str - assert (currency := settings.get( - "Currency", fallback=defaults.get("Currency"))), "Currency not set for section {section}" - currency = currency.lower() - - # Check if we need a specific region or edition - - region: str = settings.get( - "Region", fallback=defaults.get("Region", "")) - edition: str = settings.get("Edition", fallback="") - - product: str | int - platform: Optional[str] - name: str - - # Initialize AllKeyShop object - - try: - product = int(section) - name = settings.get("Name", fallback=section) - aks: AllKeyShop = AllKeyShop( - product, currency=currency, region=region, edition=edition) - - except ValueError: - product = name = section - platform = settings.get( - "Platform", fallback=defaults.get("Platform")) - assert platform - aks: AllKeyShop = AllKeyShop( - product, platform, currency=currency, - region=region, edition=edition) - - # Finally, add the product to the list - - products.append((aks, name, currency)) - - # If something goes wrong at this point, we assume that there is a - # problem with the configuration file and exit - - except HTTPError as e: - print(f"Error calling URL {e.url} for section {section}: {e}") - exit(1) - except Exception as e: - print(f"Error setting up gauge for section {section}: {e}") - exit(1) - - # Self-explanatory line, no? - - start_http_server(args.port, args.address, registry) - - # Start updating the prices - - while True: - for aks, name, currency in products: - try: - # Get all offers and filter out the ones that are not in stock - - offers: List[dict] = aks.get_offers() - available_offers: List[dict] = filter( - lambda x: x["stock"] == "InStock", offers) - - # If we have a store preference, filter out the offers that are - # not from that store - - store: Optional[str] - - if (store := settings.get("Store", fallback=defaults.get("Store", ""))): - available_offers: List[dict] = filter( - lambda x: x["platform"] == store, available_offers) - - # Get the best offer and update the gauge - - best_offer: dict = min( - available_offers, key=lambda x: x["price"][currency]["price"]) - gauge.labels(product_name=name, currency=currency).set( - best_offer["price"][currency]["price"]) - - # If something goes wrong at this stage, we assume that there is just - # a problem with our connectivity or the website itself and continue - - except Exception as e: - print(f"Error updating gauge value for {gauge._name}: {e}") - - # Finally, wait for a minute before updating the prices again - - time.sleep(60) - - -if __name__ == "__main__": - main() diff --git a/config.dist.yaml b/config.dist.yaml new file mode 100644 index 0000000..62b6114 --- /dev/null +++ b/config.dist.yaml @@ -0,0 +1,5 @@ +piped: + - api-url-1 + - api-url-2 + - api-url-3 +update_interval: 60 \ No newline at end of file diff --git a/prometheus_piped.py b/prometheus_piped.py new file mode 100755 index 0000000..b68e7fc --- /dev/null +++ b/prometheus_piped.py @@ -0,0 +1,55 @@ +from prometheus_client import start_http_server, Gauge +import urllib.request +import re +import time +import yaml + +# Create a metric to track the number of registered users per domain +registered_users_gauge = Gauge( + "registered_users", "Number of registered users", ["domain"] +) + +headers = { + "User-Agent": "Mozilla/5.0 (compatible; prometheus-piped/dev; +https://git.private.coffee/PrivateCoffee/prometheus-piped)", +} + + +def fetch_registered_users(domain): + try: + # Construct the full API URL + api_url = f"https://{domain}/registered/badge" + # Fetch the badge URL + req = urllib.request.Request(api_url, headers=headers) + with urllib.request.urlopen(req) as response: + final_url = response.geturl() + # Extract the number of registered users from the redirect URL + match = re.search(r"Registered%20Users-(\d+)-blue", final_url) + if match: + return int(match.group(1)) + except Exception as e: + print(f"Error fetching registered users from {domain}: {e}") + return 0 + + +def update_registered_users(domains, update_interval): + while True: + for domain in domains: + # Fetch the number of registered users and update the gauge + registered_users = fetch_registered_users(domain) + registered_users_gauge.labels(domain=domain).set(registered_users) + # Sleep for a while before fetching the data again + time.sleep(update_interval) + + +if __name__ == "__main__": + # Load configuration from YAML file + with open("config.yaml", "r") as file: + config = yaml.safe_load(file) + + domains = config["piped"] + update_interval = config.get("update_interval", 60) + + # Start up the server to expose the metrics + start_http_server(8098) + # Update the registered users gauge periodically + update_registered_users(domains, update_interval) diff --git a/requirements.txt b/requirements.txt index 9ca76ff..1b67103 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ -prometheus-client \ No newline at end of file +prometheus-client +pyyaml \ No newline at end of file diff --git a/settings.dist.ini b/settings.dist.ini deleted file mode 100644 index 2f6bd4c..0000000 --- a/settings.dist.ini +++ /dev/null @@ -1,9 +0,0 @@ -[DEFAULT] -Currency = eur -Platform = pc -Store = steam - -[Persona 5 Royal] - -[10539] -Name = Cyberpunk 2077 \ No newline at end of file