initial.
This commit is contained in:
130
experiments/simplest_nn.py
Normal file
130
experiments/simplest_nn.py
Normal file
@@ -0,0 +1,130 @@
|
||||
import random
|
||||
import math
|
||||
import threading
|
||||
import queue
|
||||
|
||||
|
||||
class Neuron(threading.Thread):
|
||||
def __init__(self, weights, neuron_queue, actuator_queue):
|
||||
super().__init__()
|
||||
self.weights = weights
|
||||
self.neuron_queue = neuron_queue
|
||||
self.actuator_queue = actuator_queue
|
||||
self.running = True
|
||||
|
||||
def dot(self, input_vector):
|
||||
acc = sum(i * w for i, w in zip(input_vector, self.weights[:-1]))
|
||||
return acc + self.weights[-1]
|
||||
|
||||
def run(self):
|
||||
while self.running:
|
||||
try:
|
||||
message = self.neuron_queue.get(timeout=1)
|
||||
if message[0] == "forward":
|
||||
input_vector = message[1]
|
||||
print("**** Thinking ****")
|
||||
print(f"Input: {input_vector}")
|
||||
print(f"With Weights: {self.weights}")
|
||||
|
||||
dot_product = self.dot(input_vector)
|
||||
output = [math.tanh(dot_product)]
|
||||
|
||||
self.actuator_queue.put(("forward", output))
|
||||
elif message[0] == "terminate":
|
||||
self.running = False
|
||||
except queue.Empty:
|
||||
continue
|
||||
|
||||
|
||||
class Sensor(threading.Thread):
|
||||
def __init__(self, sensor_queue, neuron_queue):
|
||||
super().__init__()
|
||||
self.sensor_queue = sensor_queue
|
||||
self.neuron_queue = neuron_queue
|
||||
self.running = True
|
||||
|
||||
def run(self):
|
||||
while self.running:
|
||||
try:
|
||||
message = self.sensor_queue.get(timeout=1)
|
||||
if message == "sync":
|
||||
sensory_signal = [random.uniform(0, 1), random.uniform(0, 1)]
|
||||
print("**** Sensing ****")
|
||||
print(f"Signal from the environment: {sensory_signal}")
|
||||
|
||||
self.neuron_queue.put(("forward", sensory_signal))
|
||||
elif message == "terminate":
|
||||
self.running = False
|
||||
except queue.Empty:
|
||||
continue
|
||||
|
||||
class Actuator(threading.Thread):
|
||||
def __init__(self, actuator_queue):
|
||||
super().__init__()
|
||||
self.actuator_queue = actuator_queue
|
||||
self.running = True
|
||||
|
||||
def pts(self, control_signal):
|
||||
print("**** Acting ****")
|
||||
print(f"Using: {control_signal} to act on the environment.")
|
||||
|
||||
def run(self):
|
||||
while self.running:
|
||||
try:
|
||||
message = self.actuator_queue.get(timeout=1)
|
||||
if message[0] == "forward":
|
||||
control_signal = message[1]
|
||||
self.pts(control_signal)
|
||||
elif message[0] == "terminate":
|
||||
self.running = False
|
||||
except queue.Empty:
|
||||
continue
|
||||
|
||||
class Cortex(threading.Thread):
|
||||
def __init__(self, sensor_queue, neuron_queue, actuator_queue):
|
||||
super().__init__()
|
||||
self.sensor_queue = sensor_queue
|
||||
self.neuron_queue = neuron_queue
|
||||
self.actuator_queue = actuator_queue
|
||||
self.running = True
|
||||
|
||||
def run(self):
|
||||
while self.running:
|
||||
command = input("amna> ").strip()
|
||||
if command == "sense_think_act":
|
||||
self.sensor_queue.put("sync")
|
||||
elif command == "terminate":
|
||||
self.sensor_queue.put("terminate")
|
||||
self.neuron_queue.put(("terminate",))
|
||||
self.actuator_queue.put(("terminate",))
|
||||
self.running = False
|
||||
else:
|
||||
print("Unknown command. Please use 'sense_think_act' or 'terminate'.")
|
||||
print("Cortex terminated.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
sensor_queue = queue.Queue()
|
||||
neuron_queue = queue.Queue()
|
||||
actuator_queue = queue.Queue()
|
||||
|
||||
# initialize weights (including bias)
|
||||
weights = [random.uniform(-0.5, 0.5) for _ in range(2)] + [random.uniform(-0.5, 0.5)] # Zwei Gewichte + Bias
|
||||
|
||||
sensor = Sensor(sensor_queue, neuron_queue)
|
||||
neuron = Neuron(weights, neuron_queue, actuator_queue)
|
||||
actuator = Actuator(actuator_queue)
|
||||
|
||||
sensor.start()
|
||||
neuron.start()
|
||||
actuator.start()
|
||||
|
||||
cortex = Cortex(sensor_queue, neuron_queue, actuator_queue)
|
||||
cortex.start()
|
||||
|
||||
cortex.join()
|
||||
|
||||
sensor.join()
|
||||
neuron.join()
|
||||
actuator.join()
|
||||
|
||||
print("System terminated.")
|
||||
Reference in New Issue
Block a user