From 65d9dd809d6939bf1e8cd314cca5bc5bdc6acb59 Mon Sep 17 00:00:00 2001 From: Frederik Jacobsen Date: Sun, 15 Dec 2024 22:05:23 +0100 Subject: [PATCH] Added a primitive run multiple sql statements, that does NOT support plsql --- db_adapter.py | 63 ++++++++++++++++++++++++++++++++++++++++++++++++++- main.py | 4 +++- query.sql | 3 ++- 3 files changed, 67 insertions(+), 3 deletions(-) diff --git a/db_adapter.py b/db_adapter.py index 6fc44b3..33dc11d 100644 --- a/db_adapter.py +++ b/db_adapter.py @@ -33,7 +33,8 @@ class DBAdapter: self._engine: sq.Engine = sq.create_engine(connection_string) logger.info('Database initialized') - def _read_and_sql_file_and_strip_for_comments(self, filename: str): + + def _read_and_sql_file_and_strip_for_comments(self, filename: str) -> str: query: str self._logger.info(f'Reading file {filename}') with open(filename, 'r') as f: @@ -42,6 +43,9 @@ class DBAdapter: query = sqlparse.format(query, strip_comments=True) return query + def _split_query_to_singular_queries(self, query) -> list[str]: + return sqlparse.split(query) + def _set_transaction_readonly(self, conn: sq.Connection): self._logger.info('Setting transaction to readonly.') if not conn.in_transaction(): @@ -66,7 +70,23 @@ class DBAdapter: case _: raise Exception(f'Database type {self.database_config.type} is not supported for readonly transactions') + def _verify_singular_query(self, query: str): + self._logger.info(f'Verifying query: {query}') + if len(self._split_query_to_singular_queries(query)) > 1: + self._logger.critical(f'Multiple queries found for query: {query}') + raise Exception(f'Multiple queries found in {query}') + def run_query(self, query: str, read_only = True) -> sq.CursorResult: + """ + Runs a single SQL query and returns the result as a CursorResult. + If more than one query, throws an error + :param query: the query to run + :param read_only: if the transaction is read-only + :return: + """ + + self._verify_singular_query(query) + result: sq.CursorResult self._logger.info(f'Running query: "{query}"') with self._engine.connect() as conn: @@ -91,6 +111,15 @@ class DBAdapter: return self.run_query(query, read_only) def run_sql_file_export_to_csv(self, schema: str | None = None, input_name: str = "query.sql", output_name: str = "export.csv", read_only = True): + """ + Runs a single SQL query and creates a csv file with the given output name and resulting contents. + If more than one query, throws an error + :param schema: the schema to use + :param input_name: the name of the sql file to use + :param output_name: the name of the csv file to create + :param read_only: if the transaction is read-only + :return: + """ result: pd.DataFrame query = self._read_and_sql_file_and_strip_for_comments(input_name) @@ -105,6 +134,8 @@ class DBAdapter: def _extract_dataframe(self, conn: sq.Connection, query: str, read_only: bool, schema: str | None = None) -> pd.DataFrame: result: pd.DataFrame + self._verify_singular_query(query) + with conn.begin(): self._logger.info("Starting transaction") try: @@ -133,3 +164,33 @@ class DBAdapter: output_file_name = f'{base_output_name}{municipality.name}.csv' result.to_csv(output_file_name, index=False, sep=';') self._logger.info(f'Created file {output_file_name}') + + def run_sql_file_multiple_statements(self, filename: str = "query.sql", read_only = False): + """ + Runs an SQL file, supports multiple statements, does not support plsql. + If any statements fail, throws an error and rolls back. + :param filename: the name of the sql file to use + :param read_only: if the transaction is read-only + :return: Nothing + """ + raw_query = self._read_and_sql_file_and_strip_for_comments(filename) + queries = self._split_query_to_singular_queries(raw_query) + self._logger.info(queries) + self._logger.info(f'Running {len(queries)} queries') + with self._engine.connect() as conn: + with conn.begin(): + self._logger.info("Starting transaction") + try: + if read_only: + self._set_transaction_readonly(conn) + for index, query in enumerate(queries): + start = time.time() + conn.execute(sq.text(query)) + end = time.time() + self._logger.info(f'({index + 1} / {len(queries)}) Query took {(end - start):.4f} seconds ({query})') + conn.commit() + except Exception as e: + self._logger.info(f'Transaction rollback') + conn.rollback() + raise e + self._logger.info(f'Transaction commited') diff --git a/main.py b/main.py index d29900e..51b6faf 100644 --- a/main.py +++ b/main.py @@ -11,4 +11,6 @@ config = Config() keepass = KeePass(config.kee_pass, logger) db_adapter = DBAdapter(keepass, config.database, logger) -db_adapter.run_sql_file_export_to_csv_multiple_schemas([Municipality('test', '123456789', 'op_test', '123')]) \ No newline at end of file +#db_adapter.run_sql_file_export_to_csv_multiple_schemas([Municipality('test', '123456789', 'op_test', '123')]) + +db_adapter.run_sql_file_multiple_statements() \ No newline at end of file diff --git a/query.sql b/query.sql index 64b92e3..69fbc56 100644 --- a/query.sql +++ b/query.sql @@ -1 +1,2 @@ -SELECT * FROM public.simple_healthcheck_counters shc \ No newline at end of file +SELECT * FROM public.simple_healthcheck_counters shc; +SELECT * FROM public.simple_healthcheck_counters shc; \ No newline at end of file