from neo4j import AsyncGraphDatabase NEO4J_CONSTRAINTS = [ "CREATE CONSTRAINT cortex_id IF NOT EXISTS FOR (n:cortex) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT sensor_id IF NOT EXISTS FOR (n:sensor) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT neuron_id IF NOT EXISTS FOR (n:neuron) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT actuator_id IF NOT EXISTS FOR (n:actuator) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT agent_id IF NOT EXISTS FOR (n:agent) REQUIRE n.id IS UNIQUE", "CREATE INDEX agent_id IF NOT EXISTS FOR (a:agent) ON (a.id)", "CREATE INDEX agent_population IF NOT EXISTS FOR (a:agent) ON (a.population_id)", "CREATE INDEX cortex_id IF NOT EXISTS FOR (cx:cortex) ON (cx.id)", "CREATE INDEX neuron_id IF NOT EXISTS FOR (n:neuron) ON (n.id)", "CREATE CONSTRAINT spec_id IF NOT EXISTS FOR (s:specie) REQUIRE s.id IS UNIQUE;" "CREATE CONSTRAINT pop_id IF NOT EXISTS FOR (p:population) REQUIRE p.id IS UNIQUE;" # constraint record has no id field, no need to make it unique ] class Neo4jDB: def __init__(self, uri: str, user: str, password: str, database: str | None = None): self._driver = AsyncGraphDatabase.driver(uri, auth=(user, password)) self._database = database async def close(self): await self._driver.close() """ async def run(self, cypher: str, **params): async with self._driver.session(database=self._database) as s: return await s.run(cypher, **params) """ async def run_read(self, cypher: str, **params): async with self._driver.session(database=self._database) as s: return await s.run(cypher, **params) async def read_single(self, cypher: str, **params): async with self._driver.session(database=self._database) as s: res = await s.run(cypher, **params) return await res.single() async def read_all(self, cypher: str, **params): async with self._driver.session(database=self._database) as s: res = await s.run(cypher, **params) return [r async for r in res] async def run_consume(self, cypher: str, **params): async with self._driver.session(database=self._database) as s: res = await s.run(cypher, **params) return await res.consume() async def create_schema(self): async with self._driver.session(database=self._database) as s: for stmt in NEO4J_CONSTRAINTS: await s.run(stmt) async def purge_all_nodes(self): async with self._driver.session(database=self._database) as s: await s.run("MATCH (n) DETACH DELETE n") async def drop_schema(self): async with self._driver.session(database=self._database) as s: res = await s.run("SHOW CONSTRAINTS") async for record in res: name = record["name"] await s.run(f"DROP CONSTRAINT {name} IF EXISTS")