db_interactor/database/db_adapter.py

191 lines
9.3 KiB
Python

import logging
import os
import time
from pathlib import Path
import pandas as pd
import sqlalchemy as sq
import sqlparse
from database.QueryParameters import QueryParameters
from models.DatabaseConfig import DatabaseConfig
from models.DatabaseType import DatabaseType
from keepass.Keepass import KeePass
from models.ExportType import ExportType
from models.Municipality import Municipality
class DBAdapter:
_engine: sq.Engine
_database_config: DatabaseConfig
_has_tables_been_initialized: bool = False
_logger: logging.Logger
_output_folder: str = 'output/'
def __init__(self, keepass: KeePass, database_config: DatabaseConfig, logger: logging.Logger):
self._database_config = database_config
connection_string: str
keepass_entry = keepass.get_db_credentials()
self._logger = logger
match self._database_config.type:
case DatabaseType.PSQL:
connection_string = f'postgresql+pg8000://{keepass_entry.name}:{keepass_entry.password}@{self._database_config.host}/{self._database_config.name}'
case DatabaseType.ORCL:
connection_string = f'oracle+cx_oracle://{keepass_entry.name}:{keepass_entry.password}@{self._database_config.host}:{self._database_config.port}/{self._database_config.ssid}'
case DatabaseType.SQLITE:
connection_string = f'sqlite:///{self._database_config.host}'
case _:
raise Exception(f'Database type {database_config.type} is not supported')
logger.info(f'Initializing database {database_config.host}:{database_config.name}')
self._engine: sq.Engine = sq.create_engine(connection_string)
logger.info('Database initialized')
def _set_transaction_readonly(self, conn: sq.Connection):
self._logger.info('Setting transaction to readonly.')
if not conn.in_transaction():
raise Exception('Connection is not in a transaction')
match self._database_config.type:
case DatabaseType.PSQL | DatabaseType.ORCL:
conn.execute(sq.text('SET TRANSACTION READ ONLY'))
case _:
raise Exception(
f'Database type {self._database_config.type} is not supported for readonly transactions')
def _set_schema(self, conn: sq.Connection, schema: str):
self._logger.info(f'Setting schema to "{schema}"')
if not conn.in_transaction():
raise Exception('Connection is not in a transaction')
match self._database_config.type:
case DatabaseType.ORCL:
conn.execute(sq.text(f"alter session set current_schema = {schema}"))
case DatabaseType.PSQL:
conn.execute(sq.text(f"set schema '{schema}'"))
conn.execute(sq.text(f"set search_path = public"))
case _:
raise Exception(
f'Database type {self._database_config.type} is not supported for readonly transactions')
def _generate_filename(self, filename: str) -> str:
try:
os.mkdir(self._output_folder)
except FileExistsError:
pass
return f'{self._output_folder}{filename}'
def _export_to_file(self, export_type: ExportType, output_file_name: str, result: pd.DataFrame, query_parameter: QueryParameters = None):
os.makedirs(os.path.dirname(output_file_name + ".txt"), exist_ok=True)
match export_type:
case ExportType.CSV:
output_file_name += '.csv'
result.to_csv(output_file_name, index=False, sep=';', encoding='utf-8')
case ExportType.EXCEL:
output_file_name += '.xlsx'
result.to_excel(output_file_name, index=False)
case ExportType.XML:
output_file_name += '.xml'
result.to_xml(output_file_name, index=False)
self._logger.info(f'Created file {output_file_name}')
def _extract_dataframe(self, conn: sq.Connection, query: str, read_only: bool, query_parameters: QueryParameters,
schema: str | None = None) -> pd.DataFrame:
result: pd.DataFrame
with conn.begin():
self._logger.info("Starting transaction")
try:
if read_only:
self._set_transaction_readonly(conn)
if schema is not None:
self._set_schema(conn, schema)
result = self._extract_dataframe_no_safeguards(conn, query, query_parameters)
except Exception as e:
conn.rollback()
raise e
return result
def _extract_dataframe_no_safeguards(self, conn: sq.Connection, query: str,
query_parameter: QueryParameters) -> pd.DataFrame:
result: pd.DataFrame
start = time.time()
result = pd.read_sql(query, conn, params=query_parameter.query_parameters)
end = time.time()
self._logger.info(f'Query took {(end - start):.4f} seconds')
return result
def run_sql_file_export_to_file_multiple_schemas(self, municipalities: list[Municipality],
query_parameter: QueryParameters,
read_only=True):
self.run_sql_files_export_to_files_multiple_schemas(municipalities, [query_parameter], read_only)
def run_sql_file_multiple_statements(self, query_parameter: QueryParameters):
"""
Runs an SQL file, supports multiple statements, does not support plsql.
If any statements fail, throws an error and rolls back.
:param query_parameter: contains data about the queries to run and how to find them
:return: Nothing
"""
queries = query_parameter.get_queries()
self._logger.info(queries)
self._logger.info(f'Running {len(queries)} queries')
with self._engine.connect() as conn:
with conn.begin():
self._logger.info("Starting transaction")
try:
for index, query in enumerate(queries):
start = time.time()
conn.execute(sq.text(query), parameters=query_parameter.query_parameters)
end = time.time()
self._logger.info(
f'({index + 1} / {len(queries)}) Query took {(end - start):.4f} seconds ({query})')
conn.commit()
except Exception as e:
self._logger.info(f'Transaction rollback')
conn.rollback()
raise e
self._logger.info(f'Transaction commited')
def run_sql_files_export_to_files_multiple_schemas(self, municipalities: list[Municipality],
query_parameters: list[QueryParameters] = None,
read_only: bool = True):
""""
Runs the list of granted sql files against the list of municipalities
:param export_type: the type of files to export
:param municipalities: a list of municipalities
:param query_parameters: a list of sql files to run TODO: make this a pair list with sql file and translation for root_name and row_name to give the XML file the correct namespaces, consider using the stylesheet option from panda to xml https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_xml.html
:param read_only: if the transaction should be set too read-only to avoid changes to the database
:return: Nothing
"""
with self._engine.connect() as conn:
with conn.begin():
if read_only:
self._set_transaction_readonly(conn)
for municipality_index, municipality in enumerate(municipalities):
self._logger.info(
f'({municipality_index + 1}/{len(municipalities)}) Starting to process municipality {municipality.name} ({municipality.schema})')
self._set_schema(conn, municipality.schema)
file_prefix = f'{municipality.kommunekode}/'
for query_file_index, query_parameter in enumerate(query_parameters):
queries = query_parameter.get_queries()
self._logger.info(
f'({query_file_index + 1}/{len(municipalities)}) Starting to process query with title: {query_parameter.title}')
if not len(queries) == 1:
self._logger.error(f'Query file {query_parameter.title} failed due to multiple queries')
raise Exception(f'Query file {query_parameter.title} failed due to multiple queries')
query = queries[0]
dataframe = self._extract_dataframe_no_safeguards(conn, query, query_parameter)
self._logger.info(
f'[{municipality.kommunekode}][{query_parameter.title}][{len(dataframe.index)}]')
filename = self._generate_filename(f'{file_prefix}{query_parameter.title}')
self._export_to_file(query_parameter.export_type, filename, dataframe, query_parameter)
self._logger.info(f'Finished processing {query_parameter.title} generated file {filename}')