matrix-rssbot/gptbot.py

258 lines
7.6 KiB
Python
Raw Normal View History

2023-04-16 14:08:57 +00:00
import sqlite3
import os
import inspect
import openai
import asyncio
import markdown2
import tiktoken
from nio import AsyncClient, RoomMessageText, MatrixRoom, Event, InviteEvent
from nio.api import MessageDirection
from nio.responses import RoomMessagesError, SyncResponse
from configparser import ConfigParser
from datetime import datetime
config = ConfigParser()
config.read("config.ini")
# Set up GPT API
openai.api_key = config["OpenAI"]["APIKey"]
MODEL = config["OpenAI"].get("Model", "gpt-3.5-turbo")
# Set up Matrix client
MATRIX_HOMESERVER = config["Matrix"]["Homeserver"]
MATRIX_ACCESS_TOKEN = config["Matrix"]["AccessToken"]
BOT_USER_ID = config["Matrix"]["UserID"]
client = AsyncClient(MATRIX_HOMESERVER, BOT_USER_ID)
SYNC_TOKEN = None
# Set up SQLite3 database
conn = sqlite3.connect("token_usage.db")
cursor = conn.cursor()
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS token_usage (
room_id TEXT NOT NULL,
tokens INTEGER NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""
)
conn.commit()
# Define the system message and max token limit
SYSTEM_MESSAGE = "You are a helpful assistant."
MAX_TOKENS = 3000
def logging(message, log_level="info"):
caller = inspect.currentframe().f_back.f_code.co_name
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S:%f")
print(f"[{timestamp} - {caller}] [{log_level.upper()}] {message}")
async def gpt_query(messages):
logging(f"Querying GPT with {len(messages)} messages")
try:
response = openai.ChatCompletion.create(
model=MODEL,
messages=messages
)
result_text = response.choices[0].message['content']
tokens_used = response.usage["total_tokens"]
logging(f"Used {tokens_used} tokens")
return result_text, tokens_used
except Exception as e:
logging(f"Error during GPT API call: {e}", "error")
return None, 0
async def fetch_last_n_messages(room_id, n=20):
# Fetch the last n messages from the room
room = await client.join(room_id)
messages = []
2023-04-16 14:29:28 +00:00
logging(f"Fetching last {n} messages from room {room_id} (starting at {SYNC_TOKEN})...")
2023-04-16 14:08:57 +00:00
response = await client.room_messages(
room_id=room_id,
start=SYNC_TOKEN,
limit=n,
)
if isinstance(response, RoomMessagesError):
logging(
f"Error fetching messages: {response.message} (status code {response.status_code})", "error")
return []
for event in response.chunk:
if isinstance(event, RoomMessageText):
messages.append(event)
logging(f"Found {len(messages)} messages")
# Reverse the list so that messages are in chronological order
return messages[::-1]
def truncate_messages_to_fit_tokens(messages, max_tokens=MAX_TOKENS):
encoding = tiktoken.encoding_for_model(MODEL)
total_tokens = 0
system_message_tokens = len(encoding.encode(SYSTEM_MESSAGE)) + 1
if system_message_tokens > max_tokens:
logging(
f"System message is too long to fit within token limit ({system_message_tokens} tokens) - cannot proceed", "error")
return []
total_tokens += system_message_tokens
total_tokens = len(SYSTEM_MESSAGE) + 1
truncated_messages = []
2023-04-16 14:29:28 +00:00
for message in [messages[0]] + list(reversed(messages[1:])):
2023-04-16 14:08:57 +00:00
content = message["content"]
tokens = len(encoding.encode(content)) + 1
if total_tokens + tokens > max_tokens:
break
total_tokens += tokens
truncated_messages.append(message)
2023-04-16 14:29:28 +00:00
return [truncated_messages[0]] + list(reversed(truncated_messages[1:]))
2023-04-16 14:08:57 +00:00
async def message_callback(room: MatrixRoom, event: RoomMessageText):
logging(f"Received message from {event.sender} in room {room.room_id}")
if event.sender == BOT_USER_ID:
logging("Message is from bot - ignoring")
return
await client.room_typing(room.room_id, True)
await client.room_read_markers(room.room_id, event.event_id)
last_messages = await fetch_last_n_messages(room.room_id, 20)
if not last_messages or all(message.sender == BOT_USER_ID for message in last_messages):
logging("No messages to respond to")
await client.room_typing(room.room_id, False)
return
chat_messages = [{"role": "system", "content": SYSTEM_MESSAGE}]
for message in last_messages:
role = "assistant" if message.sender == BOT_USER_ID else "user"
if not message.event_id == event.event_id:
chat_messages.append({"role": role, "content": message.body})
chat_messages.append({"role": "user", "content": event.body})
# Truncate messages to fit within the token limit
truncated_messages = truncate_messages_to_fit_tokens(
chat_messages, MAX_TOKENS - 1)
response, tokens_used = await gpt_query(truncated_messages)
if response:
# Send the response to the room
logging(f"Sending response to room {room.room_id}...")
markdowner = markdown2.Markdown(extras=["fenced-code-blocks"])
formatted_body = markdowner.convert(response)
await client.room_send(
room.room_id, "m.room.message", {"msgtype": "m.text", "body": response,
"format": "org.matrix.custom.html", "formatted_body": formatted_body}
)
logging("Logging tokens used...")
cursor.execute(
"INSERT INTO token_usage (room_id, tokens) VALUES (?, ?)", (room.room_id, tokens_used))
conn.commit()
else:
# Send a notice to the room if there was an error
logging("Error during GPT API call - sending notice to room")
await client.room_send(
room.room_id, "m.room.message", {
"msgtype": "m.notice", "body": "Sorry, I'm having trouble connecting to the GPT API right now. Please try again later."}
)
print("No response from GPT API")
await client.room_typing(room.room_id, False)
async def room_invite_callback(room: MatrixRoom, event):
logging(f"Received invite to room {room.room_id} - joining...")
await client.join(room.room_id)
await client.room_send(
room.room_id,
"m.room.message",
{"msgtype": "m.text",
"body": "Hello! I'm a helpful assistant. How can I help you today?"}
)
async def accept_pending_invites():
logging("Accepting pending invites...")
for room_id in list(client.invited_rooms.keys()):
logging(f"Joining room {room_id}...")
await client.join(room_id)
await client.room_send(
room_id,
"m.room.message",
{"msgtype": "m.text",
"body": "Hello! I'm a helpful assistant. How can I help you today?"}
)
async def sync_cb(response):
2023-04-16 14:29:28 +00:00
global SYNC_TOKEN
2023-04-16 14:08:57 +00:00
logging(f"Sync response received (next batch: {response.next_batch})")
SYNC_TOKEN = response.next_batch
async def main():
logging("Starting bot...")
client.access_token = MATRIX_ACCESS_TOKEN # Set the access token directly
client.user_id = BOT_USER_ID # Set the user_id directly
client.add_response_callback(sync_cb, SyncResponse)
logging("Syncing...")
await client.sync(timeout=30000)
client.add_event_callback(message_callback, RoomMessageText)
client.add_event_callback(room_invite_callback, InviteEvent)
await accept_pending_invites() # Accept pending invites
logging("Bot started")
try:
await client.sync_forever(timeout=30000) # Continue syncing events
finally:
await client.close() # Properly close the aiohttp client session
logging("Bot stopped")
if __name__ == "__main__":
try:
asyncio.run(main())
finally:
conn.close()