diff --git a/commands/__init__.py b/commands/__init__.py new file mode 100644 index 0000000..df29381 --- /dev/null +++ b/commands/__init__.py @@ -0,0 +1,17 @@ +from .help import command_help +from .newroom import command_newroom +from .stats import command_stats +from .botinfo import command_botinfo +from .unknown import command_unknown +from .coin import command_coin +from .ignoreolder import command_ignoreolder + +COMMANDS = { + "help": command_help, + "newroom": command_newroom, + "stats": command_stats, + "botinfo": command_botinfo, + "coin": command_coin, + "ignoreolder": command_ignoreolder, + None: command_unknown, +} \ No newline at end of file diff --git a/commands/botinfo.py b/commands/botinfo.py new file mode 100644 index 0000000..b7e9bb2 --- /dev/null +++ b/commands/botinfo.py @@ -0,0 +1,22 @@ +from nio.events.room_events import RoomMessageText +from nio.rooms import MatrixRoom + +async def command_botinfo(room: MatrixRoom, event: RoomMessageText, context: dict): + logging("Showing bot info...") + + await context["client"].room_send( + room.room_id, "m.room.message", {"msgtype": "m.notice", + "body": f"""GPT Info: + +Model: {context["model"]} +Maximum context tokens: {context["max_tokens"]} +Maximum context messages: {context["max_messages"]} +System message: {context["system_message"]} + +Room info: + +Bot user ID: {context["client"].user_id} +Current room ID: {room.room_id} + +For usage statistics, run !gptbot stats +"""}) \ No newline at end of file diff --git a/commands/coin.py b/commands/coin.py new file mode 100644 index 0000000..f792718 --- /dev/null +++ b/commands/coin.py @@ -0,0 +1,14 @@ +from nio.events.room_events import RoomMessageText +from nio.rooms import MatrixRoom + +from random import SystemRandom + +async def command_coin(room: MatrixRoom, event: RoomMessageText, context: dict): + context["logger"]("Flipping a coin...") + + heads = SystemRandom().choice([True, False]) + + await context["client"].room_send( + room.room_id, "m.room.message", {"msgtype": "m.notice", + "body": "Heads!" if heads else "Tails!"} + ) \ No newline at end of file diff --git a/commands/help.py b/commands/help.py new file mode 100644 index 0000000..6554d23 --- /dev/null +++ b/commands/help.py @@ -0,0 +1,16 @@ +from nio.events.room_events import RoomMessageText +from nio.rooms import MatrixRoom + +async def command_help(room: MatrixRoom, event: RoomMessageText, context: dict): + await context["client"].room_send( + room.room_id, "m.room.message", {"msgtype": "m.notice", + "body": """Available commands: + +!gptbot help - Show this message +!gptbot newroom - Create a new room and invite yourself to it +!gptbot stats - Show usage statistics for this room +!gptbot botinfo - Show information about the bot +!gptbot coin - Flip a coin (heads or tails) +!gptbot ignoreolder - Ignore messages before this point as context +"""} + ) \ No newline at end of file diff --git a/commands/ignoreolder.py b/commands/ignoreolder.py new file mode 100644 index 0000000..8da3d7c --- /dev/null +++ b/commands/ignoreolder.py @@ -0,0 +1,10 @@ +from nio.events.room_events import RoomMessageText +from nio.rooms import MatrixRoom + +async def command_ignoreolder(room: MatrixRoom, event: RoomMessageText, context: dict): + await context["client"].room_send( + room.room_id, "m.room.message", {"msgtype": "m.notice", + "body": """Alright, messages before this point will not be processed as context anymore. + +If you ever reconsider, you can simply delete your message and I will start processing messages before it again."""} + ) \ No newline at end of file diff --git a/commands/newroom.py b/commands/newroom.py new file mode 100644 index 0000000..1395545 --- /dev/null +++ b/commands/newroom.py @@ -0,0 +1,16 @@ +from nio.events.room_events import RoomMessageText +from nio.rooms import MatrixRoom + +async def command_newroom(room: MatrixRoom, event: RoomMessageText, context: dict): + room_name = " ".join(event.body.split()[2:]) or context["default_room_name"] + + context["logger"]("Creating new room...") + new_room = await context["client"].room_create(name=room_name) + + context["logger"](f"Inviting {event.sender} to new room...") + await context["client"].room_invite(new_room.room_id, event.sender) + await context["client"].room_put_state( + new_room.room_id, "m.room.power_levels", {"users": {event.sender: 100}}) + + await context["client"].room_send( + new_room.room_id, "m.room.message", {"msgtype": "m.text", "body": "Welcome to the new room!"}) diff --git a/commands/stats.py b/commands/stats.py new file mode 100644 index 0000000..e7ae307 --- /dev/null +++ b/commands/stats.py @@ -0,0 +1,23 @@ +from nio.events.room_events import RoomMessageText +from nio.rooms import MatrixRoom + +async def command_stats(room: MatrixRoom, event: RoomMessageText, context: dict): + context["logger"]("Showing stats...") + + if not (database := context.get("database")): + context["logger"]("No database connection - cannot show stats") + context["client"].room_send( + room.room_id, "m.room.message", {"msgtype": "m.notice", + "body": "Sorry, I'm not connected to a database, so I don't have any statistics on your usage."} + ) + return + + with database.cursor() as cursor: + cursor.execute( + "SELECT SUM(tokens) FROM token_usage WHERE room_id = ?", (room.room_id,)) + total_tokens = cursor.fetchone()[0] or 0 + + await context["client"].room_send( + room.room_id, "m.room.message", {"msgtype": "m.notice", + "body": f"Total tokens used: {total_tokens}"} + ) \ No newline at end of file diff --git a/commands/unknown.py b/commands/unknown.py new file mode 100644 index 0000000..4b97c95 --- /dev/null +++ b/commands/unknown.py @@ -0,0 +1,10 @@ +from nio.events.room_events import RoomMessageText +from nio.rooms import MatrixRoom + +async def command_unknown(room: MatrixRoom, event: RoomMessageText, context: dict): + context["logger"]("Unknown command") + + await context["client"].room_send( + room.room_id, "m.room.message", {"msgtype": "m.notice", + "body": "Unknown command - try !gptbot help"} + ) \ No newline at end of file diff --git a/gptbot.py b/gptbot.py index 6bef7fb..ae310cf 100644 --- a/gptbot.py +++ b/gptbot.py @@ -2,6 +2,7 @@ import os import inspect import logging import signal +import random import openai import asyncio @@ -16,28 +17,33 @@ from nio.responses import RoomMessagesError, SyncResponse, RoomRedactError from configparser import ConfigParser from datetime import datetime from argparse import ArgumentParser +from typing import List, Dict, Union, Optional -# Globals - -DATABASE = False -DEFAULT_ROOM_NAME = "GPTBot" -SYSTEM_MESSAGE = "You are a helpful assistant. " -MAX_TOKENS = 3000 -MAX_MESSAGES = 20 -DEFAULT_MODEL = "gpt-3.5-turbo" - -# Set up Matrix client -MATRIX_CLIENT = None -SYNC_TOKEN = None +from commands import COMMANDS -def logging(message, log_level="info"): +def logging(message: str, log_level: str = "info"): caller = inspect.currentframe().f_back.f_code.co_name timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S:%f") print(f"[{timestamp}] - {caller} - [{log_level.upper()}] {message}") -async def gpt_query(messages, model=DEFAULT_MODEL): +CONTEXT = { + "database": False, + "default_room_name": "GPTBot", + "system_message": "You are a helpful assistant.", + "max_tokens": 3000, + "max_messages": 20, + "model": "gpt-3.5-turbo", + "client": None, + "sync_token": None, + "logger": logging +} + + +async def gpt_query(messages: list, model: Optional[str] = None): + model = model or CONTEXT["model"] + logging(f"Querying GPT with {len(messages)} messages") try: response = openai.ChatCompletion.create( @@ -54,17 +60,20 @@ async def gpt_query(messages, model=DEFAULT_MODEL): return None, 0 -async def fetch_last_n_messages(room_id, n=MAX_MESSAGES): - global SYNC_TOKEN, MATRIX_CLIENT - +async def fetch_last_n_messages(room_id: str, n: Optional[int] = None, + client: Optional[AsyncClient] = None, sync_token: Optional[str] = None): messages = [] - logging( - f"Fetching last {2*n} messages from room {room_id} (starting at {SYNC_TOKEN})...") + n = n or CONTEXT["max_messages"] + client = client or CONTEXT["client"] + sync_token = sync_token or CONTEXT["sync_token"] - response = await MATRIX_CLIENT.room_messages( + logging( + f"Fetching last {2*n} messages from room {room_id} (starting at {sync_token})...") + + response = await client.room_messages( room_id=room_id, - start=SYNC_TOKEN, + start=sync_token, limit=2*n, ) @@ -77,6 +86,8 @@ async def fetch_last_n_messages(room_id, n=MAX_MESSAGES): if len(messages) >= n: break if isinstance(event, RoomMessageText): + if event.body.startswith("!gptbot ignoreolder"): + break if not event.body.startswith("!"): messages.append(event) @@ -86,13 +97,16 @@ async def fetch_last_n_messages(room_id, n=MAX_MESSAGES): return messages[::-1] -def truncate_messages_to_fit_tokens(messages, max_tokens=MAX_TOKENS, model=DEFAULT_MODEL): - global SYSTEM_MESSAGE +def truncate_messages_to_fit_tokens(messages: list, max_tokens: Optional[int] = None, + model: Optional[str] = None, system_message: Optional[str] = None): + max_tokens = max_tokens or CONTEXT["max_tokens"] + model = model or CONTEXT["model"] + system_message = system_message or CONTEXT["system_message"] encoding = tiktoken.encoding_for_model(model) total_tokens = 0 - system_message_tokens = len(encoding.encode(SYSTEM_MESSAGE)) + 1 + system_message_tokens = len(encoding.encode(system_message)) + 1 if system_message_tokens > max_tokens: logging( @@ -101,7 +115,7 @@ def truncate_messages_to_fit_tokens(messages, max_tokens=MAX_TOKENS, model=DEFAU total_tokens += system_message_tokens - total_tokens = len(SYSTEM_MESSAGE) + 1 + total_tokens = len(system_message) + 1 truncated_messages = [] for message in [messages[0]] + list(reversed(messages[1:])): @@ -115,19 +129,23 @@ def truncate_messages_to_fit_tokens(messages, max_tokens=MAX_TOKENS, model=DEFAU return [truncated_messages[0]] + list(reversed(truncated_messages[1:])) -async def process_query(room: MatrixRoom, event: RoomMessageText): - global MATRIX_CLIENT, DATABASE, SYSTEM_MESSAGE +async def process_query(room: MatrixRoom, event: RoomMessageText, **kwargs): - await MATRIX_CLIENT.room_typing(room.room_id, True) + client = kwargs.get("client") or CONTEXT["client"] + database = kwargs.get("database") or CONTEXT["database"] + system_message = kwargs.get("system_message") or CONTEXT["system_message"] + max_tokens = kwargs.get("max_tokens") or CONTEXT["max_tokens"] - await MATRIX_CLIENT.room_read_markers(room.room_id, event.event_id) + await client.room_typing(room.room_id, True) + + await client.room_read_markers(room.room_id, event.event_id) last_messages = await fetch_last_n_messages(room.room_id, 20) - chat_messages = [{"role": "system", "content": SYSTEM_MESSAGE}] + chat_messages = [{"role": "system", "content": system_message}] for message in last_messages: - role = "assistant" if message.sender == MATRIX_CLIENT.user_id else "user" + role = "assistant" if message.sender == client.user_id else "user" if not message.event_id == event.event_id: chat_messages.append({"role": role, "content": message.body}) @@ -135,7 +153,7 @@ async def process_query(room: MatrixRoom, event: RoomMessageText): # Truncate messages to fit within the token limit truncated_messages = truncate_messages_to_fit_tokens( - chat_messages, MAX_TOKENS - 1) + chat_messages, max_tokens - 1) response, tokens_used = await gpt_query(truncated_messages) @@ -147,138 +165,49 @@ async def process_query(room: MatrixRoom, event: RoomMessageText): markdowner = markdown2.Markdown(extras=["fenced-code-blocks"]) formatted_body = markdowner.convert(response) - message = await MATRIX_CLIENT.room_send( + message = await client.room_send( room.room_id, "m.room.message", {"msgtype": "m.text", "body": response, "format": "org.matrix.custom.html", "formatted_body": formatted_body} ) - if DATABASE: + if database: logging("Logging tokens used...") - with DATABASE.cursor() as cursor: + with database.cursor() as cursor: cursor.execute( - "INSERT INTO token_usage (message_id, room_id, tokens, timestamp) VALUES (?, ?, ?, ?)", + "INSERT INTO token_usage (message_id, room_id, tokens, timestamp) VALUES (?, ?, ?, ?)", (message.event_id, room.room_id, tokens_used, datetime.now())) - DATABASE.commit() + database.commit() else: # Send a notice to the room if there was an error logging("Error during GPT API call - sending notice to room") - await MATRIX_CLIENT.room_send( + await client.room_send( room.room_id, "m.room.message", { "msgtype": "m.notice", "body": "Sorry, I'm having trouble connecting to the GPT API right now. Please try again later."} ) print("No response from GPT API") - await MATRIX_CLIENT.room_typing(room.room_id, False) + await client.room_typing(room.room_id, False) -async def command_newroom(room: MatrixRoom, event: RoomMessageText): - room_name = " ".join(event.body.split()[2:]) or DEFAULT_ROOM_NAME - - logging("Creating new room...") - new_room = await MATRIX_CLIENT.room_create(name=room_name) - - logging(f"Inviting {event.sender} to new room...") - await MATRIX_CLIENT.room_invite(new_room.room_id, event.sender) - await MATRIX_CLIENT.room_put_state( - new_room.room_id, "m.room.power_levels", {"users": {event.sender: 100}}) - - await MATRIX_CLIENT.room_send( - new_room.room_id, "m.room.message", {"msgtype": "m.text", "body": "Welcome to the new room!"}) - - -async def command_help(room: MatrixRoom, event: RoomMessageText): - await MATRIX_CLIENT.room_send( - room.room_id, "m.room.message", {"msgtype": "m.notice", - "body": """Available commands: - -!gptbot help - Show this message -!gptbot newroom - Create a new room and invite yourself to it -!gptbot stats - Show usage statistics for this room -!gptbot botinfo - Show information about the bot -"""} - ) - - -async def command_stats(room: MatrixRoom, event: RoomMessageText): - global DATABASE, MATRIX_CLIENT - - logging("Showing stats...") - - if not DATABASE: - logging("No database connection - cannot show stats") - return - - with DATABASE.cursor() as cursor: - cursor.execute( - "SELECT SUM(tokens) FROM token_usage WHERE room_id = ?", (room.room_id,)) - total_tokens = cursor.fetchone()[0] or 0 - - await MATRIX_CLIENT.room_send( - room.room_id, "m.room.message", {"msgtype": "m.notice", - "body": f"Total tokens used: {total_tokens}"} - ) - - -async def command_unknown(room: MatrixRoom, event: RoomMessageText): - global MATRIX_CLIENT - - logging("Unknown command") - - await MATRIX_CLIENT.room_send( - room.room_id, "m.room.message", {"msgtype": "m.notice", - "body": "Unknown command - try !gptbot help"} - ) - - -async def command_botinfo(room: MatrixRoom, event: RoomMessageText): - global MATRIX_CLIENT - - logging("Showing bot info...") - - await MATRIX_CLIENT.room_send( - room.room_id, "m.room.message", {"msgtype": "m.notice", - "body": f"""GPT Info: - -Model: {DEFAULT_MODEL} -Maximum context tokens: {MAX_TOKENS} -Maximum context messages: {MAX_MESSAGES} -System message: {SYSTEM_MESSAGE} - -Room info: - -Bot user ID: {MATRIX_CLIENT.user_id} -Current room ID: {room.room_id} - -For usage statistics, run !gptbot stats -"""}) - -COMMANDS = { - "help": command_help, - "newroom": command_newroom, - "stats": command_stats, - "botinfo": command_botinfo -} - - -async def process_command(room: MatrixRoom, event: RoomMessageText): - global COMMANDS +async def process_command(room: MatrixRoom, event: RoomMessageText, context: Optional[dict] = None): + context = context or CONTEXT logging( f"Received command {event.body} from {event.sender} in room {room.room_id}") command = event.body.split()[1] if event.body.split()[1:] else None - await COMMANDS.get(command, command_unknown)(room, event) + await COMMANDS.get(command, COMMANDS[None])(room, event, context) -async def message_callback(room: MatrixRoom, event: RoomMessageText): - global DEFAULT_ROOM_NAME, MATRIX_CLIENT, SYSTEM_MESSAGE, DATABASE, MAX_TOKENS - +async def message_callback(room: MatrixRoom, event: RoomMessageText, **kwargs): + context = kwargs.get("context") or CONTEXT + logging(f"Received message from {event.sender} in room {room.room_id}") - if event.sender == MATRIX_CLIENT.user_id: + if event.sender == context["client"].user_id: logging("Message is from bot itself - ignoring") elif event.body.startswith("!gptbot"): @@ -288,16 +217,16 @@ async def message_callback(room: MatrixRoom, event: RoomMessageText): logging("Might be a command, but not for this bot - ignoring") else: - await process_query(room, event) + await process_query(room, event, context=context) -async def room_invite_callback(room: MatrixRoom, event): - global MATRIX_CLIENT +async def room_invite_callback(room: MatrixRoom, event: InviteEvent, **kwargs): + client = kwargs.get("client") or CONTEXT["client"] logging(f"Received invite to room {room.room_id} - joining...") - await MATRIX_CLIENT.join(room.room_id) - await MATRIX_CLIENT.room_send( + await client.join(room.room_id) + await client.room_send( room.room_id, "m.room.message", {"msgtype": "m.text", @@ -305,16 +234,16 @@ async def room_invite_callback(room: MatrixRoom, event): ) -async def accept_pending_invites(): - global MATRIX_CLIENT +async def accept_pending_invites(client: Optional[AsyncClient] = None): + client = client or CONTEXT["client"] logging("Accepting pending invites...") - for room_id in list(MATRIX_CLIENT.invited_rooms.keys()): + for room_id in list(client.invited_rooms.keys()): logging(f"Joining room {room_id}...") - await MATRIX_CLIENT.join(room_id) - await MATRIX_CLIENT.room_send( + await client.join(room_id) + await client.room_send( room_id, "m.room.message", {"msgtype": "m.text", @@ -322,39 +251,41 @@ async def accept_pending_invites(): ) -async def sync_cb(response): - global SYNC_TOKEN - +async def sync_cb(response, write_global: bool = True): logging( f"Sync response received (next batch: {response.next_batch})", "debug") SYNC_TOKEN = response.next_batch + if write_global: + global CONTEXT + CONTEXT["sync_token"] = SYNC_TOKEN -async def main(): - global MATRIX_CLIENT - if not MATRIX_CLIENT.user_id: - whoami = await MATRIX_CLIENT.whoami() - MATRIX_CLIENT.user_id = whoami.user_id +async def main(client: Optional[AsyncClient] = None): + client = client or CONTEXT["client"] + + if not client.user_id: + whoami = await client.whoami() + client.user_id = whoami.user_id try: - assert MATRIX_CLIENT.user_id + assert client.user_id except AssertionError: logging( "Failed to get user ID - check your access token or try setting it manually", "critical") - await MATRIX_CLIENT.close() + await client.close() return logging("Starting bot...") - MATRIX_CLIENT.add_response_callback(sync_cb, SyncResponse) + client.add_response_callback(sync_cb, SyncResponse) logging("Syncing...") - await MATRIX_CLIENT.sync(timeout=30000) + await client.sync(timeout=30000) - MATRIX_CLIENT.add_event_callback(message_callback, RoomMessageText) - MATRIX_CLIENT.add_event_callback(room_invite_callback, InviteEvent) + client.add_event_callback(message_callback, RoomMessageText) + client.add_event_callback(room_invite_callback, InviteEvent) await accept_pending_invites() # Accept pending invites @@ -362,21 +293,19 @@ async def main(): try: # Continue syncing events - await MATRIX_CLIENT.sync_forever(timeout=30000) + await client.sync_forever(timeout=30000) finally: logging("Syncing one last time...") - await MATRIX_CLIENT.sync(timeout=30000) - await MATRIX_CLIENT.close() # Properly close the aiohttp client session + await client.sync(timeout=30000) + await client.close() # Properly close the aiohttp client session logging("Bot stopped") -def initialize_database(path): - global DATABASE - +def initialize_database(path: os.PathLike): logging("Initializing database...") - DATABASE = duckdb.connect(path) + database = duckdb.connect(path) - with DATABASE.cursor() as cursor: + with database.cursor() as cursor: # Get the latest migration ID if the migrations table exists try: cursor.execute( @@ -417,7 +346,9 @@ def initialize_database(path): (datetime.now(),) ) - DATABASE.commit() + database.commit() + + return database if __name__ == "__main__": @@ -440,10 +371,10 @@ if __name__ == "__main__": logging("Matrix config not found or incomplete", "critical") exit(1) - MATRIX_CLIENT = AsyncClient(config["Matrix"]["Homeserver"]) + CONTEXT["client"] = AsyncClient(config["Matrix"]["Homeserver"]) - MATRIX_CLIENT.access_token = config["Matrix"]["AccessToken"] - MATRIX_CLIENT.user_id = config["Matrix"].get("UserID") + CONTEXT["client"].access_token = config["Matrix"]["AccessToken"] + CONTEXT["client"].user_id = config["Matrix"].get("UserID") # Set up GPT API try: @@ -456,25 +387,33 @@ if __name__ == "__main__": openai.api_key = config["OpenAI"]["APIKey"] if "Model" in config["OpenAI"]: - DEFAULT_MODEL = config["OpenAI"]["Model"] + CONTEXT["model"] = config["OpenAI"]["Model"] if "MaxTokens" in config["OpenAI"]: - MAX_TOKENS = int(config["OpenAI"]["MaxTokens"]) + CONTEXT["max_tokens"] = int(config["OpenAI"]["MaxTokens"]) if "MaxMessages" in config["OpenAI"]: - MAX_MESSAGES = int(config["OpenAI"]["MaxMessages"]) + CONTEXT["max_messages"] = int(config["OpenAI"]["MaxMessages"]) # Set up database if "Database" in config and config["Database"].get("Path"): - initialize_database(config["Database"]["Path"]) + CONTEXT["database"] = initialize_database(config["Database"]["Path"]) + + # Listen for SIGTERM + + def sigterm_handler(_signo, _stack_frame): + logging("Received SIGTERM - exiting...") + exit() + + signal.signal(signal.SIGTERM, sigterm_handler) # Start bot loop try: asyncio.run(main()) except KeyboardInterrupt: logging("Received KeyboardInterrupt - exiting...") - except signal.SIGTERM: + except SystemExit: logging("Received SIGTERM - exiting...") finally: - if DATABASE: - DATABASE.close() + if CONTEXT["database"]: + CONTEXT["database"].close()