Source code for mini_arcade_core.runtime.capture.base_worker

"""
Base worker thread for capture tasks.
"""

from __future__ import annotations

from dataclasses import dataclass
from queue import Empty, Queue
from threading import Event, Thread


[docs] @dataclass(frozen=True) class BaseJob: """ Base job for worker threads. :ivar job_id (str): Unique identifier for the job. """ job_id: str
[docs] class BaseWorker: """Base worker thread for capture tasks.""" _thread: Thread _stop: Event _q: Queue[BaseJob]
[docs] def start(self): """Start the capture worker thread.""" if self._thread.is_alive(): return self._stop.clear() self._thread.start()
[docs] def stop(self): """Stop the capture worker thread.""" self._stop.set() if self._thread.is_alive(): self._thread.join(timeout=2.0)
[docs] def enqueue(self, job: BaseJob) -> bool: """ Enqueue a capture job. :param job: CaptureJob to enqueue. :type job: CaptureJob :return: True if the job was enqueued successfully, False otherwise. :rtype: bool """ if self._stop.is_set(): return False try: self._q.put_nowait(job) return True # Justification: Queue.put_nowait can raise a broad exception # pylint: disable=broad-exception-caught except Exception: return False
# pylint: enable=broad-exception-caught
[docs] def qsize(self) -> int: """Query the current size of the job queue.""" return self._q.qsize()
def _run(self): while not self._stop.is_set(): try: job = self._q.get(timeout=0.1) self._process_job(job) except Empty: continue def _process_job(self, job: BaseJob): """Process a single job. To be implemented by subclasses.""" raise NotImplementedError("Subclasses must implement _process_job()")