from typing import LiteralString, cast from neo4j import AsyncGraphDatabase, Query NEO4J_CONSTRAINTS = [ "CREATE CONSTRAINT cortex_id IF NOT EXISTS FOR (n:cortex) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT sensor_id IF NOT EXISTS FOR (n:sensor) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT neuron_id IF NOT EXISTS FOR (n:neuron) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT actuator_id IF NOT EXISTS FOR (n:actuator) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT agent_id IF NOT EXISTS FOR (n:agent) REQUIRE n.id IS UNIQUE", "CREATE INDEX agent_id IF NOT EXISTS FOR (a:agent) ON (a.id)", "CREATE INDEX agent_population IF NOT EXISTS FOR (a:agent) ON (a.population_id)", "CREATE INDEX cortex_id IF NOT EXISTS FOR (cx:cortex) ON (cx.id)", "CREATE INDEX neuron_id IF NOT EXISTS FOR (n:neuron) ON (n.id)", "CREATE CONSTRAINT spec_id IF NOT EXISTS FOR (s:specie) REQUIRE s.id IS UNIQUE;" "CREATE CONSTRAINT pop_id IF NOT EXISTS FOR (p:population) REQUIRE p.id IS UNIQUE;" # constraint record has no id field, no need to make it unique ] class Neo4jDB: def __init__(self, uri: str, user: str, password: str, database: str | None = None): self._driver = AsyncGraphDatabase.driver(uri, auth=(user, password)) self._database = database async def close(self): await self._driver.close() """ async def run(self, cypher: str, **params): async with self._driver.session(database=self._database) as s: return await s.run(cypher, **params) """ async def execute_write(self, work): """ Method: execute_write Description: This method is used to execute a write operation on the database using the provided work. Parameters: self: The current instance of the class. work: The work to be executed as a write operation on the database. """ async with self._driver.session(database=self._database) as session: return await session.execute_write(work) async def run_read(self, cypher: LiteralString | Query, **params): """ Method: run_read Description: This method allows running a read operation with the provided cypher query and parameters using the underlying driver session. It returns the result of the read operation. Parameters: - cypher: str or Query object representing the cypher query to be executed. - **params: Additional keyword arguments that represent parameters to be passed to the cypher query. Returns: Result of the read operation based on the provided cypher query and parameters. """ async with self._driver.session(database=self._database) as s: return await s.run(cypher, **params) async def read_single(self, cypher: LiteralString | Query, **params): """ Reads a single record from the database using the provided Cypher query and parameters. :param cypher: The Cypher query to execute. :param params: Additional parameters to be passed to the query. :return: A single record retrieved from the database based on the given Cypher query and parameters. """ async with self._driver.session(database=self._database) as s: res = await s.run(cypher, **params) return await res.single() async def read_all(self, cypher: LiteralString | Query, **params): """ Reads all records from the database based on the given cypher query and parameters. Parameters: cypher (str): The Cypher query to execute. **params: Additional parameters to pass to the Cypher query. Return Type: list: A list of retrieved records from the database. """ async with self._driver.session(database=self._database) as s: res = await s.run(cypher, **params) return [r async for r in res] async def run_consume(self, cypher: LiteralString | Query, **params): """ Run a Cypher query and consume the result. Parameters: cypher : Union[LiteralString, Query] The Cypher query to be executed. **params : Any Keyword arguments for query parameters. Returns: None """ async with self._driver.session(database=self._database) as s: res = await s.run(cypher, **params) return await res.consume() async def create_schema(self): """ Creates database schema by running Neo4j constraints queries. Parameters: - self: instance of the class - database: name of the database to be used for creating schema Returns: - None """ async with self._driver.session(database=self._database) as s: for stmt in NEO4J_CONSTRAINTS: await s.run(cast(LiteralString, stmt)) async def purge_all_nodes(self): """ Purge all nodes in the database. Method parameters: - None Returns: - None """ async with self._driver.session(database=self._database) as s: await s.run("MATCH (n) DETACH DELETE n") async def drop_schema(self): """ Drop the current schema by dropping all constraints in the database. Parameters: None Return Type: None """ async with self._driver.session(database=self._database) as s: res = await s.run("SHOW CONSTRAINTS") async for record in res: name = record["name"] await s.run(cast(LiteralString, f"DROP CONSTRAINT {name} IF EXISTS"))