96 lines
3.4 KiB
Python
96 lines
3.4 KiB
Python
import random
|
|
import logging
|
|
|
|
from mathema.actors.actor import Actor
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class Sensor(Actor):
|
|
"""
|
|
Sensor actor that produces an input vector for the network and forwards it
|
|
to downstream actors (fanout).
|
|
|
|
A Sensor is an *input node* in the actor-based neural architecture. It does
|
|
not compute from other neurons; instead, it generates observations either
|
|
from:
|
|
- a local source (e.g., random numbers), or
|
|
- an external scape/environment actor.
|
|
|
|
When the cortex triggers a sensor with a ("sync",) message, the sensor:
|
|
1) calls `_sense()` to obtain a vector,
|
|
2) broadcasts that vector to all downstream targets in `fanout` via
|
|
("forward", sid, vec).
|
|
|
|
Supported sensor types (controlled by `sname`):
|
|
- "rng":
|
|
Produces `vl` random floats in [0, 1).
|
|
- "xor_GetInput" (requires `scape`):
|
|
Requests an input vector from the scape and expects a ("percept", vec)
|
|
reply on its own inbox.
|
|
- "car_GetFeatures" (requires `scape`):
|
|
Requests a feature vector from the scape and normalizes it:
|
|
* clamps values to [-1, 1],
|
|
* pads with zeros or truncates to exactly `vl` elements.
|
|
- default:
|
|
Returns a zero vector of length `vl`.
|
|
|
|
Inbox message protocol:
|
|
- ("sync",):
|
|
Trigger sensing and forwarding to fanout.
|
|
- ("percept", vec):
|
|
Scape reply to a previous ("sense", sid, self) request (handled inside `_sense()`).
|
|
- ("terminate",):
|
|
Stop the actor.
|
|
"""
|
|
def __init__(self, sid, cx_pid, name, vector_length, fanout_pids, scape=None):
|
|
super().__init__(f"Sensor-{sid}")
|
|
self.sid = sid
|
|
self.cx_pid = cx_pid
|
|
self.sname = name
|
|
self.vl = vector_length
|
|
self.fanout = fanout_pids
|
|
self.scape = scape
|
|
|
|
async def run(self):
|
|
while True:
|
|
msg = await self.inbox.get()
|
|
tag = msg[0]
|
|
|
|
log.debug(f"sensor {self.sid} got sensor message: %s", msg)
|
|
|
|
if tag == "sync":
|
|
vec = await self._sense()
|
|
log.debug(f"sensor {self.sid} sensed vec: %s", vec)
|
|
for pid in self.fanout:
|
|
await pid.send(("forward", self.sid, vec))
|
|
|
|
elif tag == "terminate":
|
|
return
|
|
|
|
async def _sense(self):
|
|
if self.sname == "rng":
|
|
return [random.random() for _ in range(self.vl)]
|
|
elif self.sname == "xor_GetInput" and self.scape:
|
|
await self.scape.send(("sense", self.sid, self))
|
|
msg = await self.inbox.get()
|
|
if msg[0] == "percept":
|
|
return msg[1]
|
|
else:
|
|
return [0.0] * self.vl
|
|
elif self.sname == "car_GetFeatures" and self.scape:
|
|
await self.scape.send(("sense", self.sid, self))
|
|
msg = await self.inbox.get()
|
|
if msg[0] == "percept":
|
|
vec = msg[1]
|
|
|
|
out = [max(-1.0, min(1.0, float(x))) for x in vec]
|
|
if len(out) < self.vl:
|
|
out = out + [0.0] * (self.vl - len(out))
|
|
elif len(out) > self.vl:
|
|
out = out[:self.vl]
|
|
return out
|
|
else:
|
|
return [0.0] * self.vl
|
|
else:
|
|
return [0.0] * self.vl
|