Files
neuroevolution/mathema/core/db.py
2026-02-21 10:58:05 +01:00

161 lines
5.7 KiB
Python

from typing import LiteralString, cast
from neo4j import AsyncGraphDatabase, Query
NEO4J_CONSTRAINTS = [
"CREATE CONSTRAINT cortex_id IF NOT EXISTS FOR (n:cortex) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT sensor_id IF NOT EXISTS FOR (n:sensor) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT neuron_id IF NOT EXISTS FOR (n:neuron) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT actuator_id IF NOT EXISTS FOR (n:actuator) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT agent_id IF NOT EXISTS FOR (n:agent) REQUIRE n.id IS UNIQUE",
"CREATE INDEX agent_id IF NOT EXISTS FOR (a:agent) ON (a.id)",
"CREATE INDEX agent_population IF NOT EXISTS FOR (a:agent) ON (a.population_id)",
"CREATE INDEX cortex_id IF NOT EXISTS FOR (cx:cortex) ON (cx.id)",
"CREATE INDEX neuron_id IF NOT EXISTS FOR (n:neuron) ON (n.id)",
"CREATE CONSTRAINT spec_id IF NOT EXISTS FOR (s:specie) REQUIRE s.id IS UNIQUE;"
"CREATE CONSTRAINT pop_id IF NOT EXISTS FOR (p:population) REQUIRE p.id IS UNIQUE;"
# constraint record has no id field, no need to make it unique
]
class Neo4jDB:
def __init__(self, uri: str, user: str, password: str, database: str | None = None):
self._driver = AsyncGraphDatabase.driver(uri, auth=(user, password))
self._database = database
async def close(self):
await self._driver.close()
"""
async def run(self, cypher: str, **params):
async with self._driver.session(database=self._database) as s:
return await s.run(cypher, **params)
"""
async def execute_write(self, work):
"""
Method: execute_write
Description:
This method is used to execute a write operation on the database using the provided work.
Parameters:
self: The current instance of the class.
work: The work to be executed as a write operation on the database.
"""
async with self._driver.session(database=self._database) as session:
return await session.execute_write(work)
async def run_read(self, cypher: LiteralString | Query, **params):
"""
Method: run_read
Description:
This method allows running a read operation with the provided cypher query and parameters using the underlying
driver session. It returns the result of the read operation.
Parameters:
- cypher: str or Query object representing the cypher query to be executed.
- **params: Additional keyword arguments that represent parameters to be passed to the cypher query.
Returns:
Result of the read operation based on the provided cypher query and parameters.
"""
async with self._driver.session(database=self._database) as s:
return await s.run(cypher, **params)
async def read_single(self, cypher: LiteralString | Query, **params):
"""
Reads a single record from the database using the provided Cypher query and parameters.
:param cypher: The Cypher query to execute.
:param params: Additional parameters to be passed to the query.
:return: A single record retrieved from the database based on the given Cypher query and parameters.
"""
async with self._driver.session(database=self._database) as s:
res = await s.run(cypher, **params)
return await res.single()
async def read_all(self, cypher: LiteralString | Query, **params):
"""
Reads all records from the database based on the given cypher query and parameters.
Parameters:
cypher (str): The Cypher query to execute.
**params: Additional parameters to pass to the Cypher query.
Return Type:
list: A list of retrieved records from the database.
"""
async with self._driver.session(database=self._database) as s:
res = await s.run(cypher, **params)
return [r async for r in res]
async def run_consume(self, cypher: LiteralString | Query, **params):
"""
Run a Cypher query and consume the result.
Parameters:
cypher : Union[LiteralString, Query]
The Cypher query to be executed.
**params : Any
Keyword arguments for query parameters.
Returns:
None
"""
async with self._driver.session(database=self._database) as s:
res = await s.run(cypher, **params)
return await res.consume()
async def create_schema(self):
"""
Creates database schema by running Neo4j constraints queries.
Parameters:
- self: instance of the class
- database: name of the database to be used for creating schema
Returns:
- None
"""
async with self._driver.session(database=self._database) as s:
for stmt in NEO4J_CONSTRAINTS:
await s.run(cast(LiteralString, stmt))
async def purge_all_nodes(self):
"""
Purge all nodes in the database.
Method parameters:
- None
Returns:
- None
"""
async with self._driver.session(database=self._database) as s:
await s.run("MATCH (n) DETACH DELETE n")
async def drop_schema(self):
"""
Drop the current schema by dropping all constraints in the database.
Parameters:
None
Return Type:
None
"""
async with self._driver.session(database=self._database) as s:
res = await s.run("SHOW CONSTRAINTS")
async for record in res:
name = record["name"]
await s.run(cast(LiteralString, f"DROP CONSTRAINT {name} IF EXISTS"))