171 lines
No EOL
4.6 KiB
Python
171 lines
No EOL
4.6 KiB
Python
# SPDX-License-Identifier: LGPL-3.0-or-later
|
|
|
|
from .logging import setup_logger
|
|
from abc import ABC, abstractmethod
|
|
|
|
log = setup_logger(__name__)
|
|
|
|
class CursorContext:
|
|
"""
|
|
Proxy wrapper around a database cursor.
|
|
Manages cursor lifecycle as a context manager and delegates
|
|
all attribute access to the underlying cursor object.
|
|
"""
|
|
def __init__(self, cursor):
|
|
self._cursor = cursor
|
|
|
|
def __enter__(self):
|
|
"""
|
|
Return the underlying cursor.
|
|
"""
|
|
return self._cursor
|
|
|
|
def __exit__(self, *args):
|
|
"""
|
|
Close the cursor on exit.
|
|
"""
|
|
self._cursor.close()
|
|
return False
|
|
|
|
def __getattr__(self, item):
|
|
return getattr(self._cursor, item)
|
|
|
|
def __iter__(self):
|
|
return iter(self._cursor)
|
|
|
|
def __next__(self):
|
|
return next(self._cursor)
|
|
|
|
class AbstractDatabase(ABC):
|
|
"""
|
|
Abstract base class for database connections.
|
|
|
|
Provides a unified interface for connecting, executing queries,
|
|
and managing transactions. Subclasses must implement _connect()
|
|
and placeholder.
|
|
|
|
Args:
|
|
strict: If True, raises RuntimeError when connect() is called
|
|
on an already open connection. If False, closes the
|
|
existing connection silently.
|
|
"""
|
|
def __init__(self, strict: bool = True):
|
|
self._conn = None
|
|
self._strict = strict
|
|
|
|
@abstractmethod
|
|
def _connect(self):
|
|
"""
|
|
Establish and return a database connection.
|
|
|
|
Must be implemented by subclasses. Read configuration from
|
|
environment variables or any other source, then return a
|
|
DB-API 2.0 compatible connection object.
|
|
|
|
Returns:
|
|
A database connection object.
|
|
"""
|
|
pass
|
|
|
|
def connect(self, *args, **kwargs):
|
|
"""
|
|
Open the database connection.
|
|
|
|
Calls _connect() internally. Should not be overridden.
|
|
|
|
Raises:
|
|
RuntimeError: If connection is already open and strict=True.
|
|
RuntimeError: If _connect() returns None.
|
|
"""
|
|
if self._conn is not None:
|
|
if self._strict:
|
|
raise RuntimeError("Database connection already open")
|
|
else:
|
|
try:
|
|
self._conn.close()
|
|
except Exception as e:
|
|
log.error(f"Query execution failed: {e}")
|
|
return None
|
|
|
|
conn = self._connect(*args, **kwargs)
|
|
|
|
if conn is None:
|
|
raise RuntimeError("_connect() must return a connection object")
|
|
|
|
self._conn = conn
|
|
return conn
|
|
|
|
def disconnect(self):
|
|
"""
|
|
Close the database connection and release resources.
|
|
"""
|
|
if self._conn is not None:
|
|
try:
|
|
self._conn.close()
|
|
finally:
|
|
self._conn = None
|
|
|
|
@property
|
|
def connection(self):
|
|
"""
|
|
Return the active connection object.
|
|
|
|
Raises:
|
|
RuntimeError: If the database is not connected.
|
|
"""
|
|
if self._conn is None:
|
|
raise RuntimeError("Database not connected")
|
|
return self._conn
|
|
|
|
@property
|
|
@abstractmethod
|
|
def placeholder(self) -> str:
|
|
"""
|
|
SQL parameter placeholder for the target dialect.
|
|
Must be implemented by subclasses.
|
|
"""
|
|
pass
|
|
|
|
def execute(self, sql: str, params: tuple = (), autocommit: bool = False) -> CursorContext | None:
|
|
"""
|
|
Execute a SQL query and return a CursorContext.
|
|
|
|
Args:
|
|
sql: SQL query string with placeholders.
|
|
params: Query parameters as a tuple.
|
|
autocommit: If True, commits the transaction after execution.
|
|
|
|
Returns:
|
|
CursorContext wrapping the cursor, or raises on failure.
|
|
|
|
Raises:
|
|
Exception: Re-raises any database error after rollback.
|
|
"""
|
|
cursor = self.connection.cursor()
|
|
|
|
try:
|
|
cursor.execute(sql, params)
|
|
|
|
if autocommit:
|
|
self.connection.commit()
|
|
|
|
return CursorContext(cursor)
|
|
|
|
except Exception:
|
|
self.connection.rollback()
|
|
log.exception("Query execution failed")
|
|
raise
|
|
|
|
def __enter__(self):
|
|
"""
|
|
Support use as a context manager for connection lifecycle.
|
|
Calls disconnect() on exit.
|
|
"""
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
"""
|
|
Support use as a context manager for connection lifecycle.
|
|
Calls disconnect() on exit.
|
|
"""
|
|
self.disconnect() |