Files
neuroevolution/mathema/core/population_monitor.py
2025-12-13 14:12:35 +01:00

659 lines
25 KiB
Python

"""
module used to monitor a population of neural networks
solving a shared or isolated scape.
"""
from __future__ import annotations
import asyncio
import json
import math
import os
import random
import uuid
import logging
from dataclasses import dataclass, field
from typing import Any, List, Literal, Optional, Sequence, Tuple, Callable, Awaitable
from mathema.genotype.neo4j.genotype import (
neo4j,
construct_agent,
clone_agent,
delete_agent,
update_fingerprint,
)
from mathema.genotype.neo4j.genotype_mutator import GenotypeMutator
from mathema.utils import stats
from mathema.core.exoself import Exoself
log = logging.getLogger(__name__)
OpTag = Literal["continue", "pause", "done"]
SelectionAlgorithm = Literal["competition", "top3"]
EFF: float = 0.05
SURVIVAL_PERCENTAGE: float = 0.5
SPECIE_SIZE_LIMIT: int = 10
INIT_SPECIE_SIZE: int = 10
GENERATION_LIMIT: int = 1000
EVALUATIONS_LIMIT: int = 100_000
FITNESS_GOAL: float = 6000
INIT_POPULATION_ID: str = "test"
INIT_OP_MODE: str = "gt"
INIT_SELECTION_ALGO: SelectionAlgorithm = "competition"
INIT_CONSTRAINTS: list[dict] = [
{"morphology": "xor_mimic", "neural_afs": ["tanh"]},
]
EXOSELF_START: Optional[Callable[[str, "PopulationMonitor"], Awaitable[Any]]] = None
def _new_id() -> str:
return uuid.uuid4().hex
def _mean(xs: Sequence[float]) -> float:
return sum(xs) / len(xs) if xs else 0.0
def _std(xs: Sequence[float]) -> float:
if not xs: return 0.0
m = _mean(xs)
return math.sqrt(sum((x - m) ** 2 for x in xs) / len(xs))
async def _read_all(q: str, **params):
return await neo4j.read_all(q, **params)
async def _run(q: str, **params):
return await neo4j.run_consume(q, **params)
@dataclass
class MonitorState:
op_mode: str
population_id: str
selection_algorithm: SelectionAlgorithm
op_tag: OpTag = "continue"
active: List[Tuple[str, Any]] = field(default_factory=list)
agent_ids: List[str] = field(default_factory=list)
tot_agents: int = 0
agents_left: int = 0
pop_gen: int = 0
eval_acc: int = 0
cycle_acc: int = 0
time_acc: int = 0
rows: List[dict] = field(default_factory=list)
async def _population_aggregate(population_id: str) -> dict:
rows = await _read_all("""
MATCH (a:agent {population_id:$pid})
RETURN collect(coalesce(toFloat(a.fitness),0.0)) AS fs
""", pid=str(population_id))
fs = [float(x) for x in (rows[0]["fs"] if rows else [])]
if not fs:
return {"cum_fitness": 0.0, "avg": 0.0, "std": 0.0, "best": 0.0, "min": 0.0, "n": 0, "agents": 0}
n = len(fs)
s = sum(fs)
m = s / n
var = sum((x - m) ** 2 for x in fs) / n
return {
"cum_fitness": s,
"avg": m,
"std": math.sqrt(var),
"best": max(fs),
"min": min(fs),
"n": n,
"agents": n
}
class PopulationMonitor:
"""
Orchestriert Generationen: spawn -> warten -> selektieren/mutieren -> next.
Expects exoself.start(agent_id, monitor) and exoself will cast(("terminated", aid, fitness, eval_acc, cycle_acc, time_acc)).
"""
def __init__(self, op_mode: str, population_id: str, selection_algorithm: SelectionAlgorithm):
self.state = MonitorState(op_mode, population_id, selection_algorithm)
self.inbox: asyncio.Queue = asyncio.Queue()
self._task: Optional[asyncio.Task] = None
self._stopped_evt = asyncio.Event()
self.mutator = GenotypeMutator(neo4j)
self.gen_ended = asyncio.Event()
self._exoself_start = EXOSELF_START or Exoself.start
# logging stuff
self.run_id = None
self._t0 = None
self._best_so_far = float("-inf")
self.train_time_sec = 30*60
# logging file handles
self._episodes_f = None
self._progress_f = None
@classmethod
async def start(cls, op_mode: str, population_id: str,
selection_algorithm: SelectionAlgorithm) -> "PopulationMonitor":
self = cls(op_mode, population_id, selection_algorithm)
self._task = asyncio.create_task(self._run(), name=f"PopulationMonitor-{population_id}")
stats.register_atexit(population_id, lambda: list(self.state.rows))
# init logging
loop = asyncio.get_running_loop()
self._t0 = loop.time()
self.run_id = f"{population_id}__{uuid.uuid4().hex[:8]}"
os.makedirs(f"runs/{self.run_id}", exist_ok=True)
self._episodes_f = open(f"runs/{self.run_id}/episodes.csv", "w", buffering=1)
self._episodes_f.write("t_sec,pop_gen,agent_id,eval_idx,episode_return\n")
self._progress_f = open(f"runs/{self.run_id}/progress.csv", "w", buffering=1)
self._progress_f.write("t_sec,t_norm,pop_gen,best_gen,best_so_far,avg,std\n")
self._deadline_task = asyncio.create_task(self._stop_after_deadline(), name=f"Deadline-{self.run_id}")
await self._init_generation()
return self
async def _stop_after_deadline(self):
await asyncio.sleep(self.train_time_sec)
await self.inbox.put(("stop", "normal"))
async def cast(self, msg: tuple) -> None:
await self.inbox.put(msg)
async def stop(self, mode: Literal["normal", "shutdown"] = "normal") -> None:
await self.inbox.put(("stop", mode))
await self._stopped_evt.wait()
async def _run(self) -> None:
try:
while True:
msg = await self.inbox.get()
tag = msg[0] if isinstance(msg, tuple) else None
if tag == "stop":
await self._handle_stop(msg[1])
break
elif tag == "pause":
if self.state.op_tag == "continue":
self.state.op_tag = "pause"
log.debug("Population Monitor will pause after this generation.")
elif tag == "continue":
if self.state.op_tag == "pause":
self.state.op_tag = "continue"
await self._init_generation()
elif tag == "episode_done":
await self._handle_episode_done(*msg[1:])
elif tag == "terminated":
await self._handle_agent_terminated(*msg[1:])
else:
pass
finally:
self._stopped_evt.set()
async def _init_generation(self) -> None:
s = self.state
agent_ids = await self._extract_agent_ids(s.population_id)
s.agent_ids = agent_ids
s.tot_agents = len(agent_ids)
s.agents_left = 0
active = []
for aid in agent_ids:
try:
handle = await self._exoself_start(aid, self)
active.append((aid, handle))
s.agents_left += 1
except Exception as e:
log.error(f"[Monitor] FAILED to start agent {aid}: {e!r}")
await _run("MATCH (a:agent {id:$aid}) SET a.fitness = toFloat($f)", aid=str(aid), f=float("-inf"))
s.active = active
s.tot_agents = s.agents_left
log.info(f"*** Population monitor started: pop={s.population_id}, mode={s.op_mode}, "
f"selection={s.selection_algorithm}, agents={s.tot_agents}")
async def _handle_stop(self, mode: str) -> None:
s = self.state
for (_aid, h) in list(s.active):
try:
if hasattr(h, "cast"):
await h.cast(("terminate",))
elif hasattr(h, "stop"):
await h.stop()
except Exception:
pass
for f in (self._episodes_f, self._progress_f):
if f is not None:
try:
f.flush()
f.close()
except Exception:
pass
self._episodes_f = None
self._progress_f = None
log.info(f"*** Population_Monitor:{s.population_id} shutdown. op_tag={s.op_tag}, op_mode={s.op_mode}")
await neo4j.close()
async def _handle_episode_done(self, agent_id: str, ep_return: float, eval_idx: int) -> None:
s = self.state
t_sec = asyncio.get_running_loop().time() - (self._t0 or 0.0)
if self._episodes_f is not None:
self._episodes_f.write(f"{t_sec:.6f},{s.pop_gen+1},{agent_id},{eval_idx},{ep_return:.10f}\n")
async def _handle_agent_terminated(self, agent_id: str, fitness: float, agent_eval: int, agent_cycle: int,
agent_time: int) -> None:
log.info(f"agent terminated: , {agent_id}, {fitness}, {agent_eval}, {agent_cycle}, {agent_time}")
s = self.state
s.eval_acc += int(agent_eval)
s.cycle_acc += int(agent_cycle)
s.time_acc += int(agent_time)
s.agents_left -= 1
await _run("MATCH (a:agent {id:$aid}) SET a.fitness = toFloat($f)", aid=str(agent_id), f=float(fitness))
s.active = [(aid, h) for (aid, h) in s.active if aid != agent_id]
log.info(f"[Monitor] agent done: {agent_id} | agents_left={s.agents_left}/{s.tot_agents}")
if 0 < s.agents_left <= 3:
log.info("[Monitor] still active: %s", [str(aid) for (aid, _h) in s.active])
if s.agents_left <= 0:
await self._generation_finished()
async def _generation_finished(self) -> None:
s = self.state
await self._mutate_population(s.population_id, SPECIE_SIZE_LIMIT, s.selection_algorithm)
s.pop_gen += 1
log.info(f"Population {s.population_id} generation {s.pop_gen} ended.\n")
pop_stats = await _population_aggregate(s.population_id)
# aggregate logging information
t_sec = asyncio.get_running_loop().time() - (self._t0 or 0.0)
t_norm = min(1.0, t_sec / self.train_time_sec)
best_gen = float(pop_stats["best"])
self._best_so_far = max(self._best_so_far, best_gen)
# write logs
if self._progress_f is not None:
self._progress_f.write(
f"{t_sec:.6f},{t_norm:.6f},{s.pop_gen},{best_gen:.10f},{self._best_so_far:.10f},"
f"{float(pop_stats['avg']):.10f},{float(pop_stats['std']):.10f}\n"
)
s.rows.append({
"gen": int(s.pop_gen),
"t_sec": int(asyncio.get_running_loop().time()),
"cum_fitness": float(pop_stats["cum_fitness"]),
"best": float(pop_stats["best"]),
"avg": float(pop_stats["avg"]),
"std": float(pop_stats["std"]),
"agents": int(pop_stats["agents"]),
"eval_acc": int(s.eval_acc),
"cycle_acc": float(s.cycle_acc),
"time_acc": float(s.time_acc),
})
self.gen_ended.set()
self.gen_ended = asyncio.Event()
# check time
now = asyncio.get_running_loop().time()
elapsed = now - self._t0
time_limit_reached = elapsed >= self.train_time_sec
if time_limit_reached:
log.info(
f"[Monitor] time limit reached "
f"({elapsed:.1f}s >= {self.train_time_sec}s), stopping run"
)
best = await self._best_fitness_in_population(s.population_id)
end_condition = (s.pop_gen >= GENERATION_LIMIT) or (s.eval_acc >= EVALUATIONS_LIMIT) or (best > FITNESS_GOAL) or time_limit_reached
if s.pop_gen >= GENERATION_LIMIT:
log.info(f"reached generation limit {GENERATION_LIMIT}, stopping")
if s.eval_acc >= EVALUATIONS_LIMIT:
log.info(f"reached evaluation limit {EVALUATIONS_LIMIT}, stopping")
if best > FITNESS_GOAL:
log.info(f"reached best fitness {best}, stopping")
if s.op_tag == "done" or end_condition:
await self.inbox.put(("stop", "normal"))
return
if s.op_tag == "pause":
log.info("Population Monitor paused.")
return
await self._init_generation()
async def _ensure_population_node(self, population_id: str) -> None:
await _run("MERGE (:population {id:$pid})", pid=str(population_id))
async def _ensure_specie_node(self, specie_id: str, population_id: str, constraint_json: str) -> None:
await _run("""
MERGE (s:specie {id:$sid})
SET s.population_id = $pid,
s.constraint_json = $cjson
""", sid=str(specie_id), pid=str(population_id), cjson=str(constraint_json))
async def _extract_agent_ids(self, population_id: str) -> List[str]:
rows = await _read_all("MATCH (a:agent {population_id:$pid}) RETURN a.id AS id ORDER BY id",
pid=str(population_id))
return [str(r["id"]) for r in rows]
async def _extract_specie_ids(self, population_id: str) -> List[str]:
rows = await _read_all("""
MATCH (s:specie {population_id:$pid}) RETURN s.id AS id ORDER BY id
""", pid=str(population_id))
return [str(r["id"]) for r in rows]
async def _extract_specie_agent_ids(self, specie_id: str) -> List[str]:
rows = await _read_all("""
MATCH (:specie {id:$sid})-[:HAS]->(a:agent) RETURN a.id AS id
""", sid=str(specie_id))
return [str(r["id"]) for r in rows]
async def _mutate_population(self, population_id: str, keep_tot: int,
selection_algorithm: SelectionAlgorithm) -> None:
energy_cost = await self._calculate_energy_cost(population_id)
specie_ids = await self._extract_specie_ids(population_id)
for sid in specie_ids:
await self._mutate_specie(sid, keep_tot, energy_cost, selection_algorithm)
async def _mutate_specie(self, specie_id: str, population_limit: int, neural_energy_cost: float,
selection_algorithm: SelectionAlgorithm) -> None:
agent_ids = await self._extract_specie_agent_ids(specie_id)
summaries = await self._construct_agent_summaries(agent_ids)
summaries.sort(key=lambda t: t[0], reverse=True)
if not summaries:
return
if selection_algorithm == "competition":
tot_survivors = round(len(summaries) * SURVIVAL_PERCENTAGE)
weighted = sorted(
[(fit / (max(tn, 1) ** EFF), (fit, tn, aid)) for (fit, tn, aid) in summaries],
key=lambda x: x[0], reverse=True
)
valid = [val for (_score, val) in weighted][:tot_survivors]
invalid = [x for x in summaries if x not in valid]
top3 = valid[:3]
top_agent_ids = [aid for (_f, _tn, aid) in top3]
for (_f, _tn, aid) in invalid:
await delete_agent(aid)
await _run("""
MATCH (:specie {id:$sid})-[h:HAS]->(a:agent {id:$aid}) DELETE h
""", sid=str(specie_id), aid=str(aid))
new_ids = await self._competition(valid, population_limit, neural_energy_cost, specie_id)
elif selection_algorithm == "top3":
valid = summaries[:3]
invalid = [x for x in summaries if x not in valid]
top_agent_ids = [aid for (_f, _tn, aid) in valid]
for (_f, _tn, aid) in invalid:
await delete_agent(aid)
await _run("MATCH (:specie {id:$sid})-[h:HAS]->(a:agent {id:$aid}) DELETE h",
sid=str(specie_id), aid=str(aid))
offspring_needed = max(0, population_limit - len(top_agent_ids))
new_ids = await self._top3(top_agent_ids, offspring_needed, specie_id)
else:
log.error(f"Unknown selection algorithm: {selection_algorithm}")
raise ValueError(f"Unknown selection algorithm: {selection_algorithm}")
f_list = [f for (f, _tn, _aid) in summaries]
avg, std, maxf, minf = _mean(f_list), _std(f_list), max(f_list), min(f_list)
row = await _read_all("MATCH (s:specie {id:$sid}) RETURN toInteger(s.innovation_factor) AS inv",
sid=str(specie_id))
inv = int(row[0]["inv"]) if row and row[0]["inv"] is not None else 0
u_inv = 0 if (maxf > inv) else (inv - 1)
await _run("""
MATCH (s:specie {id:$sid})
SET s.fitness_avg = toFloat($avg),
s.fitness_std = toFloat($std),
s.fitness_max = toFloat($maxf),
s.fitness_min = toFloat($minf),
s.innovation_factor = toInteger($inv),
s.champion_ids = $champs
""", sid=str(specie_id), avg=float(avg), std=float(std), maxf=float(maxf), minf=float(minf),
inv=int(u_inv), champs=[str(x) for x in top_agent_ids])
for aid in new_ids:
try:
await update_fingerprint(aid)
except Exception:
pass
async def _competition(self, valid: List[Tuple[float, int, str]], population_limit: int,
neural_energy_cost: float, specie_id: str) -> List[str]:
alot, est = await self._calculate_alotments(valid, neural_energy_cost)
normalizer = (est / population_limit) if population_limit > 0 else 1.0
log.debug(f"Population size normalizer: {normalizer:.4f}")
return await self._gather_survivors(alot, normalizer, specie_id)
async def _calculate_alotments(self, valid: List[Tuple[float, int, str]], neural_energy_cost: float
) -> Tuple[List[Tuple[float, float, int, str]], float]:
acc: List[Tuple[float, float, int, str]] = []
new_pop_acc = 0.0
for (fit, tn, aid) in valid:
neural_alot = (fit / neural_energy_cost) if neural_energy_cost > 0 else 0.0
mutant_alot = (neural_alot / max(tn, 1))
new_pop_acc += mutant_alot
acc.append((mutant_alot, fit, tn, aid))
log.debug(f"NewPopAcc: {new_pop_acc:.4f}")
return acc, new_pop_acc
async def _gather_survivors(self, alot: List[Tuple[float, float, int, str]], normalizer: float, specie_id: str) -> \
List[str]:
new_ids: List[str] = []
for (ma, fit, tn, aid) in alot:
count = int(round(ma / normalizer)) if normalizer > 0 else 0
log.info(f"Agent {aid}: normalized allotment = {count}")
if count >= 1:
await _run("""
MATCH (s:specie {id:$sid}), (a:agent {id:$aid})
MERGE (s)-[:HAS]->(a)
""", sid=str(specie_id), aid=str(aid))
new_ids.append(aid)
k = count - 1
for _ in range(k):
cid = await self._create_mutant_offspring(aid, specie_id)
new_ids.append(cid)
else:
await delete_agent(aid)
await _run("MATCH (:specie {id:$sid})-[h:HAS]->(a:agent {id:$aid}) DELETE h",
sid=str(specie_id), aid=str(aid))
log.info(f"New Population Size (specie={specie_id}): {len(new_ids)}")
return new_ids
async def _create_mutant_offspring(self, parent_id: str, specie_id: str) -> str:
clone_id = _new_id()
await clone_agent(parent_id, clone_id)
rows = await _read_all("MATCH (a:agent {id:$aid}) RETURN a.specie_id AS sid, a.population_id AS pid",
aid=str(parent_id))
sid = rows[0]["sid"] if rows else specie_id
pid = rows[0]["pid"] if rows else None
await _run("""
MATCH (a:agent {id:$cid})
SET a.specie_id = $sid,
a.population_id = $pid,
a.fitness = NULL
""", cid=str(clone_id), sid=str(sid), pid=str(pid))
await _run("""
MATCH (s:specie {id:$sid}), (a:agent {id:$cid})
MERGE (s)-[:HAS]->(a)
""", sid=str(specie_id), cid=str(clone_id))
await self._mutate_one_step(clone_id)
return clone_id
async def _mutate_one_step(self, agent_id: str) -> None:
ops = [
self.mutator.mutate_weights,
self.mutator.add_bias,
self.mutator.remove_bias,
self.mutator.add_inlink,
self.mutator.add_outlink,
self.mutator.add_neuron,
self.mutator.outsplice,
self.mutator.add_actuator,
]
op = random.choice(ops)
try:
await op(agent_id)
except Exception:
try:
await self.mutator.mutate_weights(agent_id)
except Exception:
pass
async def _top3(self, valid_ids: List[str], offspring_needed: int, specie_id: str) -> List[str]:
new_ids = list(valid_ids)
for _ in range(offspring_needed):
parent = random.choice(valid_ids)
cid = await self._create_mutant_offspring(parent, specie_id)
new_ids.append(cid)
return new_ids
async def _construct_agent_summaries(self, agent_ids: Sequence[str]) -> List[Tuple[float, int, str]]:
if not agent_ids:
return []
rows = await _read_all("""
UNWIND $ids AS aid
MATCH (a:agent {id:aid})-[:OWNS]->(cx:cortex)
OPTIONAL MATCH (cx)-[:HAS]->(n:neuron)
RETURN aid AS id,
toFloat(a.fitness) AS f,
count(n) AS k
""", ids=[str(x) for x in agent_ids])
out: List[Tuple[float, int, str]] = []
for r in rows:
f = float(r["f"]) if r["f"] is not None else 0.0
k = int(r["k"])
out.append((f, k, str(r["id"])))
return out
async def _calculate_energy_cost(self, population_id: str) -> float:
rows = await _read_all("""
MATCH (a:agent {population_id:$pid})-[:OWNS]->(cx:cortex)
OPTIONAL MATCH (cx)-[:HAS]->(n:neuron)
RETURN sum(coalesce(toFloat(a.fitness),0.0)) AS totE,
count(n) AS totN
""", pid=str(population_id))
if not rows:
return 0.0
totE = float(rows[0]["totE"] or 0.0)
totN = int(rows[0]["totN"] or 0)
return (totE / totN) if totN > 0 else 0.0
async def _best_fitness_in_population(self, population_id: str) -> float:
rows = await _read_all("""
MATCH (a:agent {population_id:$pid})
RETURN max(toFloat(a.fitness)) AS best
""", pid=str(population_id))
return float(rows[0]["best"] or 0.0) if rows else 0.0
async def init_population(params: Tuple[str, List[dict], str, SelectionAlgorithm]) -> PopulationMonitor:
"""
params = (Population_Id, Specie_Constraints, OpMode, Selection_Algorithm)
- erzeugt Population/Spezies-Knoten
- konstruiert Agents (über dein construct_agent)
- verknüpft Spezies->Agent, setzt agent.population_id
- startet den Monitor
"""
population_id, specie_constraints, op_mode, selection_algorithm = params
await delete_population(population_id)
await _run("MERGE (:population {id:$pid})", pid=str(population_id))
# TODO: this has to move to the genotype module
for specon in specie_constraints:
sid = _new_id()
await _run("""
MERGE (p:population {id:$pid})
MERGE (s:specie {id:$sid})
ON CREATE SET
s.population_id = $pid,
s.constraint_json = $cjson,
s.innovation_factor = 0
ON MATCH SET
s.population_id = $pid,
s.constraint_json = $cjson
MERGE (p)-[:HAS]->(s)
""", pid=str(population_id),
sid=str(sid),
cjson=json.dumps(specon, separators=(",", ":"), sort_keys=True))
for _ in range(INIT_SPECIE_SIZE):
aid = _new_id()
await construct_agent(sid, aid, specon)
# todo: this needs to move to the genotype
await _run("""
MATCH (a:agent {id:$aid}), (s:specie {id:$sid})
SET a.population_id = $pid, a.specie_id = $sid, a.fitness = NULL
MERGE (s)-[:HAS]->(a)
""", aid=str(aid), sid=str(sid), pid=str(population_id))
monitor = await PopulationMonitor.start(op_mode, population_id, selection_algorithm)
return monitor
async def continue_(op_mode: str, selection_algorithm: SelectionAlgorithm,
population_id: str = INIT_POPULATION_ID) -> PopulationMonitor:
return await PopulationMonitor.start(op_mode, population_id, selection_algorithm)
async def delete_population(population_id: str) -> None:
await _run("""
MATCH (p:population {id:$pid})
OPTIONAL MATCH (s:specie {population_id:$pid})
OPTIONAL MATCH (s)-[:HAS]->(a:agent)-[:OWNS]->(cx:cortex)
OPTIONAL MATCH (cx)-[:HAS]->(n:neuron)
OPTIONAL MATCH (cx)-[:HAS]->(sen:sensor)
OPTIONAL MATCH (cx)-[:HAS]->(act:actuator)
DETACH DELETE p, s, a, cx, n, sen, act
""", pid=str(population_id))
async def test() -> PopulationMonitor:
return await init_population((INIT_POPULATION_ID, INIT_CONSTRAINTS, INIT_OP_MODE, INIT_SELECTION_ALGO))