matrix-coffeebot/bot.py
Kumi b48d55a36c
feat: add CoffeeBot for automated coffee orders
Introduced CoffeeBot for handling coffee order notifications.
- Configures with settings.ini
- Uses matrix-nio for Matrix client interaction
- Joins specified rooms, logs in with access token
- Notifies a room when a coffee order is detected

.gitignore updated to exclude venv/ and settings.ini.
Includes a sample settings.dist.ini for configuration.
2024-07-07 11:08:25 +02:00

73 lines
No EOL
2.6 KiB
Python

import asyncio
import configparser
from nio import AsyncClient, RoomMessageText, MatrixRoom, LoginError
CONFIG_FILE = "settings.ini"
class CoffeeBot:
def __init__(self):
self.config = configparser.ConfigParser()
self.config.read(CONFIG_FILE)
self.homeserver = self.config["matrix"]["homeserver_url"]
self.username = self.config["matrix"]["username"]
self.password = self.config["matrix"].get("password", "")
self.room_a_id = self.config["matrix"]["room_a_id"]
self.room_b_id = self.config["matrix"]["room_b_id"]
self.access_token = self.config["matrix"].get("access_token", "")
self.client = AsyncClient(self.homeserver, user=self.username)
if self.access_token:
self.client.access_token = self.access_token
self.client.user_id = self.username
async def login(self):
if not self.access_token:
try:
response = await self.client.login(self.password)
self.config["matrix"]["access_token"] = response.access_token
# Unset the password after successful login
self.config["matrix"]["password"] = ""
with open(CONFIG_FILE, "w") as configfile:
self.config.write(configfile)
print("Logged in and access token saved. Password unset.")
except LoginError as e:
print(f"Failed to login: {e}")
else:
print("Using stored access token.")
async def join_rooms(self):
await self.client.join(self.room_a_id)
await self.client.join(self.room_b_id)
print(f"Joined rooms: {self.room_a_id}, {self.room_b_id}")
async def message_callback(self, room: MatrixRoom, event: RoomMessageText):
if event.sender != self.client.user:
if room.room_id == self.room_a_id and "!coffee" in event.body.lower():
user = event.sender
await self.notify_coffee_order(user)
async def notify_coffee_order(self, user):
message = f"{user} ordered coffee!"
await self.client.room_send(
room_id=self.room_b_id,
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": message,
}
)
print(f"Notified room B: {message}")
async def run(self):
await self.login()
await self.join_rooms()
self.client.add_event_callback(self.message_callback, RoomMessageText)
await self.client.sync_forever(timeout=30000)
if __name__ == "__main__":
bot = CoffeeBot()
asyncio.run(bot.run())