from prometheus_client import start_http_server, Gauge import urllib.request import re import time import yaml # Create a metric to track the number of registered users per domain registered_users_gauge = Gauge( "piped_registered_users", "Number of registered users", ["domain"] ) headers = { "User-Agent": "Mozilla/5.0 (compatible; prometheus-piped/dev; +https://git.private.coffee/PrivateCoffee/prometheus-piped)", } def fetch_registered_users(domain): try: # Construct the full API URL api_url = f"https://{domain}/registered/badge" # Fetch the badge URL req = urllib.request.Request(api_url, headers=headers) with urllib.request.urlopen(req) as response: final_url = response.geturl() # Extract the number of registered users from the redirect URL match = re.search(r"Registered%20Users-(\d+)-blue", final_url) if match: return int(match.group(1)) except Exception as e: print(f"Error fetching registered users from {domain}: {e}") return False def update_registered_users(domains, update_interval): while True: for domain in domains: # Fetch the number of registered users and update the gauge registered_users = fetch_registered_users(domain) if registered_users: registered_users_gauge.labels(domain=domain).set(registered_users) # Sleep for a while before fetching the data again time.sleep(update_interval) if __name__ == "__main__": # Load configuration from YAML file with open("config.yaml", "r") as file: config = yaml.safe_load(file) domains = config["piped"] update_interval = config.get("update_interval", 60) # Start up the server to expose the metrics start_http_server(8098) # Update the registered users gauge periodically update_registered_users(domains, update_interval)