import random import math import threading import queue class Neuron(threading.Thread): def __init__(self, weights, neuron_queue, actuator_queue): super().__init__() self.weights = weights self.neuron_queue = neuron_queue self.actuator_queue = actuator_queue self.running = True def dot(self, input_vector): acc = sum(i * w for i, w in zip(input_vector, self.weights[:-1])) return acc + self.weights[-1] def run(self): while self.running: try: message = self.neuron_queue.get(timeout=1) if message[0] == "forward": input_vector = message[1] print("**** Thinking ****") print(f"Input: {input_vector}") print(f"With Weights: {self.weights}") dot_product = self.dot(input_vector) output = [math.tanh(dot_product)] self.actuator_queue.put(("forward", output)) elif message[0] == "terminate": self.running = False except queue.Empty: continue class Sensor(threading.Thread): def __init__(self, sensor_queue, neuron_queue): super().__init__() self.sensor_queue = sensor_queue self.neuron_queue = neuron_queue self.running = True def run(self): while self.running: try: message = self.sensor_queue.get(timeout=1) if message == "sync": sensory_signal = [random.uniform(0, 1), random.uniform(0, 1)] print("**** Sensing ****") print(f"Signal from the environment: {sensory_signal}") self.neuron_queue.put(("forward", sensory_signal)) elif message == "terminate": self.running = False except queue.Empty: continue class Actuator(threading.Thread): def __init__(self, actuator_queue): super().__init__() self.actuator_queue = actuator_queue self.running = True def pts(self, control_signal): print("**** Acting ****") print(f"Using: {control_signal} to act on the environment.") def run(self): while self.running: try: message = self.actuator_queue.get(timeout=1) if message[0] == "forward": control_signal = message[1] self.pts(control_signal) elif message[0] == "terminate": self.running = False except queue.Empty: continue class Cortex(threading.Thread): def __init__(self, sensor_queue, neuron_queue, actuator_queue): super().__init__() self.sensor_queue = sensor_queue self.neuron_queue = neuron_queue self.actuator_queue = actuator_queue self.running = True def run(self): while self.running: command = input("amna> ").strip() if command == "sense_think_act": self.sensor_queue.put("sync") elif command == "terminate": self.sensor_queue.put("terminate") self.neuron_queue.put(("terminate",)) self.actuator_queue.put(("terminate",)) self.running = False else: print("Unknown command. Please use 'sense_think_act' or 'terminate'.") print("Cortex terminated.") if __name__ == "__main__": sensor_queue = queue.Queue() neuron_queue = queue.Queue() actuator_queue = queue.Queue() # initialize weights (including bias) weights = [random.uniform(-0.5, 0.5) for _ in range(2)] + [random.uniform(-0.5, 0.5)] # Zwei Gewichte + Bias sensor = Sensor(sensor_queue, neuron_queue) neuron = Neuron(weights, neuron_queue, actuator_queue) actuator = Actuator(actuator_queue) sensor.start() neuron.start() actuator.start() cortex = Cortex(sensor_queue, neuron_queue, actuator_queue) cortex.start() cortex.join() sensor.join() neuron.join() actuator.join() print("System terminated.")