Files
neuroevolution/mathema/scape/car_racing.py
2026-02-21 10:58:05 +01:00

71 lines
2.5 KiB
Python

import numpy as np
import logging
from mathema.actors.actor import Actor
log = logging.getLogger(__name__)
class CarRacingScape(Actor):
"""
Scape (environment) actor wrapping a CarRacing-like Gymnasium environment.
This actor provides an asynchronous message interface for sensors and
actuators in the actor-based cortex architecture:
- Sensors request observations/features via ("sense", sid, sensor_pid).
The scape replies to the given sensor actor with ("percept", vec).
- Actuators apply actions via ("action", action, actuator_pid).
The scape performs an env.step(action) and replies with
("result", step_reward, halt_flag) where halt_flag is 1 if the episode
terminated or was truncated.
In addition, the scape automatically resets the environment when an episode
ends (halt_flag == 1) using env.fast_reset().
Notes about `_stepped`:
- Some environments do not provide a meaningful feature vector immediately
after reset until at least one `step()` was executed.
- `_get_features()` ensures that the environment has been stepped once
(with a zero action) before calling `env.get_feature_vector()`.
"""
def __init__(self, env, name: str = "CarRacingScape"):
super().__init__(name)
self.env = env
self._stepped = False
def _get_features(self) -> list[float]:
if not self._stepped:
_, _, term, trunc, _ = self.env.step(np.array([0.0, 0.0, 0.0], dtype=np.float32))
self._stepped = True
return self.env.get_feature_vector()
async def run(self):
while True:
msg = await self.inbox.get()
tag = msg[0]
if tag == "sense":
_, sid, sensor_pid = msg
vec = self._get_features()
await sensor_pid.send(("percept", vec))
elif tag == "action":
_, action, actuator_pid = msg
_, step_reward, terminated, truncated, _ = self.env.step(np.asarray(action, dtype=np.float32))
self._stepped = True
halt_flag = 1 if (terminated or truncated) else 0
await actuator_pid.send(("result", float(step_reward), halt_flag))
if halt_flag == 1:
self.env.fast_reset()
self._stepped = False
elif tag == "terminate":
try:
self.env.close()
except Exception:
pass