add car racing sim and first geno to phenotype mapper experiment.

This commit is contained in:
2025-09-24 14:15:18 +02:00
parent 581109117c
commit 86c67b66f8
17 changed files with 1015 additions and 2 deletions

View File

@@ -0,0 +1,133 @@
import random
import math
import threading
import queue
class Neuron(threading.Thread):
def __init__(self, weights, neuron_queue, actuator_queue):
super().__init__()
self.weights = weights
self.neuron_queue = neuron_queue
self.actuator_queue = actuator_queue
self.running = True
def dot(self, input_vector):
acc = sum(i * w for i, w in zip(input_vector, self.weights[:-1]))
return acc + self.weights[-1]
def run(self):
while self.running:
try:
message = self.neuron_queue.get(timeout=1)
if message[0] == "forward":
input_vector = message[1]
print("**** Thinking ****")
print(f"Input: {input_vector}")
print(f"With Weights: {self.weights}")
dot_product = self.dot(input_vector)
output = [math.tanh(dot_product)]
self.actuator_queue.put(("forward", output))
elif message[0] == "terminate":
self.running = False
except queue.Empty:
continue
class Sensor(threading.Thread):
def __init__(self, sensor_queue, neuron_queue):
super().__init__()
self.sensor_queue = sensor_queue
self.neuron_queue = neuron_queue
self.running = True
def run(self):
while self.running:
try:
message = self.sensor_queue.get(timeout=1)
if message == "sync":
sensory_signal = [random.uniform(0, 1), random.uniform(0, 1)]
print("**** Sensing ****")
print(f"Signal from the environment: {sensory_signal}")
self.neuron_queue.put(("forward", sensory_signal))
elif message == "terminate":
self.running = False
except queue.Empty:
continue
class Actuator(threading.Thread):
def __init__(self, actuator_queue):
super().__init__()
self.actuator_queue = actuator_queue
self.running = True
def pts(self, control_signal):
print("**** Acting ****")
print(f"Using: {control_signal} to act on the environment.")
def run(self):
while self.running:
try:
message = self.actuator_queue.get(timeout=1)
if message[0] == "forward":
control_signal = message[1]
self.pts(control_signal)
elif message[0] == "terminate":
self.running = False
except queue.Empty:
continue
class Cortex(threading.Thread):
def __init__(self, sensor_queue, neuron_queue, actuator_queue):
super().__init__()
self.sensor_queue = sensor_queue
self.neuron_queue = neuron_queue
self.actuator_queue = actuator_queue
self.running = True
def run(self):
while self.running:
command = input("amna> ").strip()
if command == "sense_think_act":
self.sensor_queue.put("sync")
elif command == "terminate":
self.sensor_queue.put("terminate")
self.neuron_queue.put(("terminate",))
self.actuator_queue.put(("terminate",))
self.running = False
else:
print("Unknown command. Please use 'sense_think_act' or 'terminate'.")
print("Cortex terminated.")
if __name__ == "__main__":
sensor_queue = queue.Queue()
neuron_queue = queue.Queue()
actuator_queue = queue.Queue()
# initialize weights (including bias)
weights = [random.uniform(-0.5, 0.5) for _ in range(2)] + [random.uniform(-0.5, 0.5)] # Zwei Gewichte + Bias
sensor = Sensor(sensor_queue, neuron_queue)
neuron = Neuron(weights, neuron_queue, actuator_queue)
actuator = Actuator(actuator_queue)
sensor.start()
neuron.start()
actuator.start()
cortex = Cortex(sensor_queue, neuron_queue, actuator_queue)
cortex.start()
cortex.join()
sensor.join()
neuron.join()
actuator.join()
print("System terminated.")