"""
Reusable movement helpers for scene pipelines.
"""
from __future__ import annotations
import math
from dataclasses import dataclass
from typing import Any, Callable, Generic, Iterable, Literal, Mapping, TypeVar
from mini_arcade_core.engine.entities import BaseEntity
from mini_arcade_core.scenes.systems.phases import SystemPhase
MoveAxis = Literal["x", "y"]
ViewportPolicy = Literal["clamp", "wrap", "cull"]
# pylint: disable=invalid-name
TCtx = TypeVar("TCtx")
# pylint: enable=invalid-name
def _default_enabled_when(_ctx: object) -> bool:
return True
def _default_speed(entity: BaseEntity) -> float:
if entity.kinematic is None:
return 0.0
return float(entity.kinematic.max_speed)
def _default_dt(ctx: object, _entity: BaseEntity) -> float:
return float(getattr(ctx, "dt", 0.0))
def _default_predicate(_ctx: object, _entity: BaseEntity) -> bool:
return True
def _default_margin(_ctx: object, _entity: BaseEntity) -> float:
return 0.0
def _entity_position(entity: BaseEntity) -> tuple[float, float]:
return (
float(entity.transform.center.x),
float(entity.transform.center.y),
)
def _entity_size(entity: BaseEntity) -> tuple[float, float]:
return (
float(entity.transform.size.width),
float(entity.transform.size.height),
)
[docs]
@dataclass(frozen=True)
class AxisIntentBinding(Generic[TCtx]):
"""
Bind one intent value to one entity velocity axis.
"""
entity_getter: Callable[[TCtx], BaseEntity | None]
value_getter: Callable[[TCtx], float]
axis: MoveAxis
speed_getter: Callable[[BaseEntity], float] = _default_speed
zero_other_axis: bool = False
predicate: Callable[[TCtx, BaseEntity], bool] = _default_predicate
[docs]
@dataclass
class IntentAxisVelocitySystem(Generic[TCtx]):
"""
Convert intent values into entity velocity on one axis.
"""
name: str = "common_axis_velocity"
phase: int = SystemPhase.CONTROL
order: int = 20
enabled_when: Callable[[TCtx], bool] = _default_enabled_when
bindings: tuple[AxisIntentBinding[TCtx], ...] = ()
[docs]
def step(self, ctx: TCtx) -> None:
"""Translate intent axes into kinematic velocity for bound entities."""
if not self.enabled_when(ctx):
return
for binding in self.bindings:
entity = binding.entity_getter(ctx)
if entity is None or entity.kinematic is None:
continue
if not binding.predicate(ctx, entity):
continue
value = float(binding.value_getter(ctx))
velocity = value * float(binding.speed_getter(entity))
if binding.axis == "x":
entity.kinematic.velocity.x = velocity
if binding.zero_other_axis:
entity.kinematic.velocity.y = 0.0
else:
entity.kinematic.velocity.y = velocity
if binding.zero_other_axis:
entity.kinematic.velocity.x = 0.0
[docs]
@dataclass(frozen=True)
class MotionBinding(Generic[TCtx]):
"""
Integrate a group of entities with shared motion options.
"""
entities_getter: Callable[[TCtx], Iterable[BaseEntity]]
predicate: Callable[[TCtx, BaseEntity], bool] = _default_predicate
dt_getter: Callable[[TCtx, BaseEntity], float] = _default_dt
drag: float | None = None
drag_getter: Callable[[TCtx, BaseEntity], float | None] | None = None
spin_attr: str | None = None
ttl_step: bool = False
[docs]
@dataclass
class KinematicMotionSystem(Generic[TCtx]):
"""
Integrate kinematic entities, with optional drag, spin, and TTL ticking.
"""
name: str = "common_kinematic_motion"
phase: int = SystemPhase.SIMULATION
order: int = 30
enabled_when: Callable[[TCtx], bool] = _default_enabled_when
bindings: tuple[MotionBinding[TCtx], ...] = ()
[docs]
def step(self, ctx: TCtx) -> None:
"""Integrate velocity and acceleration for bound entities."""
if not self.enabled_when(ctx):
return
for binding in self.bindings:
for entity in binding.entities_getter(ctx):
if entity.kinematic is None:
continue
if not binding.predicate(ctx, entity):
continue
dt = float(binding.dt_getter(ctx, entity))
entity.kinematic.step(entity.transform, dt)
if binding.ttl_step and entity.life is not None:
entity.life.step(dt)
drag = binding.drag
if binding.drag_getter is not None:
drag = binding.drag_getter(ctx, entity)
if drag is not None:
entity.kinematic.velocity.x *= float(drag)
entity.kinematic.velocity.y *= float(drag)
if binding.spin_attr is not None:
entity.rotation_deg = (
float(getattr(entity, "rotation_deg", 0.0))
+ float(getattr(entity, binding.spin_attr, 0.0)) * dt
) % 360.0
[docs]
@dataclass(frozen=True)
class ViewportConstraintBinding(Generic[TCtx]):
"""
Apply one viewport policy to a group of entities.
"""
entities_getter: Callable[[TCtx], Iterable[BaseEntity]]
policy: ViewportPolicy
predicate: Callable[[TCtx, BaseEntity], bool] = _default_predicate
axes: tuple[MoveAxis, ...] = ("x", "y")
margin: float = 0.0
margin_getter: Callable[[TCtx, BaseEntity], float] = _default_margin
on_cull: Callable[[TCtx, BaseEntity], None] | None = None
[docs]
@dataclass
class ViewportConstraintSystem(Generic[TCtx]):
"""
Clamp, wrap, or cull entities against the current viewport.
"""
name: str = "common_viewport_constraints"
phase: int = SystemPhase.SIMULATION
order: int = 40
enabled_when: Callable[[TCtx], bool] = _default_enabled_when
viewport_getter: Callable[[TCtx], tuple[float, float]] = lambda ctx: tuple(
getattr(ctx.world, "viewport", (0.0, 0.0))
)
bindings: tuple[ViewportConstraintBinding[TCtx], ...] = ()
def _clamp(
self,
entity: BaseEntity,
*,
viewport: tuple[float, float],
axes: tuple[MoveAxis, ...],
) -> None:
vw, vh = viewport
x, y = _entity_position(entity)
width, height = _entity_size(entity)
if "x" in axes:
x = max(0.0, min(float(vw) - width, x))
if "y" in axes:
y = max(0.0, min(float(vh) - height, y))
entity.transform.center.x = x
entity.transform.center.y = y
def _wrap(
self,
entity: BaseEntity,
*,
viewport: tuple[float, float],
axes: tuple[MoveAxis, ...],
margin: float,
) -> None:
vw, vh = viewport
x, y = _entity_position(entity)
if "x" in axes:
if x < -margin:
x = float(vw) + margin
elif x >= float(vw) + margin:
x = -margin
if "y" in axes:
if y < -margin:
y = float(vh) + margin
elif y >= float(vh) + margin:
y = -margin
entity.transform.center.x = x
entity.transform.center.y = y
def _is_outside(
self,
entity: BaseEntity,
*,
viewport: tuple[float, float],
margin: float,
) -> bool:
vw, vh = viewport
x, y = _entity_position(entity)
width, height = _entity_size(entity)
return (
y > (float(vh) + margin)
or (y + height) < -margin
or x > (float(vw) + margin)
or (x + width) < -margin
)
[docs]
def step(self, ctx: TCtx) -> None:
"""Apply clamp, wrap, or cull viewport policies to bound entities."""
if not self.enabled_when(ctx):
return
viewport = self.viewport_getter(ctx)
for binding in self.bindings:
for entity in binding.entities_getter(ctx):
if not binding.predicate(ctx, entity):
continue
margin = float(binding.margin) + float(
binding.margin_getter(ctx, entity)
)
if binding.policy == "clamp":
self._clamp(entity, viewport=viewport, axes=binding.axes)
continue
if binding.policy == "wrap":
self._wrap(
entity,
viewport=viewport,
axes=binding.axes,
margin=margin,
)
continue
if binding.policy == "cull" and self._is_outside(
entity,
viewport=viewport,
margin=margin,
):
if binding.on_cull is not None:
binding.on_cull(ctx, entity)
# ---------------------------------------------------------------------------
# Turn / thrust system (Asteroids-style angular control)
# ---------------------------------------------------------------------------
def _dir_from_angle(
angle_deg: float, forward_offset_deg: float = -90.0
) -> tuple[float, float]:
"""Unit vector for *angle_deg* with an optional mesh-forward offset."""
rad = math.radians(angle_deg + forward_offset_deg)
return (math.cos(rad), math.sin(rad))
[docs]
@dataclass(frozen=True)
class TurnThrustBinding(Generic[TCtx]):
"""
Bind turn/thrust intent values to one entity.
*turn_getter* should return a signed float (negative=left, positive=right).
*thrust_getter* should return a float (0 = no thrust, positive = forward).
"""
entity_getter: Callable[[TCtx], BaseEntity | None]
turn_getter: Callable[[TCtx], float]
thrust_getter: Callable[[TCtx], float]
turn_speed_deg: float = 240.0
thrust_accel: float = 280.0
max_speed: float = 330.0
forward_offset_deg: float = -90.0
predicate: Callable[[TCtx, BaseEntity], bool] = _default_predicate
[docs]
@dataclass
class TurnThrustSystem(Generic[TCtx]):
"""
Rotate an entity with turn input and apply forward thrust along
its heading. Typical for Asteroids-style ship controls.
Phase: CONTROL (20), order 20.
"""
name: str = "common_turn_thrust"
phase: int = SystemPhase.CONTROL
order: int = 20
enabled_when: Callable[[TCtx], bool] = _default_enabled_when
bindings: tuple[TurnThrustBinding[TCtx], ...] = ()
[docs]
def step(self, ctx: TCtx) -> None:
"""Apply turn and thrust for bound entities."""
if not self.enabled_when(ctx):
return
dt = float(getattr(ctx, "dt", 0.0))
for binding in self.bindings:
entity = binding.entity_getter(ctx)
if entity is None or entity.kinematic is None:
continue
if not binding.predicate(ctx, entity):
continue
turn = float(binding.turn_getter(ctx))
thrust = float(binding.thrust_getter(ctx))
# Rotation
if abs(turn) > 0.0001:
entity.rotation_deg = (
float(entity.rotation_deg)
+ turn * binding.turn_speed_deg * dt
) % 360.0
# Thrust along heading
if thrust > 0.0:
dx, dy = _dir_from_angle(
float(entity.rotation_deg),
binding.forward_offset_deg,
)
entity.kinematic.velocity.x += (
dx * binding.thrust_accel * thrust * dt
)
entity.kinematic.velocity.y += (
dy * binding.thrust_accel * thrust * dt
)
# Speed cap
if binding.max_speed > 0.0:
vx = entity.kinematic.velocity.x
vy = entity.kinematic.velocity.y
speed2 = vx * vx + vy * vy
if speed2 > binding.max_speed * binding.max_speed:
speed = math.sqrt(speed2)
scale = binding.max_speed / speed
entity.kinematic.velocity.x *= scale
entity.kinematic.velocity.y *= scale
# ---------------------------------------------------------------------------
# Steering / seek system (homing missiles and simple CPU agents)
# ---------------------------------------------------------------------------
[docs]
@dataclass(frozen=True)
class SteerSeekBinding(Generic[TCtx]):
"""
Bind one pursuer entity to a target position.
*target_getter* returns the position the entity should steer towards,
or ``None`` when no target is available (entity keeps current heading).
"""
entity_getter: Callable[[TCtx], BaseEntity | None]
target_getter: Callable[[TCtx], tuple[float, float] | None]
max_steer_deg: float = 180.0
thrust_accel: float = 200.0
max_speed: float = 300.0
forward_offset_deg: float = -90.0
predicate: Callable[[TCtx, BaseEntity], bool] = _default_predicate
[docs]
@dataclass(frozen=True)
class SteerSeekGroupBinding(Generic[TCtx]):
"""
Bind a group of pursuing entities to a shared target callback.
"""
entities_getter: Callable[[TCtx], Iterable[BaseEntity]]
target_getter: Callable[[TCtx, BaseEntity], tuple[float, float] | None]
max_steer_deg: float = 180.0
thrust_accel: float = 200.0
max_speed: float = 300.0
forward_offset_deg: float = -90.0
predicate: Callable[[TCtx, BaseEntity], bool] = _default_predicate
def _angle_toward(
src_x: float,
src_y: float,
dst_x: float,
dst_y: float,
forward_offset_deg: float,
) -> float | None:
"""Desired heading (degrees) from *src* to *dst*, or None if coincident."""
dx = dst_x - src_x
dy = dst_y - src_y
if abs(dx) < 0.0001 and abs(dy) < 0.0001:
return None
return (math.degrees(math.atan2(dy, dx)) - forward_offset_deg) % 360.0
def _shortest_angular_delta(current_deg: float, target_deg: float) -> float:
"""Shortest signed rotation from *current_deg* to *target_deg*."""
diff = (target_deg - current_deg) % 360.0
if diff > 180.0:
diff -= 360.0
return diff
[docs]
@dataclass
class SteerSeekSystem(Generic[TCtx]):
"""
Steer entities toward a target position each frame.
The entity rotates toward the target (capped by *max_steer_deg*/s)
and accelerates along its heading by *thrust_accel*.
Phase: CONTROL (20), order 22.
"""
name: str = "common_steer_seek"
phase: int = SystemPhase.CONTROL
order: int = 22
enabled_when: Callable[[TCtx], bool] = _default_enabled_when
bindings: tuple[SteerSeekBinding[TCtx], ...] = ()
group_bindings: tuple[SteerSeekGroupBinding[TCtx], ...] = ()
def _apply(
self,
entity: BaseEntity,
target: tuple[float, float] | None,
*,
max_steer_deg: float,
thrust_accel: float,
max_speed: float,
forward_offset_deg: float,
dt: float,
) -> None:
if entity.kinematic is None:
return
# Steer toward target if present
if target is not None:
desired = _angle_toward(
float(entity.transform.center.x),
float(entity.transform.center.y),
target[0],
target[1],
forward_offset_deg,
)
if desired is not None:
delta = _shortest_angular_delta(
float(entity.rotation_deg), desired
)
max_turn = max_steer_deg * dt
clamped = max(-max_turn, min(max_turn, delta))
entity.rotation_deg = (
float(entity.rotation_deg) + clamped
) % 360.0
# Thrust along current heading
if thrust_accel > 0.0:
dx, dy = _dir_from_angle(
float(entity.rotation_deg), forward_offset_deg
)
entity.kinematic.velocity.x += dx * thrust_accel * dt
entity.kinematic.velocity.y += dy * thrust_accel * dt
# Speed cap
if max_speed > 0.0:
vx = entity.kinematic.velocity.x
vy = entity.kinematic.velocity.y
speed2 = vx * vx + vy * vy
if speed2 > max_speed * max_speed:
speed = math.sqrt(speed2)
scale = max_speed / speed
entity.kinematic.velocity.x *= scale
entity.kinematic.velocity.y *= scale
[docs]
def step(self, ctx: TCtx) -> None:
"""Steer and thrust toward targets for bound entities."""
if not self.enabled_when(ctx):
return
dt = float(getattr(ctx, "dt", 0.0))
for binding in self.bindings:
entity = binding.entity_getter(ctx)
if entity is None:
continue
if not binding.predicate(ctx, entity):
continue
target = binding.target_getter(ctx)
self._apply(
entity,
target,
max_steer_deg=binding.max_steer_deg,
thrust_accel=binding.thrust_accel,
max_speed=binding.max_speed,
forward_offset_deg=binding.forward_offset_deg,
dt=dt,
)
for grp in self.group_bindings:
for entity in grp.entities_getter(ctx):
if not grp.predicate(ctx, entity):
continue
target = grp.target_getter(ctx, entity)
self._apply(
entity,
target,
max_steer_deg=grp.max_steer_deg,
thrust_accel=grp.thrust_accel,
max_speed=grp.max_speed,
forward_offset_deg=grp.forward_offset_deg,
dt=dt,
)
# ---------------------------------------------------------------------------
# YAML-backed movement profile loading
# ---------------------------------------------------------------------------
[docs]
@dataclass(frozen=True)
class MovementProfile:
"""
Reusable set of movement parameters loadable from a YAML mapping.
A profile can feed *TurnThrustBinding* or *SteerSeekBinding* parameters
so games define tuning in data files instead of Python code.
"""
turn_speed_deg: float = 240.0
thrust_accel: float = 280.0
max_speed: float = 330.0
drag: float | None = None
forward_offset_deg: float = -90.0
max_steer_deg: float = 180.0
[docs]
def movement_profile_from_dict(
raw: Mapping[str, Any] | None,
) -> MovementProfile:
"""
Build a :class:`MovementProfile` from a YAML-friendly dictionary.
Missing keys fall back to ``MovementProfile`` defaults.
Unrecognised keys are silently ignored.
Example YAML::
movement:
turn_speed_deg: 200
thrust_accel: 320
max_speed: 400
drag: 0.98
"""
if not isinstance(raw, Mapping):
return MovementProfile()
def _float(key: str, default: float) -> float:
val = raw.get(key)
if val is None:
return default
return float(val)
def _opt_float(key: str, default: float | None) -> float | None:
val = raw.get(key)
if val is None:
return default
return float(val)
return MovementProfile(
turn_speed_deg=_float("turn_speed_deg", 240.0),
thrust_accel=_float("thrust_accel", 280.0),
max_speed=_float("max_speed", 330.0),
drag=_opt_float("drag", None),
forward_offset_deg=_float("forward_offset_deg", -90.0),
max_steer_deg=_float("max_steer_deg", 180.0),
)