Compare commits

..

No commits in common. "main" and "justin-russell-bugfixes" have entirely different histories.

92 changed files with 2161 additions and 3942 deletions

View file

@ -1,33 +0,0 @@
name: Docker CI/CD
on:
push:
tags:
- "*"
jobs:
docker:
name: Docker Build and Push to Docker Hub
container:
image: node:20-bookworm
steps:
- name: Install dependencies
run: |
apt update
apt install -y docker.io
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Login to Docker Hub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Build and push to Docker Hub
uses: docker/build-push-action@v5
with:
push: true
tags: |
kumitterer/matrix-gptbot:latest
kumitterer/matrix-gptbot:${{ env.GITHUB_REF_NAME }}

View file

@ -1,54 +0,0 @@
name: Python Package CI/CD
on:
workflow_dispatch:
push:
tags:
- "*"
jobs:
setup:
name: Setup and Test
container:
image: node:20-bookworm
steps:
- name: Check out code
uses: actions/checkout@v4
- name: Install dependencies
run: |
apt update
apt install -y python3 python3-venv
- name: Set up Python environment
run: |
python3 -V
python3 -m venv venv
. ./venv/bin/activate
pip install -U pip
pip install .[all]
publish:
name: Publish to PyPI
container:
image: node:20-bookworm
steps:
- name: Check out code
uses: actions/checkout@v4
- name: Install dependencies
run: |
apt update
apt install -y python3 python3-venv
- name: Publish to PyPI
run: |
python3 -m venv venv
. ./venv/bin/activate
pip install -U hatchling twine build
python -m build .
python -m twine upload --username __token__ --password ${PYPI_TOKEN} dist/*
env:
PYPI_TOKEN: ${{ secrets.PYPI_TOKEN }}

View file

@ -1,6 +0,0 @@
version: 2
updates:
- package-ecosystem: "pip"
directory: "/"
schedule:
interval: "daily"

7
.gitignore vendored
View file

@ -1,11 +1,6 @@
*.db
*.db.wal
*.db-journal
config.ini
venv/
*.pyc
__pycache__/
*.bak
dist/
pantalaimon.conf
.ruff_cache/
__pycache__/

15
.vscode/launch.json vendored
View file

@ -1,15 +0,0 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: Module",
"type": "python",
"request": "launch",
"module": "gptbot",
"justMyCode": true
}
]
}

View file

@ -1,3 +0,0 @@
{
"python.formatting.provider": "black"
}

View file

@ -1,79 +0,0 @@
# Changelog
### 0.3.14 (2024-05-21)
- Fixed issue in handling of login credentials, added error handling for login failures
### 0.3.13 (2024-05-20)
- **Breaking Change**: The `ForceTools` configuration option behavior has changed. Instead of using a separate model for tools, the bot will now try to use the default chat model for tool requests, even if that model is not known to support tools.
- Added `ToolModel` to OpenAI configuration to allow specifying a separate model for tool requests
- Automatically resize context images to a default maximum of 2000x768 pixels before sending them to the AI model
### 0.3.12 (2024-05-17)
- Added `ForceVision` to OpenAI configuration to allow third-party models to be used for image recognition
- Added some missing properties to `OpenAI` class
### 0.3.11 (2024-05-17)
- Refactoring of AI provider handling in preparation for multiple AI providers: Introduced a `BaseAI` class that all AI providers must inherit from
- Added support for temperature, top_p, frequency_penalty, and presence_penalty in `AllowedUsers`
- Introduced ruff as a development dependency for linting and applied some linting fixes
- Fixed `gptbot` command line tool
- Changed default chat model to `gpt-4o`
- Changed default image generation model to `dall-e-3`
- Removed currently unused sections from `config.dist.ini`
- Changed provided Pantalaimon config file to not use a key ring by default
- Prevent bot from crashing when an unneeded dependency is missing
### 0.3.10 (2024-05-16)
- Add support for specifying room IDs in `AllowedUsers`
- Minor fixes
### 0.3.9 (2024-04-23)
- Add Docker support for running the bot in a container
- Add TrackingMore dependency to pyproject.toml
- Replace deprecated `pkg_resources` with `importlib.metadata`
- Allow password-based login on first login
### 0.3.7 / 0.3.8 (2024-04-15)
- Changes to URLs in pyproject.toml
- Migrated build pipeline to Forgejo Actions
### 0.3.6 (2024-04-11)
- Fix issue where message type detection would fail for some messages (cece8cfb24e6f2e98d80d233b688c3e2c0ff05ae)
### 0.3.5
- Only set room avatar if it is not already set (a9c23ee9c42d0a741a7eb485315e3e2d0a526725)
### 0.3.4 (2024-02-18)
- Optimize chat model and message handling (10b74187eb43bca516e2a469b69be1dbc9496408)
- Fix parameter passing in chat response calls (2d564afd979e7bc9eee8204450254c9f86b663b5)
- Refine message filtering in bot event processing (c47f947f80f79a443bbd622833662e3122b121ef)
### 0.3.3 (2024-01-26)
- Implement recursion check in response generation (e6bc23e564e51aa149432fc67ce381a9260ee5f5)
- Implement tool emulation for models without tool support (0acc1456f9e4efa09e799f6ce2ec9a31f439fe4a)
- Allow selection of chat model by room (87173ae284957f66594e66166508e4e3bd60c26b)
### 0.3.2 (2023-12-14)
- Removed key upload from room event handler
- Fixed output of `python -m gptbot -v` to display currently installed version
- Workaround for bug preventing bot from responding when files are uploaded to an encrypted room
#### Known Issues
- When using Pantalaimon: Bot is unable to download/use files uploaded to unencrypted rooms
### 0.3.1 (2023-12-07)
- Fixed issue in newroom task causing it to be called over and over again

View file

@ -1,14 +0,0 @@
FROM python:3.12-slim
WORKDIR /app
COPY src/ /app/src
COPY pyproject.toml /app
COPY README.md /app
COPY LICENSE /app
RUN apt update
RUN apt install -y build-essential libpython3-dev ffmpeg
RUN pip install .[all]
RUN pip install 'future==1.0.0'
CMD ["python", "-m", "gptbot"]

View file

@ -1,5 +1,4 @@
Copyright (c) 2023-2024 Kumi Mitterer <gptbot@kumi.email>, Private.coffee Team
<support@private.coffee>
Copyright (c) 2023 Kumi Mitterer <gptbot@kumi.email>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

220
README.md
View file

@ -1,151 +1,46 @@
# GPTbot
[![Support Private.coffee!](https://shields.private.coffee/badge/private.coffee-support%20us!-pink?logo=coffeescript)](https://private.coffee)
[![Matrix](https://shields.private.coffee/badge/Matrix-join%20us!-blue?logo=matrix)](https://matrix.to/#/#matrix-gptbot:private.coffee)
[![PyPI](https://shields.private.coffee/pypi/v/matrix-gptbot)](https://pypi.org/project/matrix-gptbot/)
[![PyPI - Python Version](https://shields.private.coffee/pypi/pyversions/matrix-gptbot)](https://pypi.org/project/matrix-gptbot/)
[![PyPI - License](https://shields.private.coffee/pypi/l/matrix-gptbot)](https://pypi.org/project/matrix-gptbot/)
[![Latest Git Commit](https://shields.private.coffee/gitea/last-commit/privatecoffee/matrix-gptbot?gitea_url=https://git.private.coffee)](https://git.private.coffee/privatecoffee/matrix-gptbot)
GPTbot is a simple bot that uses different APIs to generate responses to
GPTbot is a simple bot that uses different APIs to generate responses to
messages in a Matrix room.
It is called GPTbot because it was originally intended to only use GPT-3 to
generate responses. However, it supports other services/APIs, and I will
probably add more in the future, so the name is a bit misleading.
## Features
- AI-generated responses to text, image and voice messages in a Matrix room
(chatbot)
- Currently supports OpenAI (`gpt-3.5-turbo` and `gpt-4`, `gpt-4o`, `whisper`
and `tts`) and compatible APIs (e.g. `ollama`)
- Able to generate pictures using OpenAI `dall-e-2`/`dall-e-3` models
- Able to browse the web to find information
- Able to use OpenWeatherMap to get weather information (requires separate
API key)
- Even able to roll dice!
- AI-generated responses to messages in a Matrix room (chatbot)
- Currently supports OpenAI (tested with `gpt-3.5-turbo` and `gpt-4`)
- AI-generated pictures via the `!gptbot imagine` command
- Currently supports OpenAI (DALL-E)
- Mathematical calculations via the `!gptbot calculate` command
- Currently supports WolframAlpha (requires separate API key)
- Currently supports WolframAlpha
- Automatic classification of messages (for `imagine`, `calculate`, etc.)
- Beta feature, see Usage section for details
- Really useful commands like `!gptbot help` and `!gptbot coin`
- sqlite3 database to store room settings
- DuckDB database to store room context
## Planned features
- End-to-end encryption support (partly implemented, but not yet working)
## Installation
To run the bot, you will need Python 3.10 or newer.
Simply clone this repository and install the requirements.
The bot has been tested with Python 3.12 on Arch, but should work with any
current version, and should not require any special dependencies or operating
system features.
### Requirements
### Production
- Python 3.10 or later
- Requirements from `requirements.txt` (install with `pip install -r requirements.txt` in a venv)
#### PyPI
### Configuration
The recommended way to install the bot is to use pip to install it from PyPI.
```shell
# Recommended: activate a venv first
python -m venv venv
. venv/bin/activate
# Install the bot
pip install matrix-gptbot[all]
```
This will install the latest release of the bot and all required dependencies
for all available features.
You can also use `pip install git+https://git.private.coffee/privatecoffee/matrix-gptbot.git`
to install the latest version from the Git repository.
#### Docker
A `docker-compose.yml` file is provided that you can use to run the bot with
Docker Compose. You will need to create a `config.ini` file as described in the
`Running` section.
```shell
# Clone the repository
git clone https://git.private.coffee/privatecoffee/matrix-gptbot.git
cd matrix-gptbot
# Create a config file
cp config.dist.ini config.ini
# Edit the config file to your needs
# Initialize the database file
sqlite3 database.db "SELECT 1"
# Optionally, create Pantalaimon config
cp contrib/pantalaimon.example.conf pantalaimon.conf
# Edit the Pantalaimon config file to your needs
# Update your homeserver URL in the bot's config.ini to point to Pantalaimon (probably http://pantalaimon:8009 if you used the provided example config)
# You can use `fetch_access_token.py` to get an access token for the bot
# Start the bot
docker-compose up -d
```
#### End-to-end encryption
WARNING: Using end-to-end encryption seems to sometimes cause problems with
file attachments, especially in rooms that are not encrypted, if the same
user also uses the bot in encrypted rooms.
The bot itself does not implement end-to-end encryption. However, it can be
used in conjunction with [pantalaimon](https://github.com/matrix-org/pantalaimon).
You first have to log in to your homeserver using `python fetch_access_token.py`,
and can then use the returned access token in your bot's `config.ini` file.
Make sure to also point the bot to your pantalaimon instance by setting
`homeserver` to your pantalaimon instance instead of directly to your
homeserver in your `config.ini`.
Note: If you don't use pantalaimon, the bot will still work, but it will not
be able to decrypt or encrypt messages. This means that you cannot use it in
rooms with end-to-end encryption enabled.
### Development
Clone the repository and install the requirements to a virtual environment.
```shell
# Clone the repository
git clone https://git.private.coffee/privatecoffee/matrix-gptbot.git
cd matrix-gptbot
# If desired, activate a venv first
python -m venv venv
. venv/bin/activate
# Install the bot in editable mode
pip install -e .[dev]
# Go to the bot directory and start working
cd src/gptbot
```
Of course, you can also fork the repository on [GitHub](https://github.com/kumitterer/matrix-gptbot/)
and work on your own copy.
#### Repository policy
Generally, the `main` branch is considered unstable and should not be used in
production. Instead, use the latest release tag. The `main` branch is used for
development and may contain breaking changes at any time.
For development, a feature branch should be created from `main` and merged back
into `main` with a pull request. The pull request will be reviewed and tested
before merging.
The bot requires a configuration file to be present in the working directory.
Copy the provided `config.dist.ini` to `config.ini` and edit it to your needs.
## Running
The bot requires a configuration file to be present in the working directory.
Copy the provided `config.dist.ini` to `config.ini` and edit it to your needs.
The bot can then be run with `python -m gptbot`. If required, activate a venv
first.
The bot can be run with `python -m gptbot`. If required, activate a venv first.
You may want to run the bot in a screen or tmux session, or use a process
manager like systemd. The repository contains a sample systemd service file
@ -155,9 +50,6 @@ adjust the paths in the file to match your setup, then copy it to
`systemctl start gptbot` and enable it to start automatically on boot with
`systemctl enable gptbot`.
Analogously, you can use the provided `gptbot-pantalaimon.service` file to run
pantalaimon as a systemd service.
## Usage
Once it is running, just invite the bot to a room and it will start responding
@ -178,42 +70,35 @@ With this setting, the bot will only be triggered if a message begins with
bot to generate a response to the message `Hello, how are you?`. The bot will
still get previous messages in the room as context for generating the response.
### Tools
The bot has a selection of tools at its disposal that it will automatically use
to generate responses. For example, if you send a message like "Draw me a
picture of a cat", the bot will automatically use DALL-E to generate an image
of a cat.
Note that this only works if the bot is configured to use a model that supports
tools. This currently is only the case for OpenAI's `gpt-3.5-turbo` model. If
you wish to use `gpt-4` instead, you can set the `ForceTools` option in the
`[OpenAI]` section of the config file to `1`. This will cause the bot to use
`gpt-3.5-turbo` for tool generation and `gpt-4` for generating the final text
response.
Similarly, it will attempt to use the `gpt-4-vision-preview` model to "read"
the contents of images if a non-vision model is used.
### Commands
There are a few commands that you can use to explicitly call a certain feature
of the bot. For example, if you want to generate an image from a text prompt,
you can use the `!gptbot imagine` command. For example, `!gptbot imagine a cat`
will cause the bot to generate an image of a cat.
There are a few commands that you can use to interact with the bot. For example,
if you want to generate an image from a text prompt, you can use the
`!gptbot imagine` command. For example, `!gptbot imagine a cat` will cause the
bot to generate an image of a cat.
To learn more about the available commands, `!gptbot help` will print a list of
available commands.
### Voice input and output
### Automatic classification
The bot supports voice input and output, but it is disabled by default. To
enable it, use the `!gptbot roomsettings` command to change the settings for
the current room. `!gptbot roomsettings stt true` will enable voice input using
OpenAI's `whisper` model, and `!gptbot roomsettings tts true` will enable voice
output using the `tts` model.
As a beta feature, the bot can automatically classify messages and use the
appropriate API to generate a response. For example, if you send a message
like "Draw me a picture of a cat", the bot will automatically use the
`imagine` command to generate an image of a cat.
Note that this currently only works for audio messages and .mp3 file uploads.
This feature is disabled by default. To enable it, use the `!gptbot roomsettings`
command to change the settings for the current room. `!gptbot roomsettings classification true`
will enable automatic classification, and `!gptbot roomsettings classification false`
will disable it again.
Note that this feature is still in beta and may not work as expected. You can
always use the commands manually if the automatic classification doesn't work
for you (including `!gptbot chat` for a regular chat message).
Also note that this feature conflicts with the `always_reply false` setting -
or rather, it doesn't make sense then because you already have to explicitly
specify the command to use.
## Troubleshooting
@ -222,12 +107,10 @@ Note that this currently only works for audio messages and .mp3 file uploads.
First of all, make sure that the bot is actually running. (Okay, that's not
really troubleshooting, but it's a good start.)
If the bot is running, check the logs, these should tell you what is going on.
For example, if the bot is showing an error message like "Timed out, retrying",
it is unable to reach your homeserver. In this case, check your homeserver URL
and make sure that the bot can reach it. If you are using Pantalaimon, make
sure that the bot is pointed to Pantalaimon and not directly to your
homeserver, and that Pantalaimon is running and reachable.
If the bot is running, check the logs. The first few lines should contain
"Starting bot...", "Syncing..." and "Bot started". If you don't see these
lines, something went wrong during startup. Fortunately, the logs should
contain more information about what went wrong.
If you need help figuring out what went wrong, feel free to open an issue.
@ -253,5 +136,4 @@ please check the logs and open an issue if you can't figure out what's going on.
## License
This project is licensed under the terms of the MIT license. See the [LICENSE](LICENSE)
file for details.
This project is licensed under the terms of the MIT license. See the [LICENSE](LICENSE) file for details.

View file

Before

Width:  |  Height:  |  Size: 186 KiB

After

Width:  |  Height:  |  Size: 186 KiB

View file

@ -1,24 +1,35 @@
from nio import (
RoomMessageText,
MegolmEvent,
InviteEvent,
Event,
SyncResponse,
JoinResponse,
InviteEvent,
OlmEvent,
MegolmEvent,
RoomMemberEvent,
Response,
)
from .test import test_callback
from .sync import sync_callback
from .invite import room_invite_callback
from .join import join_callback
from .message import message_callback
from .roommember import roommember_callback
from .test_response import test_response_callback
RESPONSE_CALLBACKS = {
Response: test_response_callback,
SyncResponse: sync_callback,
JoinResponse: join_callback,
}
EVENT_CALLBACKS = {
Event: test_callback,
InviteEvent: room_invite_callback,
RoomMessageText: message_callback,
MegolmEvent: message_callback,
RoomMemberEvent: roommember_callback,
}
}

View file

@ -2,9 +2,9 @@ from nio import InviteEvent, MatrixRoom
async def room_invite_callback(room: MatrixRoom, event: InviteEvent, bot):
if room.room_id in bot.matrix_client.rooms:
bot.logger.log(f"Already in room {room.room_id} - ignoring invite")
logging(f"Already in room {room.room_id} - ignoring invite")
return
bot.logger.log(f"Received invite to room {room.room_id} - joining...")
await bot.matrix_client.join(room.room_id)
response = await bot.matrix_client.join(room.room_id)

View file

@ -1,20 +1,16 @@
from contextlib import closing
async def join_callback(response, bot):
bot.logger.log(
f"Join response received for room {response.room_id}", "debug")
bot.matrix_client.joined_rooms()
with closing(bot.database.cursor()) as cursor:
with bot.database.cursor() as cursor:
cursor.execute(
"SELECT space_id FROM user_spaces WHERE user_id = ? AND active = TRUE", (response.sender,))
"SELECT space_id FROM user_spaces WHERE user_id = ? AND active = TRUE", (event.sender,))
space = cursor.fetchone()
if space:
bot.logger.log(f"Adding new room to space {space[0]}...")
await bot.add_rooms_to_space(space[0], [response.room_id])
bot.matrix_client.keys_upload()
await bot.add_rooms_to_space(space[0], [new_room.room_id])
await bot.send_message(bot.matrix_client.rooms[response.room_id], "Hello! Thanks for inviting me! How can I help you today?")

51
callbacks/message.py Normal file
View file

@ -0,0 +1,51 @@
from nio import MatrixRoom, RoomMessageText, MegolmEvent, RoomKeyRequestError, RoomKeyRequestResponse
from datetime import datetime
async def message_callback(room: MatrixRoom | str, event: RoomMessageText | MegolmEvent, bot):
bot.logger.log(f"Received message from {event.sender} in room {room.room_id}")
sent = datetime.fromtimestamp(event.server_timestamp / 1000)
received = datetime.now()
latency = received - sent
if isinstance(event, MegolmEvent):
try:
event = await bot.matrix_client.decrypt_event(event)
except Exception as e:
try:
bot.logger.log("Requesting new encryption keys...")
response = await bot.matrix_client.request_room_key(event)
if isinstance(response, RoomKeyRequestError):
bot.logger.log(f"Error requesting encryption keys: {response}", "error")
elif isinstance(response, RoomKeyRequestResponse):
bot.logger.log(f"Encryption keys received: {response}", "debug")
bot.matrix_bot.olm.handle_response(response)
event = await bot.matrix_client.decrypt_event(event)
except:
pass
bot.logger.log(f"Error decrypting message: {e}", "error")
await bot.send_message(room, "Sorry, I couldn't decrypt that message. Please try again later or switch to a room without encryption.", True)
return
if event.sender == bot.matrix_client.user_id:
bot.logger.log("Message is from bot itself - ignoring")
elif event.body.startswith("!gptbot"):
await bot.process_command(room, event)
elif event.body.startswith("!"):
bot.logger.log(f"Received {event.body} - might be a command, but not for this bot - ignoring")
else:
await bot.process_query(room, event)
processed = datetime.now()
processing_time = processed - received
bot.logger.log(f"Message processing took {processing_time.total_seconds()} seconds (latency: {latency.total_seconds()} seconds)")
if bot.room_uses_timing(room):
await bot.send_message(room, f"Message processing took {processing_time.total_seconds()} seconds (latency: {latency.total_seconds()} seconds)", True)

11
callbacks/test.py Normal file
View file

@ -0,0 +1,11 @@
from nio import MatrixRoom, Event
async def test_callback(room: MatrixRoom, event: Event, bot):
"""Test callback for debugging purposes.
Args:
room (MatrixRoom): The room the event was sent in.
event (Event): The event that was sent.
"""
bot.logger.log(f"Test callback called: {room.room_id} {event.event_id} {event.sender} {event.__class__}")

View file

@ -0,0 +1,4 @@
async def test_response_callback(response, bot):
bot.logger.log(
f"{response.__class__} response received", "debug")

1
classes/__init__.py Normal file
View file

@ -0,0 +1 @@
from .store import DuckDBStore

852
classes/bot.py Normal file
View file

@ -0,0 +1,852 @@
import markdown2
import duckdb
import tiktoken
import asyncio
import functools
from PIL import Image
from nio import (
AsyncClient,
AsyncClientConfig,
WhoamiResponse,
DevicesResponse,
Event,
Response,
MatrixRoom,
Api,
RoomMessagesError,
MegolmEvent,
GroupEncryptionError,
EncryptionError,
RoomMessageText,
RoomSendResponse,
SyncResponse,
RoomMessageNotice,
JoinError,
RoomLeaveError,
RoomSendError,
RoomVisibility,
RoomCreateError,
)
from nio.crypto import Olm
from typing import Optional, List
from configparser import ConfigParser
from datetime import datetime
from io import BytesIO
from pathlib import Path
import uuid
import traceback
import json
from .logging import Logger
from migrations import migrate
from callbacks import RESPONSE_CALLBACKS, EVENT_CALLBACKS
from commands import COMMANDS
from .store import DuckDBStore
from .openai import OpenAI
from .wolframalpha import WolframAlpha
from .trackingmore import TrackingMore
class GPTBot:
# Default values
database: Optional[duckdb.DuckDBPyConnection] = None
# Default name of rooms created by the bot
display_name = default_room_name = "GPTBot"
default_system_message: str = "You are a helpful assistant."
# Force default system message to be included even if a custom room message is set
force_system_message: bool = False
max_tokens: int = 3000 # Maximum number of input tokens
max_messages: int = 30 # Maximum number of messages to consider as input
matrix_client: Optional[AsyncClient] = None
sync_token: Optional[str] = None
logger: Optional[Logger] = Logger()
chat_api: Optional[OpenAI] = None
image_api: Optional[OpenAI] = None
classification_api: Optional[OpenAI] = None
parcel_api: Optional[TrackingMore] = None
operator: Optional[str] = None
room_ignore_list: List[str] = [] # List of rooms to ignore invites from
debug: bool = False
logo: Optional[Image.Image] = None
logo_uri: Optional[str] = None
allowed_users: List[str] = []
@classmethod
def from_config(cls, config: ConfigParser):
"""Create a new GPTBot instance from a config file.
Args:
config (ConfigParser): ConfigParser instance with the bot's config.
Returns:
GPTBot: The new GPTBot instance.
"""
# Create a new GPTBot instance
bot = cls()
# Set the database connection
bot.database = duckdb.connect(
config["Database"]["Path"]) if "Database" in config and "Path" in config["Database"] else None
# Override default values
if "GPTBot" in config:
bot.operator = config["GPTBot"].get("Operator", bot.operator)
bot.default_room_name = config["GPTBot"].get(
"DefaultRoomName", bot.default_room_name)
bot.default_system_message = config["GPTBot"].get(
"SystemMessage", bot.default_system_message)
bot.force_system_message = config["GPTBot"].getboolean(
"ForceSystemMessage", bot.force_system_message)
bot.debug = config["GPTBot"].getboolean("Debug", bot.debug)
logo_path = config["GPTBot"].get("Logo", str(
Path(__file__).parent.parent / "assets/logo.png"))
bot.logger.log(f"Loading logo from {logo_path}")
if Path(logo_path).exists() and Path(logo_path).is_file():
bot.logo = Image.open(logo_path)
bot.display_name = config["GPTBot"].get(
"DisplayName", bot.display_name)
if "AllowedUsers" in config["GPTBot"]:
bot.allowed_users = json.loads(config["GPTBot"]["AllowedUsers"])
bot.chat_api = bot.image_api = bot.classification_api = OpenAI(
config["OpenAI"]["APIKey"], config["OpenAI"].get("Model"), bot.logger)
bot.max_tokens = config["OpenAI"].getint("MaxTokens", bot.max_tokens)
bot.max_messages = config["OpenAI"].getint(
"MaxMessages", bot.max_messages)
# Set up WolframAlpha
if "WolframAlpha" in config:
bot.calculation_api = WolframAlpha(
config["WolframAlpha"]["APIKey"], bot.logger)
# Set up TrackingMore
if "TrackingMore" in config:
bot.parcel_api = TrackingMore(
config["TrackingMore"]["APIKey"], bot.logger)
# Set up the Matrix client
assert "Matrix" in config, "Matrix config not found"
homeserver = config["Matrix"]["Homeserver"]
bot.matrix_client = AsyncClient(homeserver)
bot.matrix_client.access_token = config["Matrix"]["AccessToken"]
bot.matrix_client.user_id = config["Matrix"].get("UserID")
bot.matrix_client.device_id = config["Matrix"].get("DeviceID")
# Return the new GPTBot instance
return bot
async def _get_user_id(self) -> str:
"""Get the user ID of the bot from the whoami endpoint.
Requires an access token to be set up.
Returns:
str: The user ID of the bot.
"""
assert self.matrix_client, "Matrix client not set up"
user_id = self.matrix_client.user_id
if not user_id:
assert self.matrix_client.access_token, "Access token not set up"
response = await self.matrix_client.whoami()
if isinstance(response, WhoamiResponse):
user_id = response.user_id
else:
raise Exception(f"Could not get user ID: {response}")
return user_id
async def _last_n_messages(self, room: str | MatrixRoom, n: Optional[int]):
messages = []
n = n or self.max_messages
room_id = room.room_id if isinstance(room, MatrixRoom) else room
self.logger.log(
f"Fetching last {2*n} messages from room {room_id} (starting at {self.sync_token})...")
response = await self.matrix_client.room_messages(
room_id=room_id,
start=self.sync_token,
limit=2*n,
)
if isinstance(response, RoomMessagesError):
raise Exception(
f"Error fetching messages: {response.message} (status code {response.status_code})", "error")
for event in response.chunk:
if len(messages) >= n:
break
if isinstance(event, MegolmEvent):
try:
event = await self.matrix_client.decrypt_event(event)
except (GroupEncryptionError, EncryptionError):
self.logger.log(
f"Could not decrypt message {event.event_id} in room {room_id}", "error")
continue
if isinstance(event, (RoomMessageText, RoomMessageNotice)):
if event.body.startswith("!gptbot ignoreolder"):
break
if (not event.body.startswith("!")) or (event.body.startswith("!gptbot")):
messages.append(event)
self.logger.log(f"Found {len(messages)} messages (limit: {n})")
# Reverse the list so that messages are in chronological order
return messages[::-1]
def _truncate(self, messages: list, max_tokens: Optional[int] = None,
model: Optional[str] = None, system_message: Optional[str] = None):
max_tokens = max_tokens or self.max_tokens
model = model or self.chat_api.chat_model
system_message = self.default_system_message if system_message is None else system_message
encoding = tiktoken.encoding_for_model(model)
total_tokens = 0
system_message_tokens = 0 if not system_message else (
len(encoding.encode(system_message)) + 1)
if system_message_tokens > max_tokens:
self.logger.log(
f"System message is too long to fit within token limit ({system_message_tokens} tokens) - cannot proceed", "error")
return []
total_tokens += system_message_tokens
total_tokens = len(system_message) + 1
truncated_messages = []
for message in [messages[0]] + list(reversed(messages[1:])):
content = message["content"]
tokens = len(encoding.encode(content)) + 1
if total_tokens + tokens > max_tokens:
break
total_tokens += tokens
truncated_messages.append(message)
return [truncated_messages[0]] + list(reversed(truncated_messages[1:]))
async def _get_device_id(self) -> str:
"""Guess the device ID of the bot.
Requires an access token to be set up.
Returns:
str: The guessed device ID.
"""
assert self.matrix_client, "Matrix client not set up"
device_id = self.matrix_client.device_id
if not device_id:
assert self.matrix_client.access_token, "Access token not set up"
devices = await self.matrix_client.devices()
if isinstance(devices, DevicesResponse):
device_id = devices.devices[0].id
return device_id
async def process_command(self, room: MatrixRoom, event: RoomMessageText):
"""Process a command. Called from the event_callback() method.
Delegates to the appropriate command handler.
Args:
room (MatrixRoom): The room the command was sent in.
event (RoomMessageText): The event containing the command.
"""
self.logger.log(
f"Received command {event.body} from {event.sender} in room {room.room_id}")
command = event.body.split()[1] if event.body.split()[1:] else None
await COMMANDS.get(command, COMMANDS[None])(room, event, self)
def room_uses_classification(self, room: MatrixRoom | str) -> bool:
"""Check if a room uses classification.
Args:
room (MatrixRoom | str): The room to check.
Returns:
bool: Whether the room uses classification.
"""
room_id = room.room_id if isinstance(room, MatrixRoom) else room
with self.database.cursor() as cursor:
cursor.execute(
"SELECT value FROM room_settings WHERE room_id = ? AND setting = ?", (room_id, "use_classification"))
result = cursor.fetchone()
return False if not result else bool(int(result[0]))
async def _event_callback(self, room: MatrixRoom, event: Event):
self.logger.log("Received event: " + str(event.event_id), "debug")
try:
for eventtype, callback in EVENT_CALLBACKS.items():
if isinstance(event, eventtype):
await callback(room, event, self)
except Exception as e:
self.logger.log(
f"Error in event callback for {event.__class__}: {e}", "error")
if self.debug:
await self.send_message(room, f"Error: {e}\n\n```\n{traceback.format_exc()}\n```", True)
def user_is_allowed(self, user_id: str) -> bool:
"""Check if a user is allowed to use the bot.
Args:
user_id (str): The user ID to check.
Returns:
bool: Whether the user is allowed to use the bot.
"""
return (
user_id in self.allowed_users or
f"*:{user_id.split(':')[1]}" in self.allowed_users or
f"@*:{user_id.split(':')[1]}" in self.allowed_users
) if self.allowed_users else True
async def event_callback(self, room: MatrixRoom, event: Event):
"""Callback for events.
Args:
room (MatrixRoom): The room the event was sent in.
event (Event): The event.
"""
if event.sender == self.matrix_client.user_id:
return
if not self.user_is_allowed(event.sender):
if len(room.users) == 2:
await self.matrix_client.room_send(
room.room_id,
"m.room.message",
{
"msgtype": "m.notice",
"body": f"You are not allowed to use this bot. Please contact {self.operator} for more information."
}
)
return
task = asyncio.create_task(self._event_callback(room, event))
def room_uses_timing(self, room: MatrixRoom):
"""Check if a room uses timing.
Args:
room (MatrixRoom): The room to check.
Returns:
bool: Whether the room uses timing.
"""
room_id = room.room_id
with self.database.cursor() as cursor:
cursor.execute(
"SELECT value FROM room_settings WHERE room_id = ? AND setting = ?", (room_id, "use_timing"))
result = cursor.fetchone()
return False if not result else bool(int(result[0]))
async def _response_callback(self, response: Response):
for response_type, callback in RESPONSE_CALLBACKS.items():
if isinstance(response, response_type):
await callback(response, self)
async def response_callback(self, response: Response):
task = asyncio.create_task(self._response_callback(response))
async def accept_pending_invites(self):
"""Accept all pending invites."""
assert self.matrix_client, "Matrix client not set up"
invites = self.matrix_client.invited_rooms
for invite in invites.keys():
if invite in self.room_ignore_list:
self.logger.log(
f"Ignoring invite to room {invite} (room is in ignore list)")
continue
self.logger.log(f"Accepting invite to room {invite}")
response = await self.matrix_client.join(invite)
if isinstance(response, JoinError):
self.logger.log(
f"Error joining room {invite}: {response.message}. Not trying again.", "error")
leave_response = await self.matrix_client.room_leave(invite)
if isinstance(leave_response, RoomLeaveError):
self.logger.log(
f"Error leaving room {invite}: {leave_response.message}", "error")
self.room_ignore_list.append(invite)
async def upload_file(self, file: bytes, filename: str = "file", mime: str = "application/octet-stream") -> str:
"""Upload a file to the homeserver.
Args:
file (bytes): The file to upload.
filename (str, optional): The name of the file. Defaults to "file".
mime (str, optional): The MIME type of the file. Defaults to "application/octet-stream".
Returns:
str: The MXC URI of the uploaded file.
"""
bio = BytesIO(file)
bio.seek(0)
response, _ = await self.matrix_client.upload(
bio,
content_type=mime,
filename=filename,
filesize=len(file)
)
return response.content_uri
async def send_image(self, room: MatrixRoom, image: bytes, message: Optional[str] = None):
"""Send an image to a room.
Args:
room (MatrixRoom): The room to send the image to.
image (bytes): The image to send.
message (str, optional): The message to send with the image. Defaults to None.
"""
self.logger.log(
f"Sending image of size {len(image)} bytes to room {room.room_id}")
bio = BytesIO(image)
img = Image.open(bio)
mime = Image.MIME[img.format]
(width, height) = img.size
self.logger.log(
f"Uploading - Image size: {width}x{height} pixels, MIME type: {mime}")
content_uri = await self.upload_file(image, "image", mime)
self.logger.log("Uploaded image - sending message...")
content = {
"body": message or "",
"info": {
"mimetype": mime,
"size": len(image),
"w": width,
"h": height,
},
"msgtype": "m.image",
"url": content_uri
}
status = await self.matrix_client.room_send(
room.room_id,
"m.room.message",
content
)
self.logger.log(str(status), "debug")
self.logger.log("Sent image")
async def send_message(self, room: MatrixRoom | str, message: str, notice: bool = False):
"""Send a message to a room.
Args:
room (MatrixRoom): The room to send the message to.
message (str): The message to send.
notice (bool): Whether to send the message as a notice. Defaults to False.
"""
if isinstance(room, str):
room = self.matrix_client.rooms[room]
markdowner = markdown2.Markdown(extras=["fenced-code-blocks"])
formatted_body = markdowner.convert(message)
msgtype = "m.notice" if notice else "m.text"
msgcontent = {"msgtype": msgtype, "body": message,
"format": "org.matrix.custom.html", "formatted_body": formatted_body}
content = None
if self.matrix_client.olm and room.encrypted:
try:
if not room.members_synced:
responses = []
responses.append(await self.matrix_client.joined_members(room.room_id))
if self.matrix_client.olm.should_share_group_session(room.room_id):
try:
event = self.matrix_client.sharing_session[room.room_id]
await event.wait()
except KeyError:
await self.matrix_client.share_group_session(
room.room_id,
ignore_unverified_devices=True,
)
if msgtype != "m.reaction":
response = self.matrix_client.encrypt(
room.room_id, "m.room.message", msgcontent)
msgtype, content = response
except Exception as e:
self.logger.log(
f"Error encrypting message: {e} - sending unencrypted", "error")
raise
if not content:
msgtype = "m.room.message"
content = msgcontent
method, path, data = Api.room_send(
self.matrix_client.access_token, room.room_id, msgtype, content, uuid.uuid4()
)
response = await self.matrix_client._send(RoomSendResponse, method, path, data, (room.room_id,))
if isinstance(response, RoomSendError):
self.logger.log(
f"Error sending message: {response.message}", "error")
return
def log_api_usage(self, message: Event | str, room: MatrixRoom | str, api: str, tokens: int):
"""Log API usage to the database.
Args:
message (Event): The event that triggered the API usage.
room (MatrixRoom | str): The room the event was sent in.
api (str): The API that was used.
tokens (int): The number of tokens used.
"""
if not self.database:
return
if isinstance(message, Event):
message = message.event_id
if isinstance(room, MatrixRoom):
room = room.room_id
self.database.execute(
"INSERT INTO token_usage (message_id, room_id, tokens, api, timestamp) VALUES (?, ?, ?, ?, ?)",
(message, room, tokens, api, datetime.now())
)
async def run(self):
"""Start the bot."""
# Set up the Matrix client
assert self.matrix_client, "Matrix client not set up"
assert self.matrix_client.access_token, "Access token not set up"
if not self.matrix_client.user_id:
self.matrix_client.user_id = await self._get_user_id()
if not self.matrix_client.device_id:
self.matrix_client.device_id = await self._get_device_id()
# Set up database
IN_MEMORY = False
if not self.database:
self.logger.log(
"No database connection set up, using in-memory database. Data will be lost on bot shutdown.")
IN_MEMORY = True
self.database = duckdb.DuckDBPyConnection(":memory:")
self.logger.log("Running migrations...")
before, after = migrate(self.database)
if before != after:
self.logger.log(f"Migrated from version {before} to {after}.")
else:
self.logger.log(f"Already at latest version {after}.")
if IN_MEMORY:
client_config = AsyncClientConfig(
store_sync_tokens=True, encryption_enabled=False)
else:
matrix_store = DuckDBStore
client_config = AsyncClientConfig(
store_sync_tokens=True, encryption_enabled=True, store=matrix_store)
self.matrix_client.config = client_config
self.matrix_client.store = matrix_store(
self.matrix_client.user_id,
self.matrix_client.device_id,
self.database
)
self.matrix_client.olm = Olm(
self.matrix_client.user_id,
self.matrix_client.device_id,
self.matrix_client.store
)
self.matrix_client.encrypted_rooms = self.matrix_client.store.load_encrypted_rooms()
# Run initial sync (now includes joining rooms)
sync = await self.matrix_client.sync(timeout=30000)
if isinstance(sync, SyncResponse):
await self.response_callback(sync)
else:
self.logger.log(f"Initial sync failed, aborting: {sync}", "error")
return
# Set up callbacks
self.matrix_client.add_event_callback(self.event_callback, Event)
self.matrix_client.add_response_callback(
self.response_callback, Response)
# Set custom name / logo
if self.display_name:
self.logger.log(f"Setting display name to {self.display_name}")
await self.matrix_client.set_displayname(self.display_name)
if self.logo:
self.logger.log("Setting avatar...")
logo_bio = BytesIO()
self.logo.save(logo_bio, format=self.logo.format)
uri = await self.upload_file(logo_bio.getvalue(), "logo", Image.MIME[self.logo.format])
self.logo_uri = uri
asyncio.create_task(self.matrix_client.set_avatar(uri))
for room in self.matrix_client.rooms.keys():
self.logger.log(f"Setting avatar for {room}...", "debug")
asyncio.create_task(self.matrix_client.room_put_state(room, "m.room.avatar", {
"url": uri
}, ""))
# Start syncing events
self.logger.log("Starting sync loop...")
try:
await self.matrix_client.sync_forever(timeout=30000)
finally:
self.logger.log("Syncing one last time...")
await self.matrix_client.sync(timeout=30000)
async def create_space(self, name, visibility=RoomVisibility.private) -> str:
"""Create a space.
Args:
name (str): The name of the space.
visibility (RoomVisibility, optional): The visibility of the space. Defaults to RoomVisibility.private.
Returns:
MatrixRoom: The created space.
"""
response = await self.matrix_client.room_create(
name=name, visibility=visibility, space=True)
if isinstance(response, RoomCreateError):
self.logger.log(
f"Error creating space: {response.message}", "error")
return
return response.room_id
async def add_rooms_to_space(self, space: MatrixRoom | str, rooms: List[MatrixRoom | str]):
"""Add rooms to a space.
Args:
space (MatrixRoom | str): The space to add the rooms to.
rooms (List[MatrixRoom | str]): The rooms to add to the space.
"""
if isinstance(space, MatrixRoom):
space = space.room_id
for room in rooms:
if isinstance(room, MatrixRoom):
room = room.room_id
if space == room:
self.logger.log(
f"Refusing to add {room} to itself", "warning")
continue
self.logger.log(f"Adding {room} to {space}...")
await self.matrix_client.room_put_state(space, "m.space.child", {
"via": [room.split(":")[1], space.split(":")[1]],
}, room)
await self.matrix_client.room_put_state(room, "m.room.parent", {
"via": [space.split(":")[1], room.split(":")[1]],
"canonical": True
}, space)
def respond_to_room_messages(self, room: MatrixRoom | str) -> bool:
"""Check whether the bot should respond to all messages sent in a room.
Args:
room (MatrixRoom | str): The room to check.
Returns:
bool: Whether the bot should respond to all messages sent in the room.
"""
if isinstance(room, MatrixRoom):
room = room.room_id
with self.database.cursor() as cursor:
cursor.execute(
"SELECT value FROM room_settings WHERE room_id = ? AND setting = ?", (room, "always_reply"))
result = cursor.fetchone()
return True if not result else bool(int(result[0]))
async def process_query(self, room: MatrixRoom, event: RoomMessageText, from_chat_command: bool = False):
"""Process a query message. Generates a response and sends it to the room.
Args:
room (MatrixRoom): The room the message was sent in.
event (RoomMessageText): The event that triggered the query.
from_chat_command (bool, optional): Whether the query was sent via the `!gptbot chat` command. Defaults to False.
"""
if not (from_chat_command or self.respond_to_room_messages(room) or self.matrix_client.user_id in event.body):
return
await self.matrix_client.room_typing(room.room_id, True)
await self.matrix_client.room_read_markers(room.room_id, event.event_id)
if (not from_chat_command) and self.room_uses_classification(room):
try:
classification, tokens = await self.classification_api.classify_message(
event.body, room.room_id)
except Exception as e:
self.logger.log(f"Error classifying message: {e}", "error")
await self.send_message(
room, "Something went wrong. Please try again.", True)
return
self.log_api_usage(
event, room, f"{self.classification_api.api_code}-{self.classification_api.classification_api}", tokens)
if not classification["type"] == "chat":
event.body = f"!gptbot {classification['type']} {classification['prompt']}"
await self.process_command(room, event)
return
try:
last_messages = await self._last_n_messages(room.room_id, 20)
except Exception as e:
self.logger.log(f"Error getting last messages: {e}", "error")
await self.send_message(
room, "Something went wrong. Please try again.", True)
return
system_message = self.get_system_message(room)
chat_messages = [{"role": "system", "content": system_message}]
for message in last_messages:
role = "assistant" if message.sender == self.matrix_client.user_id else "user"
if not message.event_id == event.event_id:
chat_messages.append({"role": role, "content": message.body})
chat_messages.append({"role": "user", "content": event.body})
# Truncate messages to fit within the token limit
truncated_messages = self._truncate(
chat_messages, self.max_tokens - 1, system_message=system_message)
try:
response, tokens_used = await self.chat_api.generate_chat_response(
chat_messages, user=room.room_id)
except Exception as e:
self.logger.log(f"Error generating response: {e}", "error")
await self.send_message(
room, "Something went wrong. Please try again.", True)
return
if response:
self.log_api_usage(
event, room, f"{self.chat_api.api_code}-{self.chat_api.chat_api}", tokens_used)
self.logger.log(f"Sending response to room {room.room_id}...")
# Convert markdown to HTML
message = await self.send_message(room, response)
else:
# Send a notice to the room if there was an error
self.logger.log("Didn't get a response from GPT API", "error")
await self.send_message(
room, "Something went wrong. Please try again.", True)
await self.matrix_client.room_typing(room.room_id, False)
def get_system_message(self, room: MatrixRoom | str) -> str:
"""Get the system message for a room.
Args:
room (MatrixRoom | str): The room to get the system message for.
Returns:
str: The system message.
"""
default = self.default_system_message
if isinstance(room, str):
room_id = room
else:
room_id = room.room_id
with self.database.cursor() as cur:
cur.execute(
"SELECT value FROM room_settings WHERE room_id = ? AND setting = ?",
(room_id, "system_message")
)
system_message = cur.fetchone()
complete = ((default if ((not system_message) or self.force_system_message) else "") + (
"\n\n" + system_message[0] if system_message else "")).strip()
return complete
def __del__(self):
"""Close the bot."""
if self.matrix_client:
asyncio.run(self.matrix_client.close())
if self.database:
self.database.close()

10
classes/logging.py Normal file
View file

@ -0,0 +1,10 @@
import inspect
from datetime import datetime
class Logger:
def log(self, message: str, log_level: str = "info"):
caller = inspect.currentframe().f_back.f_code.co_name
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S:%f")
print(f"[{timestamp}] - {caller} - [{log_level.upper()}] {message}")

160
classes/openai.py Normal file
View file

@ -0,0 +1,160 @@
import openai
import requests
import asyncio
import json
from functools import partial
from .logging import Logger
from typing import Dict, List, Tuple, Generator, AsyncGenerator, Optional, Any
class OpenAI:
api_key: str
chat_model: str = "gpt-3.5-turbo"
logger: Logger
api_code: str = "openai"
@property
def chat_api(self) -> str:
return self.chat_model
classification_api = chat_api
image_api: str = "dalle"
operator: str = "OpenAI ([https://openai.com](https://openai.com))"
def __init__(self, api_key, chat_model=None, logger=None):
self.api_key = api_key
self.chat_model = chat_model or self.chat_model
self.logger = logger or Logger()
async def _request_with_retries(self, request: partial, attempts: int = 5, retry_interval: int = 2) -> AsyncGenerator[Any | list | Dict, None]:
"""Retry a request a set number of times if it fails.
Args:
request (partial): The request to make with retries.
attempts (int, optional): The number of attempts to make. Defaults to 5.
retry_interval (int, optional): The interval in seconds between attempts. Defaults to 2 seconds.
Returns:
AsyncGenerator[Any | list | Dict, None]: The OpenAI response for the request.
"""
# call the request function and return the response if it succeeds, else retry
current_attempt = 1
while current_attempt <= attempts:
try:
response = await request()
return response
except Exception as e:
self.logger.log(f"Request failed: {e}", "error")
self.logger.log(f"Retrying in {retry_interval} seconds...")
await asyncio.sleep(retry_interval)
current_attempt += 1
# if all attempts failed, raise an exception
raise Exception("Request failed after all attempts.")
async def generate_chat_response(self, messages: List[Dict[str, str]], user: Optional[str] = None) -> Tuple[str, int]:
"""Generate a response to a chat message.
Args:
messages (List[Dict[str, str]]): A list of messages to use as context.
Returns:
Tuple[str, int]: The response text and the number of tokens used.
"""
self.logger.log(f"Generating response to {len(messages)} messages using {self.chat_model}...")
chat_partial = partial(
openai.ChatCompletion.acreate,
model=self.chat_model,
messages=messages,
api_key=self.api_key,
user=user
)
response = await self._request_with_retries(chat_partial)
result_text = response.choices[0].message['content']
tokens_used = response.usage["total_tokens"]
self.logger.log(f"Generated response with {tokens_used} tokens.")
return result_text, tokens_used
async def classify_message(self, query: str, user: Optional[str] = None) -> Tuple[Dict[str, str], int]:
system_message = """You are a classifier for different types of messages. You decide whether an incoming message is meant to be a prompt for an AI chat model, or meant for a different API. You respond with a JSON object like this:
{ "type": event_type, "prompt": prompt }
- If the message you received is meant for the AI chat model, the event_type is "chat", and the prompt is the literal content of the message you received. This is also the default if none of the other options apply.
- If it is a prompt for a calculation that can be answered better by WolframAlpha than an AI chat bot, the event_type is "calculate". Optimize the message you received for input to WolframAlpha, and return it as the prompt attribute.
- If it is a prompt for an AI image generation, the event_type is "imagine". Optimize the message you received for use with DALL-E, and return it as the prompt attribute.
- If the user is asking you to create a new room, the event_type is "newroom", and the prompt is the name of the room, if one is given, else an empty string.
- If the user is asking you to throw a coin, the event_type is "coin". The prompt is an empty string.
- If the user is asking you to roll a dice, the event_type is "dice". The prompt is an string containing an optional number of sides, if one is given, else an empty string.
- If for any reason you are unable to classify the message (for example, if it infringes on your terms of service), the event_type is "error", and the prompt is a message explaining why you are unable to process the message.
Only the event_types mentioned above are allowed, you must not respond in any other way."""
messages = [
{
"role": "system",
"content": system_message
},
{
"role": "user",
"content": query
}
]
self.logger.log(f"Classifying message '{query}'...")
chat_partial = partial(
openai.ChatCompletion.acreate,
model=self.chat_model,
messages=messages,
api_key=self.api_key,
user=user
)
response = await self._request_with_retries(chat_partial)
try:
result = json.loads(response.choices[0].message['content'])
except:
result = {"type": "chat", "prompt": query}
tokens_used = response.usage["total_tokens"]
self.logger.log(f"Classified message as {result['type']} with {tokens_used} tokens.")
return result, tokens_used
async def generate_image(self, prompt: str, user: Optional[str] = None) -> Generator[bytes, None, None]:
"""Generate an image from a prompt.
Args:
prompt (str): The prompt to use.
Yields:
bytes: The image data.
"""
self.logger.log(f"Generating image from prompt '{prompt}'...")
image_partial = partial(
openai.Image.acreate,
prompt=prompt,
n=1,
api_key=self.api_key,
size="1024x1024",
user=user
)
response = await self._request_with_retries(image_partial)
images = []
for image in response.data:
image = requests.get(image.url).content
images.append(image)
return images, len(images)

637
classes/store.py Normal file
View file

@ -0,0 +1,637 @@
import duckdb
from nio.store.database import MatrixStore, DeviceTrustState, OlmDevice, TrustState, InboundGroupSession, SessionStore, OlmSessions, GroupSessionStore, OutgoingKeyRequest, DeviceStore, Session
from nio.crypto import OlmAccount, OlmDevice
from random import SystemRandom
from collections import defaultdict
from typing import Dict, List, Optional, Tuple
from .dict import AttrDict
import json
class DuckDBStore(MatrixStore):
@property
def account_id(self):
id = self._get_account()[0] if self._get_account() else None
if id is None:
id = SystemRandom().randint(0, 2**16)
return id
def __init__(self, user_id, device_id, duckdb_conn):
self.conn = duckdb_conn
self.user_id = user_id
self.device_id = device_id
def _get_account(self):
cursor = self.conn.cursor()
cursor.execute(
"SELECT * FROM accounts WHERE user_id = ? AND device_id = ?",
(self.user_id, self.device_id),
)
account = cursor.fetchone()
cursor.close()
return account
def _get_device(self, device):
acc = self._get_account()
if not acc:
return None
cursor = self.conn.cursor()
cursor.execute(
"SELECT * FROM device_keys WHERE user_id = ? AND device_id = ? AND account_id = ?",
(device.user_id, device.id, acc[0]),
)
device_entry = cursor.fetchone()
cursor.close()
return device_entry
# Implementing methods with DuckDB equivalents
def verify_device(self, device):
if self.is_device_verified(device):
return False
d = self._get_device(device)
assert d
cursor = self.conn.cursor()
cursor.execute(
"INSERT OR REPLACE INTO device_trust_state (device_id, state) VALUES (?, ?)",
(d[0], TrustState.verified),
)
self.conn.commit()
cursor.close()
device.trust_state = TrustState.verified
return True
def unverify_device(self, device):
if not self.is_device_verified(device):
return False
d = self._get_device(device)
assert d
cursor = self.conn.cursor()
cursor.execute(
"INSERT OR REPLACE INTO device_trust_state (device_id, state) VALUES (?, ?)",
(d[0], TrustState.unset),
)
self.conn.commit()
cursor.close()
device.trust_state = TrustState.unset
return True
def is_device_verified(self, device):
d = self._get_device(device)
if not d:
return False
cursor = self.conn.cursor()
cursor.execute(
"SELECT state FROM device_trust_state WHERE device_id = ?", (d[0],)
)
trust_state = cursor.fetchone()
cursor.close()
if not trust_state:
return False
return trust_state[0] == TrustState.verified
def blacklist_device(self, device):
if self.is_device_blacklisted(device):
return False
d = self._get_device(device)
assert d
cursor = self.conn.cursor()
cursor.execute(
"INSERT OR REPLACE INTO device_trust_state (device_id, state) VALUES (?, ?)",
(d[0], TrustState.blacklisted),
)
self.conn.commit()
cursor.close()
device.trust_state = TrustState.blacklisted
return True
def unblacklist_device(self, device):
if not self.is_device_blacklisted(device):
return False
d = self._get_device(device)
assert d
cursor = self.conn.cursor()
cursor.execute(
"INSERT OR REPLACE INTO device_trust_state (device_id, state) VALUES (?, ?)",
(d[0], TrustState.unset),
)
self.conn.commit()
cursor.close()
device.trust_state = TrustState.unset
return True
def is_device_blacklisted(self, device):
d = self._get_device(device)
if not d:
return False
cursor = self.conn.cursor()
cursor.execute(
"SELECT state FROM device_trust_state WHERE device_id = ?", (d[0],)
)
trust_state = cursor.fetchone()
cursor.close()
if not trust_state:
return False
return trust_state[0] == TrustState.blacklisted
def ignore_device(self, device):
if self.is_device_ignored(device):
return False
d = self._get_device(device)
assert d
cursor = self.conn.cursor()
cursor.execute(
"INSERT OR REPLACE INTO device_trust_state (device_id, state) VALUES (?, ?)",
(d[0], int(TrustState.ignored.value)),
)
self.conn.commit()
cursor.close()
return True
def ignore_devices(self, devices):
for device in devices:
self.ignore_device(device)
def unignore_device(self, device):
if not self.is_device_ignored(device):
return False
d = self._get_device(device)
assert d
cursor = self.conn.cursor()
cursor.execute(
"INSERT OR REPLACE INTO device_trust_state (device_id, state) VALUES (?, ?)",
(d[0], TrustState.unset),
)
self.conn.commit()
cursor.close()
device.trust_state = TrustState.unset
return True
def is_device_ignored(self, device):
d = self._get_device(device)
if not d:
return False
cursor = self.conn.cursor()
cursor.execute(
"SELECT state FROM device_trust_state WHERE device_id = ?", (d[0],)
)
trust_state = cursor.fetchone()
cursor.close()
if not trust_state:
return False
return trust_state[0] == TrustState.ignored
def load_device_keys(self):
"""Load all the device keys from the database.
Returns DeviceStore containing the OlmDevices with the device keys.
"""
store = DeviceStore()
account = self.account_id
if not account:
return store
with self.conn.cursor() as cur:
cur.execute(
"SELECT * FROM device_keys WHERE account_id = ?",
(account,)
)
device_keys = cur.fetchall()
for d in device_keys:
cur.execute(
"SELECT * FROM keys WHERE device_id = ?",
(d[0],)
)
keys = cur.fetchall()
key_dict = {k[0]: k[1] for k in keys}
store.add(
OlmDevice(
d[2],
d[0],
key_dict,
display_name=d[3],
deleted=d[4],
)
)
return store
def save_device_keys(self, device_keys):
"""Save the provided device keys to the database."""
account = self.account_id
assert account
rows = []
for user_id, devices_dict in device_keys.items():
for device_id, device in devices_dict.items():
rows.append(
{
"account_id": account,
"user_id": user_id,
"device_id": device_id,
"display_name": device.display_name,
"deleted": device.deleted,
}
)
if not rows:
return
with self.conn.cursor() as cur:
for idx in range(0, len(rows), 100):
data = rows[idx: idx + 100]
cur.executemany(
"INSERT OR IGNORE INTO device_keys (account_id, user_id, device_id, display_name, deleted) VALUES (?, ?, ?, ?, ?)",
[(r["account_id"], r["user_id"], r["device_id"],
r["display_name"], r["deleted"]) for r in data]
)
for user_id, devices_dict in device_keys.items():
for device_id, device in devices_dict.items():
cur.execute(
"UPDATE device_keys SET deleted = ? WHERE device_id = ?",
(device.deleted, device_id)
)
for key_type, key in device.keys.items():
cur.execute("""
INSERT INTO keys (key_type, key, device_id) VALUES (?, ?, ?)
ON CONFLICT (key_type, device_id) DO UPDATE SET key = ?
""",
(key_type, key, device_id, key)
)
self.conn.commit()
def save_group_sessions(self, sessions):
with self.conn.cursor() as cur:
for session in sessions:
cur.execute("""
INSERT OR REPLACE INTO inbound_group_sessions (
session_id, sender_key, signing_key, room_id, pickle, account_id
) VALUES (?, ?, ?, ?, ?, ?)
""", (
session.id,
session.sender_key,
session.signing_key,
session.room_id,
session.pickle,
self.account_id
))
self.conn.commit()
def save_olm_sessions(self, sessions):
with self.conn.cursor() as cur:
for session in sessions:
cur.execute("""
INSERT OR REPLACE INTO olm_sessions (
session_id, sender_key, pickle, account_id
) VALUES (?, ?, ?, ?)
""", (
session.id,
session.sender_key,
session.pickle,
self.account_id
))
self.conn.commit()
def save_outbound_group_sessions(self, sessions):
with self.conn.cursor() as cur:
for session in sessions:
cur.execute("""
INSERT OR REPLACE INTO outbound_group_sessions (
room_id, session_id, pickle, account_id
) VALUES (?, ?, ?, ?)
""", (
session.room_id,
session.id,
session.pickle,
self.account_id
))
self.conn.commit()
def save_account(self, account: OlmAccount):
with self.conn.cursor() as cur:
cur.execute("""
INSERT OR REPLACE INTO accounts (
id, user_id, device_id, shared_account, pickle
) VALUES (?, ?, ?, ?, ?)
""", (
self.account_id,
self.user_id,
self.device_id,
account.shared,
account.pickle(self.pickle_key),
))
self.conn.commit()
def load_sessions(self):
session_store = SessionStore()
with self.conn.cursor() as cur:
cur.execute("""
SELECT
os.sender_key, os.session, os.creation_time
FROM
olm_sessions os
INNER JOIN
accounts a ON os.account_id = a.id
WHERE
a.id = ?
""", (self.account_id,))
for row in cur.fetchall():
sender_key, session_pickle, creation_time = row
session = Session.from_pickle(
session_pickle, creation_time, self.pickle_key)
session_store.add(sender_key, session)
return session_store
def load_inbound_group_sessions(self):
# type: () -> GroupSessionStore
"""Load all Olm sessions from the database.
Returns:
``GroupSessionStore`` object, containing all the loaded sessions.
"""
store = GroupSessionStore()
account = self.account_id
if not account:
return store
with self.conn.cursor() as cursor:
cursor.execute(
"SELECT * FROM inbound_group_sessions WHERE account_id = ?", (
account,)
)
for row in cursor.fetchall():
cursor.execute(
"SELECT sender_key FROM forwarded_chains WHERE session_id = ?",
(row[1],),
)
chains = cursor.fetchall()
session = InboundGroupSession.from_pickle(
row[2].encode(),
row[3],
row[4],
row[5],
self.pickle_key,
[
chain[0]
for chain in chains
],
)
store.add(session)
return store
def load_outgoing_key_requests(self):
# type: () -> dict
"""Load all outgoing key requests from the database.
Returns:
``OutgoingKeyRequestStore`` object, containing all the loaded key requests.
"""
account = self.account_id
if not account:
return store
with self.conn.cursor() as cur:
cur.execute(
"SELECT * FROM outgoing_key_requests WHERE account_id = ?",
(account,)
)
rows = cur.fetchall()
return {
row[1]: OutgoingKeyRequest.from_response(AttrDict({
"id": row[0],
"account_id": row[1],
"request_id": row[2],
"session_id": row[3],
"room_id": row[4],
"algorithm": row[5],
})) for row in rows
}
def load_encrypted_rooms(self):
"""Load the set of encrypted rooms for this account.
Returns:
``Set`` containing room ids of encrypted rooms.
"""
account = self.account_id
if not account:
return set()
with self.conn.cursor() as cur:
cur.execute(
"SELECT room_id FROM encrypted_rooms WHERE account_id = ?",
(account,)
)
rows = cur.fetchall()
return {row[0] for row in rows}
def save_sync_token(self, token):
"""Save the given token"""
account = self.account_id
assert account
with self.conn.cursor() as cur:
cur.execute(
"INSERT OR REPLACE INTO sync_tokens (account_id, token) VALUES (?, ?)",
(account, token)
)
self.conn.commit()
def save_encrypted_rooms(self, rooms):
"""Save the set of room ids for this account."""
account = self.account_id
assert account
data = [(room_id, account) for room_id in rooms]
with self.conn.cursor() as cur:
for idx in range(0, len(data), 400):
rows = data[idx: idx + 400]
cur.executemany(
"INSERT OR IGNORE INTO encrypted_rooms (room_id, account_id) VALUES (?, ?)",
rows
)
self.conn.commit()
def save_session(self, sender_key, session):
"""Save the provided Olm session to the database.
Args:
sender_key (str): The curve key that owns the Olm session.
session (Session): The Olm session that will be pickled and
saved in the database.
"""
account = self.account_id
assert account
pickled_session = session.pickle(self.pickle_key)
with self.conn.cursor() as cur:
cur.execute(
"INSERT OR REPLACE INTO olm_sessions (account_id, sender_key, session, session_id, creation_time, last_usage_date) VALUES (?, ?, ?, ?, ?, ?)",
(account, sender_key, pickled_session, session.id,
session.creation_time, session.use_time)
)
self.conn.commit()
def save_inbound_group_session(self, session):
"""Save the provided Megolm inbound group session to the database.
Args:
session (InboundGroupSession): The session to save.
"""
account = self.account_id
assert account
with self.conn.cursor() as cur:
# Insert a new session or update the existing one
query = """
INSERT INTO inbound_group_sessions (account_id, sender_key, fp_key, room_id, session)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (account_id, sender_key, fp_key, room_id)
DO UPDATE SET session = excluded.session
"""
cur.execute(query, (account, session.sender_key,
session.ed25519, session.room_id, session.pickle(self.pickle_key)))
# Delete existing forwarded chains for the session
delete_query = """
DELETE FROM forwarded_chains WHERE session_id = (SELECT id FROM inbound_group_sessions WHERE account_id = ? AND sender_key = ? AND fp_key = ? AND room_id = ?)
"""
cur.execute(
delete_query, (account, session.sender_key, session.ed25519, session.room_id))
# Insert new forwarded chains for the session
insert_query = """
INSERT INTO forwarded_chains (session_id, sender_key)
VALUES ((SELECT id FROM inbound_group_sessions WHERE account_id = ? AND sender_key = ? AND fp_key = ? AND room_id = ?), ?)
"""
for chain in session.forwarding_chain:
cur.execute(
insert_query, (account, session.sender_key, session.ed25519, session.room_id, chain))
def add_outgoing_key_request(self, key_request):
"""Add a new outgoing key request to the database.
Args:
key_request (OutgoingKeyRequest): The key request to add.
"""
account_id = self.account_id
with self.conn.cursor() as cursor:
cursor.execute(
"""
SELECT MAX(id) FROM outgoing_key_requests
"""
)
row = cursor.fetchone()
request_id = row[0] + 1 if row[0] else 1
cursor.execute(
"""
INSERT INTO outgoing_key_requests (id, account_id, request_id, session_id, room_id, algorithm)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT (account_id, request_id) DO NOTHING
""",
(
request_id,
account_id,
key_request.request_id,
key_request.session_id,
key_request.room_id,
key_request.algorithm,
)
)
def load_account(self):
# type: () -> Optional[OlmAccount]
"""Load the Olm account from the database.
Returns:
``OlmAccount`` object, or ``None`` if it wasn't found for the
current device_id.
"""
cursor = self.conn.cursor()
query = """
SELECT pickle, shared_account
FROM accounts
WHERE device_id = ?;
"""
cursor.execute(query, (self.device_id,))
result = cursor.fetchone()
if not result:
return None
account_pickle, shared = result
return OlmAccount.from_pickle(account_pickle.encode(), self.pickle_key, shared)

View file

@ -1,8 +1,9 @@
import trackingmore
import requests
from .logging import Logger
from typing import Tuple, Optional
from typing import Dict, List, Tuple, Generator, Optional
class TrackingMore:
api_key: str

View file

@ -3,7 +3,7 @@ import requests
from .logging import Logger
from typing import Generator, Optional
from typing import Dict, List, Tuple, Generator, Optional
class WolframAlpha:
api_key: str

View file

@ -22,10 +22,9 @@ for command in [
"dice",
"parcel",
"space",
"tts",
]:
function = getattr(import_module(
"." + command, "gptbot.commands"), "command_" + command)
"commands." + command), "command_" + command)
COMMANDS[command] = function
COMMANDS[None] = command_unknown

23
commands/botinfo.py Normal file
View file

@ -0,0 +1,23 @@
from nio.events.room_events import RoomMessageText
from nio.rooms import MatrixRoom
async def command_botinfo(room: MatrixRoom, event: RoomMessageText, bot):
logging("Showing bot info...")
body = f"""GPT Info:
Model: {bot.model}
Maximum context tokens: {bot.max_tokens}
Maximum context messages: {bot.max_messages}
Room info:
Bot user ID: {bot.matrix_client.user_id}
Current room ID: {room.room_id}
System message: {bot.get_system_message(room)}
For usage statistics, run !gptbot stats
"""
await bot.send_message(room, body, True)

View file

@ -23,12 +23,14 @@ async def command_calculate(room: MatrixRoom, event: RoomMessageText, bot):
bot.logger.log("Querying calculation API...")
for subpod in bot.calculation_api.generate_calculation_response(prompt, text, results_only, user=room.room_id):
bot.logger.log("Sending subpod...")
bot.logger.log(f"Sending subpod...")
if isinstance(subpod, bytes):
await bot.send_image(room, subpod)
else:
await bot.send_message(room, subpod, True)
bot.log_api_usage(event, room, f"{bot.calculation_api.api_code}-{bot.calculation_api.calculation_api}", tokens_used)
return
await bot.send_message(room, "You need to provide a prompt.", True)

View file

@ -9,7 +9,7 @@ async def command_dice(room: MatrixRoom, event: RoomMessageText, bot):
try:
sides = int(event.body.split()[2])
except (ValueError, IndexError):
except ValueError:
sides = 6
if sides < 2:

25
commands/help.py Normal file
View file

@ -0,0 +1,25 @@
from nio.events.room_events import RoomMessageText
from nio.rooms import MatrixRoom
async def command_help(room: MatrixRoom, event: RoomMessageText, bot):
body = """Available commands:
- !gptbot help - Show this message
- !gptbot botinfo - Show information about the bot
- !gptbot privacy - Show privacy information
- !gptbot newroom \<room name\> - Create a new room and invite yourself to it
- !gptbot stats - Show usage statistics for this room
- !gptbot systemmessage \<message\> - Get or set the system message for this room
- !gptbot space [enable|disable|update|invite] - Enable, disable, force update, or invite yourself to your space
- !gptbot coin - Flip a coin (heads or tails)
- !gptbot dice [number] - Roll a dice with the specified number of sides (default: 6)
- !gptbot imagine \<prompt\> - Generate an image from a prompt
- !gptbot calculate [--text] [--details] \<query\> - Calculate a result to a calculation, optionally forcing text output instead of an image, and optionally showing additional details like the input interpretation
- !gptbot chat \<message\> - Send a message to the chat API
- !gptbot classify \<message\> - Classify a message using the classification API
- !gptbot custom \<message\> - Used for custom commands handled by the chat model and defined through the room's system message
- !gptbot ignoreolder - Ignore messages before this point as context
"""
await bot.send_message(room, body, True)

View file

@ -16,10 +16,10 @@ async def command_imagine(room: MatrixRoom, event: RoomMessageText, bot):
return
for image in images:
bot.logger.log("Sending image...")
bot.logger.log(f"Sending image...")
await bot.send_image(room, image)
bot.log_api_usage(event, room, f"{bot.image_api.api_code}-{bot.image_api.image_model}", tokens_used)
bot.log_api_usage(event, room, f"{bot.image_api.api_code}-{bot.image_api.image_api}", tokens_used)
return

View file

@ -2,7 +2,6 @@ from nio.events.room_events import RoomMessageText
from nio import RoomCreateError, RoomInviteError
from nio.rooms import MatrixRoom
from contextlib import closing
async def command_newroom(room: MatrixRoom, event: RoomMessageText, bot):
room_name = " ".join(event.body.split()[
@ -13,7 +12,7 @@ async def command_newroom(room: MatrixRoom, event: RoomMessageText, bot):
if isinstance(new_room, RoomCreateError):
bot.logger.log(f"Failed to create room: {new_room.message}")
await bot.send_message(room, "Sorry, I was unable to create a new room. Please try again later, or create a room manually.", True)
await bot.send_message(room, f"Sorry, I was unable to create a new room. Please try again later, or create a room manually.", True)
return
bot.logger.log(f"Inviting {event.sender} to new room...")
@ -21,10 +20,10 @@ async def command_newroom(room: MatrixRoom, event: RoomMessageText, bot):
if isinstance(invite, RoomInviteError):
bot.logger.log(f"Failed to invite user: {invite.message}")
await bot.send_message(room, "Sorry, I was unable to invite you to the new room. Please try again later, or create a room manually.", True)
await bot.send_message(room, f"Sorry, I was unable to invite you to the new room. Please try again later, or create a room manually.", True)
return
with closing(bot.database.cursor()) as cursor:
with bot.database.cursor() as cursor:
cursor.execute(
"SELECT space_id FROM user_spaces WHERE user_id = ? AND active = TRUE", (event.sender,))
space = cursor.fetchone()
@ -43,4 +42,4 @@ async def command_newroom(room: MatrixRoom, event: RoomMessageText, bot):
await bot.matrix_client.joined_rooms()
await bot.send_message(room, f"Alright, I've created a new room called '{room_name}' and invited you to it. You can find it at {new_room.room_id}", True)
await bot.send_message(bot.matrix_client.rooms[new_room.room_id], "Welcome to the new room! What can I do for you?")
await bot.send_message(bot.rooms[new_room.room_id], f"Welcome to the new room! What can I do for you?")

View file

@ -11,7 +11,7 @@ async def command_privacy(room: MatrixRoom, event: RoomMessageText, bot):
body += "- For chat requests: " + f"{bot.chat_api.operator}" + "\n"
if bot.image_api:
body += "- For image generation requests (!gptbot imagine): " + f"{bot.image_api.operator}" + "\n"
if bot.calculation_api:
body += "- For calculation requests (!gptbot calculate): " + f"{bot.calculation_api.operator}" + "\n"
if bot.calculate_api:
body += "- For calculation requests (!gptbot calculate): " + f"{bot.calculate_api.operator}" + "\n"
await bot.send_message(room, body, True)

View file

@ -1,8 +1,6 @@
from nio.events.room_events import RoomMessageText
from nio.rooms import MatrixRoom
from contextlib import closing
async def command_roomsettings(room: MatrixRoom, event: RoomMessageText, bot):
setting = event.body.split()[2] if len(event.body.split()) > 2 else None
@ -18,15 +16,13 @@ async def command_roomsettings(room: MatrixRoom, event: RoomMessageText, bot):
if value:
bot.logger.log("Adding system message...")
with closing(bot.database.cursor()) as cur:
with bot.database.cursor() as cur:
cur.execute(
"""INSERT INTO room_settings (room_id, setting, value) VALUES (?, ?, ?)
ON CONFLICT (room_id, setting) DO UPDATE SET value = ?;""",
(room.room_id, "system_message", value, value)
)
bot.database.commit()
await bot.send_message(room, f"Alright, I've stored the system message: '{value}'.", True)
return
@ -37,22 +33,20 @@ async def command_roomsettings(room: MatrixRoom, event: RoomMessageText, bot):
await bot.send_message(room, f"The current system message is: '{system_message}'.", True)
return
if setting in ("use_classification", "always_reply", "use_timing", "tts", "stt"):
if setting in ("use_classification", "always_reply", "use_timing"):
if value:
if value.lower() in ["true", "false"]:
value = value.lower() == "true"
bot.logger.log(f"Setting {setting} status for {room.room_id} to {value}...")
with closing(bot.database.cursor()) as cur:
with bot.database.cursor() as cur:
cur.execute(
"""INSERT INTO room_settings (room_id, setting, value) VALUES (?, ?, ?)
ON CONFLICT (room_id, setting) DO UPDATE SET value = ?;""",
(room.room_id, setting, "1" if value else "0", "1" if value else "0")
)
bot.database.commit()
await bot.send_message(room, f"Alright, I've set {setting} to: '{value}'.", True)
return
@ -61,7 +55,7 @@ async def command_roomsettings(room: MatrixRoom, event: RoomMessageText, bot):
bot.logger.log(f"Retrieving {setting} status for {room.room_id}...")
with closing(bot.database.cursor()) as cur:
with bot.database.cursor() as cur:
cur.execute(
"""SELECT value FROM room_settings WHERE room_id = ? AND setting = ?;""",
(room.room_id, setting)
@ -80,51 +74,11 @@ async def command_roomsettings(room: MatrixRoom, event: RoomMessageText, bot):
await bot.send_message(room, f"The current {setting} status is: '{value}'.", True)
return
if bot.allow_model_override and setting == "model":
if value:
bot.logger.log(f"Setting chat model for {room.room_id} to {value}...")
with closing(bot.database.cursor()) as cur:
cur.execute(
"""INSERT INTO room_settings (room_id, setting, value) VALUES (?, ?, ?)
ON CONFLICT (room_id, setting) DO UPDATE SET value = ?;""",
(room.room_id, "model", value, value)
)
bot.database.commit()
await bot.send_message(room, f"Alright, I've set the chat model to: '{value}'.", True)
return
bot.logger.log(f"Retrieving chat model for {room.room_id}...")
with closing(bot.database.cursor()) as cur:
cur.execute(
"""SELECT value FROM room_settings WHERE room_id = ? AND setting = ?;""",
(room.room_id, "model")
)
value = cur.fetchone()[0]
if not value:
value = bot.chat_api.chat_model
else:
value = str(value)
await bot.send_message(room, f"The current chat model is: '{value}'.", True)
return
message = """The following settings are available:
message = f"""The following settings are available:
- system_message [message]: Get or set the system message to be sent to the chat model
- classification [true/false]: Get or set whether the room uses classification
- always_reply [true/false]: Get or set whether the bot should reply to all messages (if false, only reply to mentions and commands)
- tts [true/false]: Get or set whether the bot should generate audio files instead of sending text
- stt [true/false]: Get or set whether the bot should attempt to process information from audio files
- timing [true/false]: Get or set whether the bot should return information about the time it took to generate a response
"""
if bot.allow_model_override:
message += "- model [model]: Get or set the chat model to be used for this room"
await bot.send_message(room, message, True)

View file

@ -2,8 +2,6 @@ from nio.events.room_events import RoomMessageText
from nio.rooms import MatrixRoom
from nio.responses import RoomInviteError
from contextlib import closing
async def command_space(room: MatrixRoom, event: RoomMessageText, bot):
if len(event.body.split()) == 3:
@ -12,7 +10,7 @@ async def command_space(room: MatrixRoom, event: RoomMessageText, bot):
if request.lower() == "enable":
bot.logger.log("Enabling space...")
with closing(bot.database.cursor()) as cursor:
with bot.database.cursor() as cursor:
cursor.execute(
"SELECT space_id FROM user_spaces WHERE user_id = ? AND active = TRUE", (event.sender,))
space = cursor.fetchone()
@ -27,7 +25,7 @@ async def command_space(room: MatrixRoom, event: RoomMessageText, bot):
"url": bot.logo_uri
}, "")
with closing(bot.database.cursor()) as cursor:
with bot.database.cursor() as cursor:
cursor.execute(
"INSERT INTO user_spaces (space_id, user_id) VALUES (?, ?)", (space, event.sender))
@ -50,7 +48,7 @@ async def command_space(room: MatrixRoom, event: RoomMessageText, bot):
elif request.lower() == "disable":
bot.logger.log("Disabling space...")
with closing(bot.database.cursor()) as cursor:
with bot.database.cursor() as cursor:
cursor.execute(
"SELECT space_id FROM user_spaces WHERE user_id = ? AND active = TRUE", (event.sender,))
space = cursor.fetchone()[0]
@ -60,7 +58,7 @@ async def command_space(room: MatrixRoom, event: RoomMessageText, bot):
await bot.send_message(room, "You don't have a space enabled.", True)
return
with closing(bot.database.cursor()) as cursor:
with bot.database.cursor() as cursor:
cursor.execute(
"UPDATE user_spaces SET active = FALSE WHERE user_id = ?", (event.sender,))
@ -71,7 +69,7 @@ async def command_space(room: MatrixRoom, event: RoomMessageText, bot):
if request.lower() == "update":
bot.logger.log("Updating space...")
with closing(bot.database.cursor()) as cursor:
with bot.database.cursor() as cursor:
cursor.execute(
"SELECT space_id FROM user_spaces WHERE user_id = ? AND active = TRUE", (event.sender,))
space = cursor.fetchone()[0]
@ -105,7 +103,7 @@ async def command_space(room: MatrixRoom, event: RoomMessageText, bot):
if request.lower() == "invite":
bot.logger.log("Inviting user to space...")
with closing(bot.database.cursor()) as cursor:
with bot.database.cursor() as cursor:
cursor.execute(
"SELECT space_id FROM user_spaces WHERE user_id = ?", (event.sender,))
space = cursor.fetchone()[0]
@ -120,7 +118,7 @@ async def command_space(room: MatrixRoom, event: RoomMessageText, bot):
if isinstance(response, RoomInviteError):
bot.logger.log(
f"Failed to invite user {event.sender} to space {space}", "error")
f"Failed to invite user {user} to space {space}", "error")
await bot.send_message(
room, "Sorry, I couldn't invite you to the space. Please try again later.", True)
return
@ -128,7 +126,7 @@ async def command_space(room: MatrixRoom, event: RoomMessageText, bot):
await bot.send_message(room, "Invited you to the space.", True)
return
with closing(bot.database.cursor()) as cursor:
with bot.database.cursor() as cursor:
cursor.execute(
"SELECT active FROM user_spaces WHERE user_id = ?", (event.sender,))
status = cursor.fetchone()

18
commands/stats.py Normal file
View file

@ -0,0 +1,18 @@
from nio.events.room_events import RoomMessageText
from nio.rooms import MatrixRoom
async def command_stats(room: MatrixRoom, event: RoomMessageText, bot):
bot.logger.log("Showing stats...")
if not bot.database:
bot.logger.log("No database connection - cannot show stats")
bot.send_message(room, "Sorry, I'm not connected to a database, so I don't have any statistics on your usage.", True)
return
with bot.database.cursor() as cursor:
cursor.execute(
"SELECT SUM(tokens) FROM token_usage WHERE room_id = ?", (room.room_id,))
total_tokens = cursor.fetchone()[0] or 0
bot.send_message(room, f"Total tokens used: {total_tokens}", True)

View file

@ -1,8 +1,6 @@
from nio.events.room_events import RoomMessageText
from nio.rooms import MatrixRoom
from contextlib import closing
async def command_systemmessage(room: MatrixRoom, event: RoomMessageText, bot):
system_message = " ".join(event.body.split()[2:])
@ -10,7 +8,7 @@ async def command_systemmessage(room: MatrixRoom, event: RoomMessageText, bot):
if system_message:
bot.logger.log("Adding system message...")
with closing(bot.database.cursor()) as cur:
with bot.database.cursor() as cur:
cur.execute(
"""
INSERT INTO room_settings (room_id, setting, value) VALUES (?, ?, ?)

View file

@ -3,22 +3,76 @@
# The values that are not commented have to be set, everything else comes with
# sensible defaults.
###############################################################################
[OpenAI]
# The Chat Completion model you want to use.
#
# Unless you are in the GPT-4 beta (if you don't know - you aren't),
# leave this as the default value (gpt-3.5-turbo)
#
# Model = gpt-3.5-turbo
# Your OpenAI API key
#
# Find this in your OpenAI account:
# https://platform.openai.com/account/api-keys
#
APIKey = sk-yoursecretkey
# The maximum amount of input sent to the API
#
# In conjunction with MaxMessage, this determines how much context (= previous
# messages) you can send with your query.
#
# If you set this too high, the responses you receive will become shorter the
# longer the conversation gets.
#
# https://help.openai.com/en/articles/4936856-what-are-tokens-and-how-to-count-them
#
# MaxTokens = 3000
# The maximum number of messages in the room that will be considered as context
#
# By default, the last (up to) 20 messages will be sent as context, in addition
# to the system message and the current query itself.
#
# MaxMessages = 20
[WolframAlpha]
# An API key for Wolfram|Alpha
# Request one at https://developer.wolframalpha.com
#
# Leave unset to disable Wolfram|Alpha integration (`!gptbot calculate`)
#
#APIKey = YOUR-APIKEY
[Matrix]
# The URL to your Matrix homeserver
#
Homeserver = https://matrix.local
# An Access Token for the user your bot runs as
# Can be obtained using a request like this:
#
# See https://www.matrix.org/docs/guides/client-server-api#login
# for information on how to obtain this value
#
AccessToken = syt_yoursynapsetoken
# The Matrix user ID of the bot (@local:domain.tld)
# Only specify this if the bot fails to figure it out by itself
#
# UserID = @gptbot:matrix.local
[GPTBot]
# Some way for the user to contact you.
# Ideally, either your personal user ID or a support room
# If this is your user ID and Debug is 1, any errors that occur when using the script will be reported to you in detail
#
Operator = Contact details not set
# Enable debug mode
# Will send error tracebacks to you (= Operator above) if an error occurs processing a message from you
# Defaults to 0 (= off)
#
# Debug = 1
# The default room name used by the !newroom command
# Defaults to GPTBot if not set
#
@ -45,201 +99,22 @@ Operator = Contact details not set
# DisplayName = GPTBot
# A list of allowed users
# If not defined, everyone is allowed to use the bot (so you should really define this)
# If not defined, everyone is allowed to use the bot
# Use the "*:homeserver.matrix" syntax to allow everyone on a given homeserver
# Alternatively, you can also specify a room ID to allow everyone in the room to use the bot within that room
#
# AllowedUsers = ["*:matrix.local", "!roomid:matrix.local"]
# Minimum level of log messages that should be printed
# Available log levels in ascending order: trace, debug, info, warning, error, critical
# Defaults to info
#
LogLevel = info
###############################################################################
[OpenAI]
# The Chat Completion model you want to use.
#
# Model = gpt-4o
# The Image Generation model you want to use.
#
# ImageModel = dall-e-3
# Your OpenAI API key
#
# Find this in your OpenAI account:
# https://platform.openai.com/account/api-keys
#
# This may not be required for self-hosted models in that case, just leave it
# as it is.
#
APIKey = sk-yoursecretkey
# The maximum amount of input sent to the API
#
# In conjunction with MaxMessage, this determines how much context (= previous
# messages) you can send with your query.
#
# If you set this too high, the responses you receive will become shorter the
# longer the conversation gets.
#
# https://help.openai.com/en/articles/4936856-what-are-tokens-and-how-to-count-them
#
# MaxTokens = 3000
# The maximum number of messages in the room that will be considered as context
#
# By default, the last (up to) 20 messages will be sent as context, in addition
# to the system message and the current query itself.
#
# MaxMessages = 20
# The base URL of the OpenAI API
#
# Setting this allows you to use a self-hosted AI model for chat completions
# using something like llama-cpp-python or ollama
#
# BaseURL = https://api.openai.com/v1/
# Whether to force the use of tools in the chat completion model
#
# This will make the bot allow the use of tools in the chat completion model,
# even if the model you are using isn't known to support tools. This is useful
# if you are using a self-hosted model that supports tools, but the bot doesn't
# know about it.
#
# ForceTools = 1
# Whether a dedicated model should be used for tools
#
# This will make the bot use a dedicated model for tools. This is useful if you
# want to use a model that doesn't support tools, but still want to be able to
# use tools.
#
# ToolModel = gpt-4o
# Whether to emulate tools in the chat completion model
#
# This will make the bot use the default model to *emulate* tools. This is
# useful if you want to use a model that doesn't support tools, but still want
# to be able to use tools. However, this may cause all kinds of weird results.
#
# EmulateTools = 0
# Force vision in the chat completion model
#
# By default, the bot only supports image recognition in known vision models.
# If you set this to 1, the bot will assume that the model you're using supports
# vision, and will send images to the model as well. This may be required for
# some self-hosted models.
#
# ForceVision = 0
# Maximum width and height of images sent to the API if vision is enabled
#
# The OpenAI API has a limit of 2000 pixels for the long side of an image, and
# 768 pixels for the short side. You may have to adjust these values if you're
# using a self-hosted model that has different limits. You can also set these
# to 0 to disable image resizing.
#
# MaxImageLongSide = 2000
# MaxImageShortSide = 768
# Whether the used model supports video files as input
#
# If you are using a model that supports video files as input, set this to 1.
# This will make the bot send video files to the model as well as images.
# This may be possible with some self-hosted models, but is not supported by
# the OpenAI API at this time.
#
# ForceVideoInput = 0
# Advanced settings for the OpenAI API
#
# These settings are not required for normal operation, but can be used to
# tweak the behavior of the bot.
#
# Note: These settings are not validated by the bot, so make sure they are
# correct before setting them, or the bot may not work as expected.
#
# For more information, see the OpenAI documentation:
# https://platform.openai.com/docs/api-reference/chat/create
#
# Temperature = 1
# TopP = 1
# FrequencyPenalty = 0
# PresencePenalty = 0
###############################################################################
[WolframAlpha]
# An API key for Wolfram|Alpha
# Request one at https://developer.wolframalpha.com
#
# Leave unset to disable Wolfram|Alpha integration (`!gptbot calculate`)
#
#APIKey = YOUR-APIKEY
###############################################################################
[Matrix]
# The URL to your Matrix homeserver
#
# If you are using Pantalaimon, this should be the URL of your Pantalaimon
# instance, not the Matrix homeserver itself.
#
Homeserver = https://matrix.local
# An Access Token for the user your bot runs as
#
# See https://www.matrix.org/docs/guides/client-server-api#login
# for information on how to obtain this value
#
AccessToken = syt_yoursynapsetoken
# Instead of an Access Token, you can also use a User ID and password
# to log in. Upon first run, the bot will automatically turn this into
# an Access Token and store it in the config file, and remove the
# password from the config file.
#
# This is particularly useful if you are using Pantalaimon, where this
# is the only (easy) way to generate an Access Token.
#
# UserID = @gptbot:matrix.local
# Password = yourpassword
###############################################################################
# AllowedUsers = ["*:matrix.local"]
[Database]
# Path of the main database
# Used to "remember" settings, etc.
#
Path = database.db
# Settings for the DuckDB database.
# If not defined, the bot will not be able to remember anything, and will not support encryption
# N.B.: Encryption doesn't work as it is supposed to anyway.
###############################################################################
Path = database.db
[TrackingMore]
# API key for TrackingMore
# If not defined, the bot will not be able to provide parcel tracking
#
# APIKey = abcde-fghij-klmnop
###############################################################################
[OpenWeatherMap]
# API key for OpenWeatherMap
# If not defined, the bot will be unable to provide weather information
#
# APIKey = __________________________
###############################################################################
# APIKey = abcde-fghij-klmnop

View file

@ -1,7 +0,0 @@
[Homeserver]
Homeserver = https://example.com
ListenAddress = localhost
ListenPort = 8009
IgnoreVerification = True
LogLevel = debug
UseKeyring = no

View file

@ -1,15 +0,0 @@
version: '3.8'
services:
gptbot:
image: kumitterer/matrix-gptbot
volumes:
- ./config.ini:/app/config.ini
- ./database.db:/app/database.db
pantalaimon:
image: matrixdotorg/pantalaimon
volumes:
- ./pantalaimon.conf:/etc/pantalaimon/pantalaimon.conf
ports:
- "8009:8009"

View file

@ -1,22 +0,0 @@
from nio import AsyncClient
from configparser import ConfigParser
async def main():
config = ConfigParser()
config.read("config.ini")
user_id = input("User ID: ")
password = input("Password: ")
client = AsyncClient(config["Matrix"]["Homeserver"])
client.user = user_id
await client.login(password)
print("Access token: " + client.access_token)
await client.close()
if __name__ == "__main__":
import asyncio
asyncio.get_event_loop().run_until_complete(main())

View file

@ -1,15 +0,0 @@
[Unit]
Description=Pantalaimon for GPTbot
Requires=network.target
[Service]
Type=simple
User=gptbot
Group=gptbot
WorkingDirectory=/opt/gptbot
ExecStart=/opt/gptbot/venv/bin/python3 -um pantalaimon.main -c pantalaimon.conf
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target

37
gptbot.py Normal file
View file

@ -0,0 +1,37 @@
from classes.bot import GPTBot
from argparse import ArgumentParser
from configparser import ConfigParser
import signal
import asyncio
def sigterm_handler(_signo, _stack_frame):
exit()
if __name__ == "__main__":
# Parse command line arguments
parser = ArgumentParser()
parser.add_argument(
"--config", help="Path to config file (default: config.ini in working directory)", default="config.ini")
args = parser.parse_args()
# Read config file
config = ConfigParser()
config.read(args.config)
# Create bot
bot = GPTBot.from_config(config)
# Listen for SIGTERM
signal.signal(signal.SIGTERM, sigterm_handler)
# Start bot
try:
asyncio.run(bot.run())
except KeyboardInterrupt:
print("Received KeyboardInterrupt - exiting...")
except SystemExit:
print("Received SIGTERM - exiting...")

View file

@ -1,5 +1,5 @@
[Unit]
Description=GPTbot - A multifunctional Chatbot for Matrix
Description=GPTbot - A GPT bot for Matrix
Requires=network.target
[Service]

View file

@ -1,7 +1,8 @@
from collections import OrderedDict
from typing import Optional
from importlib import import_module
from sqlite3 import Connection as SQLiteConnection
from duckdb import DuckDBPyConnection
MAX_MIGRATION = 8
@ -10,11 +11,11 @@ MIGRATIONS = OrderedDict()
for i in range(1, MAX_MIGRATION + 1):
MIGRATIONS[i] = import_module(f".migration_{i}", __package__).migration
def get_version(db: SQLiteConnection) -> int:
def get_version(db: DuckDBPyConnection) -> int:
"""Get the current database version.
Args:
db (SQLiteConnection): Database connection.
db (DuckDBPyConnection): Database connection.
Returns:
int: Current database version.
@ -22,14 +23,14 @@ def get_version(db: SQLiteConnection) -> int:
try:
return int(db.execute("SELECT MAX(id) FROM migrations").fetchone()[0])
except Exception:
except:
return 0
def migrate(db: SQLiteConnection, from_version: Optional[int] = None, to_version: Optional[int] = None) -> None:
def migrate(db: DuckDBPyConnection, from_version: Optional[int] = None, to_version: Optional[int] = None) -> None:
"""Migrate the database to a specific version.
Args:
db (SQLiteConnection): Database connection.
db (DuckDBPyConnection): Database connection.
from_version (Optional[int]): Version to migrate from. If None, the current version is used.
to_version (Optional[int]): Version to migrate to. If None, the latest version is used.
"""

View file

@ -1,10 +1,9 @@
# Initial migration, token usage logging
from datetime import datetime
from contextlib import closing
def migration(conn):
with closing(conn.cursor()) as cursor:
with conn.cursor() as cursor:
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS token_usage (

138
migrations/migration_2.py Normal file
View file

@ -0,0 +1,138 @@
# Migration for Matrix Store
from datetime import datetime
def migration(conn):
with conn.cursor() as cursor:
# Create accounts table
cursor.execute("""
CREATE TABLE IF NOT EXISTS accounts (
id INTEGER PRIMARY KEY,
user_id VARCHAR NOT NULL,
device_id VARCHAR NOT NULL,
shared_account INTEGER NOT NULL,
pickle VARCHAR NOT NULL
);
""")
# Create device_keys table
cursor.execute("""
CREATE TABLE IF NOT EXISTS device_keys (
device_id TEXT PRIMARY KEY,
account_id INTEGER NOT NULL,
user_id TEXT NOT NULL,
display_name TEXT,
deleted BOOLEAN NOT NULL DEFAULT 0,
UNIQUE (account_id, user_id, device_id),
FOREIGN KEY (account_id) REFERENCES accounts(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS keys (
key_type TEXT NOT NULL,
key TEXT NOT NULL,
device_id VARCHAR NOT NULL,
UNIQUE (key_type, device_id),
FOREIGN KEY (device_id) REFERENCES device_keys(device_id) ON DELETE CASCADE
);
""")
# Create device_trust_state table
cursor.execute("""
CREATE TABLE IF NOT EXISTS device_trust_state (
device_id VARCHAR PRIMARY KEY,
state INTEGER NOT NULL,
FOREIGN KEY(device_id) REFERENCES device_keys(device_id) ON DELETE CASCADE
);
""")
# Create olm_sessions table
cursor.execute("""
CREATE SEQUENCE IF NOT EXISTS olm_sessions_id_seq START 1;
CREATE TABLE IF NOT EXISTS olm_sessions (
id INTEGER PRIMARY KEY DEFAULT nextval('olm_sessions_id_seq'),
account_id INTEGER NOT NULL,
sender_key TEXT NOT NULL,
session BLOB NOT NULL,
session_id VARCHAR NOT NULL,
creation_time TIMESTAMP NOT NULL,
last_usage_date TIMESTAMP NOT NULL,
FOREIGN KEY (account_id) REFERENCES accounts (id) ON DELETE CASCADE
);
""")
# Create inbound_group_sessions table
cursor.execute("""
CREATE SEQUENCE IF NOT EXISTS inbound_group_sessions_id_seq START 1;
CREATE TABLE IF NOT EXISTS inbound_group_sessions (
id INTEGER PRIMARY KEY DEFAULT nextval('inbound_group_sessions_id_seq'),
account_id INTEGER NOT NULL,
session TEXT NOT NULL,
fp_key TEXT NOT NULL,
sender_key TEXT NOT NULL,
room_id TEXT NOT NULL,
UNIQUE (account_id, sender_key, fp_key, room_id),
FOREIGN KEY (account_id) REFERENCES accounts(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS forwarded_chains (
id INTEGER PRIMARY KEY,
session_id INTEGER NOT NULL,
sender_key TEXT NOT NULL,
FOREIGN KEY (session_id) REFERENCES inbound_group_sessions(id) ON DELETE CASCADE
);
""")
# Create outbound_group_sessions table
cursor.execute("""
CREATE TABLE IF NOT EXISTS outbound_group_sessions (
id INTEGER PRIMARY KEY,
account_id INTEGER NOT NULL,
room_id VARCHAR NOT NULL,
session_id VARCHAR NOT NULL UNIQUE,
session BLOB NOT NULL,
FOREIGN KEY(account_id) REFERENCES accounts(id) ON DELETE CASCADE
);
""")
# Create outgoing_key_requests table
cursor.execute("""
CREATE TABLE IF NOT EXISTS outgoing_key_requests (
id INTEGER PRIMARY KEY,
account_id INTEGER NOT NULL,
request_id TEXT NOT NULL,
session_id TEXT NOT NULL,
room_id TEXT NOT NULL,
algorithm TEXT NOT NULL,
FOREIGN KEY (account_id) REFERENCES accounts(id) ON DELETE CASCADE,
UNIQUE (account_id, request_id)
);
""")
# Create encrypted_rooms table
cursor.execute("""
CREATE TABLE IF NOT EXISTS encrypted_rooms (
room_id TEXT NOT NULL,
account_id INTEGER NOT NULL,
PRIMARY KEY (room_id, account_id),
FOREIGN KEY (account_id) REFERENCES accounts(id) ON DELETE CASCADE
);
""")
# Create sync_tokens table
cursor.execute("""
CREATE TABLE IF NOT EXISTS sync_tokens (
account_id INTEGER PRIMARY KEY,
token TEXT NOT NULL,
FOREIGN KEY (account_id) REFERENCES accounts(id) ON DELETE CASCADE
);
""")
cursor.execute(
"INSERT INTO migrations (id, timestamp) VALUES (2, ?)",
(datetime.now(),)
)
conn.commit()

View file

@ -1,10 +1,9 @@
# Migration for custom system messages
from datetime import datetime
from contextlib import closing
def migration(conn):
with closing(conn.cursor()) as cursor:
with conn.cursor() as cursor:
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS system_messages (
@ -12,7 +11,7 @@ def migration(conn):
message_id TEXT NOT NULL,
user_id TEXT NOT NULL,
body TEXT NOT NULL,
timestamp BIGINT NOT NULL
timestamp BIGINT NOT NULL,
)
"""
)

View file

@ -1,10 +1,9 @@
# Migration to add API column to token usage table
from datetime import datetime
from contextlib import closing
def migration(conn):
with closing(conn.cursor()) as cursor:
with conn.cursor() as cursor:
cursor.execute(
"""
ALTER TABLE token_usage ADD COLUMN api TEXT DEFAULT 'openai'

View file

@ -1,10 +1,9 @@
# Migration to add room settings table
from datetime import datetime
from contextlib import closing
def migration(conn):
with closing(conn.cursor()) as cursor:
with conn.cursor() as cursor:
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS room_settings (

View file

@ -1,10 +1,9 @@
# Migration to drop primary key constraint from token_usage table
from datetime import datetime
from contextlib import closing
def migration(conn):
with closing(conn.cursor()) as cursor:
with conn.cursor() as cursor:
cursor.execute(
"""
CREATE TABLE token_usage_temp (

View file

@ -1,10 +1,9 @@
# Migration to add user_spaces table
from datetime import datetime
from contextlib import closing
def migration(conn):
with closing(conn.cursor()) as cursor:
with conn.cursor() as cursor:
cursor.execute(
"""
CREATE TABLE user_spaces (

View file

@ -1,10 +1,9 @@
# Migration to add settings table
from datetime import datetime
from contextlib import closing
def migration(conn):
with closing(conn.cursor()) as cursor:
with conn.cursor() as cursor:
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS settings (

View file

@ -1,65 +0,0 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.metadata]
allow-direct-references = true
[project]
name = "matrix-gptbot"
version = "0.3.20"
authors = [
{ name = "Kumi", email = "gptbot@kumi.email" },
{ name = "Private.coffee Team", email = "support@private.coffee" },
]
description = "Multifunctional Chatbot for Matrix"
readme = "README.md"
license = { file = "LICENSE" }
requires-python = ">=3.10"
packages = ["src/gptbot"]
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
dependencies = [
"matrix-nio[e2e]>=0.24.0",
"markdown2[all]",
"tiktoken",
"python-magic",
"pillow",
"future>=1.0.0",
]
[project.optional-dependencies]
openai = ["openai>=1.2", "pydub"]
google = ["google-generativeai"]
wolframalpha = ["wolframalpha"]
trackingmore = ["trackingmore-api-tool"]
all = [
"matrix-gptbot[openai,wolframalpha,trackingmore,google]",
"geopy",
"beautifulsoup4",
]
dev = ["matrix-gptbot[all]", "black", "hatchling", "twine", "build", "ruff"]
[project.urls]
"Homepage" = "https://git.private.coffee/privatecoffee/matrix-gptbot"
"Bug Tracker" = "https://git.private.coffee/privatecoffee/matrix-gptbot/issues"
"Source Code" = "https://git.private.coffee/privatecoffee/matrix-gptbot"
[project.scripts]
gptbot = "gptbot.__main__:main_sync"
[tool.hatch.build.targets.wheel]
packages = ["src/gptbot"]

9
requirements.txt Normal file
View file

@ -0,0 +1,9 @@
openai
matrix-nio[e2e]
markdown2[all]
tiktoken
duckdb
python-magic
pillow
wolframalpha
git+https://kumig.it/kumitterer/trackingmore-api-tool.git

View file

@ -1,70 +0,0 @@
from .classes.bot import GPTBot
from argparse import ArgumentParser
from configparser import ConfigParser
import signal
import asyncio
import importlib.metadata
def sigterm_handler(_signo, _stack_frame):
exit()
def get_version():
try:
package_version = importlib.metadata.version("matrix_gptbot")
except Exception:
return None
return package_version
async def main():
# Parse command line arguments
parser = ArgumentParser()
parser.add_argument(
"--config",
"-c",
help="Path to config file (default: config.ini in working directory)",
default="config.ini",
)
parser.add_argument(
"--version",
"-v",
help="Print version and exit",
action="version",
version=f"GPTBot {get_version() or '- version unknown'}",
)
args = parser.parse_args()
# Read config file
config = ConfigParser()
config.read(args.config)
# Create bot
bot, new_config = await GPTBot.from_config(config)
# Update config with new values
if new_config:
with open(args.config, "w") as configfile:
new_config.write(configfile)
# Listen for SIGTERM
signal.signal(signal.SIGTERM, sigterm_handler)
# Start bot
try:
await bot.run()
except KeyboardInterrupt:
print("Received KeyboardInterrupt - exiting...")
except SystemExit:
print("Received SIGTERM - exiting...")
def main_sync():
asyncio.run(main())
if __name__ == "__main__":
main_sync()

View file

@ -1,30 +0,0 @@
from nio import MatrixRoom, RoomMessageText
from datetime import datetime
async def message_callback(room: MatrixRoom | str, event: RoomMessageText, bot):
bot.logger.log(f"Received message from {event.sender} in room {room.room_id}")
sent = datetime.fromtimestamp(event.server_timestamp / 1000)
received = datetime.now()
latency = received - sent
if event.sender == bot.matrix_client.user_id:
bot.logger.log("Message is from bot itself - ignoring")
elif event.body.startswith("!gptbot") or event.body.startswith("* !gptbot"):
await bot.process_command(room, event)
elif event.body.startswith("!"):
bot.logger.log(f"Received {event.body} - might be a command, but not for this bot - ignoring")
else:
await bot.process_query(room, event)
processed = datetime.now()
processing_time = processed - received
bot.logger.log(f"Message processing took {processing_time.total_seconds()} seconds (latency: {latency.total_seconds()} seconds)")
if bot.room_uses_timing(room):
await bot.send_message(room, f"Message processing took {processing_time.total_seconds()} seconds (latency: {latency.total_seconds()} seconds)", True)

View file

@ -1,76 +0,0 @@
from ...classes.logging import Logger
import asyncio
from functools import partial
from typing import Any, AsyncGenerator, Dict, Optional, Mapping
from nio import Event
class AttributeDictionary(dict):
def __init__(self, *args, **kwargs):
super(AttributeDictionary, self).__init__(*args, **kwargs)
self.__dict__ = self
class BaseAI:
bot: Any
logger: Logger
def __init__(self, bot, config: Mapping, logger: Optional[Logger] = None):
self.bot = bot
self.logger = logger or bot.logger or Logger()
self._config = config
@property
def chat_api(self) -> str:
return self.chat_model
async def prepare_messages(
self, event: Event, messages: list[Any], system_message: Optional[str] = None
) -> list[Any]:
"""A helper method to prepare messages for the AI.
This converts a list of Matrix messages into whatever format the AI requires.
Args:
event (Event): The event that triggered the message generation. Generally a text message from a user.
messages (list[Dict[str, str]]): The messages to prepare. Generally of type RoomMessage*.
system_message (Optional[str], optional): A system message to include. Defaults to None.
Returns:
list[Any]: The prepared messages in the format the AI requires.
Raises:
NotImplementedError: If the method is not implemented in the subclass.
"""
raise NotImplementedError(
"Implementations of BaseAI must implement prepare_messages."
)
async def _request_with_retries(
self, request: partial, attempts: int = 5, retry_interval: int = 2
) -> AsyncGenerator[Any | list | Dict, None]:
"""Retry a request a set number of times if it fails.
Args:
request (partial): The request to make with retries.
attempts (int, optional): The number of attempts to make. Defaults to 5.
retry_interval (int, optional): The interval in seconds between attempts. Defaults to 2 seconds.
Returns:
AsyncGenerator[Any | list | Dict, None]: The response for the request.
"""
current_attempt = 1
while current_attempt <= attempts:
try:
response = await request()
return response
except Exception as e:
self.logger.log(f"Request failed: {e}", "error")
self.logger.log(f"Retrying in {retry_interval} seconds...")
await asyncio.sleep(retry_interval)
current_attempt += 1
raise Exception("Request failed after all attempts.")

View file

@ -1,73 +0,0 @@
from .base import BaseAI
from ..logging import Logger
from typing import Optional, Mapping, List, Dict, Tuple
import google.generativeai as genai
class GeminiAI(BaseAI):
api_code: str = "google"
@property
def chat_api(self) -> str:
return self.chat_model
google_api: genai.GenerativeModel
operator: str = "Google (https://ai.google)"
def __init__(
self,
bot,
config: Mapping,
logger: Optional[Logger] = None,
):
super().__init__(bot, config, logger)
genai.configure(api_key=self.api_key)
self.gemini_api = genai.GenerativeModel(self.chat_model)
@property
def api_key(self):
return self._config["APIKey"]
@property
def chat_model(self):
return self._config.get("Model", fallback="gemini-pro")
def prepare_messages(event, messages: List[Dict[str, str]], ) -> List[str]:
return [message["content"] for message in messages]
async def generate_chat_response(
self,
messages: List[Dict[str, str]],
user: Optional[str] = None,
room: Optional[str] = None,
use_tools: bool = True,
model: Optional[str] = None,
) -> Tuple[str, int]:
"""Generate a response to a chat message.
Args:
messages (List[Dict[str, str]]): A list of messages to use as context.
user (Optional[str], optional): The user to use the assistant for. Defaults to None.
room (Optional[str], optional): The room to use the assistant for. Defaults to None.
use_tools (bool, optional): Whether to use tools. Defaults to True.
model (Optional[str], optional): The model to use. Defaults to None, which uses the default chat model.
Returns:
Tuple[str, int]: The response text and the number of tokens used.
"""
self.logger.log(
f"Generating response to {len(messages)} messages for user {user} in room {room}..."
)
messages = self.prepare_messages(messages)
return self.gemini_api.generate_content(
messages=messages,
user=user,
room=room,
use_tools=use_tools,
model=model,
)

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -1,2 +0,0 @@
class DownloadException(Exception):
pass

View file

@ -1,26 +0,0 @@
import inspect
from datetime import datetime
class Logger:
LOG_LEVELS = ["trace", "debug", "info", "warning", "error", "critical"]
def __init__(self, log_level: str = "warning"):
if log_level not in self.LOG_LEVELS:
raise ValueError(
f"Invalid log level {log_level}. Valid levels are {', '.join(self.LOG_LEVELS)}")
self.log_level = log_level
def log(self, message: str, log_level: str = "info"):
if log_level not in self.LOG_LEVELS:
raise ValueError(
f"Invalid log level {log_level}. Valid levels are {', '.join(self.LOG_LEVELS)}")
if self.LOG_LEVELS.index(log_level) < self.LOG_LEVELS.index(self.log_level):
return
caller = inspect.currentframe().f_back.f_code.co_name
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S:%f")
print(f"[{timestamp}] - {caller} - [{log_level.upper()}] {message}")

View file

@ -1,18 +0,0 @@
from nio.events.room_events import RoomMessageText
from nio.rooms import MatrixRoom
async def command_botinfo(room: MatrixRoom, event: RoomMessageText, bot):
bot.logger.log("Showing bot info...")
body = f"""GPT Room info:
Model: {await bot.get_room_model(room)}\n
Maximum context tokens: {bot.chat_api.max_tokens}\n
Maximum context messages: {bot.chat_api.max_messages}\n
Bot user ID: {bot.matrix_client.user_id}\n
Current room ID: {room.room_id}\n
System message: {bot.get_system_message(room)}
"""
await bot.send_message(room, body, True)

View file

@ -1,25 +0,0 @@
from nio.events.room_events import RoomMessageText
from nio.rooms import MatrixRoom
async def command_help(room: MatrixRoom, event: RoomMessageText, bot):
body = """Available commands:
- !gptbot help - Show this message
- !gptbot botinfo - Show information about the bot
- !gptbot privacy - Show privacy information
- !gptbot newroom <room name> - Create a new room and invite yourself to it
- !gptbot systemmessage <message> - Get or set the system message for this room
- !gptbot space [enable|disable|update|invite] - Enable, disable, force update, or invite yourself to your space
- !gptbot coin - Flip a coin (heads or tails)
- !gptbot dice [number] - Roll a dice with the specified number of sides (default: 6)
- !gptbot imagine <prompt> - Generate an image from a prompt
- !gptbot calculate [--text] [--details] <query> - Calculate a result to a calculation, optionally forcing text output instead of an image, and optionally showing additional details like the input interpretation
- !gptbot chat <message> - Send a message to the chat API
- !gptbot classify <message> - Classify a message using the classification API
- !gptbot custom <message> - Used for custom commands handled by the chat model and defined through the room's system message
- !gptbot roomsettings [use_classification|use_timing|always_reply|system_message|tts] [true|false|<message>] - Get or set room settings
- !gptbot ignoreolder - Ignore messages before this point as context
"""
await bot.send_message(room, body, True)

View file

@ -1,34 +0,0 @@
from nio.events.room_events import RoomMessageText
from nio.rooms import MatrixRoom
from contextlib import closing
async def command_stats(room: MatrixRoom, event: RoomMessageText, bot):
await bot.send_message(
room,
"The `stats` command is no longer supported. Sorry for the inconvenience.",
True,
)
return
# Yes, the code below is unreachable, but it's kept here for reference.
bot.logger.log("Showing stats...")
if not bot.database:
bot.logger.log("No database connection - cannot show stats")
await bot.send_message(
room,
"Sorry, I'm not connected to a database, so I don't have any statistics on your usage.",
True,
)
return
with closing(bot.database.cursor()) as cursor:
cursor.execute(
"SELECT SUM(tokens) FROM token_usage WHERE room_id = ?", (room.room_id,)
)
total_tokens = cursor.fetchone()[0] or 0
await bot.send_message(room, f"Total tokens used: {total_tokens}", True)

View file

@ -1,23 +0,0 @@
from nio.events.room_events import RoomMessageText
from nio.rooms import MatrixRoom
async def command_tts(room: MatrixRoom, event: RoomMessageText, bot):
prompt = " ".join(event.body.split()[2:])
if prompt:
bot.logger.log("Generating speech...")
try:
content = await bot.tts_api.text_to_speech(prompt, user=room.room_id)
except Exception as e:
bot.logger.log(f"Error generating speech: {e}", "error")
await bot.send_message(room, "Sorry, I couldn't generate an audio file. Please try again later.", True)
return
bot.logger.log("Sending audio file...")
await bot.send_file(room, content, "audio.mp3", "audio/mpeg", "m.audio")
return
await bot.send_message(room, "You need to provide a prompt.", True)

View file

@ -1,5 +0,0 @@
# Migration for Matrix Store - No longer used
def migration(conn):
pass

View file

@ -1,21 +0,0 @@
from importlib import import_module
from .base import BaseTool, StopProcessing, Handover # noqa: F401
TOOLS = {}
for tool in [
"weather",
"geocode",
"dice",
"websearch",
"webrequest",
"imagine",
"imagedescription",
"wikipedia",
"datetime",
"newroom",
]:
tool_class = getattr(import_module(
"." + tool, "gptbot.tools"), tool.capitalize())
TOOLS[tool] = tool_class

View file

@ -1,21 +0,0 @@
class BaseTool:
DESCRIPTION: str
PARAMETERS: dict
def __init__(self, **kwargs):
self.kwargs = kwargs
self.bot = kwargs.get("bot")
self.room = kwargs.get("room")
self.user = kwargs.get("user")
self.messages = kwargs.get("messages", [])
async def run(self):
raise NotImplementedError()
class StopProcessing(Exception):
"""Stop processing the message."""
pass
class Handover(Exception):
"""Handover to the original model, if applicable. Stop using tools."""
pass

View file

@ -1,16 +0,0 @@
from .base import BaseTool
from datetime import datetime
class Datetime(BaseTool):
DESCRIPTION = "Get the current date and time."
PARAMETERS = {
"type": "object",
"properties": {
},
}
async def run(self):
"""Get the current date and time."""
return f"""**Current date and time (UTC)**
{datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")}"""

View file

@ -1,26 +0,0 @@
from .base import BaseTool
from random import SystemRandom
class Dice(BaseTool):
DESCRIPTION = "Roll dice."
PARAMETERS = {
"type": "object",
"properties": {
"dice": {
"type": "string",
"description": "The number of sides on the dice.",
"default": "6",
},
},
"required": [],
}
async def run(self):
"""Roll dice."""
dice = int(self.kwargs.get("dice", 6))
return f"""**Dice roll**
Used dice: {dice}
Result: {SystemRandom().randint(1, dice)}
"""

View file

@ -1,34 +0,0 @@
from geopy.geocoders import Nominatim
from .base import BaseTool
class Geocode(BaseTool):
DESCRIPTION = "Get location information (latitude, longitude) for a given location name."
PARAMETERS = {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The location name.",
},
},
"required": ["location"],
}
async def run(self):
"""Get location information for a given location."""
if not (location := self.kwargs.get("location")):
raise Exception('No location provided.')
geolocator = Nominatim(user_agent=self.bot.USER_AGENT)
location = geolocator.geocode(location)
if location:
return f"""**Location information for {location.address}**
Latitude: {location.latitude}
Longitude: {location.longitude}
"""
raise Exception('Could not find location data for that location.')

View file

@ -1,15 +0,0 @@
from .base import BaseTool
class Imagedescription(BaseTool):
DESCRIPTION = "Describe the content of the images in the conversation."
PARAMETERS = {
"type": "object",
"properties": {
},
}
async def run(self):
"""Describe images in the conversation."""
image_api = self.bot.image_api
return (await image_api.describe_images(self.messages, self.user))[0]

View file

@ -1,34 +0,0 @@
from .base import BaseTool, StopProcessing
class Imagine(BaseTool):
DESCRIPTION = "Use generative AI to create images from text prompts."
PARAMETERS = {
"type": "object",
"properties": {
"prompt": {
"type": "string",
"description": "The prompt to use.",
},
"orientation": {
"type": "string",
"description": "The orientation of the image.",
"enum": ["square", "landscape", "portrait"],
"default": "square",
},
},
"required": ["prompt"],
}
async def run(self):
"""Use generative AI to create images from text prompts."""
if not (prompt := self.kwargs.get("prompt")):
raise Exception('No prompt provided.')
api = self.bot.image_api
orientation = self.kwargs.get("orientation", "square")
images, tokens = await api.generate_image(prompt, self.room, orientation=orientation)
for image in images:
await self.bot.send_image(self.room, image, prompt)
raise StopProcessing()

View file

@ -1,57 +0,0 @@
from .base import BaseTool, StopProcessing
from nio import RoomCreateError, RoomInviteError
from contextlib import closing
class Newroom(BaseTool):
DESCRIPTION = "Create a new Matrix room"
PARAMETERS = {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The name of the room to create.",
"default": "GPTBot"
}
},
}
async def run(self):
"""Create a new Matrix room"""
name = self.kwargs.get("name", "GPTBot")
self.bot.logger.log("Creating new room...")
new_room = await self.bot.matrix_client.room_create(name=name)
if isinstance(new_room, RoomCreateError):
self.bot.logger.log(f"Failed to create room: {new_room.message}")
raise
self.bot.logger.log(f"Inviting {self.user} to new room...")
invite = await self.bot.matrix_client.room_invite(new_room.room_id, self.user)
if isinstance(invite, RoomInviteError):
self.bot.logger.log(f"Failed to invite user: {invite.message}")
raise
await self.bot.send_message(new_room.room_id, "Welcome to your new room! What can I do for you?")
with closing(self.bot.database.cursor()) as cursor:
cursor.execute(
"SELECT space_id FROM user_spaces WHERE user_id = ? AND active = TRUE", (self.user,))
space = cursor.fetchone()
if space:
self.bot.logger.log(f"Adding new room to space {space[0]}...")
await self.bot.add_rooms_to_space(space[0], [new_room.room_id])
if self.bot.logo_uri:
await self.bot.matrix_client.room_put_state(new_room, "m.room.avatar", {
"url": self.bot.logo_uri
}, "")
await self.bot.matrix_client.room_put_state(
new_room.room_id, "m.room.power_levels", {"users": {self.user: 100, self.bot.matrix_client.user_id: 100}})
raise StopProcessing("Created new Matrix room with ID " + new_room.room_id + " and invited user.")

View file

@ -1,58 +0,0 @@
import aiohttp
from datetime import datetime
from .base import BaseTool
class Weather(BaseTool):
DESCRIPTION = "Get weather information for a given location."
PARAMETERS = {
"type": "object",
"properties": {
"latitude": {
"type": "string",
"description": "The latitude of the location.",
},
"longitude": {
"type": "string",
"description": "The longitude of the location.",
},
"name": {
"type": "string",
"description": "A location name to include in the report. This is optional, latitude and longitude are always required."
}
},
"required": ["latitude", "longitude"],
}
async def run(self):
"""Get weather information for a given location."""
if not (latitude := self.kwargs.get("latitude")) or not (longitude := self.kwargs.get("longitude")):
raise Exception('No location provided.')
name = self.kwargs.get("name")
weather_api_key = self.bot.config.get("OpenWeatherMap", "APIKey")
if not weather_api_key:
raise Exception('Weather API key not found.')
url = f'https://api.openweathermap.org/data/3.0/onecall?lat={latitude}&lon={longitude}&appid={weather_api_key}&units=metric'
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
return f"""**Weather report{f" for {name}" if name else ""}**
Current: {data['current']['temp']}°C, {data['current']['weather'][0]['description']}
Feels like: {data['current']['feels_like']}°C
Humidity: {data['current']['humidity']}%
Wind: {data['current']['wind_speed']}m/s
Sunrise: {datetime.fromtimestamp(data['current']['sunrise']).strftime('%H:%M')}
Sunset: {datetime.fromtimestamp(data['current']['sunset']).strftime('%H:%M')}
Today: {data['daily'][0]['temp']['day']}°C, {data['daily'][0]['weather'][0]['description']}, {data['daily'][0]['summary']}
Tomorrow: {data['daily'][1]['temp']['day']}°C, {data['daily'][1]['weather'][0]['description']}, {data['daily'][1]['summary']}
"""
else:
raise Exception(f'Could not connect to weather API: {response.status} {response.reason}')

View file

@ -1,59 +0,0 @@
from .base import BaseTool
import aiohttp
from bs4 import BeautifulSoup
import re
class Webrequest(BaseTool):
DESCRIPTION = "Browse an external website by URL."
PARAMETERS = {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The URL to request.",
},
},
"required": ["url"],
}
async def html_to_text(self, html):
# Parse the HTML content of the response
soup = BeautifulSoup(html, 'html.parser')
# Format the links within the text
for link in soup.find_all('a'):
link_text = link.get_text()
link_href = link.get('href')
new_link_text = f"{link_text} ({link_href})"
link.replace_with(new_link_text)
# Extract the plain text content of the website
plain_text_content = soup.get_text()
# Remove extra whitespace
plain_text_content = re.sub('\s+', ' ', plain_text_content).strip()
# Return the formatted text content of the website
return plain_text_content
async def run(self):
"""Make a web request to a given URL."""
if not (url := self.kwargs.get("url")):
raise Exception('No URL provided.')
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.text()
output = await self.html_to_text(data)
return f"""**Web request**
URL: {url}
Status: {response.status} {response.reason}
{output}
"""

View file

@ -1,37 +0,0 @@
from .base import BaseTool
import aiohttp
from urllib.parse import quote_plus
class Websearch(BaseTool):
DESCRIPTION = "Search the web for a given query."
PARAMETERS = {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The query to search for.",
},
},
"required": ["query"],
}
async def run(self):
"""Search the web for a given query."""
if not (query := self.kwargs.get("query")):
raise Exception('No query provided.')
query = quote_plus(query)
url = f'https://librey.private.coffee/api.php?q={query}'
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
response_text = "**Search results for {query}**"
for result in data:
response_text += f"\n{result['title']}\n{result['url']}\n{result['description']}\n"
return response_text

View file

@ -1,79 +0,0 @@
from .base import BaseTool
from urllib.parse import urlencode
import aiohttp
class Wikipedia(BaseTool):
DESCRIPTION = "Get information from Wikipedia."
PARAMETERS = {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The query to search for.",
},
"language": {
"type": "string",
"description": "The language to search in.",
"default": "en",
},
"extract": {
"type": "string",
"description": "What information to extract from the page. If not provided, the full page will be returned."
},
"summarize": {
"type": "boolean",
"description": "Whether to summarize the page or not.",
"default": False,
}
},
"required": ["query"],
}
async def run(self):
"""Get information from Wikipedia."""
if not (query := self.kwargs.get("query")):
raise Exception('No query provided.')
language = self.kwargs.get("language", "en")
extract = self.kwargs.get("extract")
summarize = self.kwargs.get("summarize", False)
args = {
"action": "query",
"format": "json",
"titles": query,
}
args["prop"] = "revisions"
args["rvprop"] = "content"
url = f'https://{language}.wikipedia.org/w/api.php?{urlencode(args)}'
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
try:
pages = data['query']['pages']
page = list(pages.values())[0]
content = page['revisions'][0]['*']
except KeyError:
raise Exception(f'No results for {query} found in Wikipedia.')
if extract:
chat_messages = [{"role": "system", "content": f"Extract the following from the provided content: {extract}"}]
chat_messages.append({"role": "user", "content": content})
content, _ = await self.bot.chat_api.generate_chat_response(chat_messages, room=self.room, user=self.user, allow_override=False, use_tools=False)
if summarize:
chat_messages = [{"role": "system", "content": "Summarize the following content:"}]
chat_messages.append({"role": "user", "content": content})
content, _ = await self.bot.chat_api.generate_chat_response(chat_messages, room=self.room, user=self.user, allow_override=False, use_tools=False)
return f"**Wikipedia: {page['title']}**\n{content}"
else:
raise Exception(f'Could not connect to Wikipedia API: {response.status} {response.reason}')