187 lines
9.1 KiB
Python
187 lines
9.1 KiB
Python
import logging
|
|
import os
|
|
import time
|
|
|
|
import pandas as pd
|
|
import sqlalchemy as sq
|
|
import sqlparse
|
|
|
|
from database.QueryParameters import QueryParameters
|
|
from models.DatabaseConfig import DatabaseConfig
|
|
from models.DatabaseType import DatabaseType
|
|
from keepass.Keepass import KeePass
|
|
from models.ExportType import ExportType
|
|
from models.Municipality import Municipality
|
|
|
|
|
|
class DBAdapter:
|
|
_engine: sq.Engine
|
|
_database_config: DatabaseConfig
|
|
_has_tables_been_initialized: bool = False
|
|
_logger: logging.Logger
|
|
_output_folder: str = 'output/'
|
|
|
|
def __init__(self, keepass: KeePass, database_config: DatabaseConfig, logger: logging.Logger):
|
|
self._database_config = database_config
|
|
connection_string: str
|
|
keepass_entry = keepass.get_db_credentials()
|
|
self._logger = logger
|
|
|
|
match self._database_config.type:
|
|
case DatabaseType.PSQL:
|
|
connection_string = f'postgresql+pg8000://{keepass_entry.name}:{keepass_entry.password}@{self._database_config.host}/{self._database_config.name}'
|
|
case DatabaseType.ORCL:
|
|
connection_string = f'oracle+cx_oracle://{keepass_entry.name}:{keepass_entry.password}@{self._database_config.host}:{self._database_config.port}/{self._database_config.ssid}'
|
|
case DatabaseType.SQLITE:
|
|
connection_string = f'sqlite:///{self._database_config.host}'
|
|
case _:
|
|
raise Exception(f'Database type {database_config.type} is not supported')
|
|
logger.info(f'Initializing database {database_config.host}:{database_config.name}')
|
|
self._engine: sq.Engine = sq.create_engine(connection_string)
|
|
logger.info('Database initialized')
|
|
|
|
def _set_transaction_readonly(self, conn: sq.Connection):
|
|
self._logger.info('Setting transaction to readonly.')
|
|
if not conn.in_transaction():
|
|
raise Exception('Connection is not in a transaction')
|
|
|
|
match self._database_config.type:
|
|
case DatabaseType.PSQL | DatabaseType.ORCL:
|
|
conn.execute(sq.text('SET TRANSACTION READ ONLY'))
|
|
case _:
|
|
raise Exception(
|
|
f'Database type {self._database_config.type} is not supported for readonly transactions')
|
|
|
|
def _set_schema(self, conn: sq.Connection, schema: str):
|
|
self._logger.info(f'Setting schema to "{schema}"')
|
|
if not conn.in_transaction():
|
|
raise Exception('Connection is not in a transaction')
|
|
|
|
match self._database_config.type:
|
|
case DatabaseType.ORCL:
|
|
conn.execute(sq.text(f"alter session set current_schema = {schema}"))
|
|
case DatabaseType.PSQL:
|
|
conn.execute(sq.text(f"set schema '{schema}'"))
|
|
case _:
|
|
raise Exception(
|
|
f'Database type {self._database_config.type} is not supported for readonly transactions')
|
|
|
|
def _generate_filename(self, filename: str) -> str:
|
|
try:
|
|
os.mkdir(self._output_folder)
|
|
except FileExistsError:
|
|
pass
|
|
return f'{self._output_folder}{filename}'
|
|
|
|
def _export_to_file(self, export_type, output_file_name, result, query_parameter: QueryParameters = None):
|
|
match export_type:
|
|
case ExportType.CSV:
|
|
output_file_name += '.csv'
|
|
result.to_csv(output_file_name, index=False, sep=';', encoding='utf-8')
|
|
case ExportType.EXCEL:
|
|
output_file_name += '.xlsx'
|
|
result.to_excel(output_file_name, index=False)
|
|
case ExportType.XML:
|
|
output_file_name += '.xml'
|
|
result.to_xml(output_file_name, index=False)
|
|
|
|
self._logger.info(f'Created file {output_file_name}')
|
|
|
|
def _extract_dataframe(self, conn: sq.Connection, query: str, read_only: bool, query_parameters: QueryParameters,
|
|
schema: str | None = None) -> pd.DataFrame:
|
|
result: pd.DataFrame
|
|
|
|
with conn.begin():
|
|
self._logger.info("Starting transaction")
|
|
try:
|
|
if read_only:
|
|
self._set_transaction_readonly(conn)
|
|
if schema is not None:
|
|
self._set_schema(conn, schema)
|
|
result = self._extract_dataframe_no_safeguards(conn, query, query_parameters)
|
|
except Exception as e:
|
|
conn.rollback()
|
|
raise e
|
|
return result
|
|
|
|
def _extract_dataframe_no_safeguards(self, conn: sq.Connection, query: str,
|
|
query_parameter: QueryParameters) -> pd.DataFrame:
|
|
result: pd.DataFrame
|
|
start = time.time()
|
|
result = pd.read_sql(query, conn, params=query_parameter.query_parameters)
|
|
end = time.time()
|
|
self._logger.info(f'Query took {(end - start):.4f} seconds')
|
|
return result
|
|
|
|
def run_sql_file_export_to_file_multiple_schemas(self, municipalities: list[Municipality],
|
|
query_parameter: QueryParameters,
|
|
read_only=True):
|
|
self.run_sql_files_export_to_files_multiple_schemas(municipalities, [query_parameter], read_only)
|
|
|
|
def run_sql_file_multiple_statements(self, query_parameter: QueryParameters):
|
|
"""
|
|
Runs an SQL file, supports multiple statements, does not support plsql.
|
|
If any statements fail, throws an error and rolls back.
|
|
:param query_parameter: contains data about the queries to run and how to find them
|
|
:return: Nothing
|
|
"""
|
|
queries = query_parameter.get_queries()
|
|
self._logger.info(queries)
|
|
self._logger.info(f'Running {len(queries)} queries')
|
|
with self._engine.connect() as conn:
|
|
with conn.begin():
|
|
self._logger.info("Starting transaction")
|
|
try:
|
|
for index, query in enumerate(queries):
|
|
start = time.time()
|
|
conn.execute(sq.text(query), parameters=query_parameter.query_parameters)
|
|
end = time.time()
|
|
self._logger.info(
|
|
f'({index + 1} / {len(queries)}) Query took {(end - start):.4f} seconds ({query})')
|
|
conn.commit()
|
|
except Exception as e:
|
|
self._logger.info(f'Transaction rollback')
|
|
conn.rollback()
|
|
raise e
|
|
self._logger.info(f'Transaction commited')
|
|
|
|
def run_sql_files_export_to_files_multiple_schemas(self, municipalities: list[Municipality],
|
|
query_parameters: list[QueryParameters] = None,
|
|
read_only: bool = True):
|
|
""""
|
|
Runs the list of granted sql files against the list of municipalities
|
|
:param export_type: the type of files to export
|
|
:param municipalities: a list of municipalities
|
|
:param query_parameters: a list of sql files to run TODO: make this a pair list with sql file and translation for root_name and row_name to give the XML file the correct namespaces, consider using the stylesheet option from panda to xml https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_xml.html
|
|
:param read_only: if the transaction should be set too read-only to avoid changes to the database
|
|
:return: Nothing
|
|
"""
|
|
with self._engine.connect() as conn:
|
|
with conn.begin():
|
|
if read_only:
|
|
self._set_transaction_readonly(conn)
|
|
|
|
for municipality_index, municipality in enumerate(municipalities):
|
|
self._logger.info(
|
|
f'({municipality_index + 1}/{len(municipalities)}) Starting to process municipality {municipality.name} ({municipality.schema})')
|
|
self._set_schema(conn, municipality.schema)
|
|
file_prefix = f'{municipality.kommunekode}/'
|
|
|
|
for query_file_index, query_parameter in enumerate(query_parameters):
|
|
queries = query_parameter.get_queries()
|
|
self._logger.info(
|
|
f'({query_file_index + 1}/{len(municipalities)}) Starting to process query with title: {query_parameter.title}')
|
|
|
|
if not len(queries) != 1:
|
|
self._logger.error(f'Query file {query_parameter.title} failed due to multiple queries')
|
|
raise Exception(f'Query file {query_parameter.title} failed due to multiple queries')
|
|
|
|
query = queries[0]
|
|
dataframe = self._extract_dataframe_no_safeguards(conn, query, query_parameter)
|
|
self._logger.info(
|
|
f'[{municipality.kommunekode}][{query_parameter.title}][{len(dataframe.index)}]')
|
|
filename = self._generate_filename(f'{file_prefix}{query_parameter.title}')
|
|
|
|
self._export_to_file(query_parameter.export_type, filename, dataframe, query_parameter)
|
|
self._logger.info(f'Finished processing {query_parameter.title} generated file {filename}')
|