136 lines
6.2 KiB
Python
136 lines
6.2 KiB
Python
import logging
|
|
|
|
import sqlalchemy as sq
|
|
import sqlparse
|
|
import pandas as pd
|
|
from config import DatabaseConfig, DatabaseType
|
|
from keepass import KeePass
|
|
from models import Municipality
|
|
import time
|
|
|
|
|
|
class DBAdapter:
|
|
_engine: sq.Engine
|
|
_database_config: DatabaseConfig
|
|
_has_tables_been_initialized: bool = False
|
|
_logger: logging.Logger
|
|
def __init__(self, keepass: KeePass, database_config: DatabaseConfig, logger: logging.Logger):
|
|
self._database_config = database_config
|
|
connection_string: str
|
|
keepass_entry = keepass.get_db_credentials()
|
|
self._logger = logger
|
|
|
|
match self._database_config.type:
|
|
case DatabaseType.PSQL:
|
|
connection_string = f'postgresql+pg8000://{keepass_entry.name}:{keepass_entry.password}@{self._database_config.host}/{self._database_config.name}'
|
|
case DatabaseType.ORCL:
|
|
connection_string = f'oracle:{keepass_entry.name}:{keepass_entry.password}@{self._database_config.host}:{self._database_config.port}:{self._database_config.ssid}'
|
|
case DatabaseType.SQLITE:
|
|
connection_string = f'sqlite:///{self._database_config.host}'
|
|
case _:
|
|
raise Exception(f'Database type {database_config.type} is not supported')
|
|
logger.info(f'Initializing database {database_config.host}:{database_config.name}')
|
|
self._engine: sq.Engine = sq.create_engine(connection_string)
|
|
logger.info('Database initialized')
|
|
|
|
def _read_and_sql_file_and_strip_for_comments(self, filename: str):
|
|
query: str
|
|
self._logger.info(f'Reading file {filename}')
|
|
with open(filename, 'r') as f:
|
|
query = f.read()
|
|
self._logger.info(f'Stripping comments from file {filename}')
|
|
query = sqlparse.format(query, strip_comments=True)
|
|
return query
|
|
|
|
def _set_transaction_readonly(self, conn: sq.Connection):
|
|
self._logger.info('Setting transaction to readonly.')
|
|
if not conn.in_transaction():
|
|
raise Exception('Connection is not in a transaction')
|
|
|
|
match self._database_config.type:
|
|
case DatabaseType.PSQL | DatabaseType.ORCL:
|
|
conn.execute(sq.text('SET TRANSACTION READ ONLY'))
|
|
case _:
|
|
raise Exception(f'Database type {self.database_config.type} is not supported for readonly transactions')
|
|
|
|
def _set_schema(self, conn: sq.Connection, schema: str):
|
|
self._logger.info(f'Setting schema to "{schema}"')
|
|
if not conn.in_transaction():
|
|
raise Exception('Connection is not in a transaction')
|
|
|
|
match self._database_config.type:
|
|
case DatabaseType.ORCL:
|
|
conn.execute(sq.text(f"alter session set current_schema = '{schema}'"))
|
|
case DatabaseType.PSQL:
|
|
conn.execute(sq.text(f"set schema '{schema}'"))
|
|
case _:
|
|
raise Exception(f'Database type {self.database_config.type} is not supported for readonly transactions')
|
|
|
|
def run_query(self, query: str, read_only = True) -> sq.CursorResult:
|
|
result: sq.CursorResult
|
|
self._logger.info(f'Running query: "{query}"')
|
|
with self._engine.connect() as conn:
|
|
with conn.begin():
|
|
self._logger.info("Starting transaction")
|
|
try:
|
|
if read_only:
|
|
self._set_transaction_readonly(conn)
|
|
start = time.time()
|
|
result = conn.execute(sq.text(query))
|
|
end = time.time()
|
|
self._logger.info(f'Query took {end - start} seconds')
|
|
conn.commit()
|
|
except Exception as e:
|
|
conn.rollback()
|
|
raise e
|
|
self._logger.info(f'Transaction commited')
|
|
return result
|
|
|
|
def run_sql_file_one_statement(self, filename: str = "query.sql", read_only = True) -> sq.CursorResult:
|
|
query = self._read_and_sql_file_and_strip_for_comments(filename)
|
|
return self.run_query(query, read_only)
|
|
|
|
def run_sql_file_export_to_csv(self, schema: str | None = None, input_name: str = "query.sql", output_name: str = "export.csv", read_only = True):
|
|
result: pd.DataFrame
|
|
|
|
query = self._read_and_sql_file_and_strip_for_comments(input_name)
|
|
|
|
self._logger.info(f'Running query: "{query}"')
|
|
|
|
with self._engine.connect() as conn:
|
|
if schema is not None:
|
|
self._set_schema(conn, schema)
|
|
result = self._extract_dataframe(conn, query, read_only)
|
|
result.to_csv(output_name, index=False, sep=';')
|
|
|
|
def _extract_dataframe(self, conn: sq.Connection, query: str, read_only: bool, schema: str | None = None) -> pd.DataFrame:
|
|
result: pd.DataFrame
|
|
with conn.begin():
|
|
self._logger.info("Starting transaction")
|
|
try:
|
|
if read_only:
|
|
self._set_transaction_readonly(conn)
|
|
if schema is not None:
|
|
self._set_schema(conn, schema)
|
|
start = time.time()
|
|
result = pd.read_sql(query, conn)
|
|
end = time.time()
|
|
self._logger.info(f'Query took {(end - start):.4f} seconds')
|
|
conn.commit()
|
|
except Exception as e:
|
|
conn.rollback()
|
|
raise e
|
|
return result
|
|
|
|
def run_sql_file_export_to_csv_multiple_schemas(self, municipalities: list[Municipality], base_output_name: str = "", input_name: str = "query.sql", read_only = True):
|
|
query = self._read_and_sql_file_and_strip_for_comments(input_name)
|
|
self._logger.info(f'Running on {len(municipalities)}')
|
|
self._logger.info(f'Running query: "{query}"')
|
|
with self._engine.connect() as conn:
|
|
for index, municipality in enumerate(municipalities):
|
|
self._logger.info(f'({index + 1}/{len(municipalities)}) running for municipality {municipality.name}')
|
|
result = self._extract_dataframe(conn, query, read_only, schema = municipality.schema)
|
|
output_file_name = f'{base_output_name}{municipality.name}.csv'
|
|
result.to_csv(output_file_name, index=False, sep=';')
|
|
self._logger.info(f'Created file {output_file_name}')
|