Files
neuroevolution/mathema/scape/car_racing.py
2025-12-13 14:12:35 +01:00

48 lines
1.4 KiB
Python

import numpy as np
import logging
from mathema.actors.actor import Actor
log = logging.getLogger(__name__)
class CarRacingScape(Actor):
def __init__(self, env, name: str = "CarRacingScape"):
super().__init__(name)
self.env = env
self._stepped = False
def _get_features(self) -> list[float]:
if not self._stepped:
_, _, term, trunc, _ = self.env.step(np.array([0.0, 0.0, 0.0], dtype=np.float32))
self._stepped = True
return self.env.get_feature_vector()
async def run(self):
while True:
msg = await self.inbox.get()
tag = msg[0]
if tag == "sense":
_, sid, sensor_pid = msg
vec = self._get_features()
await sensor_pid.send(("percept", vec))
elif tag == "action":
_, action, actuator_pid = msg
_, step_reward, terminated, truncated, _ = self.env.step(np.asarray(action, dtype=np.float32))
self._stepped = True
halt_flag = 1 if (terminated or truncated) else 0
await actuator_pid.send(("result", float(step_reward), halt_flag))
if halt_flag == 1:
self.env.fast_reset()
self._stepped = False
elif tag == "terminate":
try:
self.env.close()
except Exception:
pass