feat: add lock and unlock user commands

Introduced `!roomba lock <user_id>` and `!roomba unlock <user_id>` commands to manage user access. These commands enable locking and unlocking user accounts via Matrix bot, enhancing moderation capabilities. Updated help message to reflect new commands.
This commit is contained in:
Kumi 2024-08-27 15:38:46 +02:00
parent 3c08c748b4
commit e557e20cbe
Signed by: kumi
GPG key ID: ECBCC9082395383F

View file

@ -90,10 +90,14 @@ class RoombaBot:
room_id = parts[2]
await self.shutdown_room(room_id, purge)
elif event.body.startswith("!roomba lock"):
await self.lock_user(event.body.split()[2])
elif event.body.startswith("!roomba unlock"):
await self.unlock_user(event.body.split()[2])
elif event.body and event.body.split()[0] == "!roomba":
await self.send_message(
self.moderation_room_id,
"Unknown command. Use '!roomba block <room_id>', '!roomba unblock <room_id>', or '!roomba shutdown <room_id> [--purge]'.",
"Unknown command. Use '!roomba block <room_id>', '!roomba unblock <room_id>', '!roomba shutdown <room_id> [--purge]', '!roomba lock <user_id>', or '!roomba unlock <user_id>'.",
)
await self.client.room_read_markers(
@ -207,6 +211,59 @@ class RoombaBot:
f"Failed to shutdown room {room_id}.",
)
async def lock_user(self, user_id):
"""Lock a user.
Args:
user_id (str): The user ID to lock.
"""
url = f"{self.homeserver}/_synapse/admin/v2/users/{user_id}"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
}
payload = {"locked": True}
async with aiohttp.ClientSession() as session:
async with session.put(url, headers=headers, json=payload) as resp:
if resp.status == 200:
self.logger.debug(f"User {user_id} locked successfully")
await self.send_message(
self.moderation_room_id, f"User {user_id} locked successfully."
)
else:
self.logger.error(f"Failed to lock user {user_id}: {resp.status}")
await self.send_message(
self.moderation_room_id, f"Failed to lock user {user_id}."
)
async def unlock_user(self, user_id):
"""Unlock a user.
Args:
user_id (str): The user ID to unlock.
"""
url = f"{self.homeserver}/_synapse/admin/v2/users/{user_id}"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
}
payload = {"locked": False}
async with aiohttp.ClientSession() as session:
async with session.put(url, headers=headers, json=payload) as resp:
if resp.status == 200:
self.logger.debug(f"User {user_id} unlocked successfully")
await self.send_message(
self.moderation_room_id,
f"User {user_id} unlocked successfully.",
)
else:
self.logger.error(f"Failed to unlock user {user_id}: {resp.status}")
await self.send_message(
self.moderation_room_id, f"Failed to unlock user {user_id}."
)
async def send_message(self, room_id, message):
"""Send a message to a room.