import time import logging from mathema.actors.actor import Actor log = logging.getLogger(__name__) class Cortex(Actor): def __init__(self, cid, exoself_pid, sensor_pids, neuron_pids, actuator_pids): super().__init__(f"Cortex-{cid}") self.cid = cid self.sensors = sensor_pids self.neurons = neuron_pids self.actuators = actuator_pids self.exoself_pid = exoself_pid self.awaiting_sync = set() self.fitness_acc = 0.0 self.ef_acc = 0 self.cycle_acc = 0 self.active = False self._t0 = None async def _kick_sensors(self): for n in self.neurons: await n.send(("cycle_start",)) for n in self.neurons: await n.send(("tick",)) for s in self.sensors: await s.send(("sync",)) def _reset_for_new_cycle(self): self.awaiting_sync = set(a.aid for a in self.actuators) def _reset_for_new_episode(self): self.fitness_acc = 0.0 self.ef_acc = 0 self.cycle_acc = 0 self._reset_for_new_cycle() self._t0 = time.perf_counter() self.active = True async def run(self): if self.actuators: self._reset_for_new_episode() await self._kick_sensors() while True: msg = await self.inbox.get() tag = msg[0] if tag == "register_actuators": _, aids = msg self.awaiting_sync = set(aids) if not self.active: self._reset_for_new_episode() await self._kick_sensors() continue if tag == "sync" and self.active: log.debug("CORTEX: got sync message: ", msg) _t, aid, fitness, halt_flag = msg log.debug("----------------") log.debug("_t:", _t) log.debug("aid:", aid) log.debug("fitness:", fitness) log.debug("halt_flag:", halt_flag) log.debug("----------------") self.fitness_acc += float(fitness) self.ef_acc += int(halt_flag) if aid in self.awaiting_sync: self.awaiting_sync.remove(aid) log.debug("CORTEX: awaiting sync: ", self.awaiting_sync) if not self.awaiting_sync: log.debug("CORTEX: cycle completed.") self.cycle_acc += 1 if self.ef_acc > 0: elapsed = time.perf_counter() - self._t0 await self.exoself_pid.send(( "evaluation_completed", self.fitness_acc, self.cycle_acc, elapsed )) self.active = False else: self.ef_acc = 0 self._reset_for_new_cycle() await self._kick_sensors() continue if tag == "reactivate": self._reset_for_new_episode() await self._kick_sensors() continue if tag == "terminate": for a in (self.sensors + self.actuators + self.neurons): await a.send(("terminate",)) return elif tag == "backup_from_neuron": await self.exoself_pid.send(msg)