from __future__ import annotations import asyncio import logging from dataclasses import dataclass, field from typing import Any, Callable, Dict, List, Optional from mathema.core.db import Neo4jDB from mathema.actors.actor import Actor log = logging.getLogger(__name__) # =============================================================== # Datentypen # =============================================================== @dataclass class ScapeEntry: type: str actor: Actor # Actor-Instanz (Pid-Äquivalent) task: asyncio.Task # laufender run()-Task parameters: dict = field(default_factory=dict) @dataclass class PolisState: active_mods: List[str] = field(default_factory=list) active_scapes: List[ScapeEntry] = field(default_factory=list) # =============================================================== # Polis-Klasse # =============================================================== class Polis: """ Python-Asyncio Port von Erlangs polis. - start/stop, create/reset wie im Buch - Mods = Nebenmodule mit start/stop - Scapes = Actor-basierte Environments (z.B. XorScape) - Neo4j statt Mnesia """ def __init__( self, mods: List[tuple[str, Callable[[], Any], Callable[[], Any]]] = None, public_scapes: List[tuple[str, dict, Callable[[], Actor]]] = None, neo4j_uri: str = "bolt://localhost:7687", neo4j_user: str = "neo4j", neo4j_pass: str = "mathema2", neo4j_db: Optional[str] = None, ): self._mods = mods or [] # (name, async_start, async_stop) self._public_scapes = public_scapes or [] # (type, params, factory->Actor) self._state = PolisState() self._db = Neo4jDB(neo4j_uri, neo4j_user, neo4j_pass, neo4j_db) self._lock = asyncio.Lock() self._online_evt = asyncio.Event() self._closing = False self._scapes: Dict[str, ScapeEntry] = {} # type -> ScapeEntry # ----------------------------------------------------------- # API # ----------------------------------------------------------- async def sync(self) -> None: """Im Erlang-Code: make:all([load]). Hier als Platzhalter (Hot-Reload).""" return async def create(self) -> None: """Schema in Neo4j anlegen (Constraints/Indizes).""" await self._db.create_schema() async def reset(self) -> None: """Alle Daten löschen, Schema bleibt erhalten.""" await self._db.purge_all_nodes() await self._db.create_schema() async def start(self) -> None: async with self._lock: if self._online_evt.is_set(): log.info("polis already online.") return await self._db.create_schema() # await self._start_supmods(self._mods) active_scapes = await self._start_scapes(self._public_scapes) self._state = PolisState( active_mods=[name for (name, _s, _t) in self._mods], active_scapes=active_scapes, ) self._online_evt.set() log.info("******** Polis: ##MATHEMA## is now online.") async def stop(self, reason: str = "normal") -> None: async with self._lock: if not self._online_evt.is_set(): log.info("polis is offline") return self._closing = True await self._stop_scapes(self._state.active_scapes) # await self._stop_supmods(self._mods) await self._db.close() self._online_evt.clear() log.info(f"******** Polis: ##MATHEMA## is now offline, terminated with reason:{reason}") async def get_scape(self, scape_type: str) -> Optional[Actor]: """Pid-Äquivalent (Actor-Instanz) für Scape-Typ zurückgeben.""" entry = self._scapes.get(scape_type) return entry.actor if entry else None # ----------------------------------------------------------- # intern: Mods # ----------------------------------------------------------- """ async def _start_supmods(self, mods: List[tuple[str, Callable[[], Any], Callable[[], Any]]]) -> None: for name, start_fn, _stop_fn in mods: print(f"Starting mod: {name}") res = start_fn() if asyncio.iscoroutine(res): await res async def _stop_supmods(self, mods: List[tuple[str, Callable[[], Any], Callable[[], Any]]]) -> None: for name, _start_fn, stop_fn in mods: print(f"Stopping mod: {name}") res = stop_fn() if asyncio.iscoroutine(res): await res """ # ----------------------------------------------------------- # intern: Scapes # ----------------------------------------------------------- async def _start_scapes( self, scapes_cfg: List[tuple[str, dict, Callable[[], Actor]]] ) -> List[ScapeEntry]: active: List[ScapeEntry] = [] for scape_type, params, factory in scapes_cfg: actor: Actor = factory() # Actor-Instanz erzeugen task = asyncio.create_task(actor.run(), name=f"scape:{scape_type}") entry = ScapeEntry(type=scape_type, actor=actor, task=task, parameters=params or {}) self._scapes[scape_type] = entry active.append(entry) log.debug(f"Scape started: {scape_type} -> task={task.get_name()}") return active async def _stop_scapes(self, scapes: List[ScapeEntry]) -> None: for e in scapes: try: await e.actor.send(("terminate",)) except Exception as ex: log.warning(f"Warn: terminate send failed for {e.type}: {ex}") for e in scapes: try: await asyncio.wait_for(e.task, timeout=2.0) except asyncio.TimeoutError: e.task.cancel() try: await e.task except asyncio.CancelledError: pass except Exception as ex: log.warning(f"Warn: scape {e.type} exit error: {ex}") self._scapes.clear()