db_interactor/database/db_adapter.py

269 lines
13 KiB
Python

import logging
import os
import time
import pandas as pd
import sqlalchemy as sq
import sqlparse
from models.DatabaseConfig import DatabaseConfig
from models.DatabaseType import DatabaseType
from keepass.Keepass import KeePass
from models.ExportType import ExportType
from models.Municipality import Municipality
class DBAdapter:
_engine: sq.Engine
_database_config: DatabaseConfig
_has_tables_been_initialized: bool = False
_logger: logging.Logger
_output_folder: str = 'output/'
def __init__(self, keepass: KeePass, database_config: DatabaseConfig, logger: logging.Logger):
self._database_config = database_config
connection_string: str
keepass_entry = keepass.get_db_credentials()
self._logger = logger
match self._database_config.type:
case DatabaseType.PSQL:
connection_string = f'postgresql+pg8000://{keepass_entry.name}:{keepass_entry.password}@{self._database_config.host}/{self._database_config.name}'
case DatabaseType.ORCL:
connection_string = f'oracle+cx_oracle://{keepass_entry.name}:{keepass_entry.password}@{self._database_config.host}:{self._database_config.port}/{self._database_config.ssid}'
case DatabaseType.SQLITE:
connection_string = f'sqlite:///{self._database_config.host}'
case _:
raise Exception(f'Database type {database_config.type} is not supported')
logger.info(f'Initializing database {database_config.host}:{database_config.name}')
self._engine: sq.Engine = sq.create_engine(connection_string)
logger.info('Database initialized')
def _read_and_sql_file_and_strip_for_comments(self, filename: str) -> str:
query: str
self._logger.info(f'Reading file {filename}')
with open(filename, 'r') as f:
query = f.read()
self._logger.info(f'Stripping comments from file {filename}')
query = sqlparse.format(query, strip_comments=True)
return query
def _split_query_to_singular_queries(self, query) -> list[str]:
return sqlparse.split(query)
def _set_transaction_readonly(self, conn: sq.Connection):
self._logger.info('Setting transaction to readonly.')
if not conn.in_transaction():
raise Exception('Connection is not in a transaction')
match self._database_config.type:
case DatabaseType.PSQL | DatabaseType.ORCL:
conn.execute(sq.text('SET TRANSACTION READ ONLY'))
case _:
raise Exception(f'Database type {self.database_config.type} is not supported for readonly transactions')
def _set_schema(self, conn: sq.Connection, schema: str):
self._logger.info(f'Setting schema to "{schema}"')
if not conn.in_transaction():
raise Exception('Connection is not in a transaction')
match self._database_config.type:
case DatabaseType.ORCL:
conn.execute(sq.text(f"alter session set current_schema = {schema}"))
case DatabaseType.PSQL:
conn.execute(sq.text(f"set schema '{schema}'"))
case _:
raise Exception(f'Database type {self.database_config.type} is not supported for readonly transactions')
def _verify_singular_query(self, query: str):
self._logger.info(f'Verifying query')
if len(self._split_query_to_singular_queries(query)) > 1:
self._logger.critical(f'Multiple queries found for query: {query}')
raise Exception(f'Multiple queries found in {query}')
def _generate_filename(self, filename: str) -> str:
try:
os.mkdir(self._output_folder)
except FileExistsError:
pass
return f'{self._output_folder}{filename}'
def _export_to_file(self, export_type, output_file_name, result):
match export_type:
case ExportType.CSV:
output_file_name += '.csv'
result.to_csv(output_file_name, index=False, sep=';', encoding='utf-8')
case ExportType.EXCEL:
output_file_name += '.xlsx'
result.to_excel(output_file_name, index=False)
case ExportType.XML:
output_file_name += '.xml'
result.to_xml(output_file_name, index=False)
self._logger.info(f'Created file {output_file_name}')
def run_query(self, query: str, read_only=True) -> sq.CursorResult:
"""
Runs a single SQL query and returns the result as a CursorResult.
If more than one query, throws an error
:param query: the query to run
:param read_only: if the transaction is read-only
:return:
"""
self._verify_singular_query(query)
result: sq.CursorResult
self._logger.info(f'Running query: "{query}"')
with self._engine.connect() as conn:
with conn.begin():
self._logger.info("Starting transaction")
try:
if read_only:
self._set_transaction_readonly(conn)
start = time.time()
result = conn.execute(sq.text(query))
end = time.time()
self._logger.info(f'Query took {end - start} seconds')
conn.commit()
except Exception as e:
conn.rollback()
raise e
self._logger.info(f'Transaction commited')
return result
def run_sql_file_one_statement(self, filename: str = "query.sql", read_only=True) -> sq.CursorResult:
query = self._read_and_sql_file_and_strip_for_comments(filename)
return self.run_query(query, read_only)
def run_sql_file_export_to_file(self, schema: str | None = None, input_name: str = "query.sql",
output_name: str = "export", read_only=True, export_type=ExportType.CSV):
"""
Runs a single SQL query and creates a csv file with the given output name and resulting contents.
If more than one query, throws an error
:param export_type: the type of file to export
:param schema: the schema to use
:param input_name: the name of the sql file to use
:param output_name: the name of the csv file to create
:param read_only: if the transaction is read-only
:return:
"""
result: pd.DataFrame
query = self._read_and_sql_file_and_strip_for_comments(input_name)
self._logger.info(f'Running query: "{query}"')
with self._engine.connect() as conn:
if schema is not None:
self._set_schema(conn, schema)
result = self._extract_dataframe(conn, query, read_only)
self._export_to_file(export_type, self._generate_filename(output_name), result)
def _extract_dataframe(self, conn: sq.Connection, query: str, read_only: bool,
schema: str | None = None) -> pd.DataFrame:
result: pd.DataFrame
self._verify_singular_query(query)
with conn.begin():
self._logger.info("Starting transaction")
try:
if read_only:
self._set_transaction_readonly(conn)
if schema is not None:
self._set_schema(conn, schema)
result = self._extract_dataframe_no_safeguards(conn, query)
except Exception as e:
conn.rollback()
raise e
return result
def _extract_dataframe_no_safeguards(self, conn: sq.Connection, query: str) -> pd.DataFrame:
result: pd.DataFrame
start = time.time()
result = pd.read_sql(query, conn)
end = time.time()
self._logger.info(f'Query took {(end - start):.4f} seconds')
return result
def run_sql_file_export_to_file_multiple_schemas(self, municipalities: list[Municipality],
base_output_name: str = "", input_name: str = "query.sql",
read_only=True, export_type=ExportType.CSV):
query = self._read_and_sql_file_and_strip_for_comments(input_name)
self._logger.info(f'Running on {len(municipalities)} schemas')
self._logger.info(f'Running query: "{query}"')
with self._engine.connect() as conn:
for index, municipality in enumerate(municipalities):
self._logger.info(f'({index + 1}/{len(municipalities)}) running for municipality {municipality.name}')
result = self._extract_dataframe(conn, query, read_only, schema=municipality.schema)
output_file_name = self._generate_filename(f'{base_output_name}{municipality.name}')
self._export_to_file(export_type, output_file_name, result)
def run_sql_file_multiple_statements(self, filename: str = "query.sql", read_only=False):
"""
Runs an SQL file, supports multiple statements, does not support plsql.
If any statements fail, throws an error and rolls back.
:param filename: the name of the sql file to use
:param read_only: if the transaction is read-only
:return: Nothing
"""
raw_query = self._read_and_sql_file_and_strip_for_comments(filename)
queries = self._split_query_to_singular_queries(raw_query)
self._logger.info(queries)
self._logger.info(f'Running {len(queries)} queries')
with self._engine.connect() as conn:
with conn.begin():
self._logger.info("Starting transaction")
try:
if read_only:
self._set_transaction_readonly(conn)
for index, query in enumerate(queries):
start = time.time()
conn.execute(sq.text(query))
end = time.time()
self._logger.info(
f'({index + 1} / {len(queries)}) Query took {(end - start):.4f} seconds ({query})')
conn.commit()
except Exception as e:
self._logger.info(f'Transaction rollback')
conn.rollback()
raise e
self._logger.info(f'Transaction commited')
def run_sql_files_export_to_files_multiple_schemas(self, municipalities: list[Municipality],
input_querie_file_names: list[str] = None,
read_only: bool = True, export_type=ExportType.XML):
""""
Runs the list of granted sql files against the list of municipalities
:param export_type: the type of files to export
:param municipalities: a list of municipalities
:param input_querie_file_names: a list of sql files to run TODO: make this a pair list with sql file and translation for root_name and row_name to give the XML file the correct namespaces, consider using the stylesheet option from panda to xml https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_xml.html
:param read_only: if the transaction should be set too read-only to avoid changes to the database
:return: Nothing
"""
with self._engine.connect() as conn:
with conn.begin():
if read_only:
self._set_transaction_readonly(conn)
for municipality_index, municipality in enumerate(municipalities):
self._logger.info(
f'({municipality_index + 1}/{len(municipalities)}) Starting to process municipality {municipality.name} ({municipality.schema})')
self._set_schema(conn, municipality.schema)
file_prefix = f'{municipality.kommunekode}/'
for query_file_index, query_filename in enumerate(input_querie_file_names):
self._logger.info(
f'({query_file_index + 1}/{len(municipalities)}) Starting to process query file: {query_filename}')
raw_query = self._read_and_sql_file_and_strip_for_comments(query_filename)
if not self._verify_singular_query(raw_query):
self._logger.error(f'Query file {query_filename} failed due to multiple queries')
raise Exception(f'Query file {query_filename} failed due to multiple queries')
dataframe = self._extract_dataframe_no_safeguards(conn, raw_query)
self._logger.info(f'[{municipality.kommunekode}][{query_filename}][{len(dataframe.index)}]')
filename = self._generate_filename(f'{file_prefix}{query_filename}')
self._export_to_file(export_type, filename, dataframe)
self._logger.info(f'Finished processing {query_filename} generated file {filename}')