Source code for mini_arcade_core.runtime.capture.encode_worker

"""
Encode worker thread for processing video encoding jobs.
"""

from __future__ import annotations

import traceback
from dataclasses import dataclass
from pathlib import Path
from queue import Queue
from threading import Event, Thread
from typing import Callable, Optional

from mini_arcade_core.runtime.capture.base_worker import BaseJob, BaseWorker
from mini_arcade_core.runtime.capture.video_encoder import (
    encode_png_sequence_to_mp4,
)
from mini_arcade_core.utils import logger


# pylint: disable=too-many-instance-attributes
[docs] @dataclass(frozen=True) class EncodeJob(BaseJob): """ Job for encoding a sequence of PNG frames into a video file. :ivar job_id (str): Unique identifier for the encoding job. :ivar ffmpeg_path (str): Path to the ffmpeg executable. :ivar frames_dir (Path): Directory containing the PNG frames to encode. :ivar output_path (Path): Destination path for the encoded video file. :ivar input_fps (int): Frames per second of the input PNG sequence. :ivar output_fps (int | None): Frames per second for the output video file. :ivar codec (str): Video codec to use for encoding. :ivar crf (int): Constant Rate Factor for video quality. :ivar preset (str): Preset for video encoding speed/quality tradeoff. :ivar keep_frames (bool): Whether to keep raw frames after encoding. """ ffmpeg_path: str frames_dir: Path output_path: Path input_fps: int output_fps: int | None codec: str crf: int preset: str keep_frames: bool video_interpolate: bool = True
[docs] @dataclass(frozen=True) class EncodeResult: """ Result of an encoding job. :ivar job_id (str): Unique identifier for the encoding job. :ivar ok (bool): Whether the encoding was successful. :ivar output_path (Path | None): Path to the encoded video file if successful. :ivar error (str | None): Error message if the encoding failed. """ job_id: str ok: bool output_path: Path | None = None error: str | None = None
[docs] @dataclass class EncodeWorkerConfig: """ Configuration options for the EncodeWorker. :ivar queue_size (int): Maximum number of jobs to queue. :ivar on_done (Optional[Callable[[EncodeResult], None]]): Optional callback invoked when a job is done. :ivar name (str): Name of the worker thread. :ivar daemon (bool): Whether the thread is a daemon thread. """ queue_size: int = 4 on_done: Optional[Callable[[EncodeResult], None]] = None name: str = "encode-worker" daemon: bool = True
[docs] class EncodeWorker(BaseWorker): """Encode worker thread for processing video encoding jobs asynchronously.""" def __init__(self, cfg: EncodeWorkerConfig | None = None): """ :param cfg: Optional configuration for the EncodeWorker. :type cfg: Optional[EncodeWorkerConfig] """ cfg = cfg or EncodeWorkerConfig() self._q: Queue[EncodeJob] = Queue(maxsize=cfg.queue_size) self._stop = Event() self._on_done = cfg.on_done self._thread = Thread( target=self._run, name=cfg.name, daemon=cfg.daemon )
[docs] def set_on_done( self, on_done: Optional[Callable[[EncodeResult], None]] ) -> None: """ Replace the completion callback invoked after each processed job. """ self._on_done = on_done
def _process_job(self, job: EncodeJob) -> None: try: logger.info(f"[encode] job={job.job_id} start {job.output_path}") result = encode_png_sequence_to_mp4( ffmpeg_path=job.ffmpeg_path, frames_dir=job.frames_dir, output_path=job.output_path, input_fps=job.input_fps, output_fps=job.output_fps, codec=job.codec, crf=job.crf, preset=job.preset, ) if not result.ok: res = EncodeResult( job_id=job.job_id, ok=False, error=result.error ) else: # optionally delete frames after successful encode if not job.keep_frames: for p in job.frames_dir.glob("frame_*.png"): p.unlink(missing_ok=True) res = EncodeResult( job_id=job.job_id, ok=True, output_path=result.output_path, ) except Exception as exc: # pylint: disable=broad-exception-caught logger.error( f"[encode] exception: {exc}\n{traceback.format_exc()}" ) res = EncodeResult(job_id=job.job_id, ok=False, error=str(exc)) if self._on_done: try: self._on_done(res) except Exception: # pylint: disable=broad-exception-caught logger.warning("[encode] on_done callback failed") self._q.task_done()