Files
neuroevolution/experiments/stochastic_hillclimber/actors/exoself.py

313 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import json
import math
import random
from collections import defaultdict
from typing import Any, Dict, List, Tuple, Optional
from actor import Actor
from cortex import Cortex
from sensor import Sensor
from neuron import Neuron
from actuator import Actuator
from scape import XorScape
class Exoself(Actor):
def __init__(self, genotype: Dict[str, Any], file_name: Optional[str] = None):
super().__init__("Exoself")
self.g = genotype
self.file_name = file_name
self.cx_actor: Optional[Cortex] = None
self.sensor_actors: List[Sensor] = []
self.neuron_actors: List[Neuron] = []
self.actuator_actors: List[Actuator] = []
self.tasks: List[asyncio.Task] = []
# Training-Stats
self.highest_fitness = float("-inf")
self.eval_acc = 0
self.cycle_acc = 0
self.time_acc = 0.0
self.attempt = 0
self.MAX_ATTEMPTS = 50
self.actuator_scape = None
self._perturbed: List[Neuron] = []
@staticmethod
def from_file(path: str) -> "Exoself":
with open(path, "r") as f:
g = json.load(f)
return Exoself(g, file_name=path)
async def run(self):
self._build_pid_map_and_spawn()
self._link_cortex()
for a in self.sensor_actors + self.neuron_actors + self.actuator_actors + [self.actuator_scape]:
self.tasks.append(asyncio.create_task(a.run()))
while True:
msg = await self.inbox.get()
tag = msg[0]
if tag == "evaluation_completed":
_, fitness, cycles, elapsed = msg
await self._on_evaluation_completed(fitness, cycles, elapsed)
elif tag == "terminate":
await self._terminate_all()
return
async def run_evaluation(self):
print("build network and link...")
self._build_pid_map_and_spawn()
print("link cortex...")
self._link_cortex()
for a in self.sensor_actors + self.neuron_actors + self.actuator_actors:
self.tasks.append(asyncio.create_task(a.run()))
if self.actuator_scape:
self.tasks.append(asyncio.create_task(self.actuator_scape.run()))
print("network actors are running...")
while True:
msg = await self.inbox.get()
print("message in exsoself: ", msg)
tag = msg[0]
if tag == "evaluation_completed":
_, fitness, cycles, elapsed = msg
await self._terminate_all()
return float(fitness), 1, int(cycles), float(elapsed)
elif tag == "terminate":
await self._terminate_all()
return float("-inf"), 0, 0, 0.0
# ---------- Build ----------
def _build_pid_map_and_spawn(self):
"""
Baut Cortex, dann alle Neuronen (mit cx_pid=self.cx_actor), dann verlinkt Outputs schichtweise,
dann Sensoren/Aktuatoren (mit cx_pid=self.cx_actor). Achtung: Reihenfolge wichtig.
"""
cx = self.g["cortex"]
# Cortex zuerst (damit wir cx_pid an Kinder übergeben können)
self.cx_actor = Cortex(
cid=cx["id"],
exoself_pid=self,
sensor_pids=[],
neuron_pids=[],
actuator_pids=[]
)
self.actuator_scape = XorScape()
layers: Dict[int, List[Dict[str, Any]]] = defaultdict(list)
for n in self.g["neurons"]:
layers[n["layer_index"]].append(n)
ordered_layers = [layers[i] for i in sorted(layers)]
id2neuron_actor: Dict[Any, Neuron] = {}
for layer in ordered_layers:
for n in layer:
input_idps = [(iw["input_id"], iw["weights"]) for iw in n["input_weights"]]
neuron = Neuron(
nid=n["id"],
cx_pid=self.cx_actor,
af_name=n.get("activation_function", "tanh"),
input_idps=input_idps,
output_pids=[] # füllen wir gleich
)
id2neuron_actor[n["id"]] = neuron
self.neuron_actors.append(neuron)
for li in range(len(ordered_layers) - 1):
next_pids = [id2neuron_actor[nx["id"]] for nx in ordered_layers[li + 1]]
for n in ordered_layers[li]:
id2neuron_actor[n["id"]].outputs = next_pids
actuators = self._get_actuators_block()
if not actuators:
raise ValueError("Genotype must include 'actuator' or 'actuators'.")
for a in actuators:
fanin_ids = a.get("fanin_ids", [])
expect = len(fanin_ids) if fanin_ids else 0
actuator = Actuator(
aid=a["id"],
cx_pid=self.cx_actor,
name=a["name"],
fanin_ids=fanin_ids,
expect_count=expect,
scape=self.actuator_scape
)
self.actuator_actors.append(actuator)
if ordered_layers:
last_layer = ordered_layers[-1]
out_targets = self.actuator_actors
for n in last_layer:
id2neuron_actor[n["id"]].outputs = out_targets
sensors = self._get_sensors_block()
if not sensors:
raise ValueError("Genotype must include 'sensor' or 'sensors'.")
first_layer = ordered_layers[0] if ordered_layers else []
first_layer_pids = [id2neuron_actor[n["id"]] for n in first_layer]
for s in sensors:
sensor = Sensor(
sid=s["id"],
cx_pid=self.cx_actor,
name=s["name"],
vector_length=s["vector_length"],
fanout_pids=first_layer_pids,
scape=self.actuator_scape
)
self.sensor_actors.append(sensor)
def _get_sensors_block(self) -> List[Dict[str, Any]]:
if "sensors" in self.g:
return list(self.g["sensors"])
if "sensor" in self.g:
return [self.g["sensor"]]
return []
def _get_actuators_block(self) -> List[Dict[str, Any]]:
if "actuators" in self.g:
return list(self.g["actuators"])
if "actuator" in self.g:
return [self.g["actuator"]]
return []
def _link_cortex(self):
self.cx_actor.sensors = [a for a in self.sensor_actors if a]
self.cx_actor.neurons = [a for a in self.neuron_actors if a]
self.cx_actor.actuators = [a for a in self.actuator_actors if a]
self.cx_actor.awaiting_sync = set(a.aid for a in self.cx_actor.actuators)
self.tasks.append(asyncio.create_task(self.cx_actor.run()))
async def train_until_stop(self):
self._build_pid_map_and_spawn()
self._link_cortex()
# 2) Start tasks
for a in self.sensor_actors + self.neuron_actors + self.actuator_actors:
self.tasks.append(asyncio.create_task(a.run()))
if self.actuator_scape:
self.tasks.append(asyncio.create_task(self.actuator_scape.run()))
while True:
msg = await self.inbox.get()
tag = msg[0]
if tag == "evaluation_completed":
_, fitness, cycles, elapsed = msg
maybe_stats = await self._on_evaluation_completed(fitness, cycles, elapsed)
# _on_evaluation_completed() ruft bei Stop bereits _backup_genotype() und _terminate_all()
if isinstance(maybe_stats, dict):
# Trainingsende Daten aus self.* zurückgeben (wie im Buch: Fitness/Evals/Cycles/Time)
return (
float(self.highest_fitness),
int(self.eval_acc),
int(self.cycle_acc),
float(self.time_acc),
)
elif tag == "terminate":
await self._terminate_all()
return float("-inf"), 0, 0, 0.0
async def _on_evaluation_completed(self, fitness: float, cycles: int, elapsed: float):
self.eval_acc += 1
self.cycle_acc += int(cycles)
self.time_acc += float(elapsed)
print(f"[Exoself] evaluation_completed: fitness={fitness:.6f} cycles={cycles} time={elapsed:.3f}s")
REL = 1e-6
if fitness > self.highest_fitness * (1.0 + REL):
self.highest_fitness = fitness
self.attempt = 0
for n in self.neuron_actors:
await n.send(("weight_backup",))
else:
self.attempt += 1
for n in self._perturbed:
await n.send(("weight_restore",))
if self.attempt >= self.MAX_ATTEMPTS:
print(
f"[Exoself] STOP. Best fitness={self.highest_fitness:.6f} evals={self.eval_acc} cycles={self.cycle_acc}")
await self._backup_genotype()
await self._terminate_all()
return {
"best_fitness": self.highest_fitness,
"eval_acc": self.eval_acc,
"cycle_acc": self.cycle_acc,
"time_acc": self.time_acc,
}
tot = len(self.neuron_actors)
mp = 1.0 / math.sqrt(max(1, tot))
self._perturbed = [n for n in self.neuron_actors if random.random() < mp]
for n in self._perturbed:
await n.send(("weight_perturb",))
await self.cx_actor.send(("reactivate",))
async def _backup_genotype(self):
remaining = len(self.neuron_actors)
for n in self.neuron_actors:
await n.send(("get_backup",))
backups: List[Tuple[Any, List[Tuple[Any, List[float]]]]] = []
while remaining > 0:
msg = await self.inbox.get()
if msg[0] == "backup_from_neuron":
_, nid, idps = msg
backups.append((nid, idps))
remaining -= 1
id2n = {n["id"]: n for n in self.g["neurons"]}
for nid, idps in backups:
if nid not in id2n:
continue
new_iw = []
bias_val = None
for item in idps:
if isinstance(item[0], str) and item[0] == "bias":
bias_val = float(item[1]) if not isinstance(item[1], list) else float(item[1][0])
else:
input_id, weights = item
new_iw.append({"input_id": input_id, "weights": list(weights)})
id2n[nid]["input_weights"] = new_iw
if bias_val is not None:
id2n[nid].setdefault("input_weights", []).append({"input_id": "bias", "weights": [bias_val]})
if self.file_name:
with open(self.file_name, "w") as f:
json.dump(self.g, f, indent=2)
print(f"[Exoself] Genotype updated → {self.file_name}")
async def _terminate_all(self):
for a in self.sensor_actors + self.neuron_actors + self.actuator_actors:
await a.send(("terminate",))
if self.cx_actor:
await self.cx_actor.send(("terminate",))
for t in self.tasks:
if not t.done():
t.cancel()
self.tasks.clear()