import random import logging from mathema.actors.actor import Actor log = logging.getLogger(__name__) class Sensor(Actor): """ Sensor actor that produces an input vector for the network and forwards it to downstream actors (fanout). A Sensor is an *input node* in the actor-based neural architecture. It does not compute from other neurons; instead, it generates observations either from: - a local source (e.g., random numbers), or - an external scape/environment actor. When the cortex triggers a sensor with a ("sync",) message, the sensor: 1) calls `_sense()` to obtain a vector, 2) broadcasts that vector to all downstream targets in `fanout` via ("forward", sid, vec). Supported sensor types (controlled by `sname`): - "rng": Produces `vl` random floats in [0, 1). - "xor_GetInput" (requires `scape`): Requests an input vector from the scape and expects a ("percept", vec) reply on its own inbox. - "car_GetFeatures" (requires `scape`): Requests a feature vector from the scape and normalizes it: * clamps values to [-1, 1], * pads with zeros or truncates to exactly `vl` elements. - default: Returns a zero vector of length `vl`. Inbox message protocol: - ("sync",): Trigger sensing and forwarding to fanout. - ("percept", vec): Scape reply to a previous ("sense", sid, self) request (handled inside `_sense()`). - ("terminate",): Stop the actor. """ def __init__(self, sid, cx_pid, name, vector_length, fanout_pids, scape=None): super().__init__(f"Sensor-{sid}") self.sid = sid self.cx_pid = cx_pid self.sname = name self.vl = vector_length self.fanout = fanout_pids self.scape = scape async def run(self): while True: msg = await self.inbox.get() tag = msg[0] log.debug(f"sensor {self.sid} got sensor message: %s", msg) if tag == "sync": vec = await self._sense() log.debug(f"sensor {self.sid} sensed vec: %s", vec) for pid in self.fanout: await pid.send(("forward", self.sid, vec)) elif tag == "terminate": return async def _sense(self): if self.sname == "rng": return [random.random() for _ in range(self.vl)] elif self.sname == "xor_GetInput" and self.scape: await self.scape.send(("sense", self.sid, self)) msg = await self.inbox.get() if msg[0] == "percept": return msg[1] else: return [0.0] * self.vl elif self.sname == "car_GetFeatures" and self.scape: await self.scape.send(("sense", self.sid, self)) msg = await self.inbox.get() if msg[0] == "percept": vec = msg[1] out = [max(-1.0, min(1.0, float(x))) for x in vec] if len(out) < self.vl: out = out + [0.0] * (self.vl - len(out)) elif len(out) > self.vl: out = out[:self.vl] return out else: return [0.0] * self.vl else: return [0.0] * self.vl