import numpy as np import logging from mathema.actors.actor import Actor log = logging.getLogger(__name__) class CarRacingScape(Actor): def __init__(self, env, name: str = "CarRacingScape"): super().__init__(name) self.env = env self._stepped = False def _get_features(self) -> list[float]: if not self._stepped: _, _, term, trunc, _ = self.env.step(np.array([0.0, 0.0, 0.0], dtype=np.float32)) self._stepped = True return self.env.get_feature_vector() async def run(self): while True: msg = await self.inbox.get() tag = msg[0] if tag == "sense": _, sid, sensor_pid = msg vec = self._get_features() await sensor_pid.send(("percept", vec)) elif tag == "action": _, action, actuator_pid = msg _, step_reward, terminated, truncated, _ = self.env.step(np.asarray(action, dtype=np.float32)) self._stepped = True halt_flag = 1 if (terminated or truncated) else 0 await actuator_pid.send(("result", float(step_reward), halt_flag)) if halt_flag == 1: self.env.fast_reset() self._stepped = False elif tag == "terminate": try: self.env.close() except Exception: pass