77 lines
3.0 KiB
Python
77 lines
3.0 KiB
Python
import asyncio
|
|
import logging
|
|
|
|
from mathema.actors.actor import Actor
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class Actuator(Actor):
|
|
def __init__(self, aid, cx_pid, name, fanin_ids, expect_count, scape=None):
|
|
super().__init__(f"Actuator-{aid}")
|
|
self.aid = aid
|
|
self.cx_pid = cx_pid
|
|
self.aname = name
|
|
self.fanin_ids = fanin_ids
|
|
self.expect = expect_count
|
|
self.received = {}
|
|
self.scape = scape
|
|
self.scape_inbox = asyncio.Queue()
|
|
|
|
async def run(self):
|
|
|
|
while True:
|
|
msg = await self.inbox.get()
|
|
tag = msg[0]
|
|
|
|
if tag == "forward":
|
|
_, from_id, vec = msg
|
|
self.received[from_id] = vec
|
|
|
|
if len(self.received) == self.expect:
|
|
log.debug("ACTUATOR: collected all signals...")
|
|
output = []
|
|
for fid in self.fanin_ids:
|
|
output.extend(self.received[fid])
|
|
|
|
if self.aname == "pts":
|
|
print(f"Actuator output: {output}")
|
|
fitness, halt_flag = 1.0, 0
|
|
elif self.aname == "xor_SendOutput" and self.scape:
|
|
log.debug("ACTUATOR: sending action to scape...")
|
|
await self.scape.send(("action", output, self))
|
|
while True:
|
|
resp = await self.inbox.get()
|
|
if resp[0] == "result":
|
|
log.debug("ACTUATOR: got scape response: %s", resp)
|
|
fitness, halt_flag = resp[1], resp[2]
|
|
break
|
|
elif self.aname == "car_ApplyAction" and self.scape:
|
|
y0 = float(output[0]) if len(output) > 0 else 0.0
|
|
y1 = float(output[1]) if len(output) > 1 else 0.0
|
|
y2 = float(output[2]) if len(output) > 2 else 0.0
|
|
|
|
steer = max(-1.0, min(1.0, y0))
|
|
gas = max(0.0, min(1.0, 0.5 * (y1 + 1.0)))
|
|
brake = max(0.0, min(1.0, 0.5 * (y2 + 1.0)))
|
|
|
|
action = [steer, gas, brake]
|
|
|
|
log.debug("ACTUATOR: sending action to car scape: %s", action)
|
|
await self.scape.send(("action", action, self))
|
|
while True:
|
|
resp = await self.inbox.get()
|
|
if resp[0] == "result":
|
|
log.debug("ACTUATOR: got scape response: %s", resp)
|
|
fitness, halt_flag = resp[1], resp[2]
|
|
break
|
|
else:
|
|
fitness, halt_flag = 0.0, 0
|
|
|
|
await self.cx_pid.send(("sync", self.aid, fitness, halt_flag))
|
|
log.debug("ACTUATOR: sent sync message to cortex.")
|
|
self.received.clear()
|
|
|
|
elif tag == "terminate":
|
|
return
|