import asyncio from mathema.actors.neuron import Neuron from mathema.actors.actor import Actor class Collector(Actor): def __init__(self, name="Collector"): super().__init__(name) self.events = [] self._stop = asyncio.Event() async def run(self): while True: msg = await self.inbox.get() tag = msg[0] if tag == "forward": _, from_id, vec = msg self.events.append((from_id, vec)) elif tag == "terminate": self._stop.set() return async def start(actor): return asyncio.create_task(actor.run()) async def test_feedforward(): col = Collector("COL-ff") N = Neuron(nid="N", cx_pid=None, af_name="tanh", input_idps=[("S", [1.0], False), ("bias", [0.0], False)], output_pids=[col], bias=None) tN = await start(N) tC = await start(col) await N.send(("cycle_start",)) await N.send(("forward", "S", [1.0])) await asyncio.sleep(0.01) print("FF events:", col.events) await col.send(("terminate",)) await N.send(("terminate",)) await asyncio.gather(tN, tC) async def test_lateral_nonrecurrent(): col = Collector("COL-lat") N1 = Neuron("N1", None, "tanh", [("S", [1.0], False), ("bias", [0.0], False)], [], None) N2 = Neuron("N2", None, "tanh", [("S", [1.0], False), ("N1", [1.0], False), ("bias", [0.0], False)], [col], None) N1.outputs = [N2] t1 = await start(N1) t2 = await start(N2) tC = await start(col) await N1.send(("cycle_start",)) await N2.send(("cycle_start",)) await N1.send(("forward", "S", [1.0])) await N2.send(("forward", "S", [1.0])) await asyncio.sleep(0.01) await asyncio.sleep(0.01) print("LAT events:", col.events) await col.send(("terminate",)) await N1.send(("terminate",)) await N2.send(("terminate",)) await asyncio.gather(t1, t2, tC) async def test_recurrent_edge(): col = Collector("COL-rec") N1 = Neuron("N1", None, "tanh", [("S", [1.0], False), ("bias", [0.0], False)], [], None) N2 = Neuron("N2", None, "tanh", [("S", [1.0], False), ("N1", [1.0], True), ("bias", [0.0], False)], [col], None) N1.outputs = [N2] t1 = await start(N1) t2 = await start(N2) tC = await start(col) await N1.send(("cycle_start",)) await N2.send(("cycle_start",)) await N1.send(("forward", "S", [1.0])) await N2.send(("forward", "S", [1.0])) await asyncio.sleep(0.02) await N1.send(("cycle_start",)) await N2.send(("cycle_start",)) await N1.send(("forward", "S", [1.0])) await N2.send(("forward", "S", [1.0])) await asyncio.sleep(0.02) print("REC events:", col.events) await col.send(("terminate",)) await N1.send(("terminate",)) await N2.send(("terminate",)) await asyncio.gather(t1, t2, tC) async def test_self_loop(): col = Collector("COL-self") N = Neuron("N", None, "tanh", [("S", [1.0], False), ("N", [1.0], True), ("bias", [0.0], False)], [], None) N.outputs = [N, col] tN = await start(N) tC = await start(col) await N.send(("cycle_start",)) await N.send(("forward", "S", [1.0])) await asyncio.sleep(0.02) await N.send(("cycle_start",)) await N.send(("forward", "S", [1.0])) await asyncio.sleep(0.02) print("SELF events:", col.events) await col.send(("terminate",)) await N.send(("terminate",)) await asyncio.gather(tN, tC) if __name__ == "__main__": asyncio.run(test_feedforward()) asyncio.run(test_lateral_nonrecurrent()) asyncio.run(test_recurrent_edge()) asyncio.run(test_self_loop())