diff --git a/.idea/jupyter-settings.xml b/.idea/jupyter-settings.xml
index 38686d8..afdfb86 100644
--- a/.idea/jupyter-settings.xml
+++ b/.idea/jupyter-settings.xml
@@ -6,9 +6,6 @@
-
-
-
@@ -19,9 +16,6 @@
-
-
-
diff --git a/.idea/misc.xml b/.idea/misc.xml
index fa9a4d0..482855e 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -3,5 +3,5 @@
-
+
\ No newline at end of file
diff --git a/config.toml b/config.toml
index aa07fc0..756d8b8 100644
--- a/config.toml
+++ b/config.toml
@@ -6,6 +6,6 @@ DATABASE_SSID = "XE"
DATABASE_PORT = "5432"
[keepass]
-PATH = "/Users/frederik/Passwords.kdbx"
+PATH = "/home/frederik/Passwords.kdbx"
DB_CREDENTIALS_NAME = "dbuser"
-DB_CREDENTIALS_GROUP = ""
\ No newline at end of file
+DB_CREDENTIALS_GROUP = ""
diff --git a/database/QueryParameters.py b/database/QueryParameters.py
new file mode 100644
index 0000000..4718d77
--- /dev/null
+++ b/database/QueryParameters.py
@@ -0,0 +1,68 @@
+from dataclasses import dataclass
+
+import sqlparse
+
+from models.ExportType import ExportType
+
+
+@dataclass
+class QueryParameters:
+ _input_path: str = None
+ _query: str = None
+ _export_type: ExportType = None
+ _query_parameters: dict = None
+ _title: str = None
+
+
+ def __init__(self, title: str, input_path: str = None, query: str = None, export_type: ExportType = None, query_parameters: dict = None):
+ if input_path is None and query is None:
+ raise ValueError("Either input_path or query must be provided")
+
+ if input_path is not None and query is not None:
+ raise ValueError("Cannot have both input_path and query")
+
+ self._input_path = input_path
+ self._query = query
+ self._export_type = export_type
+ self._query_parameters = query_parameters
+ self._title = title
+
+ def _split_query_to_singular_queries(self, query: str) -> list[str]:
+ query = sqlparse.format(query, strip_comments=True)
+ return [query for query in sqlparse.split(query)]
+
+ def _read_and_sql_file_and_strip_for_comments(self, filename: str) -> list[str]:
+ query: str
+ with open(filename, 'r') as f:
+ query = f.read()
+
+ return self._split_query_to_singular_queries(query)
+
+ def get_queries(self) -> list[str]:
+ if self._input_path is not None:
+ return self._read_and_sql_file_and_strip_for_comments(self._input_path)
+
+ if self._query is not None:
+ return self._split_query_to_singular_queries(self._query)
+
+ raise Exception(f'No query provided')
+
+ @property
+ def query_parameters(self) -> dict[str, str]:
+ return self._query_parameters
+
+ @property
+ def output_path(self) -> str:
+ return self._output_path
+
+ @property
+ def export_type(self) -> ExportType:
+ return self._export_type
+
+ @property
+ def input_path(self) -> str:
+ return self._input_path
+
+ @property
+ def title(self) -> str:
+ return self._title
\ No newline at end of file
diff --git a/database/db_adapter.py b/database/db_adapter.py
index d490445..55e887d 100644
--- a/database/db_adapter.py
+++ b/database/db_adapter.py
@@ -6,8 +6,9 @@ import pandas as pd
import sqlalchemy as sq
import sqlparse
+from database.QueryParameters import QueryParameters
from models.DatabaseConfig import DatabaseConfig
-from models.DatabaseType import DatabaseType
+from models.DatabaseType import DatabaseType
from keepass.Keepass import KeePass
from models.ExportType import ExportType
from models.Municipality import Municipality
@@ -39,18 +40,6 @@ class DBAdapter:
self._engine: sq.Engine = sq.create_engine(connection_string)
logger.info('Database initialized')
- def _read_and_sql_file_and_strip_for_comments(self, filename: str) -> str:
- query: str
- self._logger.info(f'Reading file {filename}')
- with open(filename, 'r') as f:
- query = f.read()
- self._logger.info(f'Stripping comments from file {filename}')
- query = sqlparse.format(query, strip_comments=True)
- return query
-
- def _split_query_to_singular_queries(self, query) -> list[str]:
- return sqlparse.split(query)
-
def _set_transaction_readonly(self, conn: sq.Connection):
self._logger.info('Setting transaction to readonly.')
if not conn.in_transaction():
@@ -60,7 +49,8 @@ class DBAdapter:
case DatabaseType.PSQL | DatabaseType.ORCL:
conn.execute(sq.text('SET TRANSACTION READ ONLY'))
case _:
- raise Exception(f'Database type {self.database_config.type} is not supported for readonly transactions')
+ raise Exception(
+ f'Database type {self._database_config.type} is not supported for readonly transactions')
def _set_schema(self, conn: sq.Connection, schema: str):
self._logger.info(f'Setting schema to "{schema}"')
@@ -73,13 +63,8 @@ class DBAdapter:
case DatabaseType.PSQL:
conn.execute(sq.text(f"set schema '{schema}'"))
case _:
- raise Exception(f'Database type {self.database_config.type} is not supported for readonly transactions')
-
- def _verify_singular_query(self, query: str):
- self._logger.info(f'Verifying query')
- if len(self._split_query_to_singular_queries(query)) > 1:
- self._logger.critical(f'Multiple queries found for query: {query}')
- raise Exception(f'Multiple queries found in {query}')
+ raise Exception(
+ f'Database type {self._database_config.type} is not supported for readonly transactions')
def _generate_filename(self, filename: str) -> str:
try:
@@ -88,7 +73,7 @@ class DBAdapter:
pass
return f'{self._output_folder}{filename}'
- def _export_to_file(self, export_type, output_file_name, result):
+ def _export_to_file(self, export_type, output_file_name, result, query_parameter: QueryParameters = None):
match export_type:
case ExportType.CSV:
output_file_name += '.csv'
@@ -102,68 +87,9 @@ class DBAdapter:
self._logger.info(f'Created file {output_file_name}')
- def run_query(self, query: str, read_only=True) -> sq.CursorResult:
- """
- Runs a single SQL query and returns the result as a CursorResult.
- If more than one query, throws an error
- :param query: the query to run
- :param read_only: if the transaction is read-only
- :return:
- """
-
- self._verify_singular_query(query)
-
- result: sq.CursorResult
- self._logger.info(f'Running query: "{query}"')
- with self._engine.connect() as conn:
- with conn.begin():
- self._logger.info("Starting transaction")
- try:
- if read_only:
- self._set_transaction_readonly(conn)
- start = time.time()
- result = conn.execute(sq.text(query))
- end = time.time()
- self._logger.info(f'Query took {end - start} seconds')
- conn.commit()
- except Exception as e:
- conn.rollback()
- raise e
- self._logger.info(f'Transaction commited')
- return result
-
- def run_sql_file_one_statement(self, filename: str = "query.sql", read_only=True) -> sq.CursorResult:
- query = self._read_and_sql_file_and_strip_for_comments(filename)
- return self.run_query(query, read_only)
-
- def run_sql_file_export_to_file(self, schema: str | None = None, input_name: str = "query.sql",
- output_name: str = "export", read_only=True, export_type=ExportType.CSV):
- """
- Runs a single SQL query and creates a csv file with the given output name and resulting contents.
- If more than one query, throws an error
- :param export_type: the type of file to export
- :param schema: the schema to use
- :param input_name: the name of the sql file to use
- :param output_name: the name of the csv file to create
- :param read_only: if the transaction is read-only
- :return:
- """
- result: pd.DataFrame
-
- query = self._read_and_sql_file_and_strip_for_comments(input_name)
-
- self._logger.info(f'Running query: "{query}"')
-
- with self._engine.connect() as conn:
- if schema is not None:
- self._set_schema(conn, schema)
- result = self._extract_dataframe(conn, query, read_only)
- self._export_to_file(export_type, self._generate_filename(output_name), result)
-
- def _extract_dataframe(self, conn: sq.Connection, query: str, read_only: bool,
+ def _extract_dataframe(self, conn: sq.Connection, query: str, read_only: bool, query_parameters: QueryParameters,
schema: str | None = None) -> pd.DataFrame:
result: pd.DataFrame
- self._verify_singular_query(query)
with conn.begin():
self._logger.info("Starting transaction")
@@ -172,54 +98,43 @@ class DBAdapter:
self._set_transaction_readonly(conn)
if schema is not None:
self._set_schema(conn, schema)
- result = self._extract_dataframe_no_safeguards(conn, query)
+ result = self._extract_dataframe_no_safeguards(conn, query, query_parameters)
except Exception as e:
conn.rollback()
raise e
return result
- def _extract_dataframe_no_safeguards(self, conn: sq.Connection, query: str) -> pd.DataFrame:
+ def _extract_dataframe_no_safeguards(self, conn: sq.Connection, query: str,
+ query_parameter: QueryParameters) -> pd.DataFrame:
result: pd.DataFrame
start = time.time()
- result = pd.read_sql(query, conn)
+ result = pd.read_sql(query, conn, params=query_parameter.query_parameters)
end = time.time()
self._logger.info(f'Query took {(end - start):.4f} seconds')
return result
def run_sql_file_export_to_file_multiple_schemas(self, municipalities: list[Municipality],
- base_output_name: str = "", input_name: str = "query.sql",
- read_only=True, export_type=ExportType.CSV):
- query = self._read_and_sql_file_and_strip_for_comments(input_name)
- self._logger.info(f'Running on {len(municipalities)} schemas')
- self._logger.info(f'Running query: "{query}"')
- with self._engine.connect() as conn:
- for index, municipality in enumerate(municipalities):
- self._logger.info(f'({index + 1}/{len(municipalities)}) running for municipality {municipality.name}')
- result = self._extract_dataframe(conn, query, read_only, schema=municipality.schema)
- output_file_name = self._generate_filename(f'{base_output_name}{municipality.name}')
- self._export_to_file(export_type, output_file_name, result)
+ query_parameter: QueryParameters,
+ read_only=True):
+ self.run_sql_files_export_to_files_multiple_schemas(municipalities, [query_parameter], read_only)
- def run_sql_file_multiple_statements(self, filename: str = "query.sql", read_only=False):
+ def run_sql_file_multiple_statements(self, query_parameter: QueryParameters):
"""
Runs an SQL file, supports multiple statements, does not support plsql.
If any statements fail, throws an error and rolls back.
- :param filename: the name of the sql file to use
- :param read_only: if the transaction is read-only
+ :param query_parameter: contains data about the queries to run and how to find them
:return: Nothing
"""
- raw_query = self._read_and_sql_file_and_strip_for_comments(filename)
- queries = self._split_query_to_singular_queries(raw_query)
+ queries = query_parameter.get_queries()
self._logger.info(queries)
self._logger.info(f'Running {len(queries)} queries')
with self._engine.connect() as conn:
with conn.begin():
self._logger.info("Starting transaction")
try:
- if read_only:
- self._set_transaction_readonly(conn)
for index, query in enumerate(queries):
start = time.time()
- conn.execute(sq.text(query))
+ conn.execute(sq.text(query), parameters=query_parameter.query_parameters)
end = time.time()
self._logger.info(
f'({index + 1} / {len(queries)}) Query took {(end - start):.4f} seconds ({query})')
@@ -231,13 +146,13 @@ class DBAdapter:
self._logger.info(f'Transaction commited')
def run_sql_files_export_to_files_multiple_schemas(self, municipalities: list[Municipality],
- input_querie_file_names: list[str] = None,
- read_only: bool = True, export_type=ExportType.XML):
+ query_parameters: list[QueryParameters] = None,
+ read_only: bool = True):
""""
Runs the list of granted sql files against the list of municipalities
:param export_type: the type of files to export
:param municipalities: a list of municipalities
- :param input_querie_file_names: a list of sql files to run TODO: make this a pair list with sql file and translation for root_name and row_name to give the XML file the correct namespaces, consider using the stylesheet option from panda to xml https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_xml.html
+ :param query_parameters: a list of sql files to run TODO: make this a pair list with sql file and translation for root_name and row_name to give the XML file the correct namespaces, consider using the stylesheet option from panda to xml https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_xml.html
:param read_only: if the transaction should be set too read-only to avoid changes to the database
:return: Nothing
"""
@@ -252,17 +167,20 @@ class DBAdapter:
self._set_schema(conn, municipality.schema)
file_prefix = f'{municipality.kommunekode}/'
- for query_file_index, query_filename in enumerate(input_querie_file_names):
+ for query_file_index, query_parameter in enumerate(query_parameters):
+ queries = query_parameter.get_queries()
self._logger.info(
- f'({query_file_index + 1}/{len(municipalities)}) Starting to process query file: {query_filename}')
- raw_query = self._read_and_sql_file_and_strip_for_comments(query_filename)
+ f'({query_file_index + 1}/{len(municipalities)}) Starting to process query with title: {query_parameter.title}')
- if not self._verify_singular_query(raw_query):
- self._logger.error(f'Query file {query_filename} failed due to multiple queries')
- raise Exception(f'Query file {query_filename} failed due to multiple queries')
+ if not len(queries) != 1:
+ self._logger.error(f'Query file {query_parameter.title} failed due to multiple queries')
+ raise Exception(f'Query file {query_parameter.title} failed due to multiple queries')
- dataframe = self._extract_dataframe_no_safeguards(conn, raw_query)
- self._logger.info(f'[{municipality.kommunekode}][{query_filename}][{len(dataframe.index)}]')
- filename = self._generate_filename(f'{file_prefix}{query_filename}')
- self._export_to_file(export_type, filename, dataframe)
- self._logger.info(f'Finished processing {query_filename} generated file {filename}')
+ query = queries[0]
+ dataframe = self._extract_dataframe_no_safeguards(conn, query, query_parameter)
+ self._logger.info(
+ f'[{municipality.kommunekode}][{query_parameter.title}][{len(dataframe.index)}]')
+ filename = self._generate_filename(f'{file_prefix}{query_parameter.title}')
+
+ self._export_to_file(query_parameter.export_type, filename, dataframe, query_parameter)
+ self._logger.info(f'Finished processing {query_parameter.title} generated file {filename}')