1093 lines
44 KiB
Python
1093 lines
44 KiB
Python
"""
|
|
module used to monitor a population of neural networks
|
|
solving a shared or isolated scape.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
import asyncio
|
|
import json
|
|
import math
|
|
import os
|
|
import random
|
|
import uuid
|
|
import logging
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, List, Literal, Optional, Sequence, Tuple, Callable, Awaitable
|
|
|
|
from mathema.genotype.neo4j.genotype import (
|
|
neo4j,
|
|
construct_agent,
|
|
clone_agent,
|
|
delete_agent,
|
|
update_fingerprint,
|
|
)
|
|
from mathema.genotype.neo4j.genotype_mutator_tx import GenotypeMutator
|
|
from mathema.utils import stats
|
|
from mathema.core.exoself import Exoself
|
|
from mathema.settings import (EFF, SURVIVAL_PERCENTAGE, SPECIE_SIZE_LIMIT,
|
|
INIT_SPECIE_SIZE, GENERATION_LIMIT, EVALUATIONS_LIMIT,
|
|
FITNESS_GOAL, INIT_POPULATION_ID)
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
OpTag = Literal["continue", "pause", "done"]
|
|
SelectionAlgorithm = Literal["competition", "top3"]
|
|
EXOSELF_START: Optional[Callable[[str, "PopulationMonitor"], Awaitable[Any]]] = None
|
|
|
|
|
|
def _new_id() -> str:
|
|
return uuid.uuid4().hex
|
|
|
|
|
|
def _mean(xs: Sequence[float]) -> float:
|
|
return sum(xs) / len(xs) if xs else 0.0
|
|
|
|
|
|
def _std(xs: Sequence[float]) -> float:
|
|
if not xs: return 0.0
|
|
m = _mean(xs)
|
|
return math.sqrt(sum((x - m) ** 2 for x in xs) / len(xs))
|
|
|
|
|
|
async def _read_all(q: str, **params):
|
|
return await neo4j.read_all(q, **params)
|
|
|
|
|
|
async def _run(q: str, **params):
|
|
return await neo4j.run_consume(q, **params)
|
|
|
|
|
|
@dataclass
|
|
class MonitorState:
|
|
op_mode: str
|
|
population_id: str
|
|
selection_algorithm: SelectionAlgorithm
|
|
op_tag: OpTag = "continue"
|
|
|
|
active: List[Tuple[str, Any]] = field(default_factory=list)
|
|
agent_ids: List[str] = field(default_factory=list)
|
|
|
|
tot_agents: int = 0
|
|
agents_left: int = 0
|
|
|
|
pop_gen: int = 0
|
|
eval_acc: int = 0
|
|
cycle_acc: int = 0
|
|
time_acc: int = 0
|
|
rows: List[dict] = field(default_factory=list)
|
|
|
|
|
|
async def _population_aggregate(population_id: str) -> dict:
|
|
"""
|
|
Retrieve fitness values for agents in a population and calculate aggregate statistics.
|
|
|
|
Parameters:
|
|
population_id (str): The unique identifier of the population.
|
|
|
|
Returns:
|
|
Dictionary: A dictionary containing the following aggregate statistics:
|
|
- "cum_fitness": Sum of all fitness values.
|
|
- "avg": Average fitness value.
|
|
- "std": Standard deviation of fitness values.
|
|
- "best": Maximum fitness value.
|
|
- "min": Minimum fitness value.
|
|
- "n": Number of fitness values.
|
|
- "agents": Number of agents in the population.
|
|
|
|
"""
|
|
rows = await _read_all("""
|
|
MATCH (a:agent {population_id:$pid})
|
|
RETURN collect(coalesce(toFloat(a.fitness),0.0)) AS fs
|
|
""", pid=str(population_id))
|
|
fs = [float(x) for x in (rows[0]["fs"] if rows else [])]
|
|
|
|
if not fs:
|
|
return {"cum_fitness": 0.0, "avg": 0.0, "std": 0.0, "best": 0.0, "min": 0.0, "n": 0, "agents": 0}
|
|
n = len(fs)
|
|
s = sum(fs)
|
|
m = s / n
|
|
var = sum((x - m) ** 2 for x in fs) / n
|
|
|
|
return {
|
|
"cum_fitness": s,
|
|
"avg": m,
|
|
"std": math.sqrt(var),
|
|
"best": max(fs),
|
|
"min": min(fs),
|
|
"n": n,
|
|
"agents": n
|
|
}
|
|
|
|
|
|
async def _best_fitness_in_population(population_id: str) -> float:
|
|
"""
|
|
|
|
Get the best fitness score in a given population based on the maximum fitness value of all agents.
|
|
|
|
Parameters:
|
|
- population_id (str): The ID of the population to search for.
|
|
|
|
Returns:
|
|
- float: The best fitness value found in the population, or 0.0 if no fitness values are found.
|
|
|
|
"""
|
|
rows = await _read_all("""
|
|
MATCH (a:agent {population_id:$pid})
|
|
RETURN max(toFloat(a.fitness)) AS best
|
|
""", pid=str(population_id))
|
|
return float(rows[0]["best"] or 0.0) if rows else 0.0
|
|
|
|
|
|
async def _calculate_energy_cost(population_id: str) -> float:
|
|
"""
|
|
Calculate the energy cost based on the fitness of agents and the number of neurons
|
|
in the cortex associated with the given population ID.
|
|
|
|
Parameters:
|
|
population_id (str): The ID of the population for which the energy cost needs to be calculated.
|
|
|
|
Returns:
|
|
float: The calculated energy cost.
|
|
"""
|
|
rows = await _read_all("""
|
|
MATCH (a:agent {population_id:$pid})-[:OWNS]->(cx:cortex)
|
|
OPTIONAL MATCH (cx)-[:HAS]->(n:neuron)
|
|
RETURN sum(coalesce(toFloat(a.fitness),0.0)) AS totE,
|
|
count(n) AS totN
|
|
""", pid=str(population_id))
|
|
if not rows:
|
|
return 0.0
|
|
totE = float(rows[0]["totE"] or 0.0)
|
|
totN = int(rows[0]["totN"] or 0)
|
|
return (totE / totN) if totN > 0 else 0.0
|
|
|
|
|
|
async def _construct_agent_summaries(agent_ids: Sequence[str]):
|
|
"""
|
|
Constructs summaries for the given list of agent IDs.
|
|
|
|
Parameters:
|
|
agent_ids (Sequence[str]): A list of agent IDs for which summaries are to be constructed.
|
|
|
|
Return Type:
|
|
List[Tuple[float, int, str]]: A list of tuples where each tuple contains the agent's fitness as a float,
|
|
the count of neurons as an integer, and the agent ID as a string.
|
|
"""
|
|
if not agent_ids:
|
|
return []
|
|
rows = await _read_all("""
|
|
UNWIND $ids AS aid
|
|
MATCH (a:agent {id:aid})-[:OWNS]->(cx:cortex)
|
|
OPTIONAL MATCH (cx)-[:HAS]->(n:neuron)
|
|
RETURN aid AS id,
|
|
toFloat(a.fitness) AS f,
|
|
count(n) AS k
|
|
""", ids=[str(x) for x in agent_ids])
|
|
out: List[Tuple[float, int, str]] = []
|
|
for r in rows:
|
|
f = float(r["f"]) if r["f"] is not None else 0.0
|
|
k = int(r["k"])
|
|
out.append((f, k, str(r["id"])))
|
|
return out
|
|
|
|
|
|
async def _calculate_alotments(valid: List[Tuple[float, int, str]],
|
|
neural_energy_cost: float
|
|
):
|
|
"""
|
|
Calculate allotments based on fitness values and neural energy cost.
|
|
|
|
Parameters:
|
|
valid (List[Tuple[float, int, str]]): A list of tuples containing fitness, total neurons, and agent ID.
|
|
neural_energy_cost (float): The energy cost for neural activity.
|
|
|
|
Returns:
|
|
Tuple[List[Tuple[float, float, int, str]], float]: A tuple containing a list of tuples with allotments,
|
|
fitness values, total neurons, and agent IDs, and the total allotments for the new population.
|
|
"""
|
|
acc: List[Tuple[float, float, int, str]] = []
|
|
new_pop_acc = 0.0
|
|
for (fit, tn, aid) in valid:
|
|
neural_alot = (fit / neural_energy_cost) if neural_energy_cost > 0 else 0.0
|
|
mutant_alot = (neural_alot / max(tn, 1))
|
|
new_pop_acc += mutant_alot
|
|
acc.append((mutant_alot, fit, tn, aid))
|
|
log.debug(f"NewPopAcc: {new_pop_acc:.4f}")
|
|
return acc, new_pop_acc
|
|
|
|
|
|
async def _extract_specie_agent_ids(specie_id: str) -> List[str]:
|
|
"""
|
|
Extracts the IDs of agents associated with a given specie.
|
|
|
|
:param specie_id: The ID of the specie for which to retrieve agent IDs
|
|
:type specie_id: str
|
|
:return: List of agent IDs associated with the specie
|
|
:rtype: List[str]
|
|
"""
|
|
rows = await _read_all("""
|
|
MATCH (:specie {id:$sid})-[:HAS]->(a:agent) RETURN a.id AS id
|
|
""", sid=str(specie_id))
|
|
return [str(r["id"]) for r in rows]
|
|
|
|
|
|
async def _extract_specie_ids(population_id: str) -> List[str]:
|
|
"""
|
|
Extract specie IDs from Neo4j database for a given population ID.
|
|
|
|
:param population_id: str - The population ID for which to extract specie IDs.
|
|
:return: List[str] - A list of specie IDs associated with the given population ID.
|
|
|
|
"""
|
|
rows = await _read_all("""
|
|
MATCH (s:specie {population_id:$pid}) RETURN s.id AS id ORDER BY id
|
|
""", pid=str(population_id))
|
|
return [str(r["id"]) for r in rows]
|
|
|
|
|
|
async def _ensure_specie_node(specie_id: str, population_id: str, constraint_json: str):
|
|
"""
|
|
Ensure that the specie node exists. Used to keep the agent database coherent.
|
|
|
|
Parameters:
|
|
specie_id (str): The ID of the specie to create.
|
|
constraint_json (str): The JSON string of the constraints for the specie.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
await _run("""
|
|
MERGE (s:specie {id:$sid})
|
|
SET s.population_id = $pid,
|
|
s.constraint_json = $cjson
|
|
""", sid=str(specie_id), pid=str(population_id), cjson=str(constraint_json))
|
|
|
|
|
|
async def _extract_agent_ids(population_id: str) -> List[str]:
|
|
"""
|
|
Extracts the agent IDs associated with a given population ID.
|
|
|
|
Parameters:
|
|
population_id (str): The ID of the population to extract agent IDs for.
|
|
|
|
Returns:
|
|
List[str]: A list of agent IDs as strings extracted from the database based on the provided population ID.
|
|
"""
|
|
rows = await _read_all("MATCH (a:agent {population_id:$pid}) RETURN a.id AS id ORDER BY id",
|
|
pid=str(population_id))
|
|
return [str(r["id"]) for r in rows]
|
|
|
|
|
|
async def _ensure_population_node(population_id: str) -> None:
|
|
await _run("MERGE (:population {id:$pid})", pid=str(population_id))
|
|
|
|
|
|
class PopulationMonitor:
|
|
def __init__(self, op_mode: str, population_id: str, selection_algorithm: SelectionAlgorithm):
|
|
self.state = MonitorState(op_mode, population_id, selection_algorithm)
|
|
self.inbox: asyncio.Queue = asyncio.Queue()
|
|
self._task: Optional[asyncio.Task] = None
|
|
self._stopped_evt = asyncio.Event()
|
|
self.mutator = GenotypeMutator(neo4j)
|
|
self.gen_ended = asyncio.Event()
|
|
self._exoself_start = EXOSELF_START or Exoself.start
|
|
|
|
# logging stuff
|
|
self.run_id = None
|
|
self._t0 = None
|
|
self._best_so_far = float("-inf")
|
|
self.train_time_sec = 30*60
|
|
self._deadline_task = None
|
|
|
|
# logging file handles
|
|
self._episodes_f = None
|
|
self._progress_f = None
|
|
|
|
@classmethod
|
|
async def start(cls, op_mode: str, population_id: str,
|
|
selection_algorithm: SelectionAlgorithm) -> "PopulationMonitor":
|
|
"""
|
|
Create and start a new population monitor instance.
|
|
|
|
This class method initializes a PopulationMonitor for the given
|
|
population and selection algorithm, starts its asynchronous message
|
|
processing loop, and prepares all logging and bookkeeping required for a
|
|
training run.
|
|
|
|
Specifically, it:
|
|
1. Instantiates the monitor with the specified operation mode, population
|
|
identifier, and selection algorithm.
|
|
2. Launches the internal actor-style run loop as an asyncio task.
|
|
3. Registers an atexit hook to collect generation-level statistics for
|
|
post-run analysis.
|
|
4. Initializes timing, assigns a unique run identifier, and creates the
|
|
corresponding output directory.
|
|
5. Opens and initializes episode-level and progress-level CSV log files.
|
|
6. Starts a deadline task to enforce the configured training time limit.
|
|
7. Initializes and activates the first generation of the population.
|
|
|
|
The method returns the fully initialized and running PopulationMonitor
|
|
instance, which can then be controlled via its public interface.
|
|
"""
|
|
self = cls(op_mode, population_id, selection_algorithm)
|
|
self._task = asyncio.create_task(self._run(), name=f"PopulationMonitor-{population_id}")
|
|
stats.register_atexit(population_id, lambda: list(self.state.rows))
|
|
|
|
# init logging
|
|
loop = asyncio.get_running_loop()
|
|
self._t0 = loop.time()
|
|
self.run_id = f"{population_id}__{uuid.uuid4().hex[:8]}"
|
|
os.makedirs(f"runs/{self.run_id}", exist_ok=True)
|
|
|
|
self._episodes_f = open(f"runs/{self.run_id}/episodes.csv", "w", buffering=1)
|
|
self._episodes_f.write("t_sec,pop_gen,agent_id,eval_idx,episode_return\n")
|
|
|
|
self._progress_f = open(f"runs/{self.run_id}/progress.csv", "w", buffering=1)
|
|
self._progress_f.write("t_sec,t_norm,pop_gen,best_gen,best_so_far,avg,std\n")
|
|
|
|
self._deadline_task = asyncio.create_task(self._stop_after_deadline(), name=f"Deadline-{self.run_id}")
|
|
|
|
await self._init_generation()
|
|
|
|
return self
|
|
|
|
async def _stop_after_deadline(self):
|
|
await asyncio.sleep(self.train_time_sec)
|
|
await self.inbox.put(("stop", "normal"))
|
|
|
|
async def cast(self, msg: tuple) -> None:
|
|
await self.inbox.put(msg)
|
|
|
|
async def stop(self, mode: Literal["normal", "shutdown"] = "normal"):
|
|
"""
|
|
Request termination of the population monitor and await shutdown.
|
|
|
|
This method sends a stop command to the monitor's internal message loop
|
|
and blocks until the monitor has completed its graceful shutdown
|
|
procedure. The optional mode parameter indicates the reason for stopping
|
|
(e.g. normal completion versus external shutdown) and is forwarded to
|
|
the internal stop handler.
|
|
|
|
It provides a synchronous-style interface for external components to
|
|
ensure that all agents are terminated, logs are flushed, and final
|
|
results are persisted before control returns to the caller.
|
|
"""
|
|
await self.inbox.put(("stop", mode))
|
|
await self._stopped_evt.wait()
|
|
|
|
async def _run(self):
|
|
"""
|
|
Main asynchronous message-processing loop of the population monitor.
|
|
|
|
This method continuously consumes messages from the internal inbox and
|
|
dispatches them to the appropriate handlers, coordinating the lifecycle
|
|
of an evolutionary run. It implements an actor-style control loop with
|
|
the following responsibilities:
|
|
|
|
1. Reacts to control messages:
|
|
- "stop": triggers graceful shutdown and finalization of the run.
|
|
- "pause": requests a pause after the current generation completes.
|
|
- "continue": resumes execution and initializes a new generation
|
|
after a pause.
|
|
2. Handles evaluation-related events:
|
|
- "episode_done": logs completion of a single evaluation episode.
|
|
- "terminated": processes termination of an agent and updates
|
|
generation-level state.
|
|
3. Maintains correct sequencing of generations and respects the current
|
|
operational mode (continue, pause, done).
|
|
|
|
The loop runs until a stop message is received or the task is cancelled.
|
|
On exit, it signals completion via an internal stopped-event to allow
|
|
other components to await full shutdown.
|
|
"""
|
|
try:
|
|
while True:
|
|
msg = await self.inbox.get()
|
|
tag = msg[0] if isinstance(msg, tuple) else None
|
|
|
|
if tag == "stop":
|
|
await self._handle_stop(msg[1])
|
|
break
|
|
elif tag == "pause":
|
|
if self.state.op_tag == "continue":
|
|
self.state.op_tag = "pause"
|
|
log.debug("Population Monitor will pause after this generation.")
|
|
elif tag == "continue":
|
|
if self.state.op_tag == "pause":
|
|
self.state.op_tag = "continue"
|
|
await self._init_generation()
|
|
elif tag == "episode_done":
|
|
await self._handle_episode_done(*msg[1:])
|
|
elif tag == "terminated":
|
|
await self._handle_agent_terminated(*msg[1:])
|
|
else:
|
|
pass
|
|
finally:
|
|
self._stopped_evt.set()
|
|
|
|
async def _init_generation(self) -> None:
|
|
"""
|
|
Initialize the agent generation process by extracting agent ids, setting initial values for the state object,
|
|
and starting agents asynchronously. If any errors occur during the agent initialization process,
|
|
the corresponding agent's fitness is set to negative infinity.
|
|
|
|
Parameters:
|
|
- self: The reference to the current object instance.
|
|
|
|
Returns:
|
|
- None
|
|
"""
|
|
s = self.state
|
|
agent_ids = await _extract_agent_ids(s.population_id)
|
|
s.agent_ids = agent_ids
|
|
s.tot_agents = len(agent_ids)
|
|
s.agents_left = 0
|
|
|
|
active = []
|
|
for aid in agent_ids:
|
|
try:
|
|
handle = await self._exoself_start(aid, self)
|
|
active.append((aid, handle))
|
|
s.agents_left += 1
|
|
except Exception as e:
|
|
log.error(f"[Monitor] FAILED to start agent {aid}: {e!r}")
|
|
|
|
await _run("MATCH (a:agent {id:$aid}) SET a.fitness = toFloat($f)", aid=str(aid), f=float("-inf"))
|
|
|
|
s.active = active
|
|
s.tot_agents = s.agents_left
|
|
log.info(f"*** Population monitor started: pop={s.population_id}, mode={s.op_mode}, "
|
|
f"selection={s.selection_algorithm}, agents={s.tot_agents}")
|
|
|
|
async def _handle_stop(self, mode: str):
|
|
"""
|
|
Gracefully stop and finalize the population monitoring run.
|
|
|
|
This method is invoked when the monitoring loop receives a stop signal.
|
|
It performs an orderly shutdown of the ongoing evolutionary run by:
|
|
|
|
1. Terminating all currently active agent handlers or actors.
|
|
2. Flushing and closing episode-level and progress-level log files.
|
|
3. Cancelling any active deadline or time-limit task.
|
|
4. Finalizing the global best-so-far fitness value based on the current
|
|
population state.
|
|
5. Writing a run summary to disk, including identifiers, training
|
|
duration, generation count, accumulated evaluation statistics, and
|
|
final best-so-far fitness.
|
|
6. Clearing internal file handles and logging the shutdown event.
|
|
|
|
The method ensures that partial results are safely persisted and that
|
|
all asynchronous resources are released before the run terminates.
|
|
"""
|
|
s = self.state
|
|
|
|
for (_aid, h) in list(s.active):
|
|
try:
|
|
if hasattr(h, "cast"):
|
|
await h.cast(("terminate",))
|
|
elif hasattr(h, "stop"):
|
|
await h.stop()
|
|
except Exception:
|
|
pass
|
|
|
|
for f in (self._episodes_f, self._progress_f):
|
|
if f is not None:
|
|
try:
|
|
f.flush()
|
|
f.close()
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
if getattr(self, "_deadline_task", None):
|
|
self._deadline_task.cancel()
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
best = await _best_fitness_in_population(self.state.population_id)
|
|
self._best_so_far = max(self._best_so_far, float(best))
|
|
except Exception:
|
|
pass
|
|
|
|
summary = {
|
|
"run_id": self.run_id,
|
|
"population_id": self.state.population_id,
|
|
"train_time_sec": self.train_time_sec,
|
|
"gens": self.state.pop_gen,
|
|
"eval_acc": self.state.eval_acc,
|
|
"best_so_far": self._best_so_far,
|
|
"op_tag": self.state.op_tag,
|
|
}
|
|
with open(f"runs/{self.run_id}/summary.json", "w") as f:
|
|
json.dump(summary, f, indent=2)
|
|
|
|
self._episodes_f = None
|
|
self._progress_f = None
|
|
log.info(f"*** Population_Monitor:{s.population_id} shutdown. op_tag={s.op_tag}, op_mode={s.op_mode}")
|
|
|
|
async def _handle_episode_done(self, agent_id: str, ep_return: float, eval_idx: int):
|
|
"""
|
|
Handle completion of a single evaluation episode for an agent.
|
|
|
|
This method is called whenever an agent finishes one evaluation episode
|
|
within the current generation. It records episode-level information for
|
|
later analysis and monitoring but does not alter population-level state
|
|
or control flow.
|
|
|
|
Specifically, it:
|
|
1. Computes the elapsed wall-clock time since the start of the run.
|
|
2. Logs the episode result (time, generation index, agent identifier,
|
|
evaluation index, and episode return) to the episode-level log file,
|
|
if enabled.
|
|
"""
|
|
s = self.state
|
|
t_sec = asyncio.get_running_loop().time() - (self._t0 or 0.0)
|
|
if self._episodes_f is not None:
|
|
self._episodes_f.write(f"{t_sec:.6f},{s.pop_gen},{agent_id},{eval_idx},{ep_return:.10f}\n")
|
|
|
|
async def _handle_agent_terminated(self, agent_id: str, fitness: float, agent_eval: int, agent_cycle: int,
|
|
agent_time: int):
|
|
"""
|
|
Handle termination of a single agent evaluation.
|
|
|
|
This method is invoked when an agent finishes its evaluation within the
|
|
current generation. It performs the following actions:
|
|
|
|
1. Accumulates per-agent evaluation statistics (evaluation count, cycle
|
|
count, and execution time) into generation-level counters.
|
|
2. Decrements the number of remaining active agents in the generation.
|
|
3. Persists the agent's final fitness value to the underlying genotype
|
|
storage.
|
|
4. Removes the terminated agent from the list of currently active agents.
|
|
5. Logs progress information, including how many agents are still active.
|
|
6. Triggers generation finalization once all agents in the generation
|
|
have completed their evaluations.
|
|
|
|
The method is fully asynchronous and is typically called by the actor
|
|
supervision or monitoring component when an agent signals termination.
|
|
"""
|
|
log.info(f"agent terminated: , {agent_id}, {fitness}, {agent_eval}, {agent_cycle}, {agent_time}")
|
|
s = self.state
|
|
|
|
s.eval_acc += int(agent_eval)
|
|
s.cycle_acc += int(agent_cycle)
|
|
s.time_acc += int(agent_time)
|
|
s.agents_left -= 1
|
|
|
|
await _run("MATCH (a:agent {id:$aid}) SET a.fitness = toFloat($f)", aid=str(agent_id), f=float(fitness))
|
|
|
|
s.active = [(aid, h) for (aid, h) in s.active if aid != agent_id]
|
|
|
|
log.info(f"[Monitor] agent done: {agent_id} | agents_left={s.agents_left}/{s.tot_agents}")
|
|
if 0 < s.agents_left <= 3:
|
|
log.info("[Monitor] still active: %s", [str(aid) for (aid, _h) in s.active])
|
|
|
|
if s.agents_left <= 0:
|
|
await self._generation_finished()
|
|
|
|
async def _generation_finished(self) -> None:
|
|
"""
|
|
Handle the end of a population generation.
|
|
|
|
This method is called once all agents of the current generation have
|
|
completed their evaluations. It performs the following steps:
|
|
|
|
1. Mutates and selects the next generation according to the configured
|
|
selection algorithm and species size limit.
|
|
2. Increments the generation counter and logs aggregated population
|
|
statistics (best, average, standard deviation of fitness).
|
|
3. Updates time-based metrics, including elapsed wall-clock time,
|
|
normalized training time, and the global best-so-far fitness.
|
|
4. Writes a progress entry to the progress log (CSV-style) and appends
|
|
a detailed statistics record to the in-memory history.
|
|
5. Signals the end of the generation via an asyncio.Event to unblock
|
|
dependent tasks.
|
|
6. Checks termination conditions, including generation limit,
|
|
evaluation limit, fitness goal, and wall-clock time limit.
|
|
7. Depending on the current operation tag and termination conditions,
|
|
either stops the run, pauses execution, or initializes the next
|
|
generation.
|
|
|
|
The method is fully asynchronous and intended to be called from the
|
|
population monitoring loop that coordinates evolutionary training.
|
|
"""
|
|
s = self.state
|
|
await self._mutate_population(s.population_id, SPECIE_SIZE_LIMIT, s.selection_algorithm)
|
|
s.pop_gen += 1
|
|
log.info(f"Population {s.population_id} generation {s.pop_gen} ended.\n")
|
|
|
|
pop_stats = await _population_aggregate(s.population_id)
|
|
|
|
# aggregate logging information
|
|
t_sec = asyncio.get_running_loop().time() - (self._t0 or 0.0)
|
|
t_norm = min(1.0, t_sec / self.train_time_sec)
|
|
best_gen = float(pop_stats["best"])
|
|
self._best_so_far = max(self._best_so_far, best_gen)
|
|
|
|
# write logs
|
|
if self._progress_f is not None:
|
|
self._progress_f.write(
|
|
f"{t_sec:.6f},{t_norm:.6f},{s.pop_gen},{best_gen:.10f},{self._best_so_far:.10f},"
|
|
f"{float(pop_stats['avg']):.10f},{float(pop_stats['std']):.10f}\n"
|
|
)
|
|
|
|
s.rows.append({
|
|
"gen": int(s.pop_gen),
|
|
"t_sec": int(asyncio.get_running_loop().time()),
|
|
"cum_fitness": float(pop_stats["cum_fitness"]),
|
|
"best": float(pop_stats["best"]),
|
|
"avg": float(pop_stats["avg"]),
|
|
"std": float(pop_stats["std"]),
|
|
"agents": int(pop_stats["agents"]),
|
|
"eval_acc": int(s.eval_acc),
|
|
"cycle_acc": float(s.cycle_acc),
|
|
"time_acc": float(s.time_acc),
|
|
})
|
|
|
|
self.gen_ended.set()
|
|
self.gen_ended = asyncio.Event()
|
|
|
|
# check time
|
|
now = asyncio.get_running_loop().time()
|
|
elapsed = now - self._t0
|
|
time_limit_reached = elapsed >= self.train_time_sec
|
|
if time_limit_reached:
|
|
log.info(
|
|
f"[Monitor] time limit reached "
|
|
f"({elapsed:.1f}s >= {self.train_time_sec}s), stopping run"
|
|
)
|
|
|
|
best = await _best_fitness_in_population(s.population_id)
|
|
end_condition = (s.pop_gen >= GENERATION_LIMIT) or (s.eval_acc >= EVALUATIONS_LIMIT) or (best > FITNESS_GOAL) or time_limit_reached
|
|
if s.pop_gen >= GENERATION_LIMIT:
|
|
log.info(f"reached generation limit {GENERATION_LIMIT}, stopping")
|
|
if s.eval_acc >= EVALUATIONS_LIMIT:
|
|
log.info(f"reached evaluation limit {EVALUATIONS_LIMIT}, stopping")
|
|
if best > FITNESS_GOAL:
|
|
log.info(f"reached best fitness {best}, stopping")
|
|
|
|
if s.op_tag == "done" or end_condition:
|
|
await self.inbox.put(("stop", "normal"))
|
|
return
|
|
|
|
if s.op_tag == "pause":
|
|
log.info("Population Monitor paused.")
|
|
return
|
|
|
|
await self._init_generation()
|
|
|
|
async def _mutate_population(self, population_id: str, keep_tot: int,
|
|
selection_algorithm: SelectionAlgorithm):
|
|
"""
|
|
Apply evolutionary mutation and selection to an entire population.
|
|
|
|
This method orchestrates the mutation step at the population level at the
|
|
end of a generation. It first computes shared contextual information,
|
|
such as the current energy cost of the population, and then iterates over
|
|
all species belonging to the population.
|
|
|
|
For each species, it delegates the actual selection and mutation process
|
|
to the species-level mutation routine, ensuring that the total number of
|
|
individuals retained respects the configured population size limit and
|
|
the chosen selection algorithm.
|
|
|
|
The method is asynchronous and intended to be invoked as part of the
|
|
generation finalization phase of the evolutionary loop.
|
|
"""
|
|
energy_cost = await _calculate_energy_cost(population_id)
|
|
specie_ids = await _extract_specie_ids(population_id)
|
|
for sid in specie_ids:
|
|
await self._mutate_specie(sid, keep_tot, energy_cost, selection_algorithm)
|
|
|
|
async def _mutate_specie(self, specie_id: str, population_limit: int, neural_energy_cost: float,
|
|
selection_algorithm: SelectionAlgorithm):
|
|
"""
|
|
Mutate and repopulate a single species according to the chosen selection strategy.
|
|
|
|
This method performs the end-of-generation evolutionary step for one
|
|
species ("specie") within a population. It:
|
|
|
|
1. Loads all agents belonging to the species and constructs fitness
|
|
summaries, sorting agents by fitness in descending order.
|
|
2. Selects survivors and removes non-survivors from both the genotype
|
|
store and the species membership relationship.
|
|
3. Creates new offspring agents to refill the species up to the target
|
|
population limit, using one of two selection algorithms:
|
|
- "competition": keeps a fraction of the best agents (SURVIVAL_PERCENTAGE),
|
|
ranks them by an efficiency-weighted score (fitness adjusted by
|
|
network size/complexity), deletes the rest, and produces offspring
|
|
via the competition routine.
|
|
- "top3": keeps only the top 3 agents, deletes all others, and produces
|
|
the required number of offspring from these champions.
|
|
4. Computes aggregate fitness statistics for the species (mean, standard
|
|
deviation, min, max) and updates the species node with these values as
|
|
well as the list of champion agent IDs.
|
|
5. Updates the species' innovation factor based on whether the current
|
|
best fitness exceeds the stored innovation threshold.
|
|
6. Refreshes fingerprints for newly created agents to keep derived
|
|
metadata consistent.
|
|
|
|
The method is asynchronous and is intended to be called from the
|
|
population-level mutation step after a generation has completed.
|
|
"""
|
|
agent_ids = await _extract_specie_agent_ids(specie_id)
|
|
|
|
summaries = await _construct_agent_summaries(agent_ids)
|
|
summaries.sort(key=lambda t: t[0], reverse=True)
|
|
|
|
if not summaries:
|
|
return
|
|
|
|
if selection_algorithm == "competition":
|
|
tot_survivors = round(len(summaries) * SURVIVAL_PERCENTAGE)
|
|
|
|
weighted = sorted(
|
|
[(fit / (max(tn, 1) ** EFF), (fit, tn, aid)) for (fit, tn, aid) in summaries],
|
|
key=lambda x: x[0], reverse=True
|
|
)
|
|
valid = [val for (_score, val) in weighted][:tot_survivors]
|
|
invalid = [x for x in summaries if x not in valid]
|
|
top3 = valid[:3]
|
|
top_agent_ids = [aid for (_f, _tn, aid) in top3]
|
|
|
|
for (_f, _tn, aid) in invalid:
|
|
await delete_agent(aid)
|
|
await _run("""
|
|
MATCH (:specie {id:$sid})-[h:HAS]->(a:agent {id:$aid}) DELETE h
|
|
""", sid=str(specie_id), aid=str(aid))
|
|
|
|
new_ids = await self._competition(valid, population_limit, neural_energy_cost, specie_id)
|
|
|
|
elif selection_algorithm == "top3":
|
|
valid = summaries[:3]
|
|
invalid = [x for x in summaries if x not in valid]
|
|
top_agent_ids = [aid for (_f, _tn, aid) in valid]
|
|
for (_f, _tn, aid) in invalid:
|
|
await delete_agent(aid)
|
|
await _run("MATCH (:specie {id:$sid})-[h:HAS]->(a:agent {id:$aid}) DELETE h",
|
|
sid=str(specie_id), aid=str(aid))
|
|
|
|
offspring_needed = max(0, population_limit - len(top_agent_ids))
|
|
new_ids = await self._top3(top_agent_ids, offspring_needed, specie_id)
|
|
|
|
else:
|
|
log.error(f"Unknown selection algorithm: {selection_algorithm}")
|
|
raise ValueError(f"Unknown selection algorithm: {selection_algorithm}")
|
|
|
|
f_list = [f for (f, _tn, _aid) in summaries]
|
|
avg, std, maxf, minf = _mean(f_list), _std(f_list), max(f_list), min(f_list)
|
|
|
|
row = await _read_all("MATCH (s:specie {id:$sid}) RETURN toInteger(s.innovation_factor) AS inv",
|
|
sid=str(specie_id))
|
|
inv = int(row[0]["inv"]) if row and row[0]["inv"] is not None else 0
|
|
u_inv = 0 if (maxf > inv) else (inv - 1)
|
|
|
|
await _run("""
|
|
MATCH (s:specie {id:$sid})
|
|
SET s.fitness_avg = toFloat($avg),
|
|
s.fitness_std = toFloat($std),
|
|
s.fitness_max = toFloat($maxf),
|
|
s.fitness_min = toFloat($minf),
|
|
s.innovation_factor = toInteger($inv),
|
|
s.champion_ids = $champs
|
|
""", sid=str(specie_id), avg=float(avg), std=float(std), maxf=float(maxf), minf=float(minf),
|
|
inv=int(u_inv), champs=[str(x) for x in top_agent_ids])
|
|
|
|
for aid in new_ids:
|
|
try:
|
|
await update_fingerprint(aid)
|
|
except Exception:
|
|
pass
|
|
|
|
async def _competition(self, valid: List[Tuple[float, int, str]], population_limit: int,
|
|
neural_energy_cost: float, specie_id: str) -> List[str]:
|
|
"""
|
|
Perform competition-based reproduction for a species.
|
|
|
|
This method implements the competition selection strategy for a single
|
|
species. Given a set of valid (surviving) agents, it:
|
|
|
|
1. Computes reproduction allotments for each agent based on fitness and
|
|
neural energy cost, yielding a total estimated population size.
|
|
2. Derives a normalization factor to scale these allotments so that the
|
|
resulting number of survivors and offspring respects the configured
|
|
population size limit.
|
|
3. Delegates to the survivor-gathering routine to retain parent agents
|
|
and generate the appropriate number of mutant offspring.
|
|
|
|
The method returns a list of agent identifiers representing the new
|
|
species population after competition-based selection and mutation. It is
|
|
asynchronous and intended to be used during the species-level mutation
|
|
phase of the evolutionary cycle.
|
|
"""
|
|
alot, est = await _calculate_alotments(valid, neural_energy_cost)
|
|
normalizer = (est / population_limit) if population_limit > 0 else 1.0
|
|
log.debug(f"Population size normalizer: {normalizer:.4f}")
|
|
return await self._gather_survivors(alot, normalizer, specie_id)
|
|
|
|
async def _gather_survivors(self, alot: List[Tuple[float, float, int, str]], normalizer: float, specie_id: str) -> \
|
|
List[str]:
|
|
"""
|
|
Collect survivors and generate offspring for a species based on normalized allotments.
|
|
|
|
This method takes a list of agents with precomputed allotment scores and
|
|
determines how many times each agent should survive or reproduce into
|
|
the next generation. For each agent, it:
|
|
|
|
1. Computes a reproduction count by normalizing the agent's allotment
|
|
value against a global normalizer.
|
|
2. Enforces a hard safety constraint to ensure that each listed agent
|
|
contributes at least one survivor to the next generation.
|
|
3. Ensures that the surviving agent remains linked to the species.
|
|
4. Creates the required number of mutant offspring clones for the agent
|
|
to satisfy its allotted reproduction count.
|
|
5. Removes agents that would otherwise receive zero allotment from the
|
|
species and deletes their genotype representation.
|
|
|
|
The method returns a list of agent identifiers representing the newly
|
|
formed species population, including both surviving parents and newly
|
|
created offspring. It is asynchronous and intended to be used as part of
|
|
the species-level reproduction process.
|
|
"""
|
|
new_ids: List[str] = []
|
|
for (ma, fit, tn, aid) in alot:
|
|
count = int(round(ma / normalizer)) if normalizer > 0 else 0
|
|
# hard safety: keep at least one survivor
|
|
if count <= 0:
|
|
count = 1
|
|
log.info(f"Agent {aid}: normalized allotment = {count}")
|
|
# TODO: this is redundant!
|
|
if count >= 1:
|
|
await _run("""
|
|
MATCH (s:specie {id:$sid}), (a:agent {id:$aid})
|
|
MERGE (s)-[:HAS]->(a)
|
|
""", sid=str(specie_id), aid=str(aid))
|
|
new_ids.append(aid)
|
|
|
|
k = count - 1
|
|
for _ in range(k):
|
|
cid = await self._create_mutant_offspring(aid, specie_id)
|
|
new_ids.append(cid)
|
|
else:
|
|
|
|
await delete_agent(aid)
|
|
await _run("MATCH (:specie {id:$sid})-[h:HAS]->(a:agent {id:$aid}) DELETE h",
|
|
sid=str(specie_id), aid=str(aid))
|
|
|
|
log.info(f"New Population Size (specie={specie_id}): {len(new_ids)}")
|
|
return new_ids
|
|
|
|
async def _create_mutant_offspring(self, parent_id: str, specie_id: str) -> str:
|
|
"""
|
|
Create a mutated offspring from a parent agent.
|
|
|
|
This method clones an existing agent to create a new offspring and
|
|
integrates it into the evolutionary population. It performs the
|
|
following steps:
|
|
|
|
1. Generates a new unique agent identifier and clones the parent agent's
|
|
genotype into the offspring.
|
|
2. Inherits and sets species and population identifiers for the cloned
|
|
agent and clears its fitness value to mark it as unevaluated.
|
|
3. Establishes the species membership relationship between the offspring
|
|
agent and the corresponding species.
|
|
4. Applies a single mutation step to the offspring's genotype to
|
|
introduce variation.
|
|
|
|
The method returns the identifier of the newly created mutant offspring.
|
|
It is asynchronous and intended to be used during the reproduction phase
|
|
of species-level evolution.
|
|
"""
|
|
clone_id = _new_id()
|
|
|
|
await clone_agent(parent_id, clone_id)
|
|
|
|
rows = await _read_all("MATCH (a:agent {id:$aid}) RETURN a.specie_id AS sid, a.population_id AS pid",
|
|
aid=str(parent_id))
|
|
sid = rows[0]["sid"] if rows else specie_id
|
|
pid = rows[0]["pid"] if rows else None
|
|
await _run("""
|
|
MATCH (a:agent {id:$cid})
|
|
SET a.specie_id = $sid,
|
|
a.population_id = $pid,
|
|
a.fitness = NULL
|
|
""", cid=str(clone_id), sid=str(sid), pid=str(pid))
|
|
|
|
await _run("""
|
|
MATCH (s:specie {id:$sid}), (a:agent {id:$cid})
|
|
MERGE (s)-[:HAS]->(a)
|
|
""", sid=str(specie_id), cid=str(clone_id))
|
|
|
|
await self._mutate_one_step(clone_id)
|
|
return clone_id
|
|
|
|
async def _mutate_one_step(self, agent_id: str):
|
|
"""
|
|
Apply a single random mutation step to an agent.
|
|
|
|
This method selects one mutation operator at random from the available
|
|
set of genotype mutation operations (e.g. weight mutation, bias
|
|
insertion/removal, structural link or neuron changes) and applies it to
|
|
the specified agent.
|
|
|
|
If the selected mutation operation fails, the method falls back to a
|
|
simple weight-mutation step as a safety measure. Any further failure is
|
|
silently ignored to ensure that the evolutionary process can continue
|
|
without interrupting the run.
|
|
|
|
The method is asynchronous and intended to introduce variation during
|
|
offspring creation.
|
|
"""
|
|
ops = [
|
|
self.mutator.mutate_weights_tx,
|
|
self.mutator.add_bias_tx,
|
|
self.mutator.remove_bias_tx,
|
|
self.mutator.add_inlink_tx,
|
|
self.mutator.add_outlink_tx,
|
|
self.mutator.add_neuron_tx,
|
|
self.mutator.outsplice_tx,
|
|
self.mutator.add_actuator_tx,
|
|
]
|
|
op = random.choice(ops)
|
|
try:
|
|
await op(agent_id)
|
|
except Exception:
|
|
|
|
try:
|
|
await self.mutator.mutate_weights_tx(agent_id)
|
|
except Exception:
|
|
pass
|
|
|
|
async def _top3(self, valid_ids: List[str], offspring_needed: int, specie_id: str) -> List[str]:
|
|
"""
|
|
Generate offspring using the top-3 selection strategy.
|
|
|
|
This method implements the "top3" reproduction strategy for a species.
|
|
It retains the three best-performing agents unchanged and fills the
|
|
remaining population slots by repeatedly selecting one of these top
|
|
agents at random and creating a mutated offspring from it.
|
|
|
|
The method returns a list of agent identifiers representing the new
|
|
species population, consisting of the original top agents and their
|
|
offspring. It is asynchronous and intended to be used during the
|
|
species-level mutation phase of the evolutionary cycle.
|
|
"""
|
|
new_ids = list(valid_ids)
|
|
for _ in range(offspring_needed):
|
|
parent = random.choice(valid_ids)
|
|
cid = await self._create_mutant_offspring(parent, specie_id)
|
|
new_ids.append(cid)
|
|
return new_ids
|
|
|
|
|
|
async def init_population(params: Tuple[str, List[dict], str, SelectionAlgorithm]) -> PopulationMonitor:
|
|
"""
|
|
Initialize a new evolutionary population and start its monitor.
|
|
|
|
This function creates a fresh population in the genotype store based on
|
|
the provided species constraints and launches a PopulationMonitor to
|
|
manage its evolutionary process. It performs the following steps:
|
|
|
|
1. Deletes any existing population with the given identifier to ensure a
|
|
clean initialization.
|
|
2. Creates a new population node in the datastore.
|
|
3. For each specified species constraint:
|
|
- Creates a new species node associated with the population.
|
|
- Stores the species constraint configuration and initializes its
|
|
innovation factor.
|
|
- Creates an initial set of agents for the species, constructing their
|
|
genotypes according to the given constraints.
|
|
- Assigns population and species identifiers to each agent, clears
|
|
fitness values, and establishes species membership relationships.
|
|
4. Starts a PopulationMonitor for the initialized population, using the
|
|
specified operation mode and selection algorithm.
|
|
|
|
The function returns the running PopulationMonitor instance, which
|
|
coordinates evaluation, mutation, and logging for the newly created
|
|
population.
|
|
"""
|
|
population_id, specie_constraints, op_mode, selection_algorithm = params
|
|
|
|
await delete_population(population_id)
|
|
|
|
await _run("MERGE (:population {id:$pid})", pid=str(population_id))
|
|
|
|
# TODO: this has to move to the genotype module
|
|
for specon in specie_constraints:
|
|
sid = _new_id()
|
|
await _run("""
|
|
MERGE (p:population {id:$pid})
|
|
MERGE (s:specie {id:$sid})
|
|
ON CREATE SET
|
|
s.population_id = $pid,
|
|
s.constraint_json = $cjson,
|
|
s.innovation_factor = 0
|
|
ON MATCH SET
|
|
s.population_id = $pid,
|
|
s.constraint_json = $cjson
|
|
MERGE (p)-[:HAS]->(s)
|
|
""", pid=str(population_id),
|
|
sid=str(sid),
|
|
cjson=json.dumps(specon, separators=(",", ":"), sort_keys=True))
|
|
|
|
for _ in range(INIT_SPECIE_SIZE):
|
|
aid = _new_id()
|
|
await construct_agent(sid, aid, specon)
|
|
|
|
# todo: this needs to move to the genotype
|
|
await _run("""
|
|
MATCH (a:agent {id:$aid}), (s:specie {id:$sid})
|
|
SET a.population_id = $pid, a.specie_id = $sid, a.fitness = NULL
|
|
MERGE (s)-[:HAS]->(a)
|
|
""", aid=str(aid), sid=str(sid), pid=str(population_id))
|
|
|
|
monitor = await PopulationMonitor.start(op_mode, population_id, selection_algorithm)
|
|
return monitor
|
|
|
|
|
|
async def continue_(op_mode: str, selection_algorithm: SelectionAlgorithm,
|
|
population_id: str = INIT_POPULATION_ID) -> PopulationMonitor:
|
|
"""
|
|
Resume or start a population monitor for an existing population.
|
|
|
|
This convenience function starts a PopulationMonitor for the specified
|
|
population using the given operation mode and selection algorithm,
|
|
without reinitializing or modifying the underlying population structure.
|
|
|
|
It is intended to continue an existing evolutionary run or to attach a
|
|
new monitor to a pre-existing population state. The function returns the
|
|
running PopulationMonitor instance, which immediately begins coordinating
|
|
evaluation and evolution according to the provided parameters.
|
|
"""
|
|
return await PopulationMonitor.start(op_mode, population_id, selection_algorithm)
|
|
|
|
|
|
async def delete_population(population_id: str):
|
|
"""
|
|
Delete a population and all associated evolutionary entities.
|
|
|
|
This function removes a population and all data linked to it from the
|
|
genotype store. It performs a cascading deletion that includes:
|
|
|
|
1. The population node itself.
|
|
2. All species belonging to the population.
|
|
3. All agents within those species.
|
|
4. All cortical structures owned by the agents, including neurons,
|
|
sensors, and actuators.
|
|
|
|
All relationships are detached prior to deletion to ensure referential
|
|
integrity. The operation is destructive and intended to be used during
|
|
population reinitialization or cleanup before starting a new evolutionary
|
|
run.
|
|
"""
|
|
await _run("""
|
|
MATCH (p:population {id:$pid})
|
|
OPTIONAL MATCH (s:specie {population_id:$pid})
|
|
OPTIONAL MATCH (s)-[:HAS]->(a:agent)-[:OWNS]->(cx:cortex)
|
|
OPTIONAL MATCH (cx)-[:HAS]->(n:neuron)
|
|
OPTIONAL MATCH (cx)-[:HAS]->(sen:sensor)
|
|
OPTIONAL MATCH (cx)-[:HAS]->(act:actuator)
|
|
DETACH DELETE p, s, a, cx, n, sen, act
|
|
""", pid=str(population_id))
|