Source code for mini_arcade_core.scenes.systems.builtins.maze

"""
Reusable maze and lane-based grid gameplay helpers.
"""

from __future__ import annotations

import random
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Generic, Iterable, Mapping, TypeVar

from mini_arcade_core.scenes.systems.builtins.grid import GridBounds, GridCoord
from mini_arcade_core.scenes.systems.phases import SystemPhase

# pylint: disable=invalid-name
TCtx = TypeVar("TCtx")
TCell = TypeVar("TCell")
# pylint: enable=invalid-name


def _default_enabled_when(_ctx: object) -> bool:
    return True


[docs] class CardinalDirection(str, Enum): """ Four-way grid direction. """ UP = "up" DOWN = "down" LEFT = "left" RIGHT = "right" @property def vector(self) -> tuple[int, int]: """Return the `(dcol, drow)` vector for this direction.""" if self is CardinalDirection.UP: return (0, -1) if self is CardinalDirection.DOWN: return (0, 1) if self is CardinalDirection.LEFT: return (-1, 0) return (1, 0) @property def opposite(self) -> "CardinalDirection": """Return the opposite cardinal direction.""" if self is CardinalDirection.UP: return CardinalDirection.DOWN if self is CardinalDirection.DOWN: return CardinalDirection.UP if self is CardinalDirection.LEFT: return CardinalDirection.RIGHT return CardinalDirection.LEFT
[docs] def step_in_direction( coord: GridCoord, direction: CardinalDirection, ) -> GridCoord: """ Return the adjacent cell in the given direction. """ dcol, drow = direction.vector return coord.translated(dcol=dcol, drow=drow)
[docs] @dataclass class TileMap(Generic[TCell]): """ Dense grid of maze/tile values. """ bounds: GridBounds default: TCell | None = None _cells: list[list[TCell | None]] = field( default_factory=list, init=False, repr=False, )
[docs] def __post_init__(self) -> None: self._cells = [ [self.default for _ in range(int(self.bounds.cols))] for _ in range(int(self.bounds.rows)) ]
[docs] def contains(self, coord: GridCoord) -> bool: """Return whether a coordinate falls inside the tile-map bounds.""" return self.bounds.contains(coord)
[docs] def get(self, coord: GridCoord) -> TCell | None: """Return the stored tile value at one coordinate, if in bounds.""" if not self.contains(coord): return None return self._cells[int(coord.row)][int(coord.col)]
[docs] def set(self, coord: GridCoord, value: TCell | None) -> None: """Store a tile value at one coordinate.""" if not self.contains(coord): raise IndexError(f"Cell out of bounds: {coord!r}") self._cells[int(coord.row)][int(coord.col)] = value
[docs] def iter_cells(self) -> tuple[tuple[GridCoord, TCell | None], ...]: """ Return all cells with their stored values. """ return tuple( ( GridCoord(col=col, row=row), self._cells[row][col], ) for row in range(int(self.bounds.rows)) for col in range(int(self.bounds.cols)) )
[docs] def tile_map_from_strings( *rows: str, legend: Mapping[str, TCell], default: TCell | None = None, ) -> TileMap[TCell]: """ Build a tile map from ASCII rows and a legend. """ width = max((len(row) for row in rows), default=0) tile_map = TileMap[TCell]( bounds=GridBounds(cols=width, rows=len(rows)), default=default, ) for row_idx, row in enumerate(rows): for col_idx, char in enumerate(row): if char in legend: tile_map.set( GridCoord(col=col_idx, row=row_idx), legend[char], ) return tile_map
[docs] def available_directions( tile_map: TileMap[TCell], coord: GridCoord, *, can_enter: Callable[[TCell | None], bool], ) -> tuple[CardinalDirection, ...]: """ Return the cardinal exits available from one cell. """ out: list[CardinalDirection] = [] for direction in CardinalDirection: target = step_in_direction(coord, direction) if not tile_map.contains(target): continue if can_enter(tile_map.get(target)): out.append(direction) return tuple(out)
def _filtered_directions( directions: tuple[CardinalDirection, ...], *, current_direction: CardinalDirection | None, allow_reverse: bool, ) -> tuple[CardinalDirection, ...]: if allow_reverse or current_direction is None: return directions non_reverse = tuple( direction for direction in directions if direction is not current_direction.opposite ) return non_reverse or directions
[docs] def choose_direction_toward( tile_map: TileMap[TCell], coord: GridCoord, target: GridCoord, *, can_enter: Callable[[TCell | None], bool], current_direction: CardinalDirection | None = None, allow_reverse: bool = False, ) -> CardinalDirection | None: """ Choose the exit that minimizes Manhattan distance to a target cell. """ exits = _filtered_directions( available_directions(tile_map, coord, can_enter=can_enter), current_direction=current_direction, allow_reverse=allow_reverse, ) if not exits: return None return min( exits, key=lambda direction: ( abs(step_in_direction(coord, direction).col - target.col) + abs(step_in_direction(coord, direction).row - target.row), direction.value, ), )
[docs] def choose_direction_away( tile_map: TileMap[TCell], coord: GridCoord, target: GridCoord, *, can_enter: Callable[[TCell | None], bool], current_direction: CardinalDirection | None = None, allow_reverse: bool = False, ) -> CardinalDirection | None: """ Choose the exit that maximizes Manhattan distance from a target cell. """ exits = _filtered_directions( available_directions(tile_map, coord, can_enter=can_enter), current_direction=current_direction, allow_reverse=allow_reverse, ) if not exits: return None return max( exits, key=lambda direction: ( abs(step_in_direction(coord, direction).col - target.col) + abs(step_in_direction(coord, direction).row - target.row), direction.value, ), )
[docs] def choose_random_direction( tile_map: TileMap[TCell], coord: GridCoord, *, can_enter: Callable[[TCell | None], bool], rng: random.Random | None = None, current_direction: CardinalDirection | None = None, allow_reverse: bool = False, ) -> CardinalDirection | None: """ Choose one valid exit randomly. """ exits = _filtered_directions( available_directions(tile_map, coord, can_enter=can_enter), current_direction=current_direction, allow_reverse=allow_reverse, ) if not exits: return None chooser = rng or random.Random(1) return chooser.choice(exits)
[docs] def is_junction( tile_map: TileMap[TCell], coord: GridCoord, *, can_enter: Callable[[TCell | None], bool], ) -> bool: """ Return whether a cell exposes more than two valid exits. """ return len(available_directions(tile_map, coord, can_enter=can_enter)) >= 3
[docs] @dataclass class GridNavigatorState: """ Mutable cell-based movement state for one maze agent. """ cell: GridCoord direction: CardinalDirection pending_direction: CardinalDirection | None = None moved_this_frame: int = 0
[docs] @dataclass(frozen=True) class GridNavigationBinding(Generic[TCtx, TCell]): """ Declarative lane/junction navigation rule. """ state_getter: Callable[[TCtx], GridNavigatorState] tile_map_getter: Callable[[TCtx], TileMap[TCell]] desired_direction_getter: Callable[[TCtx], CardinalDirection | None] = ( lambda _ctx: None ) can_enter: Callable[[TCell | None], bool] = lambda value: value is not None on_cell_entered: Callable[[TCtx, GridCoord], None] | None = None allow_reverse: bool = True steps_getter: Callable[[TCtx], int] = lambda _ctx: 1
[docs] @dataclass class GridNavigationSystem(Generic[TCtx, TCell]): """ Advance one or more maze agents through a tile map with turn buffering. """ name: str = "common_grid_navigation" phase: int = SystemPhase.SIMULATION order: int = 30 enabled_when: Callable[[TCtx], bool] = _default_enabled_when bindings: tuple[GridNavigationBinding[TCtx, TCell], ...] = ()
[docs] def step(self, ctx: TCtx) -> None: """Advance bound navigators through the tile map with turn buffering.""" if not self.enabled_when(ctx): return for binding in self.bindings: state = binding.state_getter(ctx) tile_map = binding.tile_map_getter(ctx) requested = binding.desired_direction_getter(ctx) if requested is not None: if ( binding.allow_reverse or requested != state.direction.opposite ): state.pending_direction = requested state.moved_this_frame = 0 for _ in range(max(0, int(binding.steps_getter(ctx)))): exits = available_directions( tile_map, state.cell, can_enter=binding.can_enter, ) next_direction = state.direction if state.pending_direction in exits: next_direction = state.pending_direction state.pending_direction = None elif next_direction not in exits: break state.direction = next_direction state.cell = step_in_direction(state.cell, next_direction) state.moved_this_frame += 1 if binding.on_cell_entered is not None: binding.on_cell_entered(ctx, state.cell)
[docs] @dataclass(frozen=True) class TunnelWrapBinding(Generic[TCtx]): """ Declarative horizontal/vertical wrap rule for maze agents. """ states_getter: Callable[[TCtx], Iterable[GridNavigatorState]] bounds_getter: Callable[[TCtx], GridBounds] wrap_horizontal: bool = True wrap_vertical: bool = False
[docs] @dataclass class TunnelWrapSystem(Generic[TCtx]): """ Wrap maze agents across configured grid edges. """ name: str = "common_tunnel_wrap" phase: int = SystemPhase.SIMULATION order: int = 31 enabled_when: Callable[[TCtx], bool] = _default_enabled_when bindings: tuple[TunnelWrapBinding[TCtx], ...] = ()
[docs] def step(self, ctx: TCtx) -> None: """Wrap navigator states across configured map edges.""" if not self.enabled_when(ctx): return for binding in self.bindings: bounds = binding.bounds_getter(ctx) for state in binding.states_getter(ctx): if binding.wrap_horizontal: if int(state.cell.col) < 0: state.cell = GridCoord( col=int(bounds.cols) - 1, row=int(state.cell.row), ) elif int(state.cell.col) >= int(bounds.cols): state.cell = GridCoord( col=0, row=int(state.cell.row), ) if binding.wrap_vertical: if int(state.cell.row) < 0: state.cell = GridCoord( col=int(state.cell.col), row=int(bounds.rows) - 1, ) elif int(state.cell.row) >= int(bounds.rows): state.cell = GridCoord( col=int(state.cell.col), row=0, )
[docs] class CollectibleKind(str, Enum): """ Common collectible kinds for maze games. """ PELLET = "pellet" POWER = "power" BONUS = "bonus"
[docs] @dataclass class CollectibleState: """ Mutable collectible metadata stored inside a field. """ kind: CollectibleKind payload: Any = None
[docs] @dataclass class CollectibleField: """ Dense collectible state keyed by grid cell. """ items: dict[GridCoord, CollectibleState] = field(default_factory=dict)
[docs] def item_at(self, coord: GridCoord) -> CollectibleState | None: """Return the collectible stored at one cell, if any.""" return self.items.get(coord)
[docs] def occupied_cells(self) -> tuple[GridCoord, ...]: """Return the cells that currently contain collectibles.""" return tuple(self.items.keys())
[docs] def remove(self, coord: GridCoord) -> CollectibleState | None: """Remove and return the collectible stored at one cell.""" return self.items.pop(coord, None)
[docs] @dataclass(frozen=True) class CollectibleCollisionBinding(Generic[TCtx]): """ Declarative collectible pickup rule. """ collector_cell_getter: Callable[[TCtx], GridCoord] field_getter: Callable[[TCtx], CollectibleField | None] on_collect: Callable[[TCtx, GridCoord, CollectibleState], None] | None = ( None )
[docs] @dataclass class CollectibleCollisionSystem(Generic[TCtx]): """ Consume collectibles when a collector enters the same cell. """ name: str = "common_collectible_collision" phase: int = SystemPhase.SIMULATION order: int = 35 enabled_when: Callable[[TCtx], bool] = _default_enabled_when bindings: tuple[CollectibleCollisionBinding[TCtx], ...] = ()
[docs] def step(self, ctx: TCtx) -> None: """Consume collectibles that share a cell with the collector.""" if not self.enabled_when(ctx): return for binding in self.bindings: collectible_field = binding.field_getter(ctx) if collectible_field is None: continue coord = binding.collector_cell_getter(ctx) item = collectible_field.remove(coord) if item is None: continue if binding.on_collect is not None: binding.on_collect(ctx, coord, item)
[docs] @dataclass(frozen=True) class TimedMode: """ One timed gameplay mode segment. """ name: str duration_seconds: float | None payload: Any = None
[docs] @dataclass class ModeTimerState: """ Mutable state for timed mode progression. """ mode_index: int = 0 elapsed_in_mode: float = 0.0 current_mode: str = ""
[docs] @dataclass(frozen=True) class ModeTimerBinding(Generic[TCtx]): """ Declarative timed mode schedule. """ state_getter: Callable[[TCtx], ModeTimerState] schedule: tuple[TimedMode, ...] on_mode_changed: Callable[[TCtx, TimedMode], None] | None = None loop: bool = False
[docs] @dataclass class ModeTimerSystem(Generic[TCtx]): """ Advance timed mode schedules using the current frame dt. """ name: str = "common_mode_timer" phase: int = SystemPhase.SIMULATION order: int = 15 enabled_when: Callable[[TCtx], bool] = _default_enabled_when bindings: tuple[ModeTimerBinding[TCtx], ...] = ()
[docs] def step(self, ctx: TCtx) -> None: """Advance scheduled timed modes using the current frame delta.""" if not self.enabled_when(ctx): return dt = max(0.0, float(getattr(ctx, "dt", 0.0))) if dt <= 0.0: return for binding in self.bindings: if not binding.schedule: continue state = binding.state_getter(ctx) if not state.current_mode: first = binding.schedule[0] state.current_mode = first.name if binding.on_mode_changed is not None: binding.on_mode_changed(ctx, first) state.elapsed_in_mode += dt while True: current = binding.schedule[state.mode_index] if current.duration_seconds is None: break if state.elapsed_in_mode < float(current.duration_seconds): break state.elapsed_in_mode -= float(current.duration_seconds) next_index = state.mode_index + 1 if next_index >= len(binding.schedule): if not binding.loop: state.mode_index = len(binding.schedule) - 1 state.current_mode = binding.schedule[ state.mode_index ].name state.elapsed_in_mode = 0.0 break next_index = 0 state.mode_index = next_index next_mode = binding.schedule[state.mode_index] state.current_mode = next_mode.name if binding.on_mode_changed is not None: binding.on_mode_changed(ctx, next_mode)
__all__ = [ "CardinalDirection", "CollectibleCollisionBinding", "CollectibleCollisionSystem", "CollectibleField", "CollectibleKind", "CollectibleState", "GridNavigationBinding", "GridNavigationSystem", "GridNavigatorState", "ModeTimerBinding", "ModeTimerState", "ModeTimerSystem", "TimedMode", "TileMap", "TunnelWrapBinding", "TunnelWrapSystem", "available_directions", "choose_direction_away", "choose_direction_toward", "choose_random_direction", "is_junction", "step_in_direction", "tile_map_from_strings", ]