130 lines
4.3 KiB
Python
130 lines
4.3 KiB
Python
import random
|
|
import math
|
|
import threading
|
|
import queue
|
|
|
|
|
|
class Neuron(threading.Thread):
|
|
def __init__(self, weights, neuron_queue, actuator_queue):
|
|
super().__init__()
|
|
self.weights = weights
|
|
self.neuron_queue = neuron_queue
|
|
self.actuator_queue = actuator_queue
|
|
self.running = True
|
|
|
|
def dot(self, input_vector):
|
|
acc = sum(i * w for i, w in zip(input_vector, self.weights[:-1]))
|
|
return acc + self.weights[-1]
|
|
|
|
def run(self):
|
|
while self.running:
|
|
try:
|
|
message = self.neuron_queue.get(timeout=1)
|
|
if message[0] == "forward":
|
|
input_vector = message[1]
|
|
print("**** Thinking ****")
|
|
print(f"Input: {input_vector}")
|
|
print(f"With Weights: {self.weights}")
|
|
|
|
dot_product = self.dot(input_vector)
|
|
output = [math.tanh(dot_product)]
|
|
|
|
self.actuator_queue.put(("forward", output))
|
|
elif message[0] == "terminate":
|
|
self.running = False
|
|
except queue.Empty:
|
|
continue
|
|
|
|
|
|
class Sensor(threading.Thread):
|
|
def __init__(self, sensor_queue, neuron_queue):
|
|
super().__init__()
|
|
self.sensor_queue = sensor_queue
|
|
self.neuron_queue = neuron_queue
|
|
self.running = True
|
|
|
|
def run(self):
|
|
while self.running:
|
|
try:
|
|
message = self.sensor_queue.get(timeout=1)
|
|
if message == "sync":
|
|
sensory_signal = [random.uniform(0, 1), random.uniform(0, 1)]
|
|
print("**** Sensing ****")
|
|
print(f"Signal from the environment: {sensory_signal}")
|
|
|
|
self.neuron_queue.put(("forward", sensory_signal))
|
|
elif message == "terminate":
|
|
self.running = False
|
|
except queue.Empty:
|
|
continue
|
|
|
|
class Actuator(threading.Thread):
|
|
def __init__(self, actuator_queue):
|
|
super().__init__()
|
|
self.actuator_queue = actuator_queue
|
|
self.running = True
|
|
|
|
def pts(self, control_signal):
|
|
print("**** Acting ****")
|
|
print(f"Using: {control_signal} to act on the environment.")
|
|
|
|
def run(self):
|
|
while self.running:
|
|
try:
|
|
message = self.actuator_queue.get(timeout=1)
|
|
if message[0] == "forward":
|
|
control_signal = message[1]
|
|
self.pts(control_signal)
|
|
elif message[0] == "terminate":
|
|
self.running = False
|
|
except queue.Empty:
|
|
continue
|
|
|
|
class Cortex(threading.Thread):
|
|
def __init__(self, sensor_queue, neuron_queue, actuator_queue):
|
|
super().__init__()
|
|
self.sensor_queue = sensor_queue
|
|
self.neuron_queue = neuron_queue
|
|
self.actuator_queue = actuator_queue
|
|
self.running = True
|
|
|
|
def run(self):
|
|
while self.running:
|
|
command = input("amna> ").strip()
|
|
if command == "sense_think_act":
|
|
self.sensor_queue.put("sync")
|
|
elif command == "terminate":
|
|
self.sensor_queue.put("terminate")
|
|
self.neuron_queue.put(("terminate",))
|
|
self.actuator_queue.put(("terminate",))
|
|
self.running = False
|
|
else:
|
|
print("Unknown command. Please use 'sense_think_act' or 'terminate'.")
|
|
print("Cortex terminated.")
|
|
|
|
if __name__ == "__main__":
|
|
sensor_queue = queue.Queue()
|
|
neuron_queue = queue.Queue()
|
|
actuator_queue = queue.Queue()
|
|
|
|
# initialize weights (including bias)
|
|
weights = [random.uniform(-0.5, 0.5) for _ in range(2)] + [random.uniform(-0.5, 0.5)] # Zwei Gewichte + Bias
|
|
|
|
sensor = Sensor(sensor_queue, neuron_queue)
|
|
neuron = Neuron(weights, neuron_queue, actuator_queue)
|
|
actuator = Actuator(actuator_queue)
|
|
|
|
sensor.start()
|
|
neuron.start()
|
|
actuator.start()
|
|
|
|
cortex = Cortex(sensor_queue, neuron_queue, actuator_queue)
|
|
cortex.start()
|
|
|
|
cortex.join()
|
|
|
|
sensor.join()
|
|
neuron.join()
|
|
actuator.join()
|
|
|
|
print("System terminated.") |