Files
neuroevolution/mathema/core/exoself.py
2026-02-21 10:58:05 +01:00

487 lines
18 KiB
Python

import asyncio
import json
import math
import random
import logging
from collections import defaultdict
from typing import Any, Dict, List, Tuple, Optional
from mathema.genotype.neo4j.genotype import load_genotype_snapshot, persist_neuron_backups
from mathema.actors.actor import Actor
from mathema.actors.cortex import Cortex
from mathema.actors.sensor import Sensor
from mathema.actors.neuron import Neuron
from mathema.actors.actuator import Actuator
from mathema.scape.car_racing import CarRacingScape
from mathema.envs.openai_car_racing import CarRacing
log = logging.getLogger(__name__)
class Exoself(Actor):
"""
Exoself actor coordinating genotype-driven agent evaluation and learning.
The Exoself represents the *outer control loop* of an agent in the mathema
framework. It is responsible for:
- loading a genotype snapshot from persistent storage (Neo4j),
- constructing the executable phenotype (Sensors, Neurons, Actuators,
Cortex, and Scape),
- running repeated evaluation episodes,
- applying evolutionary weight perturbations,
- tracking and reporting fitness statistics,
- persisting improved parameters back to the genotype store.
Conceptually, Exoself corresponds to the “body/executive self” around a
cortex:
- the Cortex handles step-by-step execution and fitness accumulation,
- the Exoself handles episode-level control, learning, and persistence.
"""
def __init__(self, genotype: Dict[str, Any], file_name: Optional[str] = None):
super().__init__("Exoself")
self.monitor = None
self.agent_id = None
self.g = genotype
self.file_name = file_name
self.cx_actor: Optional[Cortex] = None
self.sensor_actors: List[Sensor] = []
self.neuron_actors: List[Neuron] = []
self.actuator_actors: List[Actuator] = []
self.tasks: List[asyncio.Task] = []
self.highest_fitness = float("-inf")
self.eval_acc = 0
self.cycle_acc = 0
self.time_acc = 0.0
self.attempt = 0
self.MAX_ATTEMPTS = 10
self.actuator_scape = None
self._perturbed: List[Neuron] = []
@classmethod
async def start(cls, agent_id: str, monitor):
"""
Method start takes agent_id and monitor as parameters and is a class method. It initializes some attributes of
the class and creates a task to run the _runner coroutine. If an exception is caught during execution, a placeholder
_Dummy class is returned.
"""
try:
g = await load_genotype_snapshot(agent_id)
except Exception as e:
log.error(f"[Exoself {agent_id}] START FAILED: {e!r}")
try:
await monitor.cast(("terminated", agent_id, float("-inf"), 0, 0, 0.0))
finally:
class _Dummy:
async def cast(self, *_a, **_k): pass
async def stop(self): pass
return _Dummy()
self = cls(g)
self.agent_id = agent_id
self.monitor = monitor
async def _runner():
fitness = float("-inf")
evals = cycles = 0
elapsed = 0.0
try:
fitness, evals, cycles, elapsed = await self.train_until_stop()
except Exception as err:
log.error(f"[Exoself {self.agent_id}] CRASH in train_until_stop(): {err!r}")
fitness = float("-inf")
evals = int(self.eval_acc)
cycles = int(self.cycle_acc)
elapsed = float(self.time_acc)
finally:
try:
await monitor.cast(("terminated", self.agent_id, fitness, evals, cycles, elapsed))
except Exception as err:
log.error(f"[Exoself {self.agent_id}] FAILED to notify monitor: {err!r}")
loop = asyncio.get_running_loop()
self._runner_task = loop.create_task(_runner(), name=f"Exoself-{self.agent_id}")
return self
@staticmethod
def from_file(path: str) -> "Exoself":
with open(path, "r") as f:
g = json.load(f)
return Exoself(g, file_name=path)
async def run(self):
"""
run loop of the exoself. Builds the network (mapping from genotype to phenotype=
and waits for messages of the cortex.
"""
self.build_pid_map_and_spawn()
self._link_cortex()
for a in self.sensor_actors + self.neuron_actors + self.actuator_actors + [self.actuator_scape]:
self.tasks.append(asyncio.create_task(a.run()))
while True:
msg = await self.inbox.get()
tag = msg[0]
if tag == "evaluation_completed":
_, fitness, cycles, elapsed = msg
await self._on_evaluation_completed(fitness, cycles, elapsed)
elif tag == "terminate":
await self._terminate_all()
return
async def run_evaluation(self):
"""
Description:
Method to run evaluation of exoself by building network and linking the components,
spawning PID map, linking cortex, and running sensor, neuron, actuator actors.
It processes messages from the inbox and terminates upon specific tags.
Parameters:
None
Return:
Tuple containing evaluation results in the format (fitness: float, flag: int, cycles: int, elapsed: float)
"""
log.debug(f"exoself: build network and link...")
self.build_pid_map_and_spawn()
log.debug(f"exoself: link cortex...")
self._link_cortex()
for a in self.sensor_actors + self.neuron_actors + self.actuator_actors:
self.tasks.append(asyncio.create_task(a.run()))
if self.actuator_scape:
self.tasks.append(asyncio.create_task(self.actuator_scape.run()))
while True:
msg = await self.inbox.get()
log.debug("message in exsoself %r: ", msg)
tag = msg[0]
if tag == "evaluation_completed":
_, fitness, cycles, elapsed = msg
await self._terminate_all()
return float(fitness), 1, int(cycles), float(elapsed)
elif tag == "terminate":
await self._terminate_all()
return float("-inf"), 0, 0, 0.0
def build_pid_map_and_spawn(self):
"""
Builds the PID map for the Cortex actor and spawns Neuron, Actuator, and Sensor actors.
Parameters:
- self: reference to the class instance
Returns:
- None
"""
cx = self.g["cortex"]
self.cx_actor = Cortex(
cid=cx["id"],
exoself_pid=self,
sensor_pids=[],
neuron_pids=[],
actuator_pids=[]
)
env = CarRacing(seed_value=5, render_mode=None)
self.actuator_scape = CarRacingScape(env=env)
layers: Dict[int, List[Dict[str, Any]]] = defaultdict(list)
for n in self.g["neurons"]:
layers[n["layer_index"]].append(n)
ordered_layers = [layers[i] for i in sorted(layers)]
id2neuron_actor: Dict[Any, Neuron] = {}
for layer in ordered_layers:
for n in layer:
input_idps = [
(iw["input_id"], iw["weights"], bool(iw.get("recurrent", False)))
for iw in n["input_weights"]
]
neuron = Neuron(
nid=n["id"],
cx_pid=self.cx_actor,
af_name=n.get("activation_function", "tanh"),
input_idps=input_idps,
output_pids=[],
bias=n.get("bias")
)
id2neuron_actor[n["id"]] = neuron
self.neuron_actors.append(neuron)
out_map: Dict[Any, set] = {nid: set() for nid in id2neuron_actor.keys()}
for layer in ordered_layers:
for tgt in layer:
tgt_pid = id2neuron_actor[tgt["id"]]
for iw in tgt["input_weights"]:
src_id = iw["input_id"]
if src_id in id2neuron_actor:
out_map[src_id].add(tgt_pid)
for src_id, targets in out_map.items():
id2neuron_actor[src_id].outputs = list(targets)
actuators = self._get_actuators_block()
if not actuators:
log.error(f"genotype does not include 'actuator' or 'actuators' section")
raise ValueError("Genotype must include 'actuator' or 'actuators'.")
for a in actuators:
fanin_ids = a.get("fanin_ids", [])
expect = len(fanin_ids) if fanin_ids else 0
actuator = Actuator(
aid=a["id"],
cx_pid=self.cx_actor,
name=a["name"],
fanin_ids=fanin_ids,
expect_count=expect,
scape=self.actuator_scape
)
self.actuator_actors.append(actuator)
for a in self.actuator_actors:
for src_id in a.fanin_ids:
assert src_id in id2neuron_actor, f"Actuator {a.aid}: fanin_id {src_id} ist kein Neuron"
if src_id in id2neuron_actor:
na = id2neuron_actor[src_id]
if a not in na.outputs:
na.outputs.append(a)
sensors = self._get_sensors_block()
if not sensors:
log.error(f"Genotype must include 'sensor' or 'sensors'.")
raise ValueError("Genotype must include 'sensor' or 'sensors'.")
sensor_targets: Dict[Any, List[Neuron]] = defaultdict(list)
for layer in ordered_layers:
for tgt in layer:
tgt_pid = id2neuron_actor[tgt["id"]]
for iw in tgt["input_weights"]:
src_id = iw["input_id"]
if src_id != "bias":
sensor_targets[src_id].append(tgt_pid)
for s in sensors:
fanout_pids = sensor_targets.get(s["id"], [])
sensor = Sensor(
sid=s["id"],
cx_pid=self.cx_actor,
name=s["name"],
vector_length=s["vector_length"],
fanout_pids=fanout_pids,
scape=self.actuator_scape
)
self.sensor_actors.append(sensor)
def _get_sensors_block(self) -> List[Dict[str, Any]]:
if "sensors" in self.g:
return list(self.g["sensors"])
if "sensor" in self.g:
return [self.g["sensor"]]
return []
def _get_actuators_block(self) -> List[Dict[str, Any]]:
if "actuators" in self.g:
return list(self.g["actuators"])
if "actuator" in self.g:
return [self.g["actuator"]]
return []
def _link_cortex(self):
self.cx_actor.sensors = [a for a in self.sensor_actors if a]
self.cx_actor.neurons = [a for a in self.neuron_actors if a]
self.cx_actor.actuators = [a for a in self.actuator_actors if a]
self.cx_actor.awaiting_sync = set(a.aid for a in self.cx_actor.actuators)
self.tasks.append(asyncio.create_task(self.cx_actor.run()))
async def train_until_stop(self):
"""
train_until_stop method runs the training until the stop condition is met. It builds the PID map and spawns
necessary components, including sensor actors, neuron actors, and actuator actors. If an actuator scape is present,
it runs the actuator scape as well.
The method continuously waits for incoming messages from the inbox and processes them based on the message tag.
If the tag is "evaluation_completed," it calls the _on_evaluation_completed method with the received fitness, cycles,
and elapsed time. If the _on_evaluation_completed method returns a dictionary, the method returns a tuple containing
the highest fitness, evaluation accuracy, cycle accuracy, and time accuracy.
If the message tag is "terminate," the method calls the _terminate_all method to stop the training process and
returns a tuple with negative infinity for fitness and zeros for other metrics.
This method does not return any value explicitly during normal training execution.
"""
self.build_pid_map_and_spawn()
self._link_cortex()
for a in self.sensor_actors + self.neuron_actors + self.actuator_actors:
self.tasks.append(asyncio.create_task(a.run()))
if self.actuator_scape:
self.tasks.append(asyncio.create_task(self.actuator_scape.run()))
while True:
msg = await self.inbox.get()
tag = msg[0]
if tag == "evaluation_completed":
_, fitness, cycles, elapsed = msg
maybe_stats = await self._on_evaluation_completed(fitness, cycles, elapsed)
if isinstance(maybe_stats, dict):
return (
float(self.highest_fitness),
int(self.eval_acc),
int(self.cycle_acc),
float(self.time_acc),
)
elif tag == "terminate":
await self._terminate_all()
return float("-inf"), 0, 0, 0.0
async def _on_evaluation_completed(self, fitness: float, cycles: int, elapsed: float):
"""
This method _on_evaluation_completed is an asynchronous function that handles the completion
of an evaluation process.
Parameters:
- fitness: a float representing the fitness value obtained from the evaluation process.
- cycles: an integer indicating the number of cycles involved in the evaluation.
- elapsed: a float representing the elapsed time for the evaluation process.
This method updates internal counters and logs information about the evaluation process. It also performs
actions based on the evaluation results, such as updating the highest fitness value, backing up weights,
or restoring weights of neuron actors.
If the number of attempts reaches the maximum allowed attempts, it stops the evaluation process,
backs up the genotype, terminates all actors, and returns a dictionary containing information
about the best fitness value, evaluation count, cycle count, and accumulated time.
Finally, it calculates the perturbation probability, selects a subset of neuron actors for weight perturbation,
sends perturbation commands to selected neuron actors, and reactivates the cx_actor.
Note: This method does not have a return statement for successful execution. If an error occurs during the
episode_done message sending, it will ignore the exception. No additional
errors or exceptions are caught or handled in this method.
"""
self.eval_acc += 1
self.cycle_acc += int(cycles)
self.time_acc += float(elapsed)
log.info(f"[Exoself] evaluation_completed: fitness={fitness:.6f} cycles={cycles} time={elapsed:.3f}s")
log.info(f"[Exoself] attempt {self.attempt}")
REL = 1e-4
ABS_EPS = 1e-2
valid_fitness = isinstance(fitness, (int, float)) and math.isfinite(fitness)
if not valid_fitness:
fitness = float("-inf")
# --- NEW: per-episode log to monitor (only if finite) ---
if valid_fitness:
try:
# fitness is the episodic return from Cortex.fitness_acc
await self.monitor.cast(("episode_done", str(self.agent_id), float(fitness), int(self.eval_acc)))
except Exception:
pass
thresh = max(ABS_EPS, REL * abs(self.highest_fitness if math.isfinite(self.highest_fitness) else 0.0))
improved = fitness > self.highest_fitness + thresh
if improved:
self.highest_fitness = fitness
self.attempt = 0
for n in self.neuron_actors:
await n.send(("weight_backup",))
else:
self.attempt += 1
for n in self._perturbed:
await n.send(("weight_restore",))
if self.attempt >= self.MAX_ATTEMPTS:
log.info(
f"[Exoself] STOP. Best fitness={self.highest_fitness:.6f} "
f"evals={self.eval_acc} cycles={self.cycle_acc}")
await self._backup_genotype()
await self._terminate_all()
return {
"best_fitness": self.highest_fitness,
"eval_acc": self.eval_acc,
"cycle_acc": self.cycle_acc,
"time_acc": self.time_acc,
}
tot = len(self.neuron_actors)
mp = 1.0 / math.sqrt(max(1, tot))
self._perturbed = [n for n in self.neuron_actors if random.random() < mp]
for n in self._perturbed:
await n.send(("weight_perturb",))
await self.cx_actor.send(("reactivate",))
async def _backup_genotype(self):
remaining = len(self.neuron_actors)
for n in self.neuron_actors:
await n.send(("get_backup",))
backups: List[Tuple[Any, List[Tuple[Any, List[float]]]]] = []
while remaining > 0:
msg = await self.inbox.get()
if msg[0] == "backup_from_neuron":
_, nid, idps = msg
backups.append((str(nid), idps))
remaining -= 1
bias_rows = []
edge_rows = []
for nid, idps in backups:
for inp_id, weights in idps:
if inp_id == "bias":
b = float(weights[0])
bias_rows.append({"nid": str(nid), "bias": b})
else:
edge_rows.append({
"from_id": str(inp_id),
"to_id": str(nid),
"weights": [float(x) for x in list(weights)],
})
await persist_neuron_backups(bias_rows, edge_rows)
if self.file_name:
log.debug("[Exoself] Hinweis: file_name gesetzt, aber Persistenz läuft über Genotyp-API (Neo4j).")
async def _terminate_all(self):
for a in self.sensor_actors + self.neuron_actors + self.actuator_actors:
await a.send(("terminate",))
if self.cx_actor:
await self.cx_actor.send(("terminate",))
for t in self.tasks:
if not t.done():
t.cancel()
self.tasks.clear()