add basic structure.

This commit is contained in:
2025-09-28 19:46:23 +02:00
parent d742a0c0ce
commit 1761de8acb
12 changed files with 1064 additions and 0 deletions

0
mathema/__init__.py Normal file
View File

13
mathema/actor.py Normal file
View File

@@ -0,0 +1,13 @@
import asyncio
class Actor:
def __init__(self, name: str):
self.name = name
self.inbox = asyncio.Queue()
async def send(self, msg):
await self.inbox.put(msg)
async def run(self):
raise NotImplementedError

55
mathema/actuator.py Normal file
View File

@@ -0,0 +1,55 @@
# actors/actuator.py
import asyncio
from actor import Actor
class Actuator(Actor):
def __init__(self, aid, cx_pid, name, fanin_ids, expect_count, scape=None):
super().__init__(f"Actuator-{aid}")
self.aid = aid
self.cx_pid = cx_pid
self.aname = name
self.fanin_ids = fanin_ids
self.expect = expect_count
self.received = {}
self.scape = scape
self.scape_inbox = asyncio.Queue()
async def run(self):
while True:
msg = await self.inbox.get()
tag = msg[0]
if tag == "forward":
_, from_id, vec = msg
self.received[from_id] = vec
if len(self.received) == self.expect:
print("ACTUATOR: collected all signals...")
output = []
for fid in self.fanin_ids:
output.extend(self.received[fid])
if self.aname == "pts":
print(f"Actuator output: {output}")
fitness, halt_flag = 1.0, 0
elif self.aname == "xor_SendOutput" and self.scape:
print("ACTUATOR: sending action to scape...")
await self.scape.send(("action", output, self))
while True:
resp = await self.inbox.get()
if resp[0] == "result":
print("ACTUATOR: got scape response: ", resp)
fitness, halt_flag = resp[1], resp[2]
break
else:
fitness, halt_flag = 0.0, 0
await self.cx_pid.send(("sync", self.aid, fitness, halt_flag))
print("ACTUATOR: sent sync message to cortex.")
self.received.clear()
elif tag == "terminate":
return

103
mathema/cortex.py Normal file
View File

@@ -0,0 +1,103 @@
import time
from actor import Actor
class Cortex(Actor):
def __init__(self, cid, exoself_pid, sensor_pids, neuron_pids, actuator_pids):
super().__init__(f"Cortex-{cid}")
self.cid = cid
self.sensors = sensor_pids
self.neurons = neuron_pids
self.actuators = actuator_pids
self.exoself_pid = exoself_pid
self.awaiting_sync = set()
self.fitness_acc = 0.0
self.ef_acc = 0
self.cycle_acc = 0
self.active = False
self._t0 = None
async def _kick_sensors(self):
for s in self.sensors:
await s.send(("sync",))
def _reset_for_new_cycle(self):
self.awaiting_sync = set(a.aid for a in self.actuators)
def _reset_for_new_episode(self):
self.fitness_acc = 0.0
self.ef_acc = 0
self.cycle_acc = 0
self._reset_for_new_cycle()
self._t0 = time.perf_counter()
self.active = True
async def run(self):
if self.actuators:
self._reset_for_new_episode()
await self._kick_sensors()
while True:
msg = await self.inbox.get()
tag = msg[0]
if tag == "register_actuators":
_, aids = msg
self.awaiting_sync = set(aids)
if not self.active:
self._reset_for_new_episode()
await self._kick_sensors()
continue
if tag == "sync" and self.active:
print("CORTEX: got sync message: ", msg)
_t, aid, fitness, halt_flag = msg
print("----------------")
print("_t:", _t)
print("aid:", aid)
print("fitness:", fitness)
print("halt_flag:", halt_flag)
print("----------------")
self.fitness_acc += float(fitness)
self.ef_acc += int(halt_flag)
if aid in self.awaiting_sync:
self.awaiting_sync.remove(aid)
print("CORTEX: awaiting sync: ", self.awaiting_sync)
if not self.awaiting_sync:
print("CORTEX: cycle completed.")
self.cycle_acc += 1
if self.ef_acc > 0:
elapsed = time.perf_counter() - self._t0
await self.exoself_pid.send((
"evaluation_completed",
self.fitness_acc,
self.cycle_acc,
elapsed
))
self.active = False
else:
self.ef_acc = 0
self._reset_for_new_cycle()
await self._kick_sensors()
continue
if tag == "reactivate":
self._reset_for_new_episode()
await self._kick_sensors()
continue
if tag == "terminate":
for a in (self.sensors + self.actuators + self.neurons):
await a.send(("terminate",))
return
elif tag == "backup_from_neuron":
await self.exoself_pid.send(msg)

312
mathema/exoself.py Normal file
View File

@@ -0,0 +1,312 @@
import asyncio
import json
import math
import random
from collections import defaultdict
from typing import Any, Dict, List, Tuple, Optional
from actor import Actor
from cortex import Cortex
from sensor import Sensor
from neuron import Neuron
from actuator import Actuator
from scape import XorScape
class Exoself(Actor):
def __init__(self, genotype: Dict[str, Any], file_name: Optional[str] = None):
super().__init__("Exoself")
self.g = genotype
self.file_name = file_name
self.cx_actor: Optional[Cortex] = None
self.sensor_actors: List[Sensor] = []
self.neuron_actors: List[Neuron] = []
self.actuator_actors: List[Actuator] = []
self.tasks: List[asyncio.Task] = []
# Training-Stats
self.highest_fitness = float("-inf")
self.eval_acc = 0
self.cycle_acc = 0
self.time_acc = 0.0
self.attempt = 0
self.MAX_ATTEMPTS = 50
self.actuator_scape = None
self._perturbed: List[Neuron] = []
@staticmethod
def from_file(path: str) -> "Exoself":
with open(path, "r") as f:
g = json.load(f)
return Exoself(g, file_name=path)
async def run(self):
self._build_pid_map_and_spawn()
self._link_cortex()
for a in self.sensor_actors + self.neuron_actors + self.actuator_actors + [self.actuator_scape]:
self.tasks.append(asyncio.create_task(a.run()))
while True:
msg = await self.inbox.get()
tag = msg[0]
if tag == "evaluation_completed":
_, fitness, cycles, elapsed = msg
await self._on_evaluation_completed(fitness, cycles, elapsed)
elif tag == "terminate":
await self._terminate_all()
return
async def run_evaluation(self):
print("build network and link...")
self._build_pid_map_and_spawn()
print("link cortex...")
self._link_cortex()
for a in self.sensor_actors + self.neuron_actors + self.actuator_actors:
self.tasks.append(asyncio.create_task(a.run()))
if self.actuator_scape:
self.tasks.append(asyncio.create_task(self.actuator_scape.run()))
print("network actors are running...")
while True:
msg = await self.inbox.get()
print("message in exsoself: ", msg)
tag = msg[0]
if tag == "evaluation_completed":
_, fitness, cycles, elapsed = msg
await self._terminate_all()
return float(fitness), 1, int(cycles), float(elapsed)
elif tag == "terminate":
await self._terminate_all()
return float("-inf"), 0, 0, 0.0
# ---------- Build ----------
def _build_pid_map_and_spawn(self):
"""
Baut Cortex, dann alle Neuronen (mit cx_pid=self.cx_actor), dann verlinkt Outputs schichtweise,
dann Sensoren/Aktuatoren (mit cx_pid=self.cx_actor). Achtung: Reihenfolge wichtig.
"""
cx = self.g["cortex"]
# Cortex zuerst (damit wir cx_pid an Kinder übergeben können)
self.cx_actor = Cortex(
cid=cx["id"],
exoself_pid=self,
sensor_pids=[],
neuron_pids=[],
actuator_pids=[]
)
self.actuator_scape = XorScape()
layers: Dict[int, List[Dict[str, Any]]] = defaultdict(list)
for n in self.g["neurons"]:
layers[n["layer_index"]].append(n)
ordered_layers = [layers[i] for i in sorted(layers)]
id2neuron_actor: Dict[Any, Neuron] = {}
for layer in ordered_layers:
for n in layer:
input_idps = [(iw["input_id"], iw["weights"]) for iw in n["input_weights"]]
neuron = Neuron(
nid=n["id"],
cx_pid=self.cx_actor,
af_name=n.get("activation_function", "tanh"),
input_idps=input_idps,
output_pids=[] # füllen wir gleich
)
id2neuron_actor[n["id"]] = neuron
self.neuron_actors.append(neuron)
for li in range(len(ordered_layers) - 1):
next_pids = [id2neuron_actor[nx["id"]] for nx in ordered_layers[li + 1]]
for n in ordered_layers[li]:
id2neuron_actor[n["id"]].outputs = next_pids
actuators = self._get_actuators_block()
if not actuators:
raise ValueError("Genotype must include 'actuator' or 'actuators'.")
for a in actuators:
fanin_ids = a.get("fanin_ids", [])
expect = len(fanin_ids) if fanin_ids else 0
actuator = Actuator(
aid=a["id"],
cx_pid=self.cx_actor,
name=a["name"],
fanin_ids=fanin_ids,
expect_count=expect,
scape=self.actuator_scape
)
self.actuator_actors.append(actuator)
if ordered_layers:
last_layer = ordered_layers[-1]
out_targets = self.actuator_actors
for n in last_layer:
id2neuron_actor[n["id"]].outputs = out_targets
sensors = self._get_sensors_block()
if not sensors:
raise ValueError("Genotype must include 'sensor' or 'sensors'.")
first_layer = ordered_layers[0] if ordered_layers else []
first_layer_pids = [id2neuron_actor[n["id"]] for n in first_layer]
for s in sensors:
sensor = Sensor(
sid=s["id"],
cx_pid=self.cx_actor,
name=s["name"],
vector_length=s["vector_length"],
fanout_pids=first_layer_pids,
scape=self.actuator_scape
)
self.sensor_actors.append(sensor)
def _get_sensors_block(self) -> List[Dict[str, Any]]:
if "sensors" in self.g:
return list(self.g["sensors"])
if "sensor" in self.g:
return [self.g["sensor"]]
return []
def _get_actuators_block(self) -> List[Dict[str, Any]]:
if "actuators" in self.g:
return list(self.g["actuators"])
if "actuator" in self.g:
return [self.g["actuator"]]
return []
def _link_cortex(self):
self.cx_actor.sensors = [a for a in self.sensor_actors if a]
self.cx_actor.neurons = [a for a in self.neuron_actors if a]
self.cx_actor.actuators = [a for a in self.actuator_actors if a]
self.cx_actor.awaiting_sync = set(a.aid for a in self.cx_actor.actuators)
self.tasks.append(asyncio.create_task(self.cx_actor.run()))
async def train_until_stop(self):
self._build_pid_map_and_spawn()
self._link_cortex()
# 2) Start tasks
for a in self.sensor_actors + self.neuron_actors + self.actuator_actors:
self.tasks.append(asyncio.create_task(a.run()))
if self.actuator_scape:
self.tasks.append(asyncio.create_task(self.actuator_scape.run()))
while True:
msg = await self.inbox.get()
tag = msg[0]
if tag == "evaluation_completed":
_, fitness, cycles, elapsed = msg
maybe_stats = await self._on_evaluation_completed(fitness, cycles, elapsed)
# _on_evaluation_completed() ruft bei Stop bereits _backup_genotype() und _terminate_all()
if isinstance(maybe_stats, dict):
# Trainingsende Daten aus self.* zurückgeben (wie im Buch: Fitness/Evals/Cycles/Time)
return (
float(self.highest_fitness),
int(self.eval_acc),
int(self.cycle_acc),
float(self.time_acc),
)
elif tag == "terminate":
await self._terminate_all()
return float("-inf"), 0, 0, 0.0
async def _on_evaluation_completed(self, fitness: float, cycles: int, elapsed: float):
self.eval_acc += 1
self.cycle_acc += int(cycles)
self.time_acc += float(elapsed)
print(f"[Exoself] evaluation_completed: fitness={fitness:.6f} cycles={cycles} time={elapsed:.3f}s")
REL = 1e-6
if fitness > self.highest_fitness * (1.0 + REL):
self.highest_fitness = fitness
self.attempt = 0
for n in self.neuron_actors:
await n.send(("weight_backup",))
else:
self.attempt += 1
for n in self._perturbed:
await n.send(("weight_restore",))
if self.attempt >= self.MAX_ATTEMPTS:
print(
f"[Exoself] STOP. Best fitness={self.highest_fitness:.6f} evals={self.eval_acc} cycles={self.cycle_acc}")
await self._backup_genotype()
await self._terminate_all()
return {
"best_fitness": self.highest_fitness,
"eval_acc": self.eval_acc,
"cycle_acc": self.cycle_acc,
"time_acc": self.time_acc,
}
tot = len(self.neuron_actors)
mp = 1.0 / math.sqrt(max(1, tot))
self._perturbed = [n for n in self.neuron_actors if random.random() < mp]
for n in self._perturbed:
await n.send(("weight_perturb",))
await self.cx_actor.send(("reactivate",))
async def _backup_genotype(self):
remaining = len(self.neuron_actors)
for n in self.neuron_actors:
await n.send(("get_backup",))
backups: List[Tuple[Any, List[Tuple[Any, List[float]]]]] = []
while remaining > 0:
msg = await self.inbox.get()
if msg[0] == "backup_from_neuron":
_, nid, idps = msg
backups.append((nid, idps))
remaining -= 1
id2n = {n["id"]: n for n in self.g["neurons"]}
for nid, idps in backups:
if nid not in id2n:
continue
new_iw = []
bias_val = None
for item in idps:
if isinstance(item[0], str) and item[0] == "bias":
bias_val = float(item[1]) if not isinstance(item[1], list) else float(item[1][0])
else:
input_id, weights = item
new_iw.append({"input_id": input_id, "weights": list(weights)})
id2n[nid]["input_weights"] = new_iw
if bias_val is not None:
id2n[nid].setdefault("input_weights", []).append({"input_id": "bias", "weights": [bias_val]})
if self.file_name:
with open(self.file_name, "w") as f:
json.dump(self.g, f, indent=2)
print(f"[Exoself] Genotype updated → {self.file_name}")
async def _terminate_all(self):
for a in self.sensor_actors + self.neuron_actors + self.actuator_actors:
await a.send(("terminate",))
if self.cx_actor:
await self.cx_actor.send(("terminate",))
for t in self.tasks:
if not t.done():
t.cancel()
self.tasks.clear()

182
mathema/genotype.py Normal file
View File

@@ -0,0 +1,182 @@
# genotype.py
import json
import math
import random
import time
from typing import Dict, List, Tuple, Any, Optional
def generate_id() -> float:
return random.random()
def create_neural_weights(vector_length: int) -> List[float]:
return [random.uniform(-2.0, 2.0) for _ in range(vector_length)]
def construct(
morphology_module,
hidden_layer_densities: List[int],
file_name: Optional[str] = None,
*,
add_bias: bool = False,
) -> Dict[str, Any]:
rnd_seed = time.time_ns() & 0xFFFFFFFF
random.seed(rnd_seed)
S = morphology_module.get_InitSensor(morphology_module)
A = morphology_module.get_InitActuator(morphology_module)
sensor = {
"id": S.get("id", generate_id()),
"name": S["name"],
"vector_length": int(S["vector_length"]),
"cx_id": None, # wird später gesetzt
"fanout_ids": [], # wird später gesetzt
# optional:
# "scape": S.get("scape")
}
actuator = {
"id": A.get("id", generate_id()),
"name": A["name"],
"vector_length": int(A["vector_length"]),
"cx_id": None, # wird später gesetzt
"fanin_ids": [], # wird später gesetzt
# optional:
# "scape": A.get("scape")
}
output_vl = actuator["vector_length"]
layer_densities = list(hidden_layer_densities) + [output_vl]
cortex_id = generate_id()
layers = _create_neuro_layers(
cx_id=cortex_id,
sensor=sensor,
actuator=actuator,
layer_densities=layer_densities,
add_bias=add_bias,
)
input_layer = layers[0]
output_layer = layers[-1]
sensor["cx_id"] = cortex_id
sensor["fanout_ids"] = [n["id"] for n in input_layer]
actuator["cx_id"] = cortex_id
actuator["fanin_ids"] = [n["id"] for n in output_layer]
neuron_ids = [n["id"] for layer in layers for n in layer]
cortex = {
"id": cortex_id,
"sensor_ids": [sensor["id"]],
"actuator_ids": [actuator["id"]],
"neuron_ids": neuron_ids,
}
# 7) Genotyp zusammensetzen
genotype = {
"cortex": cortex,
"sensor": sensor,
"actuator": actuator,
"neurons": [n for layer in layers for n in layer],
}
# 8) Optional speichern
if file_name:
save_genotype(file_name, genotype)
return genotype
def _create_neuro_layers(
cx_id: float,
sensor: Dict[str, Any],
actuator: Dict[str, Any],
layer_densities: List[int],
*,
add_bias: bool,
) -> List[List[Dict[str, Any]]]:
layers: List[List[Dict[str, Any]]] = []
input_idps: List[Tuple[float, int]] = [(sensor["id"], sensor["vector_length"])]
for layer_index, layer_density in enumerate(layer_densities):
neuron_ids = [generate_id() for _ in range(layer_density)]
if layer_index < len(layer_densities) - 1:
next_ids = [generate_id() for _ in range(layer_densities[layer_index + 1])]
output_ids = next_ids
else:
output_ids = [actuator["id"]]
this_layer: List[Dict[str, Any]] = []
for _nid in neuron_ids:
proper_input = _create_neural_input(input_idps, add_bias=add_bias)
neuron = {
"id": _nid,
"layer_index": layer_index,
"cx_id": cx_id,
"activation_function": "tanh",
"input_weights": [{"input_id": i, "weights": w} for (i, w) in proper_input],
"output_ids": output_ids[:], # Kopie
}
this_layer.append(neuron)
layers.append(this_layer)
input_idps = [(n["id"], 1) for n in this_layer]
return layers
def _is_bias_tuple(t: Tuple[Any, Any]) -> bool:
key, _ = t
return isinstance(key, str) and key == "bias"
def _create_neural_input(
input_idps: List[Tuple[float, int]],
*,
add_bias: bool,
) -> List[Tuple[Any, List[float]]]:
proper: List[Tuple[Any, List[float]]] = []
for input_id, vl in input_idps:
proper.append((input_id, create_neural_weights(vl)))
if add_bias:
proper.append(("bias", [random.random() - 0.5]))
return proper
def save_genotype(file_name: str, genotype: Dict[str, Any]) -> None:
with open(file_name, "w") as f:
json.dump(genotype, f, indent=2)
def load_from_file(file_name: str) -> Dict[str, Any]:
with open(file_name, "r") as f:
return json.load(f)
def print_genotype(file_name: str) -> None:
g = load_from_file(file_name)
cx = g["cortex"]
print("[CORTEX]", cx)
sids = cx.get("sensor_ids", [])
nids = cx.get("neuron_ids", [])
aids = cx.get("actuator_ids", [])
nid2n = {n["id"]: n for n in g.get("neurons", [])}
sid2s = {g["sensor"]["id"]: g["sensor"]} if "sensor" in g else {s["id"]: s for s in g.get("sensors", [])}
aid2a = {g["actuator"]["id"]: g["actuator"]} if "actuator" in g else {a["id"]: a for a in g.get("actuators", [])}
for sid in sids:
print("[SENSOR]", sid2s.get(sid))
for nid in nids:
print("[NEURON]", nid2n.get(nid))
for aid in aids:
print("[ACTUATOR]", aid2a.get(aid))

80
mathema/morphology.py Normal file
View File

@@ -0,0 +1,80 @@
# morphology.py
import time
from typing import Any, Callable, Dict, List, Union
MorphologyType = Union[str, Callable[[str], List[Dict[str, Any]]]]
def generate_id() -> float:
now = time.time()
return 1.0 / now
def get_InitSensor(morphology: MorphologyType) -> Dict[str, Any]:
sensors = get_Sensors(morphology)
if not sensors:
raise ValueError("Morphology has no sensors.")
return sensors[0]
def get_InitActuator(morphology: MorphologyType) -> Dict[str, Any]:
actuators = get_Actuators(morphology)
if not actuators:
raise ValueError("Morphology has no actuators.")
return actuators[0]
def get_Sensors(morphology: MorphologyType) -> List[Dict[str, Any]]:
fn = _resolve_morphology(morphology)
return fn("sensors")
def get_Actuators(morphology: MorphologyType) -> List[Dict[str, Any]]:
fn = _resolve_morphology(morphology)
return fn("actuators")
def _resolve_morphology(morphology: MorphologyType) -> Callable[[str], List[Dict[str, Any]]]:
if callable(morphology):
return morphology
# 2) String -> Registry
if isinstance(morphology, str):
reg = {
"xor_mimic": xor_mimic,
}
if morphology in reg:
return reg[morphology]
raise ValueError(f"Unknown morphology name: {morphology}")
try:
# Ist es ein Modul mit einer Funktion 'xor_mimic'?
if hasattr(morphology, "xor_mimic") and callable(getattr(morphology, "xor_mimic")):
return getattr(morphology, "xor_mimic")
except Exception:
pass
raise TypeError("morphology must be a callable, a module with 'xor_mimic', or a registered string key")
def xor_mimic(kind: str) -> List[Dict[str, Any]]:
if kind == "sensors":
return [
{
"id": generate_id(),
"name": "xor_GetInput",
"vector_length": 2,
"scape": {"private": "xor_sim"}
}
]
elif kind == "actuators":
return [
{
"id": generate_id(),
"name": "xor_SendOutput",
"vector_length": 1,
"scape": {"private": "xor_sim"}
}
]
else:
raise ValueError(f"xor_mimic: unsupported kind '{kind}', expected 'sensors' or 'actuators'")

108
mathema/neuron.py Normal file
View File

@@ -0,0 +1,108 @@
# actors/neuron.py
import math
import random
from actor import Actor
def tanh(x): return math.tanh(x)
class Neuron(Actor):
def __init__(self, nid, cx_pid, af_name, input_idps, output_pids):
super().__init__(f"Neuron-{nid}")
self.nid = nid
self.cx_pid = cx_pid
self.af = tanh if af_name == "tanh" else tanh
# input_idps: [(input_id, [w1, w2, ...])]
self.inputs = {}
self.order = []
"""
for (inp_id, weights) in input_idps:
self.order.append(inp_id)
self.inputs[inp_id] = {"weights": list(weights), "got": False, "val": None}
"""
self.bias = 0.0
for (inp_id, weights) in input_idps:
if inp_id == "bias":
self.bias = float(weights[0])
else:
self.order.append(inp_id)
self.inputs[inp_id] = {"weights": list(weights), "got": False, "val": None}
self._backup_inputs = None
self._backup_bias = None
self.outputs = output_pids
print(f"Neuron {nid}: inputs={list(self.inputs.keys())}, bias={self.bias}")
async def run(self):
while True:
msg = await self.inbox.get()
tag = msg[0]
if tag == "forward":
_, from_id, data = msg
if from_id not in self.inputs:
continue
slot = self.inputs[from_id]
slot["got"] = True
slot["val"] = data
if all(self.inputs[i]["got"] for i in self.order):
acc = 0.0
for i in self.order:
w = self.inputs[i]["weights"]
v = self.inputs[i]["val"]
if isinstance(v, list):
acc += sum(wj * vj for wj, vj in zip(w, v))
else:
acc += w[0] * float(v)
out = self.af(acc + self.bias)
for pid in self.outputs:
await pid.send(("forward", self.nid, [out]))
for i in self.order:
self.inputs[i]["got"] = False
self.inputs[i]["val"] = None
print(f"Neuron {self.nid}: input_sum={acc + self.bias:.3f}, output={out:.3f}")
elif tag == "get_backup":
idps = [(i, self.inputs[i]["weights"]) for i in self.order]
idps.append(("bias", self.bias))
await self.cx_pid.send(("backup_from_neuron", self.nid, idps))
elif tag == "weight_backup":
print(f"Neuron {self.nid}: backing up weights")
self._backup_inputs = {k: {"weights": v["weights"][:]} for k, v in self.inputs.items()}
self._backup_bias = self.bias
elif tag == "weight_restore":
if self._backup_inputs is not None:
for k in self.inputs:
self.inputs[k]["weights"] = self._backup_inputs[k]["weights"][:]
self.bias = self._backup_bias
elif tag == "weight_perturb":
print(f"Neuron {self.nid}: perturbing {len([w for i in self.order for w in self.inputs[i]['weights']])} weights")
tot_w = sum(len(self.inputs[i]["weights"]) for i in self.order) + 1
mp = 1 / math.sqrt(tot_w)
delta_mag = 2.0 * math.pi
sat_lim = 2.0 * math.pi
for i in self.order:
ws = self.inputs[i]["weights"]
for j in range(len(ws)):
if random.random() < mp:
ws[j] = _sat(ws[j] + (random.random() - 0.5) * delta_mag, -sat_lim, sat_lim)
if random.random() < mp:
self.bias = _sat(self.bias + (random.random() - 0.5) * delta_mag, -sat_lim, sat_lim)
elif tag == "terminate":
return
def _sat(val, lo, hi):
return lo if val < lo else (hi if val > hi else val)

0
mathema/polis.py Normal file
View File

54
mathema/scape.py Normal file
View File

@@ -0,0 +1,54 @@
from actor import Actor
import math
class XorScape(Actor):
def __init__(self):
super().__init__("XorScape")
self.data = [
([-1, -1], [-1]),
([1, -1], [1]),
([-1, 1], [1]),
([1, 1], [-1])
]
self.index = 0
self.error_acc = 0.0
self.last_actuator = None
async def run(self):
while True:
msg = await self.inbox.get()
tag = msg[0]
if tag == "sense":
print("SCAPE: got sensed by sensor...")
_, sid, from_pid = msg
self.last_actuator = from_pid
inputs, correct = self.data[self.index]
print("SENSOR input /correct: ", inputs, correct)
await from_pid.send(("percept", inputs))
elif tag == "action":
_, output, from_pid = msg
_, correct = self.data[self.index]
step_error = sum((o - c) ** 2 for o, c in zip(output, correct))
step_rmse = math.sqrt(step_error)
print(f"XOR PATTERN: Input={self.data[self.index][0]} → Network={output} → Expected={correct}")
self.index += 1
if self.index >= len(self.data):
total_rmse = math.sqrt(self.error_acc + step_rmse)
fitness = 1.0 / (total_rmse + 1e-5)
await from_pid.send(("result", fitness, 1))
self.index = 0
self.error_acc = 0.0
else:
# Continue episode
self.error_acc += step_rmse
await from_pid.send(("result", 0.0, 0))
elif tag == "terminate":
return

45
mathema/sensor.py Normal file
View File

@@ -0,0 +1,45 @@
# actors/sensor.py
from actor import Actor
import random
class Sensor(Actor):
def __init__(self, sid, cx_pid, name, vector_length, fanout_pids, scape=None):
super().__init__(f"Sensor-{sid}")
self.sid = sid
self.cx_pid = cx_pid
self.sname = name
self.vl = vector_length
self.fanout = fanout_pids
self.scape = scape
async def run(self):
while True:
print("sensor running...")
msg = await self.inbox.get()
tag = msg[0]
print("got sensor message: ", msg)
if tag == "sync":
vec = await self._sense()
print("sensed vec: ", vec)
for pid in self.fanout:
await pid.send(("forward", self.sid, vec))
elif tag == "terminate":
return
async def _sense(self):
if self.sname == "rng":
return [random.random() for _ in range(self.vl)]
elif self.sname == "xor_GetInput" and self.scape:
print("TODO")
await self.scape.send(("sense", self.sid, self))
msg = await self.inbox.get()
if msg[0] == "percept":
return msg[1]
else:
return [0.0] * self.vl
else:
return [0.0] * self.vl

112
mathema/trainer.py Normal file
View File

@@ -0,0 +1,112 @@
import asyncio
import os
import time
from typing import Any, Dict, List, Tuple, Optional
import morphology
from genotype import construct, save_genotype, print_genotype
from exoself import Exoself
class Trainer:
def __init__(
self,
morphology_spec=morphology,
hidden_layer_densities: List[int] = None,
*,
max_attempts: int = 5,
eval_limit: float = float("inf"),
fitness_target: float = float("inf"),
experimental_file: Optional[str] = "experimental.json",
best_file: Optional[str] = "best.json",
exoself_steps_per_eval: int = 0,
):
self.morphology_spec = morphology_spec
self.hds = hidden_layer_densities or []
self.max_attempts = max_attempts
self.eval_limit = eval_limit
self.fitness_target = fitness_target
self.experimental_file = experimental_file
self.best_file = best_file
self.exoself_steps_per_eval = exoself_steps_per_eval
self.best_fitness = float("-inf")
self.best_genotype: Optional[Dict[str, Any]] = None
self.eval_acc = 0
self.cycle_acc = 0
self.time_acc = 0.0
async def _run_one_attempt(self) -> Tuple[float, int, int, float]:
print("constructing genotype...")
geno = construct(
self.morphology_spec,
self.hds,
file_name=self.experimental_file, # <-- schreibt Startnetz nach experimental.json
add_bias=True
)
fitness, evals, cycles, elapsed = await self._evaluate_with_exoself(geno)
return fitness, evals, cycles, elapsed
async def _evaluate_with_exoself(self, genotype: Dict[str, Any]) -> Tuple[float, int, int, float]:
print("creating exoself...")
ex = Exoself(genotype, file_name=self.experimental_file)
best_fitness, evals, cycles, elapsed = await ex.train_until_stop()
return best_fitness, evals, cycles, elapsed
async def go(self):
attempt = 1
while True:
print(".........")
print("current attempt: ", attempt)
print(".........")
if attempt > self.max_attempts or self.eval_acc >= self.eval_limit or self.best_fitness >= self.fitness_target:
# Abschlussausgabe wie im Buch
if self.best_file and os.path.exists(self.best_file):
print_genotype(self.best_file)
print(
f" Morphology: {getattr(self.morphology_spec, '__name__', str(self.morphology_spec))} | "
f"Best Fitness: {self.best_fitness} | EvalAcc: {self.eval_acc}"
)
return {
"best_fitness": self.best_fitness,
"eval_acc": self.eval_acc,
"cycle_acc": self.cycle_acc,
"time_acc": self.time_acc,
"best_file": self.best_file,
}
print("RUN ONE ATTEMPT!")
fitness, evals, cycles, elapsed = await self._run_one_attempt()
print("update akkus...")
self.eval_acc += evals
self.cycle_acc += cycles
self.time_acc += elapsed
# Besser als bisher?
if fitness > self.best_fitness:
self.best_fitness = fitness
if self.best_file and self.experimental_file and os.path.exists(self.experimental_file):
os.replace(self.experimental_file, self.best_file)
attempt = 1
else:
attempt += 1
if __name__ == "__main__":
trainer = Trainer(
morphology_spec=morphology,
hidden_layer_densities=[2],
max_attempts=200,
eval_limit=float("inf"),
fitness_target=99.9,
experimental_file="experimental.json",
best_file="best.json",
exoself_steps_per_eval=0,
)
asyncio.run(trainer.go())