import asyncio import json import math import random from collections import defaultdict from typing import Any, Dict, List, Tuple, Optional from actor import Actor from cortex import Cortex from sensor import Sensor from neuron import Neuron from actuator import Actuator from scape import XorScape class Exoself(Actor): def __init__(self, genotype: Dict[str, Any], file_name: Optional[str] = None): super().__init__("Exoself") self.g = genotype self.file_name = file_name self.cx_actor: Optional[Cortex] = None self.sensor_actors: List[Sensor] = [] self.neuron_actors: List[Neuron] = [] self.actuator_actors: List[Actuator] = [] self.tasks: List[asyncio.Task] = [] # Training-Stats self.highest_fitness = float("-inf") self.eval_acc = 0 self.cycle_acc = 0 self.time_acc = 0.0 self.attempt = 0 self.MAX_ATTEMPTS = 50 self.actuator_scape = None self._perturbed: List[Neuron] = [] @staticmethod def from_file(path: str) -> "Exoself": with open(path, "r") as f: g = json.load(f) return Exoself(g, file_name=path) async def run(self): self._build_pid_map_and_spawn() self._link_cortex() for a in self.sensor_actors + self.neuron_actors + self.actuator_actors + [self.actuator_scape]: self.tasks.append(asyncio.create_task(a.run())) while True: msg = await self.inbox.get() tag = msg[0] if tag == "evaluation_completed": _, fitness, cycles, elapsed = msg await self._on_evaluation_completed(fitness, cycles, elapsed) elif tag == "terminate": await self._terminate_all() return async def run_evaluation(self): print("build network and link...") self._build_pid_map_and_spawn() print("link cortex...") self._link_cortex() for a in self.sensor_actors + self.neuron_actors + self.actuator_actors: self.tasks.append(asyncio.create_task(a.run())) if self.actuator_scape: self.tasks.append(asyncio.create_task(self.actuator_scape.run())) print("network actors are running...") while True: msg = await self.inbox.get() print("message in exsoself: ", msg) tag = msg[0] if tag == "evaluation_completed": _, fitness, cycles, elapsed = msg await self._terminate_all() return float(fitness), 1, int(cycles), float(elapsed) elif tag == "terminate": await self._terminate_all() return float("-inf"), 0, 0, 0.0 # ---------- Build ---------- def _build_pid_map_and_spawn(self): """ Baut Cortex, dann alle Neuronen (mit cx_pid=self.cx_actor), dann verlinkt Outputs schichtweise, dann Sensoren/Aktuatoren (mit cx_pid=self.cx_actor). Achtung: Reihenfolge wichtig. """ cx = self.g["cortex"] # Cortex zuerst (damit wir cx_pid an Kinder übergeben können) self.cx_actor = Cortex( cid=cx["id"], exoself_pid=self, sensor_pids=[], neuron_pids=[], actuator_pids=[] ) self.actuator_scape = XorScape() layers: Dict[int, List[Dict[str, Any]]] = defaultdict(list) for n in self.g["neurons"]: layers[n["layer_index"]].append(n) ordered_layers = [layers[i] for i in sorted(layers)] id2neuron_actor: Dict[Any, Neuron] = {} for layer in ordered_layers: for n in layer: input_idps = [(iw["input_id"], iw["weights"]) for iw in n["input_weights"]] neuron = Neuron( nid=n["id"], cx_pid=self.cx_actor, af_name=n.get("activation_function", "tanh"), input_idps=input_idps, output_pids=[] # füllen wir gleich ) id2neuron_actor[n["id"]] = neuron self.neuron_actors.append(neuron) for li in range(len(ordered_layers) - 1): next_pids = [id2neuron_actor[nx["id"]] for nx in ordered_layers[li + 1]] for n in ordered_layers[li]: id2neuron_actor[n["id"]].outputs = next_pids actuators = self._get_actuators_block() if not actuators: raise ValueError("Genotype must include 'actuator' or 'actuators'.") for a in actuators: fanin_ids = a.get("fanin_ids", []) expect = len(fanin_ids) if fanin_ids else 0 actuator = Actuator( aid=a["id"], cx_pid=self.cx_actor, name=a["name"], fanin_ids=fanin_ids, expect_count=expect, scape=self.actuator_scape ) self.actuator_actors.append(actuator) if ordered_layers: last_layer = ordered_layers[-1] out_targets = self.actuator_actors for n in last_layer: id2neuron_actor[n["id"]].outputs = out_targets sensors = self._get_sensors_block() if not sensors: raise ValueError("Genotype must include 'sensor' or 'sensors'.") first_layer = ordered_layers[0] if ordered_layers else [] first_layer_pids = [id2neuron_actor[n["id"]] for n in first_layer] for s in sensors: sensor = Sensor( sid=s["id"], cx_pid=self.cx_actor, name=s["name"], vector_length=s["vector_length"], fanout_pids=first_layer_pids, scape=self.actuator_scape ) self.sensor_actors.append(sensor) def _get_sensors_block(self) -> List[Dict[str, Any]]: if "sensors" in self.g: return list(self.g["sensors"]) if "sensor" in self.g: return [self.g["sensor"]] return [] def _get_actuators_block(self) -> List[Dict[str, Any]]: if "actuators" in self.g: return list(self.g["actuators"]) if "actuator" in self.g: return [self.g["actuator"]] return [] def _link_cortex(self): self.cx_actor.sensors = [a for a in self.sensor_actors if a] self.cx_actor.neurons = [a for a in self.neuron_actors if a] self.cx_actor.actuators = [a for a in self.actuator_actors if a] self.cx_actor.awaiting_sync = set(a.aid for a in self.cx_actor.actuators) self.tasks.append(asyncio.create_task(self.cx_actor.run())) async def train_until_stop(self): self._build_pid_map_and_spawn() self._link_cortex() # 2) Start tasks for a in self.sensor_actors + self.neuron_actors + self.actuator_actors: self.tasks.append(asyncio.create_task(a.run())) if self.actuator_scape: self.tasks.append(asyncio.create_task(self.actuator_scape.run())) while True: msg = await self.inbox.get() tag = msg[0] if tag == "evaluation_completed": _, fitness, cycles, elapsed = msg maybe_stats = await self._on_evaluation_completed(fitness, cycles, elapsed) # _on_evaluation_completed() ruft bei Stop bereits _backup_genotype() und _terminate_all() if isinstance(maybe_stats, dict): # Trainingsende – Daten aus self.* zurückgeben (wie im Buch: Fitness/Evals/Cycles/Time) return ( float(self.highest_fitness), int(self.eval_acc), int(self.cycle_acc), float(self.time_acc), ) elif tag == "terminate": await self._terminate_all() return float("-inf"), 0, 0, 0.0 async def _on_evaluation_completed(self, fitness: float, cycles: int, elapsed: float): self.eval_acc += 1 self.cycle_acc += int(cycles) self.time_acc += float(elapsed) print(f"[Exoself] evaluation_completed: fitness={fitness:.6f} cycles={cycles} time={elapsed:.3f}s") REL = 1e-6 if fitness > self.highest_fitness * (1.0 + REL): self.highest_fitness = fitness self.attempt = 0 for n in self.neuron_actors: await n.send(("weight_backup",)) else: self.attempt += 1 for n in self._perturbed: await n.send(("weight_restore",)) if self.attempt >= self.MAX_ATTEMPTS: print( f"[Exoself] STOP. Best fitness={self.highest_fitness:.6f} evals={self.eval_acc} cycles={self.cycle_acc}") await self._backup_genotype() await self._terminate_all() return { "best_fitness": self.highest_fitness, "eval_acc": self.eval_acc, "cycle_acc": self.cycle_acc, "time_acc": self.time_acc, } tot = len(self.neuron_actors) mp = 1.0 / math.sqrt(max(1, tot)) self._perturbed = [n for n in self.neuron_actors if random.random() < mp] for n in self._perturbed: await n.send(("weight_perturb",)) await self.cx_actor.send(("reactivate",)) async def _backup_genotype(self): remaining = len(self.neuron_actors) for n in self.neuron_actors: await n.send(("get_backup",)) backups: List[Tuple[Any, List[Tuple[Any, List[float]]]]] = [] while remaining > 0: msg = await self.inbox.get() if msg[0] == "backup_from_neuron": _, nid, idps = msg backups.append((nid, idps)) remaining -= 1 id2n = {n["id"]: n for n in self.g["neurons"]} for nid, idps in backups: if nid not in id2n: continue new_iw = [] bias_val = None for item in idps: if isinstance(item[0], str) and item[0] == "bias": bias_val = float(item[1]) if not isinstance(item[1], list) else float(item[1][0]) else: input_id, weights = item new_iw.append({"input_id": input_id, "weights": list(weights)}) id2n[nid]["input_weights"] = new_iw if bias_val is not None: id2n[nid].setdefault("input_weights", []).append({"input_id": "bias", "weights": [bias_val]}) if self.file_name: with open(self.file_name, "w") as f: json.dump(self.g, f, indent=2) print(f"[Exoself] Genotype updated → {self.file_name}") async def _terminate_all(self): for a in self.sensor_actors + self.neuron_actors + self.actuator_actors: await a.send(("terminate",)) if self.cx_actor: await self.cx_actor.send(("terminate",)) for t in self.tasks: if not t.done(): t.cancel() self.tasks.clear()