WIP: stochastic hill climber

This commit is contained in:
2025-09-26 10:48:28 +02:00
parent f4a8f22fc6
commit 0ace1cec3c
37 changed files with 1862 additions and 0 deletions

Binary file not shown.

View File

@@ -0,0 +1,13 @@
import asyncio
class Actor:
def __init__(self, name: str):
self.name = name
self.inbox = asyncio.Queue()
async def send(self, msg):
await self.inbox.put(msg)
async def run(self):
raise NotImplementedError

View File

@@ -0,0 +1,55 @@
# actors/actuator.py
import asyncio
from actor import Actor
class Actuator(Actor):
def __init__(self, aid, cx_pid, name, fanin_ids, expect_count, scape=None):
super().__init__(f"Actuator-{aid}")
self.aid = aid
self.cx_pid = cx_pid
self.aname = name
self.fanin_ids = fanin_ids
self.expect = expect_count
self.received = {}
self.scape = scape
self.scape_inbox = asyncio.Queue()
async def run(self):
while True:
msg = await self.inbox.get()
tag = msg[0]
if tag == "forward":
_, from_id, vec = msg
self.received[from_id] = vec
if len(self.received) == self.expect:
print("ACTUATOR: collected all signals...")
output = []
for fid in self.fanin_ids:
output.extend(self.received[fid])
if self.aname == "pts":
print(f"Actuator output: {output}")
fitness, halt_flag = 1.0, 0
elif self.aname == "xor_SendOutput" and self.scape:
print("ACTUATOR: sending action to scape...")
await self.scape.send(("action", output, self))
while True:
resp = await self.inbox.get()
if resp[0] == "result":
print("ACTUATOR: got scape response: ", resp)
fitness, halt_flag = resp[1], resp[2]
break
else:
fitness, halt_flag = 0.0, 0
await self.cx_pid.send(("sync", self.aid, fitness, halt_flag))
print("ACTUATOR: sent sync message to cortex.")
self.received.clear()
elif tag == "terminate":
return

View File

@@ -0,0 +1,96 @@
{
"cortex": {
"id": 0.5107239887106383,
"sensor_ids": [
5.685637947817566e-10
],
"actuator_ids": [
5.685637947817563e-10
],
"neuron_ids": [
0.3776999353275148,
0.7030052650887313,
0.9936497204289092
]
},
"sensor": {
"id": 5.685637947817566e-10,
"name": "xor_GetInput",
"vector_length": 2,
"cx_id": 0.5107239887106383,
"fanout_ids": [
0.3776999353275148,
0.7030052650887313
]
},
"actuator": {
"id": 5.685637947817563e-10,
"name": "xor_SendOutput",
"vector_length": 1,
"cx_id": 0.5107239887106383,
"fanin_ids": [
0.9936497204289092
]
},
"neurons": [
{
"id": 0.3776999353275148,
"layer_index": 0,
"cx_id": 0.5107239887106383,
"activation_function": "tanh",
"input_weights": [
{
"input_id": 5.685637947817566e-10,
"weights": [
-1.7328567115118854,
0.31546591460152307
]
}
],
"output_ids": [
0.24149710385676537
]
},
{
"id": 0.7030052650887313,
"layer_index": 0,
"cx_id": 0.5107239887106383,
"activation_function": "tanh",
"input_weights": [
{
"input_id": 5.685637947817566e-10,
"weights": [
1.507492385500833,
-1.5181033637128052
]
}
],
"output_ids": [
0.24149710385676537
]
},
{
"id": 0.9936497204289092,
"layer_index": 1,
"cx_id": 0.5107239887106383,
"activation_function": "tanh",
"input_weights": [
{
"input_id": 0.3776999353275148,
"weights": [
0.9998252528454215
]
},
{
"input_id": 0.7030052650887313,
"weights": [
-1.7243886895741118
]
}
],
"output_ids": [
5.685637947817563e-10
]
}
]
}

View File

@@ -0,0 +1,119 @@
# actors/cortex.py
import time
from actor import Actor
class Cortex(Actor):
def __init__(self, cid, exoself_pid, sensor_pids, neuron_pids, actuator_pids):
super().__init__(f"Cortex-{cid}")
self.cid = cid
self.sensors = sensor_pids
self.neurons = neuron_pids
self.actuators = actuator_pids
self.exoself_pid = exoself_pid
self.awaiting_sync = set() # AIDs, von denen wir im aktuellen Schritt noch Sync erwarten
self.fitness_acc = 0.0 # akkumulierte Fitness über die Episode
self.ef_acc = 0 # akkumulierte EndFlags im aktuellen Schritt/Episode
self.cycle_acc = 0 # Anzahl abgeschlossener Sense-Think-Act Zyklen
self.active = False # aktiv/inaktiv (wartet auf reactivate)
self._t0 = None # Startzeit der Episode
async def _kick_sensors(self):
for s in self.sensors:
await s.send(("sync",))
def _reset_for_new_cycle(self):
self.awaiting_sync = set(a.aid for a in self.actuators)
def _reset_for_new_episode(self):
self.fitness_acc = 0.0
self.ef_acc = 0
self.cycle_acc = 0
self._reset_for_new_cycle()
self._t0 = time.perf_counter()
self.active = True
async def run(self):
# Initialisierung: falls Actuatoren schon gesetzt sind, Episode starten
if self.actuators:
self._reset_for_new_episode()
await self._kick_sensors()
while True:
msg = await self.inbox.get()
tag = msg[0]
# Optional: Exoself kann AIDs registrieren, bevor wir starten
if tag == "register_actuators":
_, aids = msg
# Map aids auf echte Actuatoren falls nötig; hier übernehmen wir sie direkt
self.awaiting_sync = set(aids)
if not self.active:
self._reset_for_new_episode()
await self._kick_sensors()
continue
if tag == "sync" and self.active:
print("CORTEX: got sync message: ", msg)
# Aktuator meldet Schrittabschluss
_t, aid, fitness, halt_flag = msg
print("----------------")
print("_t:", _t)
print("aid:", aid)
print("fitness:", fitness)
print("halt_flag:", halt_flag)
print("----------------")
# akkumulieren
self.fitness_acc += float(fitness)
self.ef_acc += int(halt_flag)
# diesen Aktuator als "eingetroffen" markieren
if aid in self.awaiting_sync:
self.awaiting_sync.remove(aid)
print("CORTEX: awaiting sync: ", self.awaiting_sync)
# Wenn alle Aktuatoren gemeldet haben, ist ein Zyklus fertig
if not self.awaiting_sync:
print("CORTEX: cycle completed.")
self.cycle_acc += 1
# Episodenende, wenn irgendein Aktuator EndFlag setzt
if self.ef_acc > 0:
elapsed = time.perf_counter() - self._t0
# An Exoself berichten
await self.exoself_pid.send((
"evaluation_completed",
self.fitness_acc,
self.cycle_acc,
elapsed
))
# inaktiv werden warten auf reactivate
self.active = False
# Flags für nächste Episode werden erst bei reactivate gesetzt
else:
# Nächsten Zyklus starten
self.ef_acc = 0
self._reset_for_new_cycle()
await self._kick_sensors()
continue
if tag == "reactivate":
# neue Episode starten
self._reset_for_new_episode()
await self._kick_sensors()
continue
if tag == "terminate":
# geordnet alle Kinder beenden
for a in (self.sensors + self.actuators + self.neurons):
await a.send(("terminate",))
return
elif tag == "backup_from_neuron":
# vom Neuron an Cortex -> an Exoself weiterreichen
await self.exoself_pid.send(msg)

View File

@@ -0,0 +1,346 @@
# exoself.py
import asyncio
import json
import math
import random
from collections import defaultdict
from typing import Any, Dict, List, Tuple, Optional
from actor import Actor
from cortex import Cortex
from sensor import Sensor
from neuron import Neuron
from actuator import Actuator
from scape import XorScape
class Exoself(Actor):
"""
Exoself übersetzt den Genotyp (JSON) in einen laufenden Phenotyp (Actors) und
steuert das simple Neuroevolution-Training (Backup/Restore/Perturb + Reactivate).
"""
def __init__(self, genotype: Dict[str, Any], file_name: Optional[str] = None):
super().__init__("Exoself")
self.g = genotype
self.file_name = file_name
self.cx_actor: Optional[Cortex] = None
self.sensor_actors: List[Sensor] = []
self.neuron_actors: List[Neuron] = []
self.actuator_actors: List[Actuator] = []
self.tasks: List[asyncio.Task] = []
# Training-Stats
self.highest_fitness = float("-inf")
self.eval_acc = 0
self.cycle_acc = 0
self.time_acc = 0.0
self.attempt = 0
self.MAX_ATTEMPTS = 50
self.actuator_scape = None
# zuletzt perturbierte Neuronen (für Restore)
self._perturbed: List[Neuron] = []
# ---------- Convenience ----------
@staticmethod
def from_file(path: str) -> "Exoself":
with open(path, "r") as f:
g = json.load(f)
return Exoself(g, file_name=path)
# ---------- Public API ----------
async def run(self):
# 1) Netzwerk bauen
self._build_pid_map_and_spawn()
# 2) Cortex verlinken + starten
self._link_cortex()
# 3) Actors starten (Sensoren/Neuronen/Aktuatoren)
for a in self.sensor_actors + self.neuron_actors + self.actuator_actors + [self.actuator_scape]:
self.tasks.append(asyncio.create_task(a.run()))
# 4) Hauptloop: auf Cortex-Events hören
while True:
msg = await self.inbox.get()
tag = msg[0]
if tag == "evaluation_completed":
_, fitness, cycles, elapsed = msg
await self._on_evaluation_completed(fitness, cycles, elapsed)
elif tag == "terminate":
await self._terminate_all()
return
# in exoself.py, innerhalb der Klasse Exoself
async def run_evaluation(self):
"""
Eine einzelne Episode/Evaluation:
- baut & verlinkt das Netz
- startet alle Actors
- wartet auf 'evaluation_completed' vom Cortex
- beendet alles und liefert (fitness, evals, cycles, elapsed)
"""
# 1) Netzwerk bauen & Cortex verlinken
print("build network and link...")
self._build_pid_map_and_spawn()
print("link cortex...")
self._link_cortex()
# 2) Sensor/Neuron/Aktuator-Tasks starten (Cortex startete _link_cortex bereits)
for a in self.sensor_actors + self.neuron_actors + self.actuator_actors:
self.tasks.append(asyncio.create_task(a.run()))
if self.actuator_scape:
self.tasks.append(asyncio.create_task(self.actuator_scape.run()))
print("network actors are running...")
# 3) Auf Abschluss warten
while True:
msg = await self.inbox.get()
print("message in exsoself: ", msg)
tag = msg[0]
if tag == "evaluation_completed":
_, fitness, cycles, elapsed = msg
# 4) Sauber terminieren
await self._terminate_all()
# Evals = 1 (eine Episode)
return float(fitness), 1, int(cycles), float(elapsed)
elif tag == "terminate":
await self._terminate_all()
# Falls vorzeitig terminiert wurde
return float("-inf"), 0, 0, 0.0
# ---------- Build ----------
def _build_pid_map_and_spawn(self):
"""
Baut Cortex, dann alle Neuronen (mit cx_pid=self.cx_actor), dann verlinkt Outputs schichtweise,
dann Sensoren/Aktuatoren (mit cx_pid=self.cx_actor). Achtung: Reihenfolge wichtig.
"""
cx = self.g["cortex"]
# Cortex zuerst (damit wir cx_pid an Kinder übergeben können)
self.cx_actor = Cortex(
cid=cx["id"],
exoself_pid=self,
sensor_pids=[], # werden gleich gesetzt
neuron_pids=[],
actuator_pids=[]
)
self.actuator_scape = XorScape()
# Neuronen nach Layer gruppieren (damit outputs korrekt gesetzt werden)
layers: Dict[int, List[Dict[str, Any]]] = defaultdict(list)
for n in self.g["neurons"]:
layers[n["layer_index"]].append(n)
ordered_layers = [layers[i] for i in sorted(layers)]
# Platzhalter: wir benötigen später Referenzen nach ID
id2neuron_actor: Dict[Any, Neuron] = {}
# Zuerst alle Neuronen erzeugen (ohne Outputs), damit wir Referenzen haben
for layer in ordered_layers:
for n in layer:
input_idps = [(iw["input_id"], iw["weights"]) for iw in n["input_weights"]]
neuron = Neuron(
nid=n["id"],
cx_pid=self.cx_actor,
af_name=n.get("activation_function", "tanh"),
input_idps=input_idps,
output_pids=[] # füllen wir gleich
)
id2neuron_actor[n["id"]] = neuron
self.neuron_actors.append(neuron)
# Jetzt Outputs pro Layer setzen:
# - für Nicht-Output-Layer: Outputs = Neuronen der nächsten Schicht
# - für Output-Layer: Outputs = Aktuator(en) (setzen wir nachdem Aktuatoren erzeugt sind)
for li in range(len(ordered_layers) - 1):
next_pids = [id2neuron_actor[nx["id"]] for nx in ordered_layers[li + 1]]
for n in ordered_layers[li]:
id2neuron_actor[n["id"]].outputs = next_pids
# Aktuatoren anlegen (brauchen cx_pid und fanin_ids)
# Genotyp kann "actuator" (ein Objekt) oder "actuators" (Liste) haben wir unterstützen beides.
actuators = self._get_actuators_block()
if not actuators:
raise ValueError("Genotype must include 'actuator' or 'actuators'.")
for a in actuators:
fanin_ids = a.get("fanin_ids", [])
expect = len(fanin_ids) if fanin_ids else 0
actuator = Actuator(
aid=a["id"],
cx_pid=self.cx_actor,
name=a["name"],
fanin_ids=fanin_ids,
expect_count=expect,
scape=self.actuator_scape
)
self.actuator_actors.append(actuator)
# Output-Layer Neuronen → Outputs = Aktuatoren
if ordered_layers:
last_layer = ordered_layers[-1]
out_targets = self.actuator_actors # Liste
for n in last_layer:
id2neuron_actor[n["id"]].outputs = out_targets
# Sensor(en) anlegen (brauchen cx_pid und fanout auf erste Schicht)
sensors = self._get_sensors_block()
if not sensors:
raise ValueError("Genotype must include 'sensor' or 'sensors'.")
first_layer = ordered_layers[0] if ordered_layers else []
first_layer_pids = [id2neuron_actor[n["id"]] for n in first_layer]
for s in sensors:
sensor = Sensor(
sid=s["id"],
cx_pid=self.cx_actor,
name=s["name"],
vector_length=s["vector_length"],
fanout_pids=first_layer_pids,
scape=self.actuator_scape
)
self.sensor_actors.append(sensor)
def _get_sensors_block(self) -> List[Dict[str, Any]]:
if "sensors" in self.g:
return list(self.g["sensors"])
if "sensor" in self.g:
return [self.g["sensor"]]
return []
def _get_actuators_block(self) -> List[Dict[str, Any]]:
if "actuators" in self.g:
return list(self.g["actuators"])
if "actuator" in self.g:
return [self.g["actuator"]]
return []
# ---------- Link ----------
def _link_cortex(self):
"""
Übergibt dem Cortex die fertigen Listen und setzt awaiting_sync.
Startet dann den Cortex-Task.
"""
self.cx_actor.sensors = [a for a in self.sensor_actors if a]
self.cx_actor.neurons = [a for a in self.neuron_actors if a]
self.cx_actor.actuators = [a for a in self.actuator_actors if a]
# Wichtig: vor Start die erwarteten AIDs setzen,
# damit der erste Sensor-Trigger nicht in eine leere awaiting_sync läuft.
self.cx_actor.awaiting_sync = set(a.aid for a in self.cx_actor.actuators)
# Cortex starten
self.tasks.append(asyncio.create_task(self.cx_actor.run()))
# ---------- Training-Loop Reaction ----------
async def _on_evaluation_completed(self, fitness: float, cycles: int, elapsed: float):
self.eval_acc += 1
self.cycle_acc += int(cycles)
self.time_acc += float(elapsed)
print(f"[Exoself] evaluation_completed: fitness={fitness:.6f} cycles={cycles} time={elapsed:.3f}s")
if fitness > self.highest_fitness:
self.highest_fitness = fitness
self.attempt = 0
# Backup aller Neuronen
for n in self.neuron_actors:
await n.send(("weight_backup",))
else:
self.attempt += 1
# Restore nur der zuletzt perturbierten Neuronen
for n in self._perturbed:
await n.send(("weight_restore",))
# Stop-Kriterium?
if self.attempt >= self.MAX_ATTEMPTS:
print(f"[Exoself] STOP. Best fitness={self.highest_fitness:.6f} evals={self.eval_acc} cycles={self.cycle_acc}")
await self._backup_genotype()
await self._terminate_all()
return {
"best_fitness": self.highest_fitness,
"eval_acc": self.eval_acc,
"cycle_acc": self.cycle_acc,
"time_acc": self.time_acc,
}
# Perturbiere Teilmenge der Neuronen
tot = len(self.neuron_actors)
mp = 1.0 / math.sqrt(max(1, tot))
self._perturbed = [n for n in self.neuron_actors if random.random() < mp]
for n in self._perturbed:
await n.send(("weight_perturb",))
# Nächste Episode starten
await self.cx_actor.send(("reactivate",))
# ---------- Backup Genotype ----------
async def _backup_genotype(self):
"""
Holt von allen Neuronen die aktuellen Weights und schreibt sie in self.g.
Speichert optional in self.file_name.
"""
# 1) Request
remaining = len(self.neuron_actors)
for n in self.neuron_actors:
await n.send(("get_backup",))
# 2) Collect vom Cortex-Postfach (Neuronen senden an cx_pid → cx leitet an Exoself weiter
# oder du hast sie direkt an Exoself schicken lassen; falls direkt an Cortex, dann
# lausche hier stattdessen auf self.cx_actor.inbox. In deinem Neuron-Code geht es an cx_pid,
# und in deiner bisherigen Implementierung hast du aus dem Cortex-Postfach gelesen.
# Hier vereinfachen wir: Neuronen senden direkt an EXOSELF (passe Neuron ggf. an).
backups: List[Tuple[Any, List[Tuple[Any, List[float]]]]] = []
while remaining > 0:
msg = await self.inbox.get()
if msg[0] == "backup_from_neuron":
_, nid, idps = msg
backups.append((nid, idps))
remaining -= 1
# 3) Update JSON
# exoself.py -> in _backup_genotype()
id2n = {n["id"]: n for n in self.g["neurons"]}
for nid, idps in backups:
if nid not in id2n:
continue
new_iw = []
bias_val = None
for item in idps:
if isinstance(item[0], str) and item[0] == "bias":
bias_val = float(item[1]) if not isinstance(item[1], list) else float(item[1][0])
else:
input_id, weights = item
new_iw.append({"input_id": input_id, "weights": list(weights)})
id2n[nid]["input_weights"] = new_iw
# Bias mit abspeichern (Variante B):
if bias_val is not None:
id2n[nid].setdefault("input_weights", []).append({"input_id": "bias", "weights": [bias_val]})
# 4) Save
if self.file_name:
with open(self.file_name, "w") as f:
json.dump(self.g, f, indent=2)
print(f"[Exoself] Genotype updated → {self.file_name}")
# ---------- Termination ----------
async def _terminate_all(self):
for a in self.sensor_actors + self.neuron_actors + self.actuator_actors:
await a.send(("terminate",))
if self.cx_actor:
await self.cx_actor.send(("terminate",))
for t in self.tasks:
if not t.done():
t.cancel()
self.tasks.clear()

View File

@@ -0,0 +1,96 @@
{
"cortex": {
"id": 0.0873421806271496,
"sensor_ids": [
5.6856379409509e-10
],
"actuator_ids": [
5.685637940950896e-10
],
"neuron_ids": [
0.6384794610574114,
0.6023131059017351,
0.39277072802910173
]
},
"sensor": {
"id": 5.6856379409509e-10,
"name": "xor_GetInput",
"vector_length": 2,
"cx_id": 0.0873421806271496,
"fanout_ids": [
0.6384794610574114,
0.6023131059017351
]
},
"actuator": {
"id": 5.685637940950896e-10,
"name": "xor_SendOutput",
"vector_length": 1,
"cx_id": 0.0873421806271496,
"fanin_ids": [
0.39277072802910173
]
},
"neurons": [
{
"id": 0.6384794610574114,
"layer_index": 0,
"cx_id": 0.0873421806271496,
"activation_function": "tanh",
"input_weights": [
{
"input_id": 5.6856379409509e-10,
"weights": [
-1.241701053140722,
-0.9643293453192259
]
}
],
"output_ids": [
0.20086494757397733
]
},
{
"id": 0.6023131059017351,
"layer_index": 0,
"cx_id": 0.0873421806271496,
"activation_function": "tanh",
"input_weights": [
{
"input_id": 5.6856379409509e-10,
"weights": [
-0.38795737506820904,
-0.5125123920689245
]
}
],
"output_ids": [
0.20086494757397733
]
},
{
"id": 0.39277072802910173,
"layer_index": 1,
"cx_id": 0.0873421806271496,
"activation_function": "tanh",
"input_weights": [
{
"input_id": 0.6384794610574114,
"weights": [
-0.7426574958298033
]
},
{
"input_id": 0.6023131059017351,
"weights": [
-0.9823211499227558
]
}
],
"output_ids": [
5.685637940950896e-10
]
}
]
}

View File

@@ -0,0 +1,266 @@
# genotype.py
import json
import math
import random
import time
from typing import Dict, List, Tuple, Any, Optional
# -----------------------------
# Utilities / ID & Weights
# -----------------------------
def generate_id() -> float:
return random.random()
def create_neural_weights(vector_length: int) -> List[float]:
return [random.uniform(-2.0, 2.0) for _ in range(vector_length)]
# -----------------------------
# Morphology "Interface"
# -----------------------------
"""
Erwartete Morphologie-API (duck-typed):
- get_InitSensor(morphology) -> dict mit Feldern:
{
"id": <beliebig>,
"name": <sensor-funktionsname, z.B. "rng" oder "xor_GetInput">,
"vector_length": <int>,
# optional: "scape": <frei nutzbar>,
}
- get_InitActuator(morphology) -> dict mit Feldern:
{
"id": <beliebig>,
"name": <aktor-funktionsname, z.B. "pts" oder "xor_SendOutput">,
"vector_length": <int>,
# optional: "scape": <frei nutzbar>,
}
Du kannst z.B. ein kleines Modul/Objekt übergeben, das diese Funktionen bereitstellt.
"""
# -----------------------------
# Genotype-Erzeugung
# -----------------------------
def construct(
morphology_module,
hidden_layer_densities: List[int],
file_name: Optional[str] = None,
*,
add_bias: bool = False,
) -> Dict[str, Any]:
"""
Baut einen Genotyp (JSON-kompatibles Dict) analog zur Erlang-Version:
- wählt Initial-Sensor & -Aktuator aus Morphologie
- bestimmt Output-Layer-Dichte = actuator.vector_length
- erzeugt alle Neuronen-Schichten & Gewichte
- setzt fanout_ids/fanin_ids
- erstellt Cortex mit Listen der IDs
- speichert optional in file_name
add_bias: Wenn True, hängt pro Neuron einen Bias als ('bias', b) an die Input-Liste an.
Dein bisheriges JSON nutzt KEIN Bias-Element. Standard=False für Kompatibilität.
"""
# Seed wie im Buch
rnd_seed = time.time_ns() & 0xFFFFFFFF
random.seed(rnd_seed)
# 1) Sensor & Aktuator aus Morphologie
S = morphology_module.get_InitSensor(morphology_module)
A = morphology_module.get_InitActuator(morphology_module)
# Pflichtfelder normalisieren
sensor = {
"id": S.get("id", generate_id()),
"name": S["name"],
"vector_length": int(S["vector_length"]),
"cx_id": None, # wird später gesetzt
"fanout_ids": [], # wird später gesetzt
# optional:
# "scape": S.get("scape")
}
actuator = {
"id": A.get("id", generate_id()),
"name": A["name"],
"vector_length": int(A["vector_length"]),
"cx_id": None, # wird später gesetzt
"fanin_ids": [], # wird später gesetzt
# optional:
# "scape": A.get("scape")
}
# 2) Output-Layer-Dichte = actuator.vector_length (wie im Buch)
output_vl = actuator["vector_length"]
layer_densities = list(hidden_layer_densities) + [output_vl]
# 3) Cortex-ID festlegen
cortex_id = generate_id()
# 4) Neuronen-Schichten erzeugen
layers = _create_neuro_layers(
cx_id=cortex_id,
sensor=sensor,
actuator=actuator,
layer_densities=layer_densities,
add_bias=add_bias,
)
# 5) Sensor/Aktuator mit Cortex/Fanout/Fanin verbinden
input_layer = layers[0]
output_layer = layers[-1]
sensor["cx_id"] = cortex_id
sensor["fanout_ids"] = [n["id"] for n in input_layer]
actuator["cx_id"] = cortex_id
actuator["fanin_ids"] = [n["id"] for n in output_layer]
# 6) Cortex-Objekt
neuron_ids = [n["id"] for layer in layers for n in layer]
cortex = {
"id": cortex_id,
"sensor_ids": [sensor["id"]],
"actuator_ids": [actuator["id"]],
"neuron_ids": neuron_ids,
}
# 7) Genotyp zusammensetzen
genotype = {
"cortex": cortex,
"sensor": sensor,
"actuator": actuator,
"neurons": [n for layer in layers for n in layer],
}
# 8) Optional speichern
if file_name:
save_genotype(file_name, genotype)
return genotype
def _create_neuro_layers(
cx_id: float,
sensor: Dict[str, Any],
actuator: Dict[str, Any],
layer_densities: List[int],
*,
add_bias: bool,
) -> List[List[Dict[str, Any]]]:
"""
Baut alle Neuronen-Schichten.
- Erste Schicht erhält als Input den Sensor (Vektor).
- Mittlere Schichten erhalten skalare Inputs von voriger Schicht.
- Letzte Schicht verbindet zum Aktuator (output_ids = [actuator.id]).
"""
layers: List[List[Dict[str, Any]]] = []
# Platzhalter-Input: (input_id, vector_length)
input_idps: List[Tuple[float, int]] = [(sensor["id"], sensor["vector_length"])]
for layer_index, layer_density in enumerate(layer_densities):
# Neuron-IDs generieren
neuron_ids = [generate_id() for _ in range(layer_density)]
# output_ids: Standardmäßig IDs der nächsten Schicht (werden nach dem Layer bekannt)
if layer_index < len(layer_densities) - 1:
# IDs der nächste Schicht vorab erzeugen (für output_ids)
next_ids = [generate_id() for _ in range(layer_densities[layer_index + 1])]
# Aber: wir erzeugen hier *nur* die IDs für output_ids, nicht die Neuronen der nächsten Schicht
output_ids = next_ids
else:
# letzte Schicht → zum Aktuator
output_ids = [actuator["id"]]
# Layer-Neuronen erzeugen
this_layer: List[Dict[str, Any]] = []
for _nid in neuron_ids:
proper_input = _create_neural_input(input_idps, add_bias=add_bias)
neuron = {
"id": _nid,
"layer_index": layer_index,
"cx_id": cx_id,
"activation_function": "tanh",
# JSON-Konvention: [{"input_id": id, "weights": [...]}, ...]
"input_weights": [{"input_id": i, "weights": w} for (i, w) in proper_input if not _is_bias_tuple((i, w))],
"output_ids": output_ids[:], # Kopie
}
this_layer.append(neuron)
layers.append(this_layer)
# Inputs für nächste Schicht: jetzt sind es skalare Outputs der aktuellen Neuronen
input_idps = [(n["id"], 1) for n in this_layer]
# WICHTIG: oben haben wir für mittlere Layer „künstliche“ next_ids erzeugt, damit output_ids befüllt waren.
# In deiner Laufzeit-Topologie ignorieren wir diese Platzhalter und verdrahten später im Exoself die
# tatsächlichen Actor-Referenzen schichtweise. Für den Genotyp sind die output_ids der Zwischenlayer
# nicht kritisch (du überschreibst sie ohnehin beim Phenotype-Mapping). Die letzte Schicht zeigt korrekt auf den Aktuator.
return layers
def _is_bias_tuple(t: Tuple[Any, Any]) -> bool:
"""Hilfsfunktion falls du später Bias im JSON aufnehmen willst."""
key, _ = t
return isinstance(key, str) and key == "bias"
def _create_neural_input(
input_idps: List[Tuple[float, int]],
*,
add_bias: bool,
) -> List[Tuple[Any, List[float]]]:
"""
Wandelt Platzhalter (input_id, vector_length) in (input_id, weights[]) um.
Optional einen Bias als ('bias', [b]) anhängen (Erlang-Variante).
"""
proper: List[Tuple[Any, List[float]]] = []
for input_id, vl in input_idps:
proper.append((input_id, create_neural_weights(vl)))
if add_bias:
proper.append(("bias", [random.random() - 0.5]))
return proper
# -----------------------------
# File I/O & Print
# -----------------------------
def save_genotype(file_name: str, genotype: Dict[str, Any]) -> None:
with open(file_name, "w") as f:
json.dump(genotype, f, indent=2)
def load_from_file(file_name: str) -> Dict[str, Any]:
with open(file_name, "r") as f:
return json.load(f)
def print_genotype(file_name: str) -> None:
g = load_from_file(file_name)
cx = g["cortex"]
print("[CORTEX]", cx)
sids = cx.get("sensor_ids", [])
nids = cx.get("neuron_ids", [])
aids = cx.get("actuator_ids", [])
# Indexe für schnellen Zugriff
nid2n = {n["id"]: n for n in g.get("neurons", [])}
sid2s = {g["sensor"]["id"]: g["sensor"]} if "sensor" in g else {s["id"]: s for s in g.get("sensors", [])}
aid2a = {g["actuator"]["id"]: g["actuator"]} if "actuator" in g else {a["id"]: a for a in g.get("actuators", [])}
for sid in sids:
print("[SENSOR]", sid2s.get(sid))
for nid in nids:
print("[NEURON]", nid2n.get(nid))
for aid in aids:
print("[ACTUATOR]", aid2a.get(aid))

View File

@@ -0,0 +1,104 @@
# morphology.py
import time
from typing import Any, Callable, Dict, List, Union
MorphologyType = Union[str, Callable[[str], List[Dict[str, Any]]]]
def generate_id() -> float:
"""
Zeitbasierte float-ID (ähnlich wie im Buch).
"""
now = time.time()
return 1.0 / now
# ------------------------------------------------------------
# Morphologie-API (duck-typed, wie im Erlang-Original)
# ------------------------------------------------------------
def get_InitSensor(morphology: MorphologyType) -> Dict[str, Any]:
sensors = get_Sensors(morphology)
if not sensors:
raise ValueError("Morphology has no sensors.")
return sensors[0]
def get_InitActuator(morphology: MorphologyType) -> Dict[str, Any]:
actuators = get_Actuators(morphology)
if not actuators:
raise ValueError("Morphology has no actuators.")
return actuators[0]
def get_Sensors(morphology: MorphologyType) -> List[Dict[str, Any]]:
fn = _resolve_morphology(morphology)
return fn("sensors")
def get_Actuators(morphology: MorphologyType) -> List[Dict[str, Any]]:
fn = _resolve_morphology(morphology)
return fn("actuators")
def _resolve_morphology(morphology: MorphologyType) -> Callable[[str], List[Dict[str, Any]]]:
"""
Akzeptiert:
- eine Callable Morphologie-Funktion (z.B. xor_mimic)
- einen String (z.B. "xor_mimic") -> per Registry auflösen
- ein Modul-Objekt (z.B. import morphology) -> Standard 'xor_mimic' aus dem Modul
"""
# 1) Bereits eine Callable-Funktion? (z.B. xor_mimic)
if callable(morphology):
return morphology
# 2) String -> Registry
if isinstance(morphology, str):
reg = {
"xor_mimic": xor_mimic,
# weitere Morphologien hier registrieren...
}
if morphology in reg:
return reg[morphology]
raise ValueError(f"Unknown morphology name: {morphology}")
# 3) Modul-Objekt: versuche eine Standard-Morphologie-Funktion daraus zu nehmen
# Hier nehmen wir 'xor_mimic' als Default (du kannst das generalisieren).
try:
# Ist es ein Modul mit einer Funktion 'xor_mimic'?
if hasattr(morphology, "xor_mimic") and callable(getattr(morphology, "xor_mimic")):
return getattr(morphology, "xor_mimic")
except Exception:
pass
raise TypeError("morphology must be a callable, a module with 'xor_mimic', or a registered string key")
# ------------------------------------------------------------
# Beispiel-Morphologie: XOR (wie im Buch)
# ------------------------------------------------------------
def xor_mimic(kind: str) -> List[Dict[str, Any]]:
"""
Liefert je nach 'kind' ('sensors' | 'actuators') die Definitionen.
Felder sind so benannt, dass sie direkt mit unserem Genotype/Phenotype-Code
kompatibel sind (vector_length statt 'vl', etc.).
"""
if kind == "sensors":
return [
{
"id": generate_id(),
"name": "xor_GetInput", # Sensorfunktion (muss in deinem Sensor-Actor implementiert sein)
"vector_length": 2,
"scape": {"private": "xor_sim"} # optional, falls du Scapes nutzt
}
]
elif kind == "actuators":
return [
{
"id": generate_id(),
"name": "xor_SendOutput", # Aktuatorfunktion (muss in deinem Actuator-Actor implementiert sein)
"vector_length": 1,
"scape": {"private": "xor_sim"} # optional
}
]
else:
raise ValueError(f"xor_mimic: unsupported kind '{kind}', expected 'sensors' or 'actuators'")

View File

@@ -0,0 +1,39 @@
import asyncio
import morphology
from experiments.stochastic_hillclimber.actors.trainer import Trainer
from genotype import construct, save_genotype
def test_genotype_construction():
"""
genotype_data = construct(morphology, hidden_layer_densities=[3, 2])
# Prüfen, ob Cortex, Sensor, Actuator und Neuronen existieren
assert "cortex" in genotype_data
assert len(genotype_data["neurons"]) == 3 + 2 + 1 # 3 in 1. HL, 2 in 2. HL, 1 Output
print("Genotype construction OK")
print("Cortex:", genotype_data["cortex"])
print("---------------------------------")
print("Neurons:", genotype_data["neurons"])
print("---------------------------------")
print("Actuators:", genotype_data["actuator"])
save_genotype("test.json", genotype_data)
"""
trainer = Trainer(
morphology_spec=morphology, # <— wichtig! callable oder "xor_mimic"
hidden_layer_densities=[2], # wie im Buchbeispiel
max_attempts=float("inf"), # MA=inf
eval_limit=float("inf"), # EL=inf
fitness_target=99.9, # FT=99.9
experimental_file="experimental.json",
best_file="best.json",
exoself_steps_per_eval=0, # 0 = Scape/Cortex entscheiden über Halt
)
asyncio.run(trainer.go())
test_genotype_construction()

View File

@@ -0,0 +1,105 @@
# actors/neuron.py
import math
import random
from actor import Actor
def tanh(x): return math.tanh(x)
class Neuron(Actor):
def __init__(self, nid, cx_pid, af_name, input_idps, output_pids):
super().__init__(f"Neuron-{nid}")
self.nid = nid
self.cx_pid = cx_pid
self.af = tanh if af_name == "tanh" else tanh
# input_idps: [(input_id, [w1, w2, ...])]
self.inputs = {}
self.order = []
for (inp_id, weights) in input_idps:
self.order.append(inp_id)
self.inputs[inp_id] = {"weights": list(weights), "got": False, "val": None}
# WICHTIG: Bias lernbar machen
self.bias = random.uniform(-2.0, 2.0)
# Für Backup/Restore
self._backup_inputs = None
self._backup_bias = None
self.outputs = output_pids
print(f"Neuron {nid}: inputs={list(self.inputs.keys())}, bias={self.bias}")
async def run(self):
while True:
msg = await self.inbox.get()
tag = msg[0]
if tag == "forward":
_, from_id, data = msg
if from_id not in self.inputs:
continue
slot = self.inputs[from_id]
slot["got"] = True
slot["val"] = data
if all(self.inputs[i]["got"] for i in self.order):
acc = 0.0
for i in self.order:
w = self.inputs[i]["weights"]
v = self.inputs[i]["val"]
if isinstance(v, list):
acc += sum(wj * vj for wj, vj in zip(w, v))
else:
acc += w[0] * float(v)
out = self.af(acc + self.bias)
for pid in self.outputs:
await pid.send(("forward", self.nid, [out]))
for i in self.order:
self.inputs[i]["got"] = False
self.inputs[i]["val"] = None
print(f"Neuron {self.nid}: input_sum={acc + self.bias:.3f}, output={out:.3f}")
elif tag == "get_backup":
# Packe Gewichte + Bias zurück
idps = [(i, self.inputs[i]["weights"]) for i in self.order]
idps.append(("bias", self.bias))
await self.cx_pid.send(("backup_from_neuron", self.nid, idps))
elif tag == "weight_backup":
# Tiefkopie der Gewichte + Bias
print(f"Neuron {self.nid}: backing up weights")
self._backup_inputs = {k: {"weights": v["weights"][:]} for k, v in self.inputs.items()}
self._backup_bias = self.bias
elif tag == "weight_restore":
if self._backup_inputs is not None:
for k in self.inputs:
self.inputs[k]["weights"] = self._backup_inputs[k]["weights"][:]
self.bias = self._backup_bias
elif tag == "weight_perturb":
# In neuron.py weight_perturb, add:
print(f"Neuron {self.nid}: perturbing {len([w for i in self.order for w in self.inputs[i]['weights']])} weights")
# MP wie im Buch: 1/sqrt(TotalWeightsIncludingBias)
tot_w = sum(len(self.inputs[i]["weights"]) for i in self.order) + 1
mp = 1 / math.sqrt(tot_w)
delta_mag = 2.0 * math.pi
sat_lim = 2.0 * math.pi
# Gewichte perturbieren
for i in self.order:
ws = self.inputs[i]["weights"]
for j in range(len(ws)):
if random.random() < mp:
ws[j] = _sat(ws[j] + (random.random() - 0.5) * delta_mag, -sat_lim, sat_lim)
# Bias perturbieren
if random.random() < mp:
self.bias = _sat(self.bias + (random.random() - 0.5) * delta_mag, -sat_lim, sat_lim)
elif tag == "terminate":
return
def _sat(val, lo, hi):
return lo if val < lo else (hi if val > hi else val)

View File

@@ -0,0 +1,58 @@
# actors/scape.py
from actor import Actor
import math
class XorScape(Actor):
def __init__(self):
super().__init__("XorScape")
self.data = [
([-1, -1], [-1]),
([1, -1], [1]),
([-1, 1], [1]),
([1, 1], [-1])
]
self.index = 0
self.error_acc = 0.0
self.last_actuator = None # <-- wer uns zuletzt "action" geschickt hat
async def run(self):
while True:
msg = await self.inbox.get()
tag = msg[0]
if tag == "sense":
print("SCAPE: got sensed by sensor...")
# Sensor fragt nach Input
_, sid, from_pid = msg
self.last_actuator = from_pid # merken, wer beteiligt ist
inputs, correct = self.data[self.index]
print("SENSOR input /correct: ", inputs, correct)
await from_pid.send(("percept", inputs))
elif tag == "action":
_, output, from_pid = msg
_, correct = self.data[self.index]
# Calculate RMSE like the Erlang version
step_error = sum((o - c) ** 2 for o, c in zip(output, correct))
step_rmse = math.sqrt(step_error) # This is the key change!
print(f"XOR PATTERN: Input={self.data[self.index][0]} → Network={output} → Expected={correct}")
self.index += 1
if self.index >= len(self.data):
# Episode finished - calculate final fitness
total_rmse = math.sqrt(self.error_acc + step_rmse) # Not step_error!
fitness = 1.0 / (total_rmse + 1e-5)
await from_pid.send(("result", fitness, 1))
self.index = 0
self.error_acc = 0.0
else:
# Continue episode
self.error_acc += step_rmse # Add RMSE, not raw error
await from_pid.send(("result", 0.0, 0))
elif tag == "terminate":
return

View File

@@ -0,0 +1,45 @@
# actors/sensor.py
from actor import Actor
import random
class Sensor(Actor):
def __init__(self, sid, cx_pid, name, vector_length, fanout_pids, scape=None):
super().__init__(f"Sensor-{sid}")
self.sid = sid
self.cx_pid = cx_pid
self.sname = name
self.vl = vector_length
self.fanout = fanout_pids
self.scape = scape
async def run(self):
while True:
print("sensor running...")
msg = await self.inbox.get()
tag = msg[0]
print("got sensor message: ", msg)
if tag == "sync":
vec = await self._sense()
print("sensed vec: ", vec)
for pid in self.fanout:
await pid.send(("forward", self.sid, vec))
elif tag == "terminate":
return
async def _sense(self):
if self.sname == "rng":
return [random.random() for _ in range(self.vl)]
elif self.sname == "xor_GetInput" and self.scape:
print("TODO")
await self.scape.send(("sense", self.sid, self))
msg = await self.inbox.get()
if msg[0] == "percept":
return msg[1]
else:
return [0.0] * self.vl
else:
return [0.0] * self.vl

View File

@@ -0,0 +1,179 @@
{
"cortex": {
"id": 5.685689537782505e-10,
"sensor_ids": [
5.685689537782522e-10
],
"actuator_ids": [
5.685689537782516e-10
],
"neuron_ids": [
5.685689537782498e-10,
5.685689537782496e-10,
5.685689537782496e-10,
5.685689537782457e-10,
5.685689537782457e-10,
5.685689537782437e-10
]
},
"sensor": {
"id": 5.685689537782522e-10,
"name": "xor_GetInput",
"vector_length": 2,
"cx_id": 5.685689537782505e-10,
"fanout_ids": [
5.685689537782498e-10,
5.685689537782496e-10,
5.685689537782496e-10
]
},
"actuator": {
"id": 5.685689537782516e-10,
"name": "xor_SendOutput",
"vector_length": 1,
"cx_id": 5.685689537782505e-10,
"fanin_ids": [
5.685689537782437e-10
]
},
"neurons": [
{
"id": 5.685689537782498e-10,
"layer_index": 0,
"cx_id": 5.685689537782505e-10,
"activation_function": "tanh",
"input_weights": [
{
"input_id": 5.685689537782522e-10,
"weights": [
-0.16508088565795254,
-0.2129110085532152
]
}
],
"output_ids": [
5.685689537782493e-10,
5.685689537782493e-10
]
},
{
"id": 5.685689537782496e-10,
"layer_index": 0,
"cx_id": 5.685689537782505e-10,
"activation_function": "tanh",
"input_weights": [
{
"input_id": 5.685689537782522e-10,
"weights": [
-0.2097398020485577,
0.39878180348846304
]
}
],
"output_ids": [
5.685689537782493e-10,
5.685689537782493e-10
]
},
{
"id": 5.685689537782496e-10,
"layer_index": 0,
"cx_id": 5.685689537782505e-10,
"activation_function": "tanh",
"input_weights": [
{
"input_id": 5.685689537782522e-10,
"weights": [
-0.29051024087142796,
0.4799280128038572
]
}
],
"output_ids": [
5.685689537782493e-10,
5.685689537782493e-10
]
},
{
"id": 5.685689537782457e-10,
"layer_index": 1,
"cx_id": 5.685689537782505e-10,
"activation_function": "tanh",
"input_weights": [
{
"input_id": 5.685689537782498e-10,
"weights": [
-0.13139529896484625
]
},
{
"input_id": 5.685689537782496e-10,
"weights": [
0.0922098630405015
]
},
{
"input_id": 5.685689537782496e-10,
"weights": [
0.07506465172430998
]
}
],
"output_ids": [
5.685689537782454e-10
]
},
{
"id": 5.685689537782457e-10,
"layer_index": 1,
"cx_id": 5.685689537782505e-10,
"activation_function": "tanh",
"input_weights": [
{
"input_id": 5.685689537782498e-10,
"weights": [
0.01874150935077279
]
},
{
"input_id": 5.685689537782496e-10,
"weights": [
0.1722685772992305
]
},
{
"input_id": 5.685689537782496e-10,
"weights": [
-0.07333424218267892
]
}
],
"output_ids": [
5.685689537782454e-10
]
},
{
"id": 5.685689537782437e-10,
"layer_index": 2,
"cx_id": 5.685689537782505e-10,
"activation_function": "tanh",
"input_weights": [
{
"input_id": 5.685689537782457e-10,
"weights": [
0.47060152728694127
]
},
{
"input_id": 5.685689537782457e-10,
"weights": [
0.33179877773351285
]
}
],
"output_ids": [
5.685689537782516e-10
]
}
]
}

View File

@@ -0,0 +1,151 @@
# trainer.py
import asyncio
import time
from typing import Any, Dict, List, Tuple, Optional
import morphology # dein morphology.py
from genotype import construct, save_genotype, print_genotype
from exoself import Exoself # deine Actor-basierte Exoself-Implementierung
class Trainer:
"""
Stochastischer Hillclimber wie im Erlang-Original.
- morphology_spec: entweder das morphology-Modul, ein String-Key, oder eine Callable-Morphologie
- hidden_layer_densities: z.B. [4, 3]
- max_attempts / eval_limit / fitness_target: Stoppbedingungen
- experimental_file / best_file: Pfade, falls du wie im Buch speichern willst
"""
def __init__(
self,
morphology_spec=morphology,
hidden_layer_densities: List[int] = None,
*,
max_attempts: int = 5,
eval_limit: float = float("inf"),
fitness_target: float = float("inf"),
experimental_file: Optional[str] = "experimental.json",
best_file: Optional[str] = "best.json",
exoself_steps_per_eval: int = 0,
):
self.morphology_spec = morphology_spec
self.hds = hidden_layer_densities or []
self.max_attempts = max_attempts
self.eval_limit = eval_limit
self.fitness_target = fitness_target
self.experimental_file = experimental_file
self.best_file = best_file
# Wenn deine Exoself/Cortex eine feste Anzahl Zyklen pro „Evaluation“ braucht,
# kannst du hier default 0 lassen (Cortex entscheidet über Halt-Flag),
# oder einen Wert setzen.
self.exoself_steps_per_eval = exoself_steps_per_eval
# Laufende Akkus (wie im Erlang-Code)
self.best_fitness = float("-inf")
self.best_genotype: Optional[Dict[str, Any]] = None
self.eval_acc = 0
self.cycle_acc = 0
self.time_acc = 0.0
async def _run_one_attempt(self) -> Tuple[float, int, int, float, Dict[str, Any]]:
"""
Ein Trainingsversuch:
- Genotyp konstruieren
- Exoself laufen lassen
- Fitness/Evals/Cycles/Time zurückgeben + den verwendeten Genotyp
"""
print("constructing genotype...")
geno = construct(self.morphology_spec, self.hds, file_name=self.experimental_file, add_bias=True)
# Exoself starten und bis zum evaluation_completed warten
fitness, evals, cycles, elapsed = await self._evaluate_with_exoself(geno)
return fitness, evals, cycles, elapsed, geno
async def _evaluate_with_exoself(self, genotype: Dict[str, Any]) -> Tuple[float, int, int, float]:
"""
Startet Exoself (deine Actor-basierte Variante) und wartet,
bis der Cortex die Evaluation abgeschlossen hat.
Erwartete Rückgabe: fitness, evals, cycles, elapsed
"""
print("creating exoself...")
ex = Exoself(genotype)
# Exoself.run() sollte idealerweise einen Tuple (fitness, evals, cycles, time)
# liefern. Falls deine Version aktuell „backup“-Listen liefert, ersetze das hier
# mit der passenden Logik oder benutze das „evaluation_completed“-Signal aus dem Cortex.
fitness, evals, cycles, elapsed = await ex.run_evaluation()
return fitness, evals, cycles, elapsed
async def go(self):
"""
Entspricht dem Erlang loop/…:
Wiederholt Versuche, bis Stoppbedingung erfüllt.
"""
attempt = 1
while True:
print(".........")
print("current attempt: ", attempt)
print(".........")
# Stoppbedingung vor Versuch?
if attempt > self.max_attempts or self.eval_acc >= self.eval_limit or self.best_fitness >= self.fitness_target:
# Ausgabe wie im Buch
if self.best_genotype and self.best_file:
# bestes Genotypfile ausgeben/„drucken“
save_genotype(self.best_file, self.best_genotype)
print_genotype(self.best_file)
print(
f" Morphology: {getattr(self.morphology_spec, '__name__', str(self.morphology_spec))} | "
f"Best Fitness: {self.best_fitness} | EvalAcc: {self.eval_acc}"
)
# Optional: an „Benchmarker“ melden bei dir vermutlich nicht nötig
return {
"best_fitness": self.best_fitness,
"eval_acc": self.eval_acc,
"cycle_acc": self.cycle_acc,
"time_acc": self.time_acc,
"best_file": self.best_file,
}
print("RUN ONE ATTEMPT!")
# --- Ein Versuch ---
fitness, evals, cycles, elapsed, geno = await self._run_one_attempt()
print("update akkus...")
# Akkus updaten
self.eval_acc += evals
self.cycle_acc += cycles
self.time_acc += elapsed
# Besser als bisher?
if fitness > self.best_fitness:
# „experimental.json“ → „best.json“ (semantisch wie file:rename(...))
self.best_fitness = fitness
self.best_genotype = geno
if self.best_file:
save_genotype(self.best_file, geno)
# Reset Attempt-Zähler (wie Erlang: Attempt=0 nach Verbesserung)
attempt = 1
else:
attempt += 1
# --------------------------
# Beispiel: ausführen
# --------------------------
if __name__ == "__main__":
# Beispielkonfiguration (XOR-Morphologie, kleine Topologie)
trainer = Trainer(
morphology_spec=morphology, # oder morphology.xor_mimic
hidden_layer_densities=[2], # wie im Buch-Beispiel
max_attempts=1000,
eval_limit=float("inf"),
fitness_target=float("inf"),
experimental_file="experimental.json",
best_file="best.json",
exoself_steps_per_eval=0, # 0 => Cortex/Scape steuern Halt-Flag
)
asyncio.run(trainer.go())