Need to work on running update queries, that combine plsql and normal sql statements and potentially multiple extracts for one sql file, to make multiple files, and probably to also export to xlsx file format.
136 lines
6.2 KiB
Python
136 lines
6.2 KiB
Python
import logging
|
|
|
|
import sqlalchemy as sq
|
|
import sqlparse
|
|
import pandas as pd
|
|
from config import DatabaseConfig, DatabaseType
|
|
from keepass import KeePass
|
|
from models import Municipality
|
|
import time
|
|
|
|
|
|
class DBAdapter:
|
|
_engine: sq.Engine
|
|
_database_config: DatabaseConfig
|
|
_has_tables_been_initialized: bool = False
|
|
_logger: logging.Logger
|
|
def __init__(self, keepass: KeePass, database_config: DatabaseConfig, logger: logging.Logger):
|
|
self._database_config = database_config
|
|
connection_string: str
|
|
keepass_entry = keepass.get_db_credentials()
|
|
self._logger = logger
|
|
|
|
match self._database_config.type:
|
|
case DatabaseType.PSQL:
|
|
connection_string = f'postgresql+pg8000://{keepass_entry.name}:{keepass_entry.password}@{self._database_config.host}/{self._database_config.name}'
|
|
case DatabaseType.ORCL:
|
|
connection_string = f'oracle:{keepass_entry.name}:{keepass_entry.password}@{self._database_config.host}:{self._database_config.port}:{self._database_config.ssid}'
|
|
case DatabaseType.SQLITE:
|
|
connection_string = f'sqlite:///{self._database_config.host}'
|
|
case _:
|
|
raise Exception(f'Database type {database_config.type} is not supported')
|
|
logger.info(f'Initializing database {database_config.host}:{database_config.name}')
|
|
self._engine: sq.Engine = sq.create_engine(connection_string)
|
|
logger.info('Database initialized')
|
|
|
|
def _read_and_sql_file_and_strip_for_comments(self, filename: str):
|
|
query: str
|
|
self._logger.info(f'Reading file {filename}')
|
|
with open(filename, 'r') as f:
|
|
query = f.read()
|
|
self._logger.info(f'Stripping comments from file {filename}')
|
|
query = sqlparse.format(query, strip_comments=True)
|
|
return query
|
|
|
|
def _set_transaction_readonly(self, conn: sq.Connection):
|
|
self._logger.info('Setting transaction to readonly.')
|
|
if not conn.in_transaction():
|
|
raise Exception('Connection is not in a transaction')
|
|
|
|
match self._database_config.type:
|
|
case DatabaseType.PSQL | DatabaseType.ORCL:
|
|
conn.execute(sq.text('SET TRANSACTION READ ONLY'))
|
|
case _:
|
|
raise Exception(f'Database type {self.database_config.type} is not supported for readonly transactions')
|
|
|
|
def _set_schema(self, conn: sq.Connection, schema: str):
|
|
self._logger.info(f'Setting schema to "{schema}"')
|
|
if not conn.in_transaction():
|
|
raise Exception('Connection is not in a transaction')
|
|
|
|
match self._database_config.type:
|
|
case DatabaseType.ORCL:
|
|
conn.execute(sq.text(f"alter session set current_schema = '{schema}'"))
|
|
case DatabaseType.PSQL:
|
|
conn.execute(sq.text(f"set schema '{schema}'"))
|
|
case _:
|
|
raise Exception(f'Database type {self.database_config.type} is not supported for readonly transactions')
|
|
|
|
def run_query(self, query: str, read_only = True) -> sq.CursorResult:
|
|
result: sq.CursorResult
|
|
self._logger.info(f'Running query: "{query}"')
|
|
with self._engine.connect() as conn:
|
|
with conn.begin():
|
|
self._logger.info("Starting transaction")
|
|
try:
|
|
if read_only:
|
|
self._set_transaction_readonly(conn)
|
|
start = time.time()
|
|
result = conn.execute(sq.text(query))
|
|
end = time.time()
|
|
self._logger.info(f'Query took {end - start} seconds')
|
|
conn.commit()
|
|
except Exception as e:
|
|
conn.rollback()
|
|
raise e
|
|
self._logger.info(f'Transaction commited')
|
|
return result
|
|
|
|
def run_sql_file_one_statement(self, filename: str = "query.sql", read_only = True) -> sq.CursorResult:
|
|
query = self._read_and_sql_file_and_strip_for_comments(filename)
|
|
return self.run_query(query, read_only)
|
|
|
|
def run_sql_file_export_to_csv(self, schema: str | None = None, input_name: str = "query.sql", output_name: str = "export.csv", read_only = True):
|
|
result: pd.DataFrame
|
|
|
|
query = self._read_and_sql_file_and_strip_for_comments(input_name)
|
|
|
|
self._logger.info(f'Running query: "{query}"')
|
|
|
|
with self._engine.connect() as conn:
|
|
if schema is not None:
|
|
self._set_schema(conn, schema)
|
|
result = self._extract_dataframe(conn, query, read_only)
|
|
result.to_csv(output_name, index=False, sep=';')
|
|
|
|
def _extract_dataframe(self, conn: sq.Connection, query: str, read_only: bool, schema: str | None = None) -> pd.DataFrame:
|
|
result: pd.DataFrame
|
|
with conn.begin():
|
|
self._logger.info("Starting transaction")
|
|
try:
|
|
if read_only:
|
|
self._set_transaction_readonly(conn)
|
|
if schema is not None:
|
|
self._set_schema(conn, schema)
|
|
start = time.time()
|
|
result = pd.read_sql(query, conn)
|
|
end = time.time()
|
|
self._logger.info(f'Query took {(end - start):.4f} seconds')
|
|
conn.commit()
|
|
except Exception as e:
|
|
conn.rollback()
|
|
raise e
|
|
return result
|
|
|
|
def run_sql_file_export_to_csv_multiple_schemas(self, municipalities: list[Municipality], base_output_name: str = "", input_name: str = "query.sql", read_only = True):
|
|
query = self._read_and_sql_file_and_strip_for_comments(input_name)
|
|
self._logger.info(f'Running on {len(municipalities)}')
|
|
self._logger.info(f'Running query: "{query}"')
|
|
with self._engine.connect() as conn:
|
|
for index, municipality in enumerate(municipalities):
|
|
self._logger.info(f'({index}/{len(municipalities)} running for municipality {municipality.name})')
|
|
result = self._extract_dataframe(conn, query, read_only, schema = municipality.schema)
|
|
output_file_name = f'{base_output_name}{municipality.name}.csv'
|
|
result.to_csv(output_file_name, index=False, sep=';')
|
|
self._logger.info(f'Created file {output_file_name}')
|