import asyncio import json import math import random import logging from collections import defaultdict from typing import Any, Dict, List, Tuple, Optional from mathema.genotype.neo4j.genotype import load_genotype_snapshot, persist_neuron_backups from mathema.actors.actor import Actor from mathema.actors.cortex import Cortex from mathema.actors.sensor import Sensor from mathema.actors.neuron import Neuron from mathema.actors.actuator import Actuator from mathema.scape.scape import XorScape from mathema.scape.car_racing import CarRacingScape from mathema.envs.openai_car_racing import CarRacing log = logging.getLogger(__name__) class Exoself(Actor): def __init__(self, genotype: Dict[str, Any], file_name: Optional[str] = None): super().__init__("Exoself") self.monitor = None self.agent_id = None self.g = genotype self.file_name = file_name self.cx_actor: Optional[Cortex] = None self.sensor_actors: List[Sensor] = [] self.neuron_actors: List[Neuron] = [] self.actuator_actors: List[Actuator] = [] self.tasks: List[asyncio.Task] = [] self.highest_fitness = float("-inf") self.eval_acc = 0 self.cycle_acc = 0 self.time_acc = 0.0 self.attempt = 0 self.MAX_ATTEMPTS = 10 self.actuator_scape = None self._perturbed: List[Neuron] = [] @classmethod async def start(cls, agent_id: str, monitor) -> "Exoself": try: g = await load_genotype_snapshot(agent_id) except Exception as e: log.error(f"[Exoself {agent_id}] START FAILED: {e!r}") try: await monitor.cast(("terminated", agent_id, float("-inf"), 0, 0, 0.0)) finally: class _Dummy: async def cast(self, *_a, **_k): pass async def stop(self): pass return _Dummy() self = cls(g) self.agent_id = agent_id self.monitor = monitor async def _runner(): fitness = float("-inf") evals = cycles = 0 elapsed = 0.0 try: fitness, evals, cycles, elapsed = await self.train_until_stop() except Exception as e: log.error(f"[Exoself {self.agent_id}] CRASH in train_until_stop(): {e!r}") fitness = float("-inf") evals = int(self.eval_acc) cycles = int(self.cycle_acc) elapsed = float(self.time_acc) finally: try: await monitor.cast(("terminated", self.agent_id, fitness, evals, cycles, elapsed)) except Exception as e: log.error(f"[Exoself {self.agent_id}] FAILED to notify monitor: {e!r}") loop = asyncio.get_running_loop() self._runner_task = loop.create_task(_runner(), name=f"Exoself-{self.agent_id}") return self @staticmethod def from_file(path: str) -> "Exoself": with open(path, "r") as f: g = json.load(f) return Exoself(g, file_name=path) async def run(self): self.build_pid_map_and_spawn() self._link_cortex() for a in self.sensor_actors + self.neuron_actors + self.actuator_actors + [self.actuator_scape]: self.tasks.append(asyncio.create_task(a.run())) while True: msg = await self.inbox.get() tag = msg[0] if tag == "evaluation_completed": _, fitness, cycles, elapsed = msg await self._on_evaluation_completed(fitness, cycles, elapsed) elif tag == "terminate": await self._terminate_all() return async def run_evaluation(self): log.debug(f"exoself: build network and link...") self.build_pid_map_and_spawn() log.debug(f"exoself: link cortex...") self._link_cortex() for a in self.sensor_actors + self.neuron_actors + self.actuator_actors: self.tasks.append(asyncio.create_task(a.run())) if self.actuator_scape: self.tasks.append(asyncio.create_task(self.actuator_scape.run())) while True: msg = await self.inbox.get() log.debug("message in exsoself %r: ", msg) tag = msg[0] if tag == "evaluation_completed": _, fitness, cycles, elapsed = msg await self._terminate_all() return float(fitness), 1, int(cycles), float(elapsed) elif tag == "terminate": await self._terminate_all() return float("-inf"), 0, 0, 0.0 def build_pid_map_and_spawn(self): cx = self.g["cortex"] self.cx_actor = Cortex( cid=cx["id"], exoself_pid=self, sensor_pids=[], neuron_pids=[], actuator_pids=[] ) env = CarRacing(seed_value=5, render_mode=None) self.actuator_scape = CarRacingScape(env=env) layers: Dict[int, List[Dict[str, Any]]] = defaultdict(list) for n in self.g["neurons"]: layers[n["layer_index"]].append(n) ordered_layers = [layers[i] for i in sorted(layers)] id2neuron_actor: Dict[Any, Neuron] = {} for layer in ordered_layers: for n in layer: input_idps = [ (iw["input_id"], iw["weights"], bool(iw.get("recurrent", False))) for iw in n["input_weights"] ] neuron = Neuron( nid=n["id"], cx_pid=self.cx_actor, af_name=n.get("activation_function", "tanh"), input_idps=input_idps, output_pids=[], bias=n.get("bias") ) id2neuron_actor[n["id"]] = neuron self.neuron_actors.append(neuron) out_map: Dict[Any, set] = {nid: set() for nid in id2neuron_actor.keys()} for layer in ordered_layers: for tgt in layer: tgt_pid = id2neuron_actor[tgt["id"]] for iw in tgt["input_weights"]: src_id = iw["input_id"] if src_id in id2neuron_actor: out_map[src_id].add(tgt_pid) for src_id, targets in out_map.items(): id2neuron_actor[src_id].outputs = list(targets) actuators = self._get_actuators_block() if not actuators: log.error(f"genotype does not include 'actuator' or 'actuators' section") raise ValueError("Genotype must include 'actuator' or 'actuators'.") for a in actuators: fanin_ids = a.get("fanin_ids", []) expect = len(fanin_ids) if fanin_ids else 0 actuator = Actuator( aid=a["id"], cx_pid=self.cx_actor, name=a["name"], fanin_ids=fanin_ids, expect_count=expect, scape=self.actuator_scape ) self.actuator_actors.append(actuator) for a in self.actuator_actors: for src_id in a.fanin_ids: assert src_id in id2neuron_actor, f"Actuator {a.aid}: fanin_id {src_id} ist kein Neuron" if src_id in id2neuron_actor: na = id2neuron_actor[src_id] if a not in na.outputs: na.outputs.append(a) sensors = self._get_sensors_block() if not sensors: log.error(f"Genotype must include 'sensor' or 'sensors'.") raise ValueError("Genotype must include 'sensor' or 'sensors'.") sensor_targets: Dict[Any, List[Neuron]] = defaultdict(list) for layer in ordered_layers: for tgt in layer: tgt_pid = id2neuron_actor[tgt["id"]] for iw in tgt["input_weights"]: src_id = iw["input_id"] if src_id != "bias": sensor_targets[src_id].append(tgt_pid) for s in sensors: fanout_pids = sensor_targets.get(s["id"], []) sensor = Sensor( sid=s["id"], cx_pid=self.cx_actor, name=s["name"], vector_length=s["vector_length"], fanout_pids=fanout_pids, scape=self.actuator_scape ) self.sensor_actors.append(sensor) def _get_sensors_block(self) -> List[Dict[str, Any]]: if "sensors" in self.g: return list(self.g["sensors"]) if "sensor" in self.g: return [self.g["sensor"]] return [] def _get_actuators_block(self) -> List[Dict[str, Any]]: if "actuators" in self.g: return list(self.g["actuators"]) if "actuator" in self.g: return [self.g["actuator"]] return [] def _link_cortex(self): self.cx_actor.sensors = [a for a in self.sensor_actors if a] self.cx_actor.neurons = [a for a in self.neuron_actors if a] self.cx_actor.actuators = [a for a in self.actuator_actors if a] self.cx_actor.awaiting_sync = set(a.aid for a in self.cx_actor.actuators) self.tasks.append(asyncio.create_task(self.cx_actor.run())) async def train_until_stop(self): self.build_pid_map_and_spawn() self._link_cortex() for a in self.sensor_actors + self.neuron_actors + self.actuator_actors: self.tasks.append(asyncio.create_task(a.run())) if self.actuator_scape: self.tasks.append(asyncio.create_task(self.actuator_scape.run())) while True: msg = await self.inbox.get() tag = msg[0] if tag == "evaluation_completed": _, fitness, cycles, elapsed = msg maybe_stats = await self._on_evaluation_completed(fitness, cycles, elapsed) if isinstance(maybe_stats, dict): return ( float(self.highest_fitness), int(self.eval_acc), int(self.cycle_acc), float(self.time_acc), ) elif tag == "terminate": await self._terminate_all() return float("-inf"), 0, 0, 0.0 async def _on_evaluation_completed(self, fitness: float, cycles: int, elapsed: float): self.eval_acc += 1 self.cycle_acc += int(cycles) self.time_acc += float(elapsed) log.info(f"[Exoself] evaluation_completed: fitness={fitness:.6f} cycles={cycles} time={elapsed:.3f}s") log.info(f"[Exoself] attempt {self.attempt}") REL = 1e-4 ABS_EPS = 1e-2 valid_fitness = isinstance(fitness, (int, float)) and math.isfinite(fitness) if not valid_fitness: fitness = float("-inf") # --- NEW: per-episode log to monitor (only if finite) --- if valid_fitness: try: # fitness is the episodic return from Cortex.fitness_acc await self.monitor.cast(("episode_done", str(self.agent_id), float(fitness), int(self.eval_acc))) except Exception: pass thresh = max(ABS_EPS, REL * abs(self.highest_fitness if math.isfinite(self.highest_fitness) else 0.0)) improved = fitness > self.highest_fitness + thresh if improved: self.highest_fitness = fitness self.attempt = 0 for n in self.neuron_actors: await n.send(("weight_backup",)) else: self.attempt += 1 for n in self._perturbed: await n.send(("weight_restore",)) if self.attempt >= self.MAX_ATTEMPTS: log.info( f"[Exoself] STOP. Best fitness={self.highest_fitness:.6f} " f"evals={self.eval_acc} cycles={self.cycle_acc}") await self._backup_genotype() await self._terminate_all() return { "best_fitness": self.highest_fitness, "eval_acc": self.eval_acc, "cycle_acc": self.cycle_acc, "time_acc": self.time_acc, } tot = len(self.neuron_actors) mp = 1.0 / math.sqrt(max(1, tot)) self._perturbed = [n for n in self.neuron_actors if random.random() < mp] for n in self._perturbed: await n.send(("weight_perturb",)) await self.cx_actor.send(("reactivate",)) async def _backup_genotype(self): remaining = len(self.neuron_actors) for n in self.neuron_actors: await n.send(("get_backup",)) backups: List[Tuple[Any, List[Tuple[Any, List[float]]]]] = [] while remaining > 0: msg = await self.inbox.get() if msg[0] == "backup_from_neuron": _, nid, idps = msg backups.append((str(nid), idps)) remaining -= 1 bias_rows = [] edge_rows = [] for nid, idps in backups: for inp_id, weights in idps: if inp_id == "bias": b = float(weights[0]) bias_rows.append({"nid": str(nid), "bias": b}) else: edge_rows.append({ "from_id": str(inp_id), "to_id": str(nid), "weights": [float(x) for x in list(weights)], }) await persist_neuron_backups(bias_rows, edge_rows) if self.file_name: log.debug("[Exoself] Hinweis: file_name gesetzt, aber Persistenz läuft über Genotyp-API (Neo4j).") async def _terminate_all(self): for a in self.sensor_actors + self.neuron_actors + self.actuator_actors: await a.send(("terminate",)) if self.cx_actor: await self.cx_actor.send(("terminate",)) for t in self.tasks: if not t.done(): t.cancel() self.tasks.clear()