""" module used to monitor a population of neural networks solving a shared or isolated scape. """ from __future__ import annotations import asyncio import json import math import os import random import uuid import logging from dataclasses import dataclass, field from typing import Any, List, Literal, Optional, Sequence, Tuple, Callable, Awaitable from mathema.genotype.neo4j.genotype import ( neo4j, construct_agent, clone_agent, delete_agent, update_fingerprint, ) from mathema.genotype.neo4j.genotype_mutator_tx import GenotypeMutator from mathema.utils import stats from mathema.core.exoself import Exoself from mathema.settings import (EFF, SURVIVAL_PERCENTAGE, SPECIE_SIZE_LIMIT, INIT_SPECIE_SIZE, GENERATION_LIMIT, EVALUATIONS_LIMIT, FITNESS_GOAL, INIT_POPULATION_ID) log = logging.getLogger(__name__) OpTag = Literal["continue", "pause", "done"] SelectionAlgorithm = Literal["competition", "top3"] EXOSELF_START: Optional[Callable[[str, "PopulationMonitor"], Awaitable[Any]]] = None def _new_id() -> str: return uuid.uuid4().hex def _mean(xs: Sequence[float]) -> float: return sum(xs) / len(xs) if xs else 0.0 def _std(xs: Sequence[float]) -> float: if not xs: return 0.0 m = _mean(xs) return math.sqrt(sum((x - m) ** 2 for x in xs) / len(xs)) async def _read_all(q: str, **params): return await neo4j.read_all(q, **params) async def _run(q: str, **params): return await neo4j.run_consume(q, **params) @dataclass class MonitorState: op_mode: str population_id: str selection_algorithm: SelectionAlgorithm op_tag: OpTag = "continue" active: List[Tuple[str, Any]] = field(default_factory=list) agent_ids: List[str] = field(default_factory=list) tot_agents: int = 0 agents_left: int = 0 pop_gen: int = 0 eval_acc: int = 0 cycle_acc: int = 0 time_acc: int = 0 rows: List[dict] = field(default_factory=list) async def _population_aggregate(population_id: str) -> dict: """ Retrieve fitness values for agents in a population and calculate aggregate statistics. Parameters: population_id (str): The unique identifier of the population. Returns: Dictionary: A dictionary containing the following aggregate statistics: - "cum_fitness": Sum of all fitness values. - "avg": Average fitness value. - "std": Standard deviation of fitness values. - "best": Maximum fitness value. - "min": Minimum fitness value. - "n": Number of fitness values. - "agents": Number of agents in the population. """ rows = await _read_all(""" MATCH (a:agent {population_id:$pid}) RETURN collect(coalesce(toFloat(a.fitness),0.0)) AS fs """, pid=str(population_id)) fs = [float(x) for x in (rows[0]["fs"] if rows else [])] if not fs: return {"cum_fitness": 0.0, "avg": 0.0, "std": 0.0, "best": 0.0, "min": 0.0, "n": 0, "agents": 0} n = len(fs) s = sum(fs) m = s / n var = sum((x - m) ** 2 for x in fs) / n return { "cum_fitness": s, "avg": m, "std": math.sqrt(var), "best": max(fs), "min": min(fs), "n": n, "agents": n } async def _best_fitness_in_population(population_id: str) -> float: """ Get the best fitness score in a given population based on the maximum fitness value of all agents. Parameters: - population_id (str): The ID of the population to search for. Returns: - float: The best fitness value found in the population, or 0.0 if no fitness values are found. """ rows = await _read_all(""" MATCH (a:agent {population_id:$pid}) RETURN max(toFloat(a.fitness)) AS best """, pid=str(population_id)) return float(rows[0]["best"] or 0.0) if rows else 0.0 async def _calculate_energy_cost(population_id: str) -> float: """ Calculate the energy cost based on the fitness of agents and the number of neurons in the cortex associated with the given population ID. Parameters: population_id (str): The ID of the population for which the energy cost needs to be calculated. Returns: float: The calculated energy cost. """ rows = await _read_all(""" MATCH (a:agent {population_id:$pid})-[:OWNS]->(cx:cortex) OPTIONAL MATCH (cx)-[:HAS]->(n:neuron) RETURN sum(coalesce(toFloat(a.fitness),0.0)) AS totE, count(n) AS totN """, pid=str(population_id)) if not rows: return 0.0 totE = float(rows[0]["totE"] or 0.0) totN = int(rows[0]["totN"] or 0) return (totE / totN) if totN > 0 else 0.0 async def _construct_agent_summaries(agent_ids: Sequence[str]): """ Constructs summaries for the given list of agent IDs. Parameters: agent_ids (Sequence[str]): A list of agent IDs for which summaries are to be constructed. Return Type: List[Tuple[float, int, str]]: A list of tuples where each tuple contains the agent's fitness as a float, the count of neurons as an integer, and the agent ID as a string. """ if not agent_ids: return [] rows = await _read_all(""" UNWIND $ids AS aid MATCH (a:agent {id:aid})-[:OWNS]->(cx:cortex) OPTIONAL MATCH (cx)-[:HAS]->(n:neuron) RETURN aid AS id, toFloat(a.fitness) AS f, count(n) AS k """, ids=[str(x) for x in agent_ids]) out: List[Tuple[float, int, str]] = [] for r in rows: f = float(r["f"]) if r["f"] is not None else 0.0 k = int(r["k"]) out.append((f, k, str(r["id"]))) return out async def _calculate_alotments(valid: List[Tuple[float, int, str]], neural_energy_cost: float ): """ Calculate allotments based on fitness values and neural energy cost. Parameters: valid (List[Tuple[float, int, str]]): A list of tuples containing fitness, total neurons, and agent ID. neural_energy_cost (float): The energy cost for neural activity. Returns: Tuple[List[Tuple[float, float, int, str]], float]: A tuple containing a list of tuples with allotments, fitness values, total neurons, and agent IDs, and the total allotments for the new population. """ acc: List[Tuple[float, float, int, str]] = [] new_pop_acc = 0.0 for (fit, tn, aid) in valid: neural_alot = (fit / neural_energy_cost) if neural_energy_cost > 0 else 0.0 mutant_alot = (neural_alot / max(tn, 1)) new_pop_acc += mutant_alot acc.append((mutant_alot, fit, tn, aid)) log.debug(f"NewPopAcc: {new_pop_acc:.4f}") return acc, new_pop_acc async def _extract_specie_agent_ids(specie_id: str) -> List[str]: """ Extracts the IDs of agents associated with a given specie. :param specie_id: The ID of the specie for which to retrieve agent IDs :type specie_id: str :return: List of agent IDs associated with the specie :rtype: List[str] """ rows = await _read_all(""" MATCH (:specie {id:$sid})-[:HAS]->(a:agent) RETURN a.id AS id """, sid=str(specie_id)) return [str(r["id"]) for r in rows] async def _extract_specie_ids(population_id: str) -> List[str]: """ Extract specie IDs from Neo4j database for a given population ID. :param population_id: str - The population ID for which to extract specie IDs. :return: List[str] - A list of specie IDs associated with the given population ID. """ rows = await _read_all(""" MATCH (s:specie {population_id:$pid}) RETURN s.id AS id ORDER BY id """, pid=str(population_id)) return [str(r["id"]) for r in rows] async def _ensure_specie_node(specie_id: str, population_id: str, constraint_json: str): """ Ensure that the specie node exists. Used to keep the agent database coherent. Parameters: specie_id (str): The ID of the specie to create. constraint_json (str): The JSON string of the constraints for the specie. Returns: None """ await _run(""" MERGE (s:specie {id:$sid}) SET s.population_id = $pid, s.constraint_json = $cjson """, sid=str(specie_id), pid=str(population_id), cjson=str(constraint_json)) async def _extract_agent_ids(population_id: str) -> List[str]: """ Extracts the agent IDs associated with a given population ID. Parameters: population_id (str): The ID of the population to extract agent IDs for. Returns: List[str]: A list of agent IDs as strings extracted from the database based on the provided population ID. """ rows = await _read_all("MATCH (a:agent {population_id:$pid}) RETURN a.id AS id ORDER BY id", pid=str(population_id)) return [str(r["id"]) for r in rows] async def _ensure_population_node(population_id: str) -> None: await _run("MERGE (:population {id:$pid})", pid=str(population_id)) class PopulationMonitor: def __init__(self, op_mode: str, population_id: str, selection_algorithm: SelectionAlgorithm): self.state = MonitorState(op_mode, population_id, selection_algorithm) self.inbox: asyncio.Queue = asyncio.Queue() self._task: Optional[asyncio.Task] = None self._stopped_evt = asyncio.Event() self.mutator = GenotypeMutator(neo4j) self.gen_ended = asyncio.Event() self._exoself_start = EXOSELF_START or Exoself.start # logging stuff self.run_id = None self._t0 = None self._best_so_far = float("-inf") self.train_time_sec = 30*60 self._deadline_task = None # logging file handles self._episodes_f = None self._progress_f = None @classmethod async def start(cls, op_mode: str, population_id: str, selection_algorithm: SelectionAlgorithm) -> "PopulationMonitor": """ Create and start a new population monitor instance. This class method initializes a PopulationMonitor for the given population and selection algorithm, starts its asynchronous message processing loop, and prepares all logging and bookkeeping required for a training run. Specifically, it: 1. Instantiates the monitor with the specified operation mode, population identifier, and selection algorithm. 2. Launches the internal actor-style run loop as an asyncio task. 3. Registers an atexit hook to collect generation-level statistics for post-run analysis. 4. Initializes timing, assigns a unique run identifier, and creates the corresponding output directory. 5. Opens and initializes episode-level and progress-level CSV log files. 6. Starts a deadline task to enforce the configured training time limit. 7. Initializes and activates the first generation of the population. The method returns the fully initialized and running PopulationMonitor instance, which can then be controlled via its public interface. """ self = cls(op_mode, population_id, selection_algorithm) self._task = asyncio.create_task(self._run(), name=f"PopulationMonitor-{population_id}") stats.register_atexit(population_id, lambda: list(self.state.rows)) # init logging loop = asyncio.get_running_loop() self._t0 = loop.time() self.run_id = f"{population_id}__{uuid.uuid4().hex[:8]}" os.makedirs(f"runs/{self.run_id}", exist_ok=True) self._episodes_f = open(f"runs/{self.run_id}/episodes.csv", "w", buffering=1) self._episodes_f.write("t_sec,pop_gen,agent_id,eval_idx,episode_return\n") self._progress_f = open(f"runs/{self.run_id}/progress.csv", "w", buffering=1) self._progress_f.write("t_sec,t_norm,pop_gen,best_gen,best_so_far,avg,std\n") self._deadline_task = asyncio.create_task(self._stop_after_deadline(), name=f"Deadline-{self.run_id}") await self._init_generation() return self async def _stop_after_deadline(self): await asyncio.sleep(self.train_time_sec) await self.inbox.put(("stop", "normal")) async def cast(self, msg: tuple) -> None: await self.inbox.put(msg) async def stop(self, mode: Literal["normal", "shutdown"] = "normal"): """ Request termination of the population monitor and await shutdown. This method sends a stop command to the monitor's internal message loop and blocks until the monitor has completed its graceful shutdown procedure. The optional mode parameter indicates the reason for stopping (e.g. normal completion versus external shutdown) and is forwarded to the internal stop handler. It provides a synchronous-style interface for external components to ensure that all agents are terminated, logs are flushed, and final results are persisted before control returns to the caller. """ await self.inbox.put(("stop", mode)) await self._stopped_evt.wait() async def _run(self): """ Main asynchronous message-processing loop of the population monitor. This method continuously consumes messages from the internal inbox and dispatches them to the appropriate handlers, coordinating the lifecycle of an evolutionary run. It implements an actor-style control loop with the following responsibilities: 1. Reacts to control messages: - "stop": triggers graceful shutdown and finalization of the run. - "pause": requests a pause after the current generation completes. - "continue": resumes execution and initializes a new generation after a pause. 2. Handles evaluation-related events: - "episode_done": logs completion of a single evaluation episode. - "terminated": processes termination of an agent and updates generation-level state. 3. Maintains correct sequencing of generations and respects the current operational mode (continue, pause, done). The loop runs until a stop message is received or the task is cancelled. On exit, it signals completion via an internal stopped-event to allow other components to await full shutdown. """ try: while True: msg = await self.inbox.get() tag = msg[0] if isinstance(msg, tuple) else None if tag == "stop": await self._handle_stop(msg[1]) break elif tag == "pause": if self.state.op_tag == "continue": self.state.op_tag = "pause" log.debug("Population Monitor will pause after this generation.") elif tag == "continue": if self.state.op_tag == "pause": self.state.op_tag = "continue" await self._init_generation() elif tag == "episode_done": await self._handle_episode_done(*msg[1:]) elif tag == "terminated": await self._handle_agent_terminated(*msg[1:]) else: pass finally: self._stopped_evt.set() async def _init_generation(self) -> None: """ Initialize the agent generation process by extracting agent ids, setting initial values for the state object, and starting agents asynchronously. If any errors occur during the agent initialization process, the corresponding agent's fitness is set to negative infinity. Parameters: - self: The reference to the current object instance. Returns: - None """ s = self.state agent_ids = await _extract_agent_ids(s.population_id) s.agent_ids = agent_ids s.tot_agents = len(agent_ids) s.agents_left = 0 active = [] for aid in agent_ids: try: handle = await self._exoself_start(aid, self) active.append((aid, handle)) s.agents_left += 1 except Exception as e: log.error(f"[Monitor] FAILED to start agent {aid}: {e!r}") await _run("MATCH (a:agent {id:$aid}) SET a.fitness = toFloat($f)", aid=str(aid), f=float("-inf")) s.active = active s.tot_agents = s.agents_left log.info(f"*** Population monitor started: pop={s.population_id}, mode={s.op_mode}, " f"selection={s.selection_algorithm}, agents={s.tot_agents}") async def _handle_stop(self, mode: str): """ Gracefully stop and finalize the population monitoring run. This method is invoked when the monitoring loop receives a stop signal. It performs an orderly shutdown of the ongoing evolutionary run by: 1. Terminating all currently active agent handlers or actors. 2. Flushing and closing episode-level and progress-level log files. 3. Cancelling any active deadline or time-limit task. 4. Finalizing the global best-so-far fitness value based on the current population state. 5. Writing a run summary to disk, including identifiers, training duration, generation count, accumulated evaluation statistics, and final best-so-far fitness. 6. Clearing internal file handles and logging the shutdown event. The method ensures that partial results are safely persisted and that all asynchronous resources are released before the run terminates. """ s = self.state for (_aid, h) in list(s.active): try: if hasattr(h, "cast"): await h.cast(("terminate",)) elif hasattr(h, "stop"): await h.stop() except Exception: pass for f in (self._episodes_f, self._progress_f): if f is not None: try: f.flush() f.close() except Exception: pass try: if getattr(self, "_deadline_task", None): self._deadline_task.cancel() except Exception: pass try: best = await _best_fitness_in_population(self.state.population_id) self._best_so_far = max(self._best_so_far, float(best)) except Exception: pass summary = { "run_id": self.run_id, "population_id": self.state.population_id, "train_time_sec": self.train_time_sec, "gens": self.state.pop_gen, "eval_acc": self.state.eval_acc, "best_so_far": self._best_so_far, "op_tag": self.state.op_tag, } with open(f"runs/{self.run_id}/summary.json", "w") as f: json.dump(summary, f, indent=2) self._episodes_f = None self._progress_f = None log.info(f"*** Population_Monitor:{s.population_id} shutdown. op_tag={s.op_tag}, op_mode={s.op_mode}") async def _handle_episode_done(self, agent_id: str, ep_return: float, eval_idx: int): """ Handle completion of a single evaluation episode for an agent. This method is called whenever an agent finishes one evaluation episode within the current generation. It records episode-level information for later analysis and monitoring but does not alter population-level state or control flow. Specifically, it: 1. Computes the elapsed wall-clock time since the start of the run. 2. Logs the episode result (time, generation index, agent identifier, evaluation index, and episode return) to the episode-level log file, if enabled. """ s = self.state t_sec = asyncio.get_running_loop().time() - (self._t0 or 0.0) if self._episodes_f is not None: self._episodes_f.write(f"{t_sec:.6f},{s.pop_gen},{agent_id},{eval_idx},{ep_return:.10f}\n") async def _handle_agent_terminated(self, agent_id: str, fitness: float, agent_eval: int, agent_cycle: int, agent_time: int): """ Handle termination of a single agent evaluation. This method is invoked when an agent finishes its evaluation within the current generation. It performs the following actions: 1. Accumulates per-agent evaluation statistics (evaluation count, cycle count, and execution time) into generation-level counters. 2. Decrements the number of remaining active agents in the generation. 3. Persists the agent's final fitness value to the underlying genotype storage. 4. Removes the terminated agent from the list of currently active agents. 5. Logs progress information, including how many agents are still active. 6. Triggers generation finalization once all agents in the generation have completed their evaluations. The method is fully asynchronous and is typically called by the actor supervision or monitoring component when an agent signals termination. """ log.info(f"agent terminated: , {agent_id}, {fitness}, {agent_eval}, {agent_cycle}, {agent_time}") s = self.state s.eval_acc += int(agent_eval) s.cycle_acc += int(agent_cycle) s.time_acc += int(agent_time) s.agents_left -= 1 await _run("MATCH (a:agent {id:$aid}) SET a.fitness = toFloat($f)", aid=str(agent_id), f=float(fitness)) s.active = [(aid, h) for (aid, h) in s.active if aid != agent_id] log.info(f"[Monitor] agent done: {agent_id} | agents_left={s.agents_left}/{s.tot_agents}") if 0 < s.agents_left <= 3: log.info("[Monitor] still active: %s", [str(aid) for (aid, _h) in s.active]) if s.agents_left <= 0: await self._generation_finished() async def _generation_finished(self) -> None: """ Handle the end of a population generation. This method is called once all agents of the current generation have completed their evaluations. It performs the following steps: 1. Mutates and selects the next generation according to the configured selection algorithm and species size limit. 2. Increments the generation counter and logs aggregated population statistics (best, average, standard deviation of fitness). 3. Updates time-based metrics, including elapsed wall-clock time, normalized training time, and the global best-so-far fitness. 4. Writes a progress entry to the progress log (CSV-style) and appends a detailed statistics record to the in-memory history. 5. Signals the end of the generation via an asyncio.Event to unblock dependent tasks. 6. Checks termination conditions, including generation limit, evaluation limit, fitness goal, and wall-clock time limit. 7. Depending on the current operation tag and termination conditions, either stops the run, pauses execution, or initializes the next generation. The method is fully asynchronous and intended to be called from the population monitoring loop that coordinates evolutionary training. """ s = self.state await self._mutate_population(s.population_id, SPECIE_SIZE_LIMIT, s.selection_algorithm) s.pop_gen += 1 log.info(f"Population {s.population_id} generation {s.pop_gen} ended.\n") pop_stats = await _population_aggregate(s.population_id) # aggregate logging information t_sec = asyncio.get_running_loop().time() - (self._t0 or 0.0) t_norm = min(1.0, t_sec / self.train_time_sec) best_gen = float(pop_stats["best"]) self._best_so_far = max(self._best_so_far, best_gen) # write logs if self._progress_f is not None: self._progress_f.write( f"{t_sec:.6f},{t_norm:.6f},{s.pop_gen},{best_gen:.10f},{self._best_so_far:.10f}," f"{float(pop_stats['avg']):.10f},{float(pop_stats['std']):.10f}\n" ) s.rows.append({ "gen": int(s.pop_gen), "t_sec": int(asyncio.get_running_loop().time()), "cum_fitness": float(pop_stats["cum_fitness"]), "best": float(pop_stats["best"]), "avg": float(pop_stats["avg"]), "std": float(pop_stats["std"]), "agents": int(pop_stats["agents"]), "eval_acc": int(s.eval_acc), "cycle_acc": float(s.cycle_acc), "time_acc": float(s.time_acc), }) self.gen_ended.set() self.gen_ended = asyncio.Event() # check time now = asyncio.get_running_loop().time() elapsed = now - self._t0 time_limit_reached = elapsed >= self.train_time_sec if time_limit_reached: log.info( f"[Monitor] time limit reached " f"({elapsed:.1f}s >= {self.train_time_sec}s), stopping run" ) best = await _best_fitness_in_population(s.population_id) end_condition = (s.pop_gen >= GENERATION_LIMIT) or (s.eval_acc >= EVALUATIONS_LIMIT) or (best > FITNESS_GOAL) or time_limit_reached if s.pop_gen >= GENERATION_LIMIT: log.info(f"reached generation limit {GENERATION_LIMIT}, stopping") if s.eval_acc >= EVALUATIONS_LIMIT: log.info(f"reached evaluation limit {EVALUATIONS_LIMIT}, stopping") if best > FITNESS_GOAL: log.info(f"reached best fitness {best}, stopping") if s.op_tag == "done" or end_condition: await self.inbox.put(("stop", "normal")) return if s.op_tag == "pause": log.info("Population Monitor paused.") return await self._init_generation() async def _mutate_population(self, population_id: str, keep_tot: int, selection_algorithm: SelectionAlgorithm): """ Apply evolutionary mutation and selection to an entire population. This method orchestrates the mutation step at the population level at the end of a generation. It first computes shared contextual information, such as the current energy cost of the population, and then iterates over all species belonging to the population. For each species, it delegates the actual selection and mutation process to the species-level mutation routine, ensuring that the total number of individuals retained respects the configured population size limit and the chosen selection algorithm. The method is asynchronous and intended to be invoked as part of the generation finalization phase of the evolutionary loop. """ energy_cost = await _calculate_energy_cost(population_id) specie_ids = await _extract_specie_ids(population_id) for sid in specie_ids: await self._mutate_specie(sid, keep_tot, energy_cost, selection_algorithm) async def _mutate_specie(self, specie_id: str, population_limit: int, neural_energy_cost: float, selection_algorithm: SelectionAlgorithm): """ Mutate and repopulate a single species according to the chosen selection strategy. This method performs the end-of-generation evolutionary step for one species ("specie") within a population. It: 1. Loads all agents belonging to the species and constructs fitness summaries, sorting agents by fitness in descending order. 2. Selects survivors and removes non-survivors from both the genotype store and the species membership relationship. 3. Creates new offspring agents to refill the species up to the target population limit, using one of two selection algorithms: - "competition": keeps a fraction of the best agents (SURVIVAL_PERCENTAGE), ranks them by an efficiency-weighted score (fitness adjusted by network size/complexity), deletes the rest, and produces offspring via the competition routine. - "top3": keeps only the top 3 agents, deletes all others, and produces the required number of offspring from these champions. 4. Computes aggregate fitness statistics for the species (mean, standard deviation, min, max) and updates the species node with these values as well as the list of champion agent IDs. 5. Updates the species' innovation factor based on whether the current best fitness exceeds the stored innovation threshold. 6. Refreshes fingerprints for newly created agents to keep derived metadata consistent. The method is asynchronous and is intended to be called from the population-level mutation step after a generation has completed. """ agent_ids = await _extract_specie_agent_ids(specie_id) summaries = await _construct_agent_summaries(agent_ids) summaries.sort(key=lambda t: t[0], reverse=True) if not summaries: return if selection_algorithm == "competition": tot_survivors = round(len(summaries) * SURVIVAL_PERCENTAGE) weighted = sorted( [(fit / (max(tn, 1) ** EFF), (fit, tn, aid)) for (fit, tn, aid) in summaries], key=lambda x: x[0], reverse=True ) valid = [val for (_score, val) in weighted][:tot_survivors] invalid = [x for x in summaries if x not in valid] top3 = valid[:3] top_agent_ids = [aid for (_f, _tn, aid) in top3] for (_f, _tn, aid) in invalid: await delete_agent(aid) await _run(""" MATCH (:specie {id:$sid})-[h:HAS]->(a:agent {id:$aid}) DELETE h """, sid=str(specie_id), aid=str(aid)) new_ids = await self._competition(valid, population_limit, neural_energy_cost, specie_id) elif selection_algorithm == "top3": valid = summaries[:3] invalid = [x for x in summaries if x not in valid] top_agent_ids = [aid for (_f, _tn, aid) in valid] for (_f, _tn, aid) in invalid: await delete_agent(aid) await _run("MATCH (:specie {id:$sid})-[h:HAS]->(a:agent {id:$aid}) DELETE h", sid=str(specie_id), aid=str(aid)) offspring_needed = max(0, population_limit - len(top_agent_ids)) new_ids = await self._top3(top_agent_ids, offspring_needed, specie_id) else: log.error(f"Unknown selection algorithm: {selection_algorithm}") raise ValueError(f"Unknown selection algorithm: {selection_algorithm}") f_list = [f for (f, _tn, _aid) in summaries] avg, std, maxf, minf = _mean(f_list), _std(f_list), max(f_list), min(f_list) row = await _read_all("MATCH (s:specie {id:$sid}) RETURN toInteger(s.innovation_factor) AS inv", sid=str(specie_id)) inv = int(row[0]["inv"]) if row and row[0]["inv"] is not None else 0 u_inv = 0 if (maxf > inv) else (inv - 1) await _run(""" MATCH (s:specie {id:$sid}) SET s.fitness_avg = toFloat($avg), s.fitness_std = toFloat($std), s.fitness_max = toFloat($maxf), s.fitness_min = toFloat($minf), s.innovation_factor = toInteger($inv), s.champion_ids = $champs """, sid=str(specie_id), avg=float(avg), std=float(std), maxf=float(maxf), minf=float(minf), inv=int(u_inv), champs=[str(x) for x in top_agent_ids]) for aid in new_ids: try: await update_fingerprint(aid) except Exception: pass async def _competition(self, valid: List[Tuple[float, int, str]], population_limit: int, neural_energy_cost: float, specie_id: str) -> List[str]: """ Perform competition-based reproduction for a species. This method implements the competition selection strategy for a single species. Given a set of valid (surviving) agents, it: 1. Computes reproduction allotments for each agent based on fitness and neural energy cost, yielding a total estimated population size. 2. Derives a normalization factor to scale these allotments so that the resulting number of survivors and offspring respects the configured population size limit. 3. Delegates to the survivor-gathering routine to retain parent agents and generate the appropriate number of mutant offspring. The method returns a list of agent identifiers representing the new species population after competition-based selection and mutation. It is asynchronous and intended to be used during the species-level mutation phase of the evolutionary cycle. """ alot, est = await _calculate_alotments(valid, neural_energy_cost) normalizer = (est / population_limit) if population_limit > 0 else 1.0 log.debug(f"Population size normalizer: {normalizer:.4f}") return await self._gather_survivors(alot, normalizer, specie_id) async def _gather_survivors(self, alot: List[Tuple[float, float, int, str]], normalizer: float, specie_id: str) -> \ List[str]: """ Collect survivors and generate offspring for a species based on normalized allotments. This method takes a list of agents with precomputed allotment scores and determines how many times each agent should survive or reproduce into the next generation. For each agent, it: 1. Computes a reproduction count by normalizing the agent's allotment value against a global normalizer. 2. Enforces a hard safety constraint to ensure that each listed agent contributes at least one survivor to the next generation. 3. Ensures that the surviving agent remains linked to the species. 4. Creates the required number of mutant offspring clones for the agent to satisfy its allotted reproduction count. 5. Removes agents that would otherwise receive zero allotment from the species and deletes their genotype representation. The method returns a list of agent identifiers representing the newly formed species population, including both surviving parents and newly created offspring. It is asynchronous and intended to be used as part of the species-level reproduction process. """ new_ids: List[str] = [] for (ma, fit, tn, aid) in alot: count = int(round(ma / normalizer)) if normalizer > 0 else 0 # hard safety: keep at least one survivor if count <= 0: count = 1 log.info(f"Agent {aid}: normalized allotment = {count}") # TODO: this is redundant! if count >= 1: await _run(""" MATCH (s:specie {id:$sid}), (a:agent {id:$aid}) MERGE (s)-[:HAS]->(a) """, sid=str(specie_id), aid=str(aid)) new_ids.append(aid) k = count - 1 for _ in range(k): cid = await self._create_mutant_offspring(aid, specie_id) new_ids.append(cid) else: await delete_agent(aid) await _run("MATCH (:specie {id:$sid})-[h:HAS]->(a:agent {id:$aid}) DELETE h", sid=str(specie_id), aid=str(aid)) log.info(f"New Population Size (specie={specie_id}): {len(new_ids)}") return new_ids async def _create_mutant_offspring(self, parent_id: str, specie_id: str) -> str: """ Create a mutated offspring from a parent agent. This method clones an existing agent to create a new offspring and integrates it into the evolutionary population. It performs the following steps: 1. Generates a new unique agent identifier and clones the parent agent's genotype into the offspring. 2. Inherits and sets species and population identifiers for the cloned agent and clears its fitness value to mark it as unevaluated. 3. Establishes the species membership relationship between the offspring agent and the corresponding species. 4. Applies a single mutation step to the offspring's genotype to introduce variation. The method returns the identifier of the newly created mutant offspring. It is asynchronous and intended to be used during the reproduction phase of species-level evolution. """ clone_id = _new_id() await clone_agent(parent_id, clone_id) rows = await _read_all("MATCH (a:agent {id:$aid}) RETURN a.specie_id AS sid, a.population_id AS pid", aid=str(parent_id)) sid = rows[0]["sid"] if rows else specie_id pid = rows[0]["pid"] if rows else None await _run(""" MATCH (a:agent {id:$cid}) SET a.specie_id = $sid, a.population_id = $pid, a.fitness = NULL """, cid=str(clone_id), sid=str(sid), pid=str(pid)) await _run(""" MATCH (s:specie {id:$sid}), (a:agent {id:$cid}) MERGE (s)-[:HAS]->(a) """, sid=str(specie_id), cid=str(clone_id)) await self._mutate_one_step(clone_id) return clone_id async def _mutate_one_step(self, agent_id: str): """ Apply a single random mutation step to an agent. This method selects one mutation operator at random from the available set of genotype mutation operations (e.g. weight mutation, bias insertion/removal, structural link or neuron changes) and applies it to the specified agent. If the selected mutation operation fails, the method falls back to a simple weight-mutation step as a safety measure. Any further failure is silently ignored to ensure that the evolutionary process can continue without interrupting the run. The method is asynchronous and intended to introduce variation during offspring creation. """ ops = [ self.mutator.mutate_weights_tx, self.mutator.add_bias_tx, self.mutator.remove_bias_tx, self.mutator.add_inlink_tx, self.mutator.add_outlink_tx, self.mutator.add_neuron_tx, self.mutator.outsplice_tx, self.mutator.add_actuator_tx, ] op = random.choice(ops) try: await op(agent_id) except Exception: try: await self.mutator.mutate_weights_tx(agent_id) except Exception: pass async def _top3(self, valid_ids: List[str], offspring_needed: int, specie_id: str) -> List[str]: """ Generate offspring using the top-3 selection strategy. This method implements the "top3" reproduction strategy for a species. It retains the three best-performing agents unchanged and fills the remaining population slots by repeatedly selecting one of these top agents at random and creating a mutated offspring from it. The method returns a list of agent identifiers representing the new species population, consisting of the original top agents and their offspring. It is asynchronous and intended to be used during the species-level mutation phase of the evolutionary cycle. """ new_ids = list(valid_ids) for _ in range(offspring_needed): parent = random.choice(valid_ids) cid = await self._create_mutant_offspring(parent, specie_id) new_ids.append(cid) return new_ids async def init_population(params: Tuple[str, List[dict], str, SelectionAlgorithm]) -> PopulationMonitor: """ Initialize a new evolutionary population and start its monitor. This function creates a fresh population in the genotype store based on the provided species constraints and launches a PopulationMonitor to manage its evolutionary process. It performs the following steps: 1. Deletes any existing population with the given identifier to ensure a clean initialization. 2. Creates a new population node in the datastore. 3. For each specified species constraint: - Creates a new species node associated with the population. - Stores the species constraint configuration and initializes its innovation factor. - Creates an initial set of agents for the species, constructing their genotypes according to the given constraints. - Assigns population and species identifiers to each agent, clears fitness values, and establishes species membership relationships. 4. Starts a PopulationMonitor for the initialized population, using the specified operation mode and selection algorithm. The function returns the running PopulationMonitor instance, which coordinates evaluation, mutation, and logging for the newly created population. """ population_id, specie_constraints, op_mode, selection_algorithm = params await delete_population(population_id) await _run("MERGE (:population {id:$pid})", pid=str(population_id)) # TODO: this has to move to the genotype module for specon in specie_constraints: sid = _new_id() await _run(""" MERGE (p:population {id:$pid}) MERGE (s:specie {id:$sid}) ON CREATE SET s.population_id = $pid, s.constraint_json = $cjson, s.innovation_factor = 0 ON MATCH SET s.population_id = $pid, s.constraint_json = $cjson MERGE (p)-[:HAS]->(s) """, pid=str(population_id), sid=str(sid), cjson=json.dumps(specon, separators=(",", ":"), sort_keys=True)) for _ in range(INIT_SPECIE_SIZE): aid = _new_id() await construct_agent(sid, aid, specon) # todo: this needs to move to the genotype await _run(""" MATCH (a:agent {id:$aid}), (s:specie {id:$sid}) SET a.population_id = $pid, a.specie_id = $sid, a.fitness = NULL MERGE (s)-[:HAS]->(a) """, aid=str(aid), sid=str(sid), pid=str(population_id)) monitor = await PopulationMonitor.start(op_mode, population_id, selection_algorithm) return monitor async def continue_(op_mode: str, selection_algorithm: SelectionAlgorithm, population_id: str = INIT_POPULATION_ID) -> PopulationMonitor: """ Resume or start a population monitor for an existing population. This convenience function starts a PopulationMonitor for the specified population using the given operation mode and selection algorithm, without reinitializing or modifying the underlying population structure. It is intended to continue an existing evolutionary run or to attach a new monitor to a pre-existing population state. The function returns the running PopulationMonitor instance, which immediately begins coordinating evaluation and evolution according to the provided parameters. """ return await PopulationMonitor.start(op_mode, population_id, selection_algorithm) async def delete_population(population_id: str): """ Delete a population and all associated evolutionary entities. This function removes a population and all data linked to it from the genotype store. It performs a cascading deletion that includes: 1. The population node itself. 2. All species belonging to the population. 3. All agents within those species. 4. All cortical structures owned by the agents, including neurons, sensors, and actuators. All relationships are detached prior to deletion to ensure referential integrity. The operation is destructive and intended to be used during population reinitialization or cleanup before starting a new evolutionary run. """ await _run(""" MATCH (p:population {id:$pid}) OPTIONAL MATCH (s:specie {population_id:$pid}) OPTIONAL MATCH (s)-[:HAS]->(a:agent)-[:OWNS]->(cx:cortex) OPTIONAL MATCH (cx)-[:HAS]->(n:neuron) OPTIONAL MATCH (cx)-[:HAS]->(sen:sensor) OPTIONAL MATCH (cx)-[:HAS]->(act:actuator) DETACH DELETE p, s, a, cx, n, sen, act """, pid=str(population_id))