Kumi
b48d55a36c
Introduced CoffeeBot for handling coffee order notifications. - Configures with settings.ini - Uses matrix-nio for Matrix client interaction - Joins specified rooms, logs in with access token - Notifies a room when a coffee order is detected .gitignore updated to exclude venv/ and settings.ini. Includes a sample settings.dist.ini for configuration.
73 lines
No EOL
2.6 KiB
Python
73 lines
No EOL
2.6 KiB
Python
import asyncio
|
|
import configparser
|
|
from nio import AsyncClient, RoomMessageText, MatrixRoom, LoginError
|
|
|
|
CONFIG_FILE = "settings.ini"
|
|
|
|
class CoffeeBot:
|
|
def __init__(self):
|
|
self.config = configparser.ConfigParser()
|
|
self.config.read(CONFIG_FILE)
|
|
|
|
self.homeserver = self.config["matrix"]["homeserver_url"]
|
|
self.username = self.config["matrix"]["username"]
|
|
self.password = self.config["matrix"].get("password", "")
|
|
self.room_a_id = self.config["matrix"]["room_a_id"]
|
|
self.room_b_id = self.config["matrix"]["room_b_id"]
|
|
self.access_token = self.config["matrix"].get("access_token", "")
|
|
|
|
self.client = AsyncClient(self.homeserver, user=self.username)
|
|
|
|
if self.access_token:
|
|
self.client.access_token = self.access_token
|
|
self.client.user_id = self.username
|
|
|
|
async def login(self):
|
|
if not self.access_token:
|
|
try:
|
|
response = await self.client.login(self.password)
|
|
self.config["matrix"]["access_token"] = response.access_token
|
|
# Unset the password after successful login
|
|
self.config["matrix"]["password"] = ""
|
|
with open(CONFIG_FILE, "w") as configfile:
|
|
self.config.write(configfile)
|
|
print("Logged in and access token saved. Password unset.")
|
|
except LoginError as e:
|
|
print(f"Failed to login: {e}")
|
|
else:
|
|
print("Using stored access token.")
|
|
|
|
async def join_rooms(self):
|
|
await self.client.join(self.room_a_id)
|
|
await self.client.join(self.room_b_id)
|
|
print(f"Joined rooms: {self.room_a_id}, {self.room_b_id}")
|
|
|
|
async def message_callback(self, room: MatrixRoom, event: RoomMessageText):
|
|
if event.sender != self.client.user:
|
|
if room.room_id == self.room_a_id and "!coffee" in event.body.lower():
|
|
user = event.sender
|
|
await self.notify_coffee_order(user)
|
|
|
|
async def notify_coffee_order(self, user):
|
|
message = f"{user} ordered coffee!"
|
|
await self.client.room_send(
|
|
room_id=self.room_b_id,
|
|
message_type="m.room.message",
|
|
content={
|
|
"msgtype": "m.text",
|
|
"body": message,
|
|
}
|
|
)
|
|
print(f"Notified room B: {message}")
|
|
|
|
async def run(self):
|
|
await self.login()
|
|
await self.join_rooms()
|
|
|
|
self.client.add_event_callback(self.message_callback, RoomMessageText)
|
|
|
|
await self.client.sync_forever(timeout=30000)
|
|
|
|
if __name__ == "__main__":
|
|
bot = CoffeeBot()
|
|
asyncio.run(bot.run()) |