import sqlalchemy as sq import sqlparse import pandas as pd from config import DatabaseConfig, DatabaseType from keepass import KeePass def _read_and_sql_file_and_strip_for_comments(filename: str): query: str with open(filename, 'r') as f: query = f.read() query = sqlparse.format(query, strip_comments=True) return query class DBAdapter: _engine: sq.Engine _database_config: DatabaseConfig def __init__(self, keepass: KeePass, database_config: DatabaseConfig): self._database_config = database_config connection_string: str keepass_entry = keepass.get_db_credentials() match self._database_config.type: case DatabaseType.PSQL: connection_string = f'postgresql+pg8000://{keepass_entry.name}:{keepass_entry.password}@{self._database_config.host}/{self._database_config.name}' case DatabaseType.ORCL: connection_string = f'oracle:{keepass_entry.name}:{keepass_entry.password}@{self._database_config.host}:{self._database_config.port}:{self._database_config.ssid}' case _: raise Exception(f'Database type {database_config.type} is not supported') self._engine: sq.Engine = sq.create_engine(connection_string) def _set_transaction_readonly(self, conn: sq.Connection): if not conn.in_transaction(): raise Exception('Connection is not in a transaction') match self._database_config.type: case DatabaseType.PSQL: conn.execute(sq.text('SET TRANSACTION READ ONLY')) case DatabaseType.ORCL: conn.execute(sq.text('SET TRANSACTION READ ONLY')) case _: raise Exception(f'Database type {self.database_config.type} is not supported for readonly transactions') def run_query(self, query: str, read_only = True) -> sq.CursorResult: result: sq.CursorResult with self._engine.connect() as conn: with conn.begin(): try: if read_only: self._set_transaction_readonly(conn) result = conn.execute(sq.text(query)) conn.commit() except Exception as e: conn.rollback() raise e return result def run_sql_file_one_statement(self, filename: str = "query.sql", read_only = True) -> sq.CursorResult: query = _read_and_sql_file_and_strip_for_comments(filename) return self.run_query(query, read_only) def run_sql_file_export_to_csv(self, input_name: str = "query.sql", output_name: str = "export.csv", read_only = True): result: pd.DataFrame query = _read_and_sql_file_and_strip_for_comments(input_name) with self._engine.connect() as conn: with conn.begin(): try: if read_only: self._set_transaction_readonly(conn) result = pd.read_sql(query, conn) conn.commit() except Exception as e: conn.rollback() raise e result.to_csv(output_name, index=False, sep=';')