146 lines
5.3 KiB
Python
146 lines
5.3 KiB
Python
import time
|
|
import logging
|
|
|
|
from mathema.actors.actor import Actor
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class Cortex(Actor):
|
|
"""
|
|
Cortex actor coordinating a network of Sensors, Neurons, and Actuators.
|
|
|
|
The Cortex is responsible for driving the network forward in discrete
|
|
computation cycles, collecting fitness feedback from all actuators, and
|
|
reporting evaluation results to an Exoself (supervisor) actor.
|
|
|
|
High-level behavior:
|
|
- At the start of an episode, the cortex triggers a new cycle:
|
|
1) It tells all neurons to prepare recurrent state for the new cycle
|
|
via ("cycle_start",).
|
|
2) It optionally triggers neurons via ("tick",) (scheduler hook).
|
|
3) It tells all sensors to produce outputs via ("sync",).
|
|
- Actuators eventually send back ("sync", aid, fitness, halt_flag).
|
|
- Once all actuators have synchronized for the current cycle, the cortex
|
|
either:
|
|
* ends the evaluation if any actuator requested a halt (halt_flag > 0),
|
|
and reports ("evaluation_completed", total_fitness, cycles, elapsed)
|
|
to the exoself, or
|
|
* starts the next cycle.
|
|
|
|
Message protocol (inbox):
|
|
- ("register_actuators", aids):
|
|
Provide/replace the set of actuator IDs that must sync each cycle.
|
|
Used when actuators are created dynamically or not known at init.
|
|
- ("sync", aid, fitness, halt_flag):
|
|
Fitness feedback from an actuator for the current cycle.
|
|
The cortex accumulates fitness and checks halt conditions.
|
|
- ("reactivate",):
|
|
Restart a new evaluation episode (reset counters and kick sensors).
|
|
- ("terminate",):
|
|
Terminate the cortex and cascade termination to sensors/neurons/actuators.
|
|
- ("backup_from_neuron", nid, idps...):
|
|
Forward neuron backup data upstream to the exoself.
|
|
"""
|
|
def __init__(self, cid, exoself_pid, sensor_pids, neuron_pids, actuator_pids):
|
|
super().__init__(f"Cortex-{cid}")
|
|
self.cid = cid
|
|
self.sensors = sensor_pids
|
|
self.neurons = neuron_pids
|
|
self.actuators = actuator_pids
|
|
self.exoself_pid = exoself_pid
|
|
|
|
self.awaiting_sync = set()
|
|
self.fitness_acc = 0.0
|
|
self.ef_acc = 0
|
|
self.cycle_acc = 0
|
|
self.active = False
|
|
self._t0 = None
|
|
|
|
async def _kick_sensors(self):
|
|
for n in self.neurons:
|
|
await n.send(("cycle_start",))
|
|
for n in self.neurons:
|
|
await n.send(("tick",))
|
|
for s in self.sensors:
|
|
await s.send(("sync",))
|
|
|
|
def _reset_for_new_cycle(self):
|
|
self.awaiting_sync = set(a.aid for a in self.actuators)
|
|
|
|
def _reset_for_new_episode(self):
|
|
self.fitness_acc = 0.0
|
|
self.ef_acc = 0
|
|
self.cycle_acc = 0
|
|
self._reset_for_new_cycle()
|
|
self._t0 = time.perf_counter()
|
|
self.active = True
|
|
|
|
async def run(self):
|
|
if self.actuators:
|
|
self._reset_for_new_episode()
|
|
await self._kick_sensors()
|
|
|
|
while True:
|
|
msg = await self.inbox.get()
|
|
tag = msg[0]
|
|
|
|
if tag == "register_actuators":
|
|
_, aids = msg
|
|
self.awaiting_sync = set(aids)
|
|
if not self.active:
|
|
self._reset_for_new_episode()
|
|
await self._kick_sensors()
|
|
continue
|
|
|
|
if tag == "sync" and self.active:
|
|
log.debug("CORTEX: got sync message: ", msg)
|
|
_t, aid, fitness, halt_flag = msg
|
|
|
|
log.debug("----------------")
|
|
log.debug("_t:", _t)
|
|
log.debug("aid:", aid)
|
|
log.debug("fitness:", fitness)
|
|
log.debug("halt_flag:", halt_flag)
|
|
log.debug("----------------")
|
|
|
|
self.fitness_acc += float(fitness)
|
|
self.ef_acc += int(halt_flag)
|
|
|
|
if aid in self.awaiting_sync:
|
|
self.awaiting_sync.remove(aid)
|
|
|
|
log.debug("CORTEX: awaiting sync: ", self.awaiting_sync)
|
|
|
|
if not self.awaiting_sync:
|
|
log.debug("CORTEX: cycle completed.")
|
|
self.cycle_acc += 1
|
|
|
|
if self.ef_acc > 0:
|
|
elapsed = time.perf_counter() - self._t0
|
|
await self.exoself_pid.send((
|
|
"evaluation_completed",
|
|
self.fitness_acc,
|
|
self.cycle_acc,
|
|
elapsed
|
|
))
|
|
self.active = False
|
|
else:
|
|
self.ef_acc = 0
|
|
self._reset_for_new_cycle()
|
|
await self._kick_sensors()
|
|
|
|
continue
|
|
|
|
if tag == "reactivate":
|
|
self._reset_for_new_episode()
|
|
await self._kick_sensors()
|
|
continue
|
|
|
|
if tag == "terminate":
|
|
for a in (self.sensors + self.actuators + self.neurons):
|
|
await a.send(("terminate",))
|
|
return
|
|
|
|
elif tag == "backup_from_neuron":
|
|
await self.exoself_pid.send(msg)
|