Splitting feed processing off into async task

This commit is contained in:
Kumi 2024-03-02 17:13:04 +01:00
parent 8ea8c9208a
commit 3b95dc0d3e
Signed by: kumi
GPG key ID: ECBCC9082395383F
3 changed files with 82 additions and 40 deletions

View file

@ -13,6 +13,11 @@
# #
Operator = Contact details not set Operator = Contact details not set
# Minimum duration of a single loop
# By default, this is set to 300, meaning that rooms get updated approximately every 300 seconds, unless the total loop takes longer
#
# LoopDuration = 300
# Enable debug mode # Enable debug mode
# Will send error tracebacks to you (= Operator above) if an error occurs processing a message from you # Will send error tracebacks to you (= Operator above) if an error occurs processing a message from you
# Defaults to 0 (= off) # Defaults to 0 (= off)

View file

@ -50,6 +50,7 @@ import sys
import traceback import traceback
import markdown2 import markdown2
import feedparser
from .logging import Logger from .logging import Logger
from .callbacks import RESPONSE_CALLBACKS, EVENT_CALLBACKS from .callbacks import RESPONSE_CALLBACKS, EVENT_CALLBACKS
@ -72,6 +73,10 @@ class RSSBot:
if self.sync_response: if self.sync_response:
return self.sync_response.next_batch return self.sync_response.next_batch
@property
def loop_duration(self) -> int:
return self.config["RSSBot"].getint("LoopDuration", 300)
@property @property
def allowed_users(self) -> List[str]: def allowed_users(self) -> List[str]:
"""List of users allowed to use the bot. """List of users allowed to use the bot.
@ -320,7 +325,9 @@ class RSSBot:
self.room_ignore_list.append(invite) self.room_ignore_list.append(invite)
else: else:
await self.send_message(invite, "Thank you for inviting me to your room!") await self.send_message(
invite, "Thank you for inviting me to your room!"
)
async def upload_file( async def upload_file(
self, self,
@ -521,6 +528,67 @@ class RSSBot:
if state_key is None or event["state_key"] == state_key: if state_key is None or event["state_key"] == state_key:
return event return event
async def process_room(self, room):
state = await self.get_state_event(room, "rssbot.feeds")
if not state:
feeds = []
else:
feeds = state["content"]["feeds"]
for feed in feeds:
feed_state = await self.get_state_event(room, "rssbot.feed_state", feed)
if feed_state:
self.logger.log(
f"Identified timestamp as {feed_state['content']['timestamp']}",
"debug"
)
timestamp = int(feed_state["content"]["timestamp"])
else:
timestamp = 0
try:
feed_content = feedparser.parse(feed)
new_timestamp = timestamp
for entry in feed_content.entries:
entry_timestamp = int(
datetime(*entry.published_parsed[:6]).timestamp()
)
if entry_timestamp > timestamp:
entry_message = f"__{feed_content.feed.title}: {entry.title}__\n\n{entry.description}\n\n{entry.link}"
await self.send_message(room, entry_message)
new_timestamp = max(entry_timestamp, new_timestamp)
await self.send_state_event(
room, "rssbot.feed_state", {"timestamp": new_timestamp}, feed
)
except:
await self.send_message(
room,
f"Could not access or parse RSS feed at {feed}. Please ensure that you got the URL right, and that it is actually an RSS feed.",
True,
)
async def process_rooms(self):
while True:
self.logger.log("Starting to process rooms", "debug")
start_timestamp = datetime.now()
for room in self.matrix_client.rooms.values():
try:
await self.process_room(room)
except Exception as e:
self.logger.log(f"Something went wrong processing room {room.room_id}: {e}", "error")
end_timestamp = datetime.now()
self.logger.log("Done processing rooms", "debug")
if (time_taken := (end_timestamp - start_timestamp).seconds) < self.loop_duration:
await asyncio.sleep(self.loop_duration - time_taken)
async def run(self): async def run(self):
"""Start the bot.""" """Start the bot."""
@ -566,9 +634,15 @@ class RSSBot:
# Start syncing events # Start syncing events
self.logger.log("Starting sync loop...", "warning") self.logger.log("Starting sync loop...", "warning")
sync_task = self.matrix_client.sync_forever(timeout=30000, full_state=True)
feed_task = self.process_rooms()
tasks = asyncio.gather(sync_task, feed_task)
try: try:
await self.matrix_client.sync_forever(timeout=30000, full_state=True) await tasks
finally: finally:
tasks.cancel()
self.logger.log("Syncing one last time...", "warning") self.logger.log("Syncing one last time...", "warning")
await self.matrix_client.sync(timeout=30000, full_state=True) await self.matrix_client.sync(timeout=30000, full_state=True)

View file

@ -2,44 +2,7 @@ from nio.events.room_events import RoomMessageText
from nio import RoomPutStateError from nio import RoomPutStateError
from nio.rooms import MatrixRoom from nio.rooms import MatrixRoom
from datetime import datetime
import feedparser
async def command_processfeeds(room: MatrixRoom, event: RoomMessageText, bot): async def command_processfeeds(room: MatrixRoom, event: RoomMessageText, bot):
bot.logger.log(f"Processing feeds for room {room.room_id}") bot.logger.log(f"Processing feeds for room {room.room_id}")
state = await bot.get_state_event(room, "rssbot.feeds") await bot.process_room(room)
if not state:
feeds = []
else:
feeds = state["content"]["feeds"]
for feed in feeds:
feed_state = await bot.get_state_event(room, "rssbot.feed_state", feed)
if feed_state:
bot.logger.log(f"Identified timestamp as {feed_state['content']['timestamp']}")
timestamp = int(feed_state["content"]["timestamp"])
else:
timestamp = 0
try:
feed_content = feedparser.parse(feed)
new_timestamp = timestamp
for entry in feed_content.entries:
entry_timestamp = int(datetime(*entry.published_parsed[:6]).timestamp())
if entry_timestamp > timestamp:
entry_message = f"__{feed_content.feed.title}: {entry.title}__\n\n{entry.description}\n\n{entry.link}"
await bot.send_message(room, entry_message)
new_timestamp = max(entry_timestamp, new_timestamp)
await bot.send_state_event(room, "rssbot.feed_state", {"timestamp": new_timestamp}, feed)
except:
raise
await bot.send_message(
room,
f"Could not access or parse RSS feed at {feed}. Please ensure that you got the URL right, and that it is actually an RSS feed.",
True
)