import logging import sqlalchemy as sq import sqlparse import pandas as pd from config import DatabaseConfig, DatabaseType from keepass import KeePass from models import Municipality import time class DBAdapter: _engine: sq.Engine _database_config: DatabaseConfig _has_tables_been_initialized: bool = False _logger: logging.Logger def __init__(self, keepass: KeePass, database_config: DatabaseConfig, logger: logging.Logger): self._database_config = database_config connection_string: str keepass_entry = keepass.get_db_credentials() self._logger = logger match self._database_config.type: case DatabaseType.PSQL: connection_string = f'postgresql+pg8000://{keepass_entry.name}:{keepass_entry.password}@{self._database_config.host}/{self._database_config.name}' case DatabaseType.ORCL: connection_string = f'oracle:{keepass_entry.name}:{keepass_entry.password}@{self._database_config.host}:{self._database_config.port}:{self._database_config.ssid}' case DatabaseType.SQLITE: connection_string = f'sqlite:///{self._database_config.host}' case _: raise Exception(f'Database type {database_config.type} is not supported') logger.info(f'Initializing database {database_config.host}:{database_config.name}') self._engine: sq.Engine = sq.create_engine(connection_string) logger.info('Database initialized') def _read_and_sql_file_and_strip_for_comments(self, filename: str) -> str: query: str self._logger.info(f'Reading file {filename}') with open(filename, 'r') as f: query = f.read() self._logger.info(f'Stripping comments from file {filename}') query = sqlparse.format(query, strip_comments=True) return query def _split_query_to_singular_queries(self, query) -> list[str]: return sqlparse.split(query) def _set_transaction_readonly(self, conn: sq.Connection): self._logger.info('Setting transaction to readonly.') if not conn.in_transaction(): raise Exception('Connection is not in a transaction') match self._database_config.type: case DatabaseType.PSQL | DatabaseType.ORCL: conn.execute(sq.text('SET TRANSACTION READ ONLY')) case _: raise Exception(f'Database type {self.database_config.type} is not supported for readonly transactions') def _set_schema(self, conn: sq.Connection, schema: str): self._logger.info(f'Setting schema to "{schema}"') if not conn.in_transaction(): raise Exception('Connection is not in a transaction') match self._database_config.type: case DatabaseType.ORCL: conn.execute(sq.text(f"alter session set current_schema = '{schema}'")) case DatabaseType.PSQL: conn.execute(sq.text(f"set schema '{schema}'")) case _: raise Exception(f'Database type {self.database_config.type} is not supported for readonly transactions') def _verify_singular_query(self, query: str): self._logger.info(f'Verifying query: {query}') if len(self._split_query_to_singular_queries(query)) > 1: self._logger.critical(f'Multiple queries found for query: {query}') raise Exception(f'Multiple queries found in {query}') def run_query(self, query: str, read_only = True) -> sq.CursorResult: """ Runs a single SQL query and returns the result as a CursorResult. If more than one query, throws an error :param query: the query to run :param read_only: if the transaction is read-only :return: """ self._verify_singular_query(query) result: sq.CursorResult self._logger.info(f'Running query: "{query}"') with self._engine.connect() as conn: with conn.begin(): self._logger.info("Starting transaction") try: if read_only: self._set_transaction_readonly(conn) start = time.time() result = conn.execute(sq.text(query)) end = time.time() self._logger.info(f'Query took {end - start} seconds') conn.commit() except Exception as e: conn.rollback() raise e self._logger.info(f'Transaction commited') return result def run_sql_file_one_statement(self, filename: str = "query.sql", read_only = True) -> sq.CursorResult: query = self._read_and_sql_file_and_strip_for_comments(filename) return self.run_query(query, read_only) def run_sql_file_export_to_csv(self, schema: str | None = None, input_name: str = "query.sql", output_name: str = "export.csv", read_only = True): """ Runs a single SQL query and creates a csv file with the given output name and resulting contents. If more than one query, throws an error :param schema: the schema to use :param input_name: the name of the sql file to use :param output_name: the name of the csv file to create :param read_only: if the transaction is read-only :return: """ result: pd.DataFrame query = self._read_and_sql_file_and_strip_for_comments(input_name) self._logger.info(f'Running query: "{query}"') with self._engine.connect() as conn: if schema is not None: self._set_schema(conn, schema) result = self._extract_dataframe(conn, query, read_only) result.to_csv(output_name, index=False, sep=';') def _extract_dataframe(self, conn: sq.Connection, query: str, read_only: bool, schema: str | None = None) -> pd.DataFrame: result: pd.DataFrame self._verify_singular_query(query) with conn.begin(): self._logger.info("Starting transaction") try: if read_only: self._set_transaction_readonly(conn) if schema is not None: self._set_schema(conn, schema) start = time.time() result = pd.read_sql(query, conn) end = time.time() self._logger.info(f'Query took {(end - start):.4f} seconds') conn.commit() except Exception as e: conn.rollback() raise e return result def run_sql_file_export_to_csv_multiple_schemas(self, municipalities: list[Municipality], base_output_name: str = "", input_name: str = "query.sql", read_only = True): query = self._read_and_sql_file_and_strip_for_comments(input_name) self._logger.info(f'Running on {len(municipalities)}') self._logger.info(f'Running query: "{query}"') with self._engine.connect() as conn: for index, municipality in enumerate(municipalities): self._logger.info(f'({index + 1}/{len(municipalities)}) running for municipality {municipality.name}') result = self._extract_dataframe(conn, query, read_only, schema = municipality.schema) output_file_name = f'{base_output_name}{municipality.name}.csv' result.to_csv(output_file_name, index=False, sep=';') self._logger.info(f'Created file {output_file_name}') def run_sql_file_multiple_statements(self, filename: str = "query.sql", read_only = False): """ Runs an SQL file, supports multiple statements, does not support plsql. If any statements fail, throws an error and rolls back. :param filename: the name of the sql file to use :param read_only: if the transaction is read-only :return: Nothing """ raw_query = self._read_and_sql_file_and_strip_for_comments(filename) queries = self._split_query_to_singular_queries(raw_query) self._logger.info(queries) self._logger.info(f'Running {len(queries)} queries') with self._engine.connect() as conn: with conn.begin(): self._logger.info("Starting transaction") try: if read_only: self._set_transaction_readonly(conn) for index, query in enumerate(queries): start = time.time() conn.execute(sq.text(query)) end = time.time() self._logger.info(f'({index + 1} / {len(queries)}) Query took {(end - start):.4f} seconds ({query})') conn.commit() except Exception as e: self._logger.info(f'Transaction rollback') conn.rollback() raise e self._logger.info(f'Transaction commited')