import logging import os import sqlalchemy as sq import sqlparse import pandas as pd from config import DatabaseConfig, DatabaseType from keepass import KeePass from models import Municipality, ExportType import time class DBAdapter: _engine: sq.Engine _database_config: DatabaseConfig _has_tables_been_initialized: bool = False _logger: logging.Logger _output_folder: str = 'output/' def __init__(self, keepass: KeePass, database_config: DatabaseConfig, logger: logging.Logger): self._database_config = database_config connection_string: str keepass_entry = keepass.get_db_credentials() self._logger = logger match self._database_config.type: case DatabaseType.PSQL: connection_string = f'postgresql+pg8000://{keepass_entry.name}:{keepass_entry.password}@{self._database_config.host}/{self._database_config.name}' case DatabaseType.ORCL: connection_string = f'oracle+cx_oracle://{keepass_entry.name}:{keepass_entry.password}@{self._database_config.host}:{self._database_config.port}/{self._database_config.ssid}' case DatabaseType.SQLITE: connection_string = f'sqlite:///{self._database_config.host}' case _: raise Exception(f'Database type {database_config.type} is not supported') logger.info(f'Initializing database {database_config.host}:{database_config.name}') self._engine: sq.Engine = sq.create_engine(connection_string) logger.info('Database initialized') def _read_and_sql_file_and_strip_for_comments(self, filename: str) -> str: query: str self._logger.info(f'Reading file {filename}') with open(filename, 'r') as f: query = f.read() self._logger.info(f'Stripping comments from file {filename}') query = sqlparse.format(query, strip_comments=True) return query def _split_query_to_singular_queries(self, query) -> list[str]: return sqlparse.split(query) def _set_transaction_readonly(self, conn: sq.Connection): self._logger.info('Setting transaction to readonly.') if not conn.in_transaction(): raise Exception('Connection is not in a transaction') match self._database_config.type: case DatabaseType.PSQL | DatabaseType.ORCL: conn.execute(sq.text('SET TRANSACTION READ ONLY')) case _: raise Exception(f'Database type {self.database_config.type} is not supported for readonly transactions') def _set_schema(self, conn: sq.Connection, schema: str): self._logger.info(f'Setting schema to "{schema}"') if not conn.in_transaction(): raise Exception('Connection is not in a transaction') match self._database_config.type: case DatabaseType.ORCL: conn.execute(sq.text(f"alter session set current_schema = {schema}")) case DatabaseType.PSQL: conn.execute(sq.text(f"set schema '{schema}'")) case _: raise Exception(f'Database type {self.database_config.type} is not supported for readonly transactions') def _verify_singular_query(self, query: str): self._logger.info(f'Verifying query') if len(self._split_query_to_singular_queries(query)) > 1: self._logger.critical(f'Multiple queries found for query: {query}') raise Exception(f'Multiple queries found in {query}') def _generate_filename(self, filename: str) -> str: try: os.mkdir(self._output_folder) except FileExistsError: pass return f'{self._output_folder}{filename}' def _export_to_file(self, export_type, output_file_name, result): match export_type: case ExportType.CSV: output_file_name += '.csv' result.to_csv(output_file_name, index=False, sep=';', encoding='utf-8') case ExportType.EXCEL: output_file_name += '.xlsx' result.to_excel(output_file_name, index=False) case ExportType.XML: output_file_name += '.xml' result.to_xml(output_file_name, index=False) self._logger.info(f'Created file {output_file_name}') def run_query(self, query: str, read_only = True) -> sq.CursorResult: """ Runs a single SQL query and returns the result as a CursorResult. If more than one query, throws an error :param query: the query to run :param read_only: if the transaction is read-only :return: """ self._verify_singular_query(query) result: sq.CursorResult self._logger.info(f'Running query: "{query}"') with self._engine.connect() as conn: with conn.begin(): self._logger.info("Starting transaction") try: if read_only: self._set_transaction_readonly(conn) start = time.time() result = conn.execute(sq.text(query)) end = time.time() self._logger.info(f'Query took {end - start} seconds') conn.commit() except Exception as e: conn.rollback() raise e self._logger.info(f'Transaction commited') return result def run_sql_file_one_statement(self, filename: str = "query.sql", read_only = True) -> sq.CursorResult: query = self._read_and_sql_file_and_strip_for_comments(filename) return self.run_query(query, read_only) def run_sql_file_export_to_file(self, schema: str | None = None, input_name: str = "query.sql", output_name: str = "export", read_only = True, export_type = ExportType.CSV): """ Runs a single SQL query and creates a csv file with the given output name and resulting contents. If more than one query, throws an error :param export_type: the type of file to export :param schema: the schema to use :param input_name: the name of the sql file to use :param output_name: the name of the csv file to create :param read_only: if the transaction is read-only :return: """ result: pd.DataFrame query = self._read_and_sql_file_and_strip_for_comments(input_name) self._logger.info(f'Running query: "{query}"') with self._engine.connect() as conn: if schema is not None: self._set_schema(conn, schema) result = self._extract_dataframe(conn, query, read_only) self._export_to_file(export_type, self._generate_filename(output_name), result) def _extract_dataframe(self, conn: sq.Connection, query: str, read_only: bool, schema: str | None = None) -> pd.DataFrame: result: pd.DataFrame self._verify_singular_query(query) with conn.begin(): self._logger.info("Starting transaction") try: if read_only: self._set_transaction_readonly(conn) if schema is not None: self._set_schema(conn, schema) result = self._extract_dataframe_no_safeguards(conn, query) except Exception as e: conn.rollback() raise e return result def _extract_dataframe_no_safeguards(self, conn: sq.Connection, query: str) -> pd.DataFrame: result: pd.DataFrame start = time.time() result = pd.read_sql(query, conn) end = time.time() self._logger.info(f'Query took {(end - start):.4f} seconds') return result def run_sql_file_export_to_file_multiple_schemas(self, municipalities: list[Municipality], base_output_name: str = "", input_name: str = "query.sql", read_only = True, export_type = ExportType.CSV): query = self._read_and_sql_file_and_strip_for_comments(input_name) self._logger.info(f'Running on {len(municipalities)} schemas') self._logger.info(f'Running query: "{query}"') with self._engine.connect() as conn: for index, municipality in enumerate(municipalities): self._logger.info(f'({index + 1}/{len(municipalities)}) running for municipality {municipality.name}') result = self._extract_dataframe(conn, query, read_only, schema = municipality.schema) output_file_name = self._generate_filename(f'{base_output_name}{municipality.name}') self._export_to_file(export_type, output_file_name, result) def run_sql_file_multiple_statements(self, filename: str = "query.sql", read_only = False): """ Runs an SQL file, supports multiple statements, does not support plsql. If any statements fail, throws an error and rolls back. :param filename: the name of the sql file to use :param read_only: if the transaction is read-only :return: Nothing """ raw_query = self._read_and_sql_file_and_strip_for_comments(filename) queries = self._split_query_to_singular_queries(raw_query) self._logger.info(queries) self._logger.info(f'Running {len(queries)} queries') with self._engine.connect() as conn: with conn.begin(): self._logger.info("Starting transaction") try: if read_only: self._set_transaction_readonly(conn) for index, query in enumerate(queries): start = time.time() conn.execute(sq.text(query)) end = time.time() self._logger.info(f'({index + 1} / {len(queries)}) Query took {(end - start):.4f} seconds ({query})') conn.commit() except Exception as e: self._logger.info(f'Transaction rollback') conn.rollback() raise e self._logger.info(f'Transaction commited') def run_sql_files_export_to_files_multiple_schemas(self, municipalities: list[Municipality], input_querie_file_names: list[str] = None, read_only: bool = True, export_type = ExportType.XML): """" Runs the list of granted sql files against the list of municipalities :param export_type: the type of files to export :param municipalities: a list of municipalities :param input_querie_file_names: a list of sql files to run TODO: make this a pair list with sql file and translation for root_name and row_name to give the XML file the correct namespaces, consider using the stylesheet option from panda to xml https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_xml.html :param read_only: if the transaction should be set too read-only to avoid changes to the database :return: Nothing """ with self._engine.connect() as conn: with conn.begin(): if read_only: self._set_transaction_readonly(conn) for municipality_index, municipality in enumerate(municipalities): self._logger.info(f'({municipality_index + 1}/{len(municipalities)}) Starting to process municipality {municipality.name} ({municipality.schema})') self._set_schema(conn, municipality.schema) file_prefix = f'{municipality.kommunekode}/' for query_file_index, query_filename in enumerate(input_querie_file_names): self._logger.info(f'({query_file_index + 1}/{len(municipalities)}) Starting to process query file: {query_filename}') raw_query = self._read_and_sql_file_and_strip_for_comments(query_filename) if not self._verify_singular_query(raw_query): self._logger.error(f'Query file {query_filename} failed due to multiple queries') raise Exception(f'Query file {query_filename} failed due to multiple queries') dataframe = self._extract_dataframe_no_safeguards(conn, raw_query) self._logger.info(f'[{municipality.kommunekode}][{query_filename}][{len(dataframe.index)}]') filename = self._generate_filename(f'{file_prefix}{query_filename}') self._export_to_file(export_type, filename, dataframe) self._logger.info(f'Finished processing {query_filename} generated file {filename}')