From 6c305784581a665514bcef32dc523d896d5f8ade Mon Sep 17 00:00:00 2001 From: Frederik Jacobsen Date: Sun, 9 Mar 2025 19:18:56 +0100 Subject: [PATCH] Did some cleanup of the db_adapter.py --- .idea/jupyter-settings.xml | 6 -- .idea/misc.xml | 2 +- config.toml | 4 +- database/QueryParameters.py | 68 ++++++++++++++++ database/db_adapter.py | 154 +++++++++--------------------------- 5 files changed, 107 insertions(+), 127 deletions(-) create mode 100644 database/QueryParameters.py diff --git a/.idea/jupyter-settings.xml b/.idea/jupyter-settings.xml index 38686d8..afdfb86 100644 --- a/.idea/jupyter-settings.xml +++ b/.idea/jupyter-settings.xml @@ -6,9 +6,6 @@ @@ -19,9 +16,6 @@ diff --git a/.idea/misc.xml b/.idea/misc.xml index fa9a4d0..482855e 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -3,5 +3,5 @@ - + \ No newline at end of file diff --git a/config.toml b/config.toml index aa07fc0..756d8b8 100644 --- a/config.toml +++ b/config.toml @@ -6,6 +6,6 @@ DATABASE_SSID = "XE" DATABASE_PORT = "5432" [keepass] -PATH = "/Users/frederik/Passwords.kdbx" +PATH = "/home/frederik/Passwords.kdbx" DB_CREDENTIALS_NAME = "dbuser" -DB_CREDENTIALS_GROUP = "" \ No newline at end of file +DB_CREDENTIALS_GROUP = "" diff --git a/database/QueryParameters.py b/database/QueryParameters.py new file mode 100644 index 0000000..4718d77 --- /dev/null +++ b/database/QueryParameters.py @@ -0,0 +1,68 @@ +from dataclasses import dataclass + +import sqlparse + +from models.ExportType import ExportType + + +@dataclass +class QueryParameters: + _input_path: str = None + _query: str = None + _export_type: ExportType = None + _query_parameters: dict = None + _title: str = None + + + def __init__(self, title: str, input_path: str = None, query: str = None, export_type: ExportType = None, query_parameters: dict = None): + if input_path is None and query is None: + raise ValueError("Either input_path or query must be provided") + + if input_path is not None and query is not None: + raise ValueError("Cannot have both input_path and query") + + self._input_path = input_path + self._query = query + self._export_type = export_type + self._query_parameters = query_parameters + self._title = title + + def _split_query_to_singular_queries(self, query: str) -> list[str]: + query = sqlparse.format(query, strip_comments=True) + return [query for query in sqlparse.split(query)] + + def _read_and_sql_file_and_strip_for_comments(self, filename: str) -> list[str]: + query: str + with open(filename, 'r') as f: + query = f.read() + + return self._split_query_to_singular_queries(query) + + def get_queries(self) -> list[str]: + if self._input_path is not None: + return self._read_and_sql_file_and_strip_for_comments(self._input_path) + + if self._query is not None: + return self._split_query_to_singular_queries(self._query) + + raise Exception(f'No query provided') + + @property + def query_parameters(self) -> dict[str, str]: + return self._query_parameters + + @property + def output_path(self) -> str: + return self._output_path + + @property + def export_type(self) -> ExportType: + return self._export_type + + @property + def input_path(self) -> str: + return self._input_path + + @property + def title(self) -> str: + return self._title \ No newline at end of file diff --git a/database/db_adapter.py b/database/db_adapter.py index d490445..55e887d 100644 --- a/database/db_adapter.py +++ b/database/db_adapter.py @@ -6,8 +6,9 @@ import pandas as pd import sqlalchemy as sq import sqlparse +from database.QueryParameters import QueryParameters from models.DatabaseConfig import DatabaseConfig -from models.DatabaseType import DatabaseType +from models.DatabaseType import DatabaseType from keepass.Keepass import KeePass from models.ExportType import ExportType from models.Municipality import Municipality @@ -39,18 +40,6 @@ class DBAdapter: self._engine: sq.Engine = sq.create_engine(connection_string) logger.info('Database initialized') - def _read_and_sql_file_and_strip_for_comments(self, filename: str) -> str: - query: str - self._logger.info(f'Reading file {filename}') - with open(filename, 'r') as f: - query = f.read() - self._logger.info(f'Stripping comments from file {filename}') - query = sqlparse.format(query, strip_comments=True) - return query - - def _split_query_to_singular_queries(self, query) -> list[str]: - return sqlparse.split(query) - def _set_transaction_readonly(self, conn: sq.Connection): self._logger.info('Setting transaction to readonly.') if not conn.in_transaction(): @@ -60,7 +49,8 @@ class DBAdapter: case DatabaseType.PSQL | DatabaseType.ORCL: conn.execute(sq.text('SET TRANSACTION READ ONLY')) case _: - raise Exception(f'Database type {self.database_config.type} is not supported for readonly transactions') + raise Exception( + f'Database type {self._database_config.type} is not supported for readonly transactions') def _set_schema(self, conn: sq.Connection, schema: str): self._logger.info(f'Setting schema to "{schema}"') @@ -73,13 +63,8 @@ class DBAdapter: case DatabaseType.PSQL: conn.execute(sq.text(f"set schema '{schema}'")) case _: - raise Exception(f'Database type {self.database_config.type} is not supported for readonly transactions') - - def _verify_singular_query(self, query: str): - self._logger.info(f'Verifying query') - if len(self._split_query_to_singular_queries(query)) > 1: - self._logger.critical(f'Multiple queries found for query: {query}') - raise Exception(f'Multiple queries found in {query}') + raise Exception( + f'Database type {self._database_config.type} is not supported for readonly transactions') def _generate_filename(self, filename: str) -> str: try: @@ -88,7 +73,7 @@ class DBAdapter: pass return f'{self._output_folder}{filename}' - def _export_to_file(self, export_type, output_file_name, result): + def _export_to_file(self, export_type, output_file_name, result, query_parameter: QueryParameters = None): match export_type: case ExportType.CSV: output_file_name += '.csv' @@ -102,68 +87,9 @@ class DBAdapter: self._logger.info(f'Created file {output_file_name}') - def run_query(self, query: str, read_only=True) -> sq.CursorResult: - """ - Runs a single SQL query and returns the result as a CursorResult. - If more than one query, throws an error - :param query: the query to run - :param read_only: if the transaction is read-only - :return: - """ - - self._verify_singular_query(query) - - result: sq.CursorResult - self._logger.info(f'Running query: "{query}"') - with self._engine.connect() as conn: - with conn.begin(): - self._logger.info("Starting transaction") - try: - if read_only: - self._set_transaction_readonly(conn) - start = time.time() - result = conn.execute(sq.text(query)) - end = time.time() - self._logger.info(f'Query took {end - start} seconds') - conn.commit() - except Exception as e: - conn.rollback() - raise e - self._logger.info(f'Transaction commited') - return result - - def run_sql_file_one_statement(self, filename: str = "query.sql", read_only=True) -> sq.CursorResult: - query = self._read_and_sql_file_and_strip_for_comments(filename) - return self.run_query(query, read_only) - - def run_sql_file_export_to_file(self, schema: str | None = None, input_name: str = "query.sql", - output_name: str = "export", read_only=True, export_type=ExportType.CSV): - """ - Runs a single SQL query and creates a csv file with the given output name and resulting contents. - If more than one query, throws an error - :param export_type: the type of file to export - :param schema: the schema to use - :param input_name: the name of the sql file to use - :param output_name: the name of the csv file to create - :param read_only: if the transaction is read-only - :return: - """ - result: pd.DataFrame - - query = self._read_and_sql_file_and_strip_for_comments(input_name) - - self._logger.info(f'Running query: "{query}"') - - with self._engine.connect() as conn: - if schema is not None: - self._set_schema(conn, schema) - result = self._extract_dataframe(conn, query, read_only) - self._export_to_file(export_type, self._generate_filename(output_name), result) - - def _extract_dataframe(self, conn: sq.Connection, query: str, read_only: bool, + def _extract_dataframe(self, conn: sq.Connection, query: str, read_only: bool, query_parameters: QueryParameters, schema: str | None = None) -> pd.DataFrame: result: pd.DataFrame - self._verify_singular_query(query) with conn.begin(): self._logger.info("Starting transaction") @@ -172,54 +98,43 @@ class DBAdapter: self._set_transaction_readonly(conn) if schema is not None: self._set_schema(conn, schema) - result = self._extract_dataframe_no_safeguards(conn, query) + result = self._extract_dataframe_no_safeguards(conn, query, query_parameters) except Exception as e: conn.rollback() raise e return result - def _extract_dataframe_no_safeguards(self, conn: sq.Connection, query: str) -> pd.DataFrame: + def _extract_dataframe_no_safeguards(self, conn: sq.Connection, query: str, + query_parameter: QueryParameters) -> pd.DataFrame: result: pd.DataFrame start = time.time() - result = pd.read_sql(query, conn) + result = pd.read_sql(query, conn, params=query_parameter.query_parameters) end = time.time() self._logger.info(f'Query took {(end - start):.4f} seconds') return result def run_sql_file_export_to_file_multiple_schemas(self, municipalities: list[Municipality], - base_output_name: str = "", input_name: str = "query.sql", - read_only=True, export_type=ExportType.CSV): - query = self._read_and_sql_file_and_strip_for_comments(input_name) - self._logger.info(f'Running on {len(municipalities)} schemas') - self._logger.info(f'Running query: "{query}"') - with self._engine.connect() as conn: - for index, municipality in enumerate(municipalities): - self._logger.info(f'({index + 1}/{len(municipalities)}) running for municipality {municipality.name}') - result = self._extract_dataframe(conn, query, read_only, schema=municipality.schema) - output_file_name = self._generate_filename(f'{base_output_name}{municipality.name}') - self._export_to_file(export_type, output_file_name, result) + query_parameter: QueryParameters, + read_only=True): + self.run_sql_files_export_to_files_multiple_schemas(municipalities, [query_parameter], read_only) - def run_sql_file_multiple_statements(self, filename: str = "query.sql", read_only=False): + def run_sql_file_multiple_statements(self, query_parameter: QueryParameters): """ Runs an SQL file, supports multiple statements, does not support plsql. If any statements fail, throws an error and rolls back. - :param filename: the name of the sql file to use - :param read_only: if the transaction is read-only + :param query_parameter: contains data about the queries to run and how to find them :return: Nothing """ - raw_query = self._read_and_sql_file_and_strip_for_comments(filename) - queries = self._split_query_to_singular_queries(raw_query) + queries = query_parameter.get_queries() self._logger.info(queries) self._logger.info(f'Running {len(queries)} queries') with self._engine.connect() as conn: with conn.begin(): self._logger.info("Starting transaction") try: - if read_only: - self._set_transaction_readonly(conn) for index, query in enumerate(queries): start = time.time() - conn.execute(sq.text(query)) + conn.execute(sq.text(query), parameters=query_parameter.query_parameters) end = time.time() self._logger.info( f'({index + 1} / {len(queries)}) Query took {(end - start):.4f} seconds ({query})') @@ -231,13 +146,13 @@ class DBAdapter: self._logger.info(f'Transaction commited') def run_sql_files_export_to_files_multiple_schemas(self, municipalities: list[Municipality], - input_querie_file_names: list[str] = None, - read_only: bool = True, export_type=ExportType.XML): + query_parameters: list[QueryParameters] = None, + read_only: bool = True): """" Runs the list of granted sql files against the list of municipalities :param export_type: the type of files to export :param municipalities: a list of municipalities - :param input_querie_file_names: a list of sql files to run TODO: make this a pair list with sql file and translation for root_name and row_name to give the XML file the correct namespaces, consider using the stylesheet option from panda to xml https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_xml.html + :param query_parameters: a list of sql files to run TODO: make this a pair list with sql file and translation for root_name and row_name to give the XML file the correct namespaces, consider using the stylesheet option from panda to xml https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_xml.html :param read_only: if the transaction should be set too read-only to avoid changes to the database :return: Nothing """ @@ -252,17 +167,20 @@ class DBAdapter: self._set_schema(conn, municipality.schema) file_prefix = f'{municipality.kommunekode}/' - for query_file_index, query_filename in enumerate(input_querie_file_names): + for query_file_index, query_parameter in enumerate(query_parameters): + queries = query_parameter.get_queries() self._logger.info( - f'({query_file_index + 1}/{len(municipalities)}) Starting to process query file: {query_filename}') - raw_query = self._read_and_sql_file_and_strip_for_comments(query_filename) + f'({query_file_index + 1}/{len(municipalities)}) Starting to process query with title: {query_parameter.title}') - if not self._verify_singular_query(raw_query): - self._logger.error(f'Query file {query_filename} failed due to multiple queries') - raise Exception(f'Query file {query_filename} failed due to multiple queries') + if not len(queries) != 1: + self._logger.error(f'Query file {query_parameter.title} failed due to multiple queries') + raise Exception(f'Query file {query_parameter.title} failed due to multiple queries') - dataframe = self._extract_dataframe_no_safeguards(conn, raw_query) - self._logger.info(f'[{municipality.kommunekode}][{query_filename}][{len(dataframe.index)}]') - filename = self._generate_filename(f'{file_prefix}{query_filename}') - self._export_to_file(export_type, filename, dataframe) - self._logger.info(f'Finished processing {query_filename} generated file {filename}') + query = queries[0] + dataframe = self._extract_dataframe_no_safeguards(conn, query, query_parameter) + self._logger.info( + f'[{municipality.kommunekode}][{query_parameter.title}][{len(dataframe.index)}]') + filename = self._generate_filename(f'{file_prefix}{query_parameter.title}') + + self._export_to_file(query_parameter.export_type, filename, dataframe, query_parameter) + self._logger.info(f'Finished processing {query_parameter.title} generated file {filename}')