contentmonster/classes/doghandler.py

71 lines
2.3 KiB
Python
Raw Normal View History

2021-11-25 12:42:00 +00:00
from watchdog.events import (FileSystemEventHandler, FileSystemEvent,
FileCreatedEvent, FileDeletedEvent,
FileModifiedEvent, FileMovedEvent)
from multiprocessing import Queue
2021-11-22 10:14:38 +00:00
import os.path
2021-11-22 10:14:38 +00:00
class DogHandler(FileSystemEventHandler):
2021-11-25 12:42:00 +00:00
"""Class implementing a watchdog event handler
"""
def __init__(self, directory, queue: Queue, *args, **kwargs) -> None:
"""Initialize a new DogHandler object
Args:
directory (classes.directory.Directory): Directory to watch
queue (multiprocessing.Queue): Queue to put detected events on
"""
print("Initialized")
2021-11-22 10:14:38 +00:00
super().__init__(*args, **kwargs)
self._directory = directory
2021-11-22 10:14:38 +00:00
self._queue = queue
2021-11-25 12:42:00 +00:00
def dispatch(self, event: FileSystemEvent):
"""Dispatch events to the appropriate event handlers
Args:
event (watchdog.events.FileSystemEvent): Event to handle
"""
if not event.is_directory:
super().dispatch(event)
2021-11-25 12:42:00 +00:00
def on_created(self, event: FileCreatedEvent):
"""Put file creation events on the queue
Args:
event (watchdog.events.FileCreatedEvent): Event describing the
created file
"""
self._queue.put((self._directory, os.path.basename(event.src_path)))
2021-11-22 10:14:38 +00:00
2021-11-25 12:42:00 +00:00
def on_modified(self, event: FileModifiedEvent):
"""Put file modification events on the queue
Args:
event (watchdog.events.FileModifiedEvent): Event describing the
modified file
"""
self._queue.put((self._directory, os.path.basename(event.src_path)))
2021-11-22 10:14:38 +00:00
2021-11-25 12:42:00 +00:00
def on_moved(self, event: FileMovedEvent):
"""Put file move events on the queue
Args:
event (watchdog.events.FileMovedEvent): Event describing the moved
file (source and destination)
"""
self._queue.put((self._directory, os.path.basename(event.src_path)))
self._queue.put((self._directory, os.path.basename(event.dest_path)))
2021-11-22 10:14:38 +00:00
2021-11-25 12:42:00 +00:00
def on_deleted(self, event: FileDeletedEvent):
"""Put file deletion events on the queue
Args:
event (watchdog.events.FileDeletedEvent): Event describing the
deleted file
"""
self._queue.put((self._directory, os.path.basename(event.src_path)))