Files
neuroevolution/mathema/smoke_test_car.py
2025-12-13 14:12:35 +01:00

146 lines
4.2 KiB
Python

import asyncio
import logging
import numpy as np
from mathema.actors.actor import Actor
from mathema.actors.sensor import Sensor
from mathema.actors.actuator import Actuator
from mathema.scape.car_racing import CarRacingScape
from mathema.envs.openai_car_racing import CarRacing
logging.basicConfig(level=logging.INFO)
log = logging.getLogger("smoke")
FEATURE_LEN = 10 + 6
class DummyCortex(Actor):
"""Minimaler Cortex: zählt Fitness und Episoden. Erwartet Actuator->('sync', aid, fitness, halt_flag)."""
def __init__(self, stop_after_episodes: int = 3):
super().__init__("DummyCortex")
self.total_fitness = 0.0
self.episodes = 0
self.stop_after = int(stop_after_episodes)
self.sensors = []
self.neurons = []
self.actuators = []
async def run(self):
log.info("[Cortex] started. stop_after=%d", self.stop_after)
try:
while True:
msg = await self.inbox.get()
tag = msg[0]
if tag == "sync":
_, aid, fitness_delta, halt_flag = msg
self.total_fitness += float(fitness_delta)
if halt_flag == 1:
self.episodes += 1
log.info("[Cortex] EPISODE done: %d cum_fitness=%.3f",
self.episodes, self.total_fitness)
if self.episodes >= self.stop_after:
log.info("[Cortex] stopping smoke test...")
for a in (self.sensors + self.neurons + self.actuators):
try:
await a.send(("terminate",))
except Exception:
pass
return
elif tag == "reactivate":
pass
elif tag == "terminate":
return
finally:
log.info("[Cortex] terminated.")
class RelayNeuron(Actor):
"""
Minimal-Neuron: nimmt Sensor-Features entgegen ("forward", sid, vec)
und sendet eine 3-dimensionale Aktor-Action ("forward", nid, [steer,gas,brake]) weiter.
"""
def __init__(self, nid: str, out_actuator: Actuator):
super().__init__(f"RelayNeuron-{nid}")
self.nid = nid
self.out = out_actuator
async def run(self):
try:
while True:
msg = await self.inbox.get()
tag = msg[0]
if tag == "forward":
_, _sid, features = msg
action_vec = [0.0, 0.2, -1.0]
await self.out.send(("forward", self.nid, action_vec))
elif tag == "terminate":
return
finally:
pass
async def main():
env = CarRacing(seed_value=5, render_mode=None)
scape = CarRacingScape(env)
cx = DummyCortex(stop_after_episodes=3)
actuator = Actuator(
aid="A1",
cx_pid=cx,
name="car_ApplyAction",
fanin_ids=["N1"],
expect_count=1,
scape=scape
)
neuron = RelayNeuron("N1", actuator)
sensor = Sensor(
sid="S1",
cx_pid=cx,
name="car_GetFeatures",
vector_length=FEATURE_LEN,
fanout_pids=[neuron],
scape=scape
)
cx.sensors = [sensor]
cx.neurons = [neuron]
cx.actuators = [actuator]
tasks = [
asyncio.create_task(scape.run(), name="CarScape"),
asyncio.create_task(cx.run(), name="Cortex"),
asyncio.create_task(sensor.run(), name="Sensor"),
asyncio.create_task(neuron.run(), name="Neuron"),
asyncio.create_task(actuator.run(), name="Actuator"),
]
steps = 0
try:
while not tasks[1].done():
await sensor.send(("sync",))
steps += 1
await asyncio.sleep(0.0)
log.info("[SMOKE] finished after %d steps. ✅", steps)
finally:
try:
await scape.send(("terminate",))
except Exception:
pass
for t in tasks:
if not t.done():
t.cancel()
if __name__ == "__main__":
asyncio.run(main())