""" module used to monitor a population of neural networks solving a shared or isolated scape. """ from __future__ import annotations import asyncio import json import math import os import random import uuid import logging from dataclasses import dataclass, field from typing import Any, List, Literal, Optional, Sequence, Tuple, Callable, Awaitable from mathema.genotype.neo4j.genotype import ( neo4j, construct_agent, clone_agent, delete_agent, update_fingerprint, ) from mathema.genotype.neo4j.genotype_mutator import GenotypeMutator from mathema.utils import stats from mathema.core.exoself import Exoself log = logging.getLogger(__name__) OpTag = Literal["continue", "pause", "done"] SelectionAlgorithm = Literal["competition", "top3"] EFF: float = 0.05 SURVIVAL_PERCENTAGE: float = 0.5 SPECIE_SIZE_LIMIT: int = 10 INIT_SPECIE_SIZE: int = 10 GENERATION_LIMIT: int = 1000 EVALUATIONS_LIMIT: int = 100_000 FITNESS_GOAL: float = 6000 INIT_POPULATION_ID: str = "test" INIT_OP_MODE: str = "gt" INIT_SELECTION_ALGO: SelectionAlgorithm = "competition" INIT_CONSTRAINTS: list[dict] = [ {"morphology": "xor_mimic", "neural_afs": ["tanh"]}, ] EXOSELF_START: Optional[Callable[[str, "PopulationMonitor"], Awaitable[Any]]] = None def _new_id() -> str: return uuid.uuid4().hex def _mean(xs: Sequence[float]) -> float: return sum(xs) / len(xs) if xs else 0.0 def _std(xs: Sequence[float]) -> float: if not xs: return 0.0 m = _mean(xs) return math.sqrt(sum((x - m) ** 2 for x in xs) / len(xs)) async def _read_all(q: str, **params): return await neo4j.read_all(q, **params) async def _run(q: str, **params): return await neo4j.run_consume(q, **params) @dataclass class MonitorState: op_mode: str population_id: str selection_algorithm: SelectionAlgorithm op_tag: OpTag = "continue" active: List[Tuple[str, Any]] = field(default_factory=list) agent_ids: List[str] = field(default_factory=list) tot_agents: int = 0 agents_left: int = 0 pop_gen: int = 0 eval_acc: int = 0 cycle_acc: int = 0 time_acc: int = 0 rows: List[dict] = field(default_factory=list) async def _population_aggregate(population_id: str) -> dict: rows = await _read_all(""" MATCH (a:agent {population_id:$pid}) RETURN collect(coalesce(toFloat(a.fitness),0.0)) AS fs """, pid=str(population_id)) fs = [float(x) for x in (rows[0]["fs"] if rows else [])] if not fs: return {"cum_fitness": 0.0, "avg": 0.0, "std": 0.0, "best": 0.0, "min": 0.0, "n": 0, "agents": 0} n = len(fs) s = sum(fs) m = s / n var = sum((x - m) ** 2 for x in fs) / n return { "cum_fitness": s, "avg": m, "std": math.sqrt(var), "best": max(fs), "min": min(fs), "n": n, "agents": n } class PopulationMonitor: """ Orchestriert Generationen: spawn -> warten -> selektieren/mutieren -> next. Expects exoself.start(agent_id, monitor) and exoself will cast(("terminated", aid, fitness, eval_acc, cycle_acc, time_acc)). """ def __init__(self, op_mode: str, population_id: str, selection_algorithm: SelectionAlgorithm): self.state = MonitorState(op_mode, population_id, selection_algorithm) self.inbox: asyncio.Queue = asyncio.Queue() self._task: Optional[asyncio.Task] = None self._stopped_evt = asyncio.Event() self.mutator = GenotypeMutator(neo4j) self.gen_ended = asyncio.Event() self._exoself_start = EXOSELF_START or Exoself.start # logging stuff self.run_id = None self._t0 = None self._best_so_far = float("-inf") self.train_time_sec = 30*60 # logging file handles self._episodes_f = None self._progress_f = None @classmethod async def start(cls, op_mode: str, population_id: str, selection_algorithm: SelectionAlgorithm) -> "PopulationMonitor": self = cls(op_mode, population_id, selection_algorithm) self._task = asyncio.create_task(self._run(), name=f"PopulationMonitor-{population_id}") stats.register_atexit(population_id, lambda: list(self.state.rows)) # init logging loop = asyncio.get_running_loop() self._t0 = loop.time() self.run_id = f"{population_id}__{uuid.uuid4().hex[:8]}" os.makedirs(f"runs/{self.run_id}", exist_ok=True) self._episodes_f = open(f"runs/{self.run_id}/episodes.csv", "w", buffering=1) self._episodes_f.write("t_sec,pop_gen,agent_id,eval_idx,episode_return\n") self._progress_f = open(f"runs/{self.run_id}/progress.csv", "w", buffering=1) self._progress_f.write("t_sec,t_norm,pop_gen,best_gen,best_so_far,avg,std\n") self._deadline_task = asyncio.create_task(self._stop_after_deadline(), name=f"Deadline-{self.run_id}") await self._init_generation() return self async def _stop_after_deadline(self): await asyncio.sleep(self.train_time_sec) await self.inbox.put(("stop", "normal")) async def cast(self, msg: tuple) -> None: await self.inbox.put(msg) async def stop(self, mode: Literal["normal", "shutdown"] = "normal") -> None: await self.inbox.put(("stop", mode)) await self._stopped_evt.wait() async def _run(self) -> None: try: while True: msg = await self.inbox.get() tag = msg[0] if isinstance(msg, tuple) else None if tag == "stop": await self._handle_stop(msg[1]) break elif tag == "pause": if self.state.op_tag == "continue": self.state.op_tag = "pause" log.debug("Population Monitor will pause after this generation.") elif tag == "continue": if self.state.op_tag == "pause": self.state.op_tag = "continue" await self._init_generation() elif tag == "episode_done": await self._handle_episode_done(*msg[1:]) elif tag == "terminated": await self._handle_agent_terminated(*msg[1:]) else: pass finally: self._stopped_evt.set() async def _init_generation(self) -> None: s = self.state agent_ids = await self._extract_agent_ids(s.population_id) s.agent_ids = agent_ids s.tot_agents = len(agent_ids) s.agents_left = 0 active = [] for aid in agent_ids: try: handle = await self._exoself_start(aid, self) active.append((aid, handle)) s.agents_left += 1 except Exception as e: log.error(f"[Monitor] FAILED to start agent {aid}: {e!r}") await _run("MATCH (a:agent {id:$aid}) SET a.fitness = toFloat($f)", aid=str(aid), f=float("-inf")) s.active = active s.tot_agents = s.agents_left log.info(f"*** Population monitor started: pop={s.population_id}, mode={s.op_mode}, " f"selection={s.selection_algorithm}, agents={s.tot_agents}") async def _handle_stop(self, mode: str) -> None: s = self.state for (_aid, h) in list(s.active): try: if hasattr(h, "cast"): await h.cast(("terminate",)) elif hasattr(h, "stop"): await h.stop() except Exception: pass for f in (self._episodes_f, self._progress_f): if f is not None: try: f.flush() f.close() except Exception: pass self._episodes_f = None self._progress_f = None log.info(f"*** Population_Monitor:{s.population_id} shutdown. op_tag={s.op_tag}, op_mode={s.op_mode}") await neo4j.close() async def _handle_episode_done(self, agent_id: str, ep_return: float, eval_idx: int) -> None: s = self.state t_sec = asyncio.get_running_loop().time() - (self._t0 or 0.0) if self._episodes_f is not None: self._episodes_f.write(f"{t_sec:.6f},{s.pop_gen+1},{agent_id},{eval_idx},{ep_return:.10f}\n") async def _handle_agent_terminated(self, agent_id: str, fitness: float, agent_eval: int, agent_cycle: int, agent_time: int) -> None: log.info(f"agent terminated: , {agent_id}, {fitness}, {agent_eval}, {agent_cycle}, {agent_time}") s = self.state s.eval_acc += int(agent_eval) s.cycle_acc += int(agent_cycle) s.time_acc += int(agent_time) s.agents_left -= 1 await _run("MATCH (a:agent {id:$aid}) SET a.fitness = toFloat($f)", aid=str(agent_id), f=float(fitness)) s.active = [(aid, h) for (aid, h) in s.active if aid != agent_id] log.info(f"[Monitor] agent done: {agent_id} | agents_left={s.agents_left}/{s.tot_agents}") if 0 < s.agents_left <= 3: log.info("[Monitor] still active: %s", [str(aid) for (aid, _h) in s.active]) if s.agents_left <= 0: await self._generation_finished() async def _generation_finished(self) -> None: s = self.state await self._mutate_population(s.population_id, SPECIE_SIZE_LIMIT, s.selection_algorithm) s.pop_gen += 1 log.info(f"Population {s.population_id} generation {s.pop_gen} ended.\n") pop_stats = await _population_aggregate(s.population_id) # aggregate logging information t_sec = asyncio.get_running_loop().time() - (self._t0 or 0.0) t_norm = min(1.0, t_sec / self.train_time_sec) best_gen = float(pop_stats["best"]) self._best_so_far = max(self._best_so_far, best_gen) # write logs if self._progress_f is not None: self._progress_f.write( f"{t_sec:.6f},{t_norm:.6f},{s.pop_gen},{best_gen:.10f},{self._best_so_far:.10f}," f"{float(pop_stats['avg']):.10f},{float(pop_stats['std']):.10f}\n" ) s.rows.append({ "gen": int(s.pop_gen), "t_sec": int(asyncio.get_running_loop().time()), "cum_fitness": float(pop_stats["cum_fitness"]), "best": float(pop_stats["best"]), "avg": float(pop_stats["avg"]), "std": float(pop_stats["std"]), "agents": int(pop_stats["agents"]), "eval_acc": int(s.eval_acc), "cycle_acc": float(s.cycle_acc), "time_acc": float(s.time_acc), }) self.gen_ended.set() self.gen_ended = asyncio.Event() # check time now = asyncio.get_running_loop().time() elapsed = now - self._t0 time_limit_reached = elapsed >= self.train_time_sec if time_limit_reached: log.info( f"[Monitor] time limit reached " f"({elapsed:.1f}s >= {self.train_time_sec}s), stopping run" ) best = await self._best_fitness_in_population(s.population_id) end_condition = (s.pop_gen >= GENERATION_LIMIT) or (s.eval_acc >= EVALUATIONS_LIMIT) or (best > FITNESS_GOAL) or time_limit_reached if s.pop_gen >= GENERATION_LIMIT: log.info(f"reached generation limit {GENERATION_LIMIT}, stopping") if s.eval_acc >= EVALUATIONS_LIMIT: log.info(f"reached evaluation limit {EVALUATIONS_LIMIT}, stopping") if best > FITNESS_GOAL: log.info(f"reached best fitness {best}, stopping") if s.op_tag == "done" or end_condition: await self.inbox.put(("stop", "normal")) return if s.op_tag == "pause": log.info("Population Monitor paused.") return await self._init_generation() async def _ensure_population_node(self, population_id: str) -> None: await _run("MERGE (:population {id:$pid})", pid=str(population_id)) async def _ensure_specie_node(self, specie_id: str, population_id: str, constraint_json: str) -> None: await _run(""" MERGE (s:specie {id:$sid}) SET s.population_id = $pid, s.constraint_json = $cjson """, sid=str(specie_id), pid=str(population_id), cjson=str(constraint_json)) async def _extract_agent_ids(self, population_id: str) -> List[str]: rows = await _read_all("MATCH (a:agent {population_id:$pid}) RETURN a.id AS id ORDER BY id", pid=str(population_id)) return [str(r["id"]) for r in rows] async def _extract_specie_ids(self, population_id: str) -> List[str]: rows = await _read_all(""" MATCH (s:specie {population_id:$pid}) RETURN s.id AS id ORDER BY id """, pid=str(population_id)) return [str(r["id"]) for r in rows] async def _extract_specie_agent_ids(self, specie_id: str) -> List[str]: rows = await _read_all(""" MATCH (:specie {id:$sid})-[:HAS]->(a:agent) RETURN a.id AS id """, sid=str(specie_id)) return [str(r["id"]) for r in rows] async def _mutate_population(self, population_id: str, keep_tot: int, selection_algorithm: SelectionAlgorithm) -> None: energy_cost = await self._calculate_energy_cost(population_id) specie_ids = await self._extract_specie_ids(population_id) for sid in specie_ids: await self._mutate_specie(sid, keep_tot, energy_cost, selection_algorithm) async def _mutate_specie(self, specie_id: str, population_limit: int, neural_energy_cost: float, selection_algorithm: SelectionAlgorithm) -> None: agent_ids = await self._extract_specie_agent_ids(specie_id) summaries = await self._construct_agent_summaries(agent_ids) summaries.sort(key=lambda t: t[0], reverse=True) if not summaries: return if selection_algorithm == "competition": tot_survivors = round(len(summaries) * SURVIVAL_PERCENTAGE) weighted = sorted( [(fit / (max(tn, 1) ** EFF), (fit, tn, aid)) for (fit, tn, aid) in summaries], key=lambda x: x[0], reverse=True ) valid = [val for (_score, val) in weighted][:tot_survivors] invalid = [x for x in summaries if x not in valid] top3 = valid[:3] top_agent_ids = [aid for (_f, _tn, aid) in top3] for (_f, _tn, aid) in invalid: await delete_agent(aid) await _run(""" MATCH (:specie {id:$sid})-[h:HAS]->(a:agent {id:$aid}) DELETE h """, sid=str(specie_id), aid=str(aid)) new_ids = await self._competition(valid, population_limit, neural_energy_cost, specie_id) elif selection_algorithm == "top3": valid = summaries[:3] invalid = [x for x in summaries if x not in valid] top_agent_ids = [aid for (_f, _tn, aid) in valid] for (_f, _tn, aid) in invalid: await delete_agent(aid) await _run("MATCH (:specie {id:$sid})-[h:HAS]->(a:agent {id:$aid}) DELETE h", sid=str(specie_id), aid=str(aid)) offspring_needed = max(0, population_limit - len(top_agent_ids)) new_ids = await self._top3(top_agent_ids, offspring_needed, specie_id) else: log.error(f"Unknown selection algorithm: {selection_algorithm}") raise ValueError(f"Unknown selection algorithm: {selection_algorithm}") f_list = [f for (f, _tn, _aid) in summaries] avg, std, maxf, minf = _mean(f_list), _std(f_list), max(f_list), min(f_list) row = await _read_all("MATCH (s:specie {id:$sid}) RETURN toInteger(s.innovation_factor) AS inv", sid=str(specie_id)) inv = int(row[0]["inv"]) if row and row[0]["inv"] is not None else 0 u_inv = 0 if (maxf > inv) else (inv - 1) await _run(""" MATCH (s:specie {id:$sid}) SET s.fitness_avg = toFloat($avg), s.fitness_std = toFloat($std), s.fitness_max = toFloat($maxf), s.fitness_min = toFloat($minf), s.innovation_factor = toInteger($inv), s.champion_ids = $champs """, sid=str(specie_id), avg=float(avg), std=float(std), maxf=float(maxf), minf=float(minf), inv=int(u_inv), champs=[str(x) for x in top_agent_ids]) for aid in new_ids: try: await update_fingerprint(aid) except Exception: pass async def _competition(self, valid: List[Tuple[float, int, str]], population_limit: int, neural_energy_cost: float, specie_id: str) -> List[str]: alot, est = await self._calculate_alotments(valid, neural_energy_cost) normalizer = (est / population_limit) if population_limit > 0 else 1.0 log.debug(f"Population size normalizer: {normalizer:.4f}") return await self._gather_survivors(alot, normalizer, specie_id) async def _calculate_alotments(self, valid: List[Tuple[float, int, str]], neural_energy_cost: float ) -> Tuple[List[Tuple[float, float, int, str]], float]: acc: List[Tuple[float, float, int, str]] = [] new_pop_acc = 0.0 for (fit, tn, aid) in valid: neural_alot = (fit / neural_energy_cost) if neural_energy_cost > 0 else 0.0 mutant_alot = (neural_alot / max(tn, 1)) new_pop_acc += mutant_alot acc.append((mutant_alot, fit, tn, aid)) log.debug(f"NewPopAcc: {new_pop_acc:.4f}") return acc, new_pop_acc async def _gather_survivors(self, alot: List[Tuple[float, float, int, str]], normalizer: float, specie_id: str) -> \ List[str]: new_ids: List[str] = [] for (ma, fit, tn, aid) in alot: count = int(round(ma / normalizer)) if normalizer > 0 else 0 log.info(f"Agent {aid}: normalized allotment = {count}") if count >= 1: await _run(""" MATCH (s:specie {id:$sid}), (a:agent {id:$aid}) MERGE (s)-[:HAS]->(a) """, sid=str(specie_id), aid=str(aid)) new_ids.append(aid) k = count - 1 for _ in range(k): cid = await self._create_mutant_offspring(aid, specie_id) new_ids.append(cid) else: await delete_agent(aid) await _run("MATCH (:specie {id:$sid})-[h:HAS]->(a:agent {id:$aid}) DELETE h", sid=str(specie_id), aid=str(aid)) log.info(f"New Population Size (specie={specie_id}): {len(new_ids)}") return new_ids async def _create_mutant_offspring(self, parent_id: str, specie_id: str) -> str: clone_id = _new_id() await clone_agent(parent_id, clone_id) rows = await _read_all("MATCH (a:agent {id:$aid}) RETURN a.specie_id AS sid, a.population_id AS pid", aid=str(parent_id)) sid = rows[0]["sid"] if rows else specie_id pid = rows[0]["pid"] if rows else None await _run(""" MATCH (a:agent {id:$cid}) SET a.specie_id = $sid, a.population_id = $pid, a.fitness = NULL """, cid=str(clone_id), sid=str(sid), pid=str(pid)) await _run(""" MATCH (s:specie {id:$sid}), (a:agent {id:$cid}) MERGE (s)-[:HAS]->(a) """, sid=str(specie_id), cid=str(clone_id)) await self._mutate_one_step(clone_id) return clone_id async def _mutate_one_step(self, agent_id: str) -> None: ops = [ self.mutator.mutate_weights, self.mutator.add_bias, self.mutator.remove_bias, self.mutator.add_inlink, self.mutator.add_outlink, self.mutator.add_neuron, self.mutator.outsplice, self.mutator.add_actuator, ] op = random.choice(ops) try: await op(agent_id) except Exception: try: await self.mutator.mutate_weights(agent_id) except Exception: pass async def _top3(self, valid_ids: List[str], offspring_needed: int, specie_id: str) -> List[str]: new_ids = list(valid_ids) for _ in range(offspring_needed): parent = random.choice(valid_ids) cid = await self._create_mutant_offspring(parent, specie_id) new_ids.append(cid) return new_ids async def _construct_agent_summaries(self, agent_ids: Sequence[str]) -> List[Tuple[float, int, str]]: if not agent_ids: return [] rows = await _read_all(""" UNWIND $ids AS aid MATCH (a:agent {id:aid})-[:OWNS]->(cx:cortex) OPTIONAL MATCH (cx)-[:HAS]->(n:neuron) RETURN aid AS id, toFloat(a.fitness) AS f, count(n) AS k """, ids=[str(x) for x in agent_ids]) out: List[Tuple[float, int, str]] = [] for r in rows: f = float(r["f"]) if r["f"] is not None else 0.0 k = int(r["k"]) out.append((f, k, str(r["id"]))) return out async def _calculate_energy_cost(self, population_id: str) -> float: rows = await _read_all(""" MATCH (a:agent {population_id:$pid})-[:OWNS]->(cx:cortex) OPTIONAL MATCH (cx)-[:HAS]->(n:neuron) RETURN sum(coalesce(toFloat(a.fitness),0.0)) AS totE, count(n) AS totN """, pid=str(population_id)) if not rows: return 0.0 totE = float(rows[0]["totE"] or 0.0) totN = int(rows[0]["totN"] or 0) return (totE / totN) if totN > 0 else 0.0 async def _best_fitness_in_population(self, population_id: str) -> float: rows = await _read_all(""" MATCH (a:agent {population_id:$pid}) RETURN max(toFloat(a.fitness)) AS best """, pid=str(population_id)) return float(rows[0]["best"] or 0.0) if rows else 0.0 async def init_population(params: Tuple[str, List[dict], str, SelectionAlgorithm]) -> PopulationMonitor: """ params = (Population_Id, Specie_Constraints, OpMode, Selection_Algorithm) - erzeugt Population/Spezies-Knoten - konstruiert Agents (über dein construct_agent) - verknüpft Spezies->Agent, setzt agent.population_id - startet den Monitor """ population_id, specie_constraints, op_mode, selection_algorithm = params await delete_population(population_id) await _run("MERGE (:population {id:$pid})", pid=str(population_id)) # TODO: this has to move to the genotype module for specon in specie_constraints: sid = _new_id() await _run(""" MERGE (p:population {id:$pid}) MERGE (s:specie {id:$sid}) ON CREATE SET s.population_id = $pid, s.constraint_json = $cjson, s.innovation_factor = 0 ON MATCH SET s.population_id = $pid, s.constraint_json = $cjson MERGE (p)-[:HAS]->(s) """, pid=str(population_id), sid=str(sid), cjson=json.dumps(specon, separators=(",", ":"), sort_keys=True)) for _ in range(INIT_SPECIE_SIZE): aid = _new_id() await construct_agent(sid, aid, specon) # todo: this needs to move to the genotype await _run(""" MATCH (a:agent {id:$aid}), (s:specie {id:$sid}) SET a.population_id = $pid, a.specie_id = $sid, a.fitness = NULL MERGE (s)-[:HAS]->(a) """, aid=str(aid), sid=str(sid), pid=str(population_id)) monitor = await PopulationMonitor.start(op_mode, population_id, selection_algorithm) return monitor async def continue_(op_mode: str, selection_algorithm: SelectionAlgorithm, population_id: str = INIT_POPULATION_ID) -> PopulationMonitor: return await PopulationMonitor.start(op_mode, population_id, selection_algorithm) async def delete_population(population_id: str) -> None: await _run(""" MATCH (p:population {id:$pid}) OPTIONAL MATCH (s:specie {population_id:$pid}) OPTIONAL MATCH (s)-[:HAS]->(a:agent)-[:OWNS]->(cx:cortex) OPTIONAL MATCH (cx)-[:HAS]->(n:neuron) OPTIONAL MATCH (cx)-[:HAS]->(sen:sensor) OPTIONAL MATCH (cx)-[:HAS]->(act:actuator) DETACH DELETE p, s, a, cx, n, sen, act """, pid=str(population_id)) async def test() -> PopulationMonitor: return await init_population((INIT_POPULATION_ID, INIT_CONSTRAINTS, INIT_OP_MODE, INIT_SELECTION_ALGO))