"""
Reusable grid/discrete-step gameplay helpers.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Callable, Generic, Iterable, Optional, TypeVar, Union
from mini_arcade_core.engine.entities import BaseEntity
from mini_arcade_core.scenes.systems.phases import SystemPhase
# pylint: disable=invalid-name
TCtx = TypeVar("TCtx")
SpawnResult = Optional[Union[BaseEntity, Iterable[BaseEntity]]]
# pylint: enable=invalid-name
def _default_enabled_when(_ctx: object) -> bool:
return True
def _normalize_spawned(spawned: SpawnResult) -> tuple[BaseEntity, ...]:
if spawned is None:
return ()
if isinstance(spawned, BaseEntity):
return (spawned,)
return tuple(entity for entity in spawned if entity is not None)
[docs]
@dataclass(frozen=True, order=True)
class GridCoord:
"""
One integer grid cell coordinate.
"""
col: int
row: int
[docs]
def translated(self, *, dcol: int = 0, drow: int = 0) -> "GridCoord":
"""
Return a new cell translated by integer deltas.
"""
return GridCoord(
col=int(self.col) + int(dcol), row=int(self.row) + int(drow)
)
[docs]
@dataclass(frozen=True)
class GridBounds:
"""
Rectangular grid bounds measured in cells.
"""
cols: int
rows: int
[docs]
def contains(self, coord: GridCoord) -> bool:
"""
Return whether a cell lies inside this grid.
"""
return 0 <= int(coord.col) < int(self.cols) and 0 <= int(
coord.row
) < int(self.rows)
[docs]
def iter_cells(self) -> tuple[GridCoord, ...]:
"""
Return every cell inside the bounds in row-major order.
"""
return tuple(
GridCoord(col=col, row=row)
for row in range(int(self.rows))
for col in range(int(self.cols))
)
[docs]
@dataclass(frozen=True)
class GridLayout:
"""
World-space layout for a rectangular cell grid.
"""
bounds: GridBounds
cell_width: float
cell_height: float
origin_x: float = 0.0
origin_y: float = 0.0
[docs]
def cell_origin(self, coord: GridCoord) -> tuple[float, float]:
"""
Return the top-left world coordinate for a grid cell.
"""
return (
float(self.origin_x) + (int(coord.col) * float(self.cell_width)),
float(self.origin_y) + (int(coord.row) * float(self.cell_height)),
)
[docs]
def cell_center(self, coord: GridCoord) -> tuple[float, float]:
"""
Return the center world coordinate for a grid cell.
"""
x, y = self.cell_origin(coord)
return (
x + (float(self.cell_width) * 0.5),
y + (float(self.cell_height) * 0.5),
)
[docs]
def cell_rect(self, coord: GridCoord) -> tuple[float, float, float, float]:
"""
Return a world-space rect tuple `(x, y, w, h)` for a grid cell.
"""
x, y = self.cell_origin(coord)
return (x, y, float(self.cell_width), float(self.cell_height))
[docs]
def contains(self, coord: GridCoord) -> bool:
"""
Delegate containment checks to bounds.
"""
return self.bounds.contains(coord)
[docs]
def occupied_grid_cells(
values: Iterable[object],
*,
coord_getter: Callable[[object], GridCoord | None],
include: Callable[[object], bool] | None = None,
) -> set[GridCoord]:
"""
Collect occupied cells from arbitrary values.
"""
out: set[GridCoord] = set()
include = include or (lambda _value: True)
for value in values:
if not include(value):
continue
coord = coord_getter(value)
if coord is None:
continue
out.add(coord)
return out
[docs]
def free_grid_cells(
bounds: GridBounds,
occupied: Iterable[GridCoord],
) -> tuple[GridCoord, ...]:
"""
Return free cells inside `bounds` after subtracting occupied cells.
"""
blocked = {coord for coord in occupied if bounds.contains(coord)}
return tuple(
coord for coord in bounds.iter_cells() if coord not in blocked
)
[docs]
def choose_first_grid_cell(
_ctx: object,
cells: tuple[GridCoord, ...],
) -> GridCoord | None:
"""
Deterministic default cell chooser.
"""
return cells[0] if cells else None
[docs]
@dataclass
class CadenceState:
"""
Mutable state for fixed-interval gameplay stepping.
"""
accumulator: float = 0.0
tick_count: int = 0
steps_this_frame: int = 0
[docs]
@dataclass(frozen=True)
class CadenceBinding(Generic[TCtx]):
"""
Declarative fixed-cadence simulation rule.
"""
state_getter: Callable[[TCtx], CadenceState]
interval_seconds: float
on_tick: Callable[[TCtx], None]
enabled_when: Callable[[TCtx], bool] = _default_enabled_when
max_steps_per_frame: int = 4
[docs]
@dataclass
class CadenceSystem(Generic[TCtx]):
"""
Execute one or more fixed-timestep callbacks from variable frame dt.
"""
name: str = "common_cadence"
phase: int = SystemPhase.SIMULATION
order: int = 20
enabled_when: Callable[[TCtx], bool] = _default_enabled_when
bindings: tuple[CadenceBinding[TCtx], ...] = ()
[docs]
def step(self, ctx: TCtx) -> None:
"""Advance cadence timers and fire callbacks when they elapse."""
if not self.enabled_when(ctx):
return
frame_dt = max(0.0, float(getattr(ctx, "dt", 0.0)))
if frame_dt <= 0.0:
return
for binding in self.bindings:
state = binding.state_getter(ctx)
state.steps_this_frame = 0
if not binding.enabled_when(ctx):
continue
interval = max(0.0001, float(binding.interval_seconds))
state.accumulator += frame_dt
while (
state.accumulator >= interval
and state.steps_this_frame < int(binding.max_steps_per_frame)
):
state.accumulator -= interval
state.steps_this_frame += 1
state.tick_count += 1
binding.on_tick(ctx)
[docs]
@dataclass(frozen=True)
class GridCellSpawnBinding(Generic[TCtx]):
"""
Declarative spawn rule that chooses one currently free cell in a grid.
"""
should_spawn: Callable[[TCtx], bool]
bounds_getter: Callable[[TCtx], GridBounds]
occupied_cells_getter: Callable[[TCtx], Iterable[GridCoord]]
spawn: Callable[[TCtx, GridCoord], SpawnResult]
choose_cell: Callable[[TCtx, tuple[GridCoord, ...]], GridCoord | None] = (
choose_first_grid_cell
)
on_spawned: (
Callable[[TCtx, tuple[BaseEntity, ...], GridCoord], None] | None
) = None
insert_into_world: bool = True
[docs]
@dataclass
class GridCellSpawnSystem(Generic[TCtx]):
"""
Spawn entities into currently free grid cells.
"""
name: str = "common_grid_spawn"
phase: int = SystemPhase.SIMULATION
order: int = 25
enabled_when: Callable[[TCtx], bool] = _default_enabled_when
bindings: tuple[GridCellSpawnBinding[TCtx], ...] = ()
[docs]
def step(self, ctx: TCtx) -> None:
"""Spawn entities into the first available grid cell per binding."""
if not self.enabled_when(ctx):
return
for binding in self.bindings:
if not binding.should_spawn(ctx):
continue
bounds = binding.bounds_getter(ctx)
free_cells = free_grid_cells(
bounds,
binding.occupied_cells_getter(ctx),
)
if not free_cells:
continue
cell = binding.choose_cell(ctx, free_cells)
if cell is None or cell not in free_cells:
continue
spawned = _normalize_spawned(binding.spawn(ctx, cell))
if not spawned:
continue
if binding.insert_into_world:
ctx.world.entities.extend(spawned)
if binding.on_spawned is not None:
binding.on_spawned(ctx, spawned, cell)
__all__ = [
"CadenceBinding",
"CadenceState",
"CadenceSystem",
"GridBounds",
"GridCellSpawnBinding",
"GridCellSpawnSystem",
"GridCoord",
"GridLayout",
"choose_first_grid_cell",
"free_grid_cells",
"occupied_grid_cells",
]