Added a way to run the same query on multiple schemas and logging.

Need to work on running update queries, that combine plsql and normal sql statements and potentially multiple extracts for one sql file, to make multiple files, and probably to also export to xlsx file format.
This commit is contained in:
Frederik Jacobsen 2024-12-12 21:50:37 +01:00
parent 3ff2db97b6
commit 8f736f58f3
8 changed files with 224 additions and 37 deletions

3
.gitignore vendored
View File

@ -8,4 +8,5 @@ wheels/
# Virtual environments
.venv
*.csv
*.csv
local.db

7
.idea/dataSources.xml generated
View File

@ -8,5 +8,12 @@
<jdbc-url>jdbc:postgresql://192.168.1.44:5432/postgres</jdbc-url>
<working-dir>$ProjectFileDir$</working-dir>
</data-source>
<data-source source="LOCAL" name="local.db" uuid="e9e62481-d705-4a08-8b68-06e3d11d1a93">
<driver-ref>sqlite.xerial</driver-ref>
<synchronize>true</synchronize>
<jdbc-driver>org.sqlite.JDBC</jdbc-driver>
<jdbc-url>jdbc:sqlite:local.db</jdbc-url>
<working-dir>$ProjectFileDir$</working-dir>
</data-source>
</component>
</project>

View File

@ -1,3 +1,4 @@
import logging
from enum import Enum
import toml
@ -7,6 +8,7 @@ from dataclasses import dataclass
class DatabaseType(Enum):
PSQL = 1
ORCL = 2
SQLITE = 3
@dataclass
class DatabaseConfig:
@ -25,6 +27,8 @@ class DatabaseConfig:
self._database_type = DatabaseType.PSQL
case 'ORCL':
self._database_type = DatabaseType.ORCL
case 'SQLITE':
self._database_type = DatabaseType.SQLITE
case _:
self._database_type = None
@ -32,6 +36,19 @@ class DatabaseConfig:
self._database_ssid = config["DATABASE_SSID"]
self._database_port = config["DATABASE_PORT"]
@classmethod
def create(cls, database_name: str, database_type: DatabaseType, database_ssid: str, database_port: str, host: str):
config = {
"HOST": host,
"DATABASE_TYPE": database_type,
"DATABASE_NAME": database_name,
"DATABASE_SSID": database_ssid,
"DATABASE_PORT": database_port,
}
obj = cls.__new__(cls)
cls.__init__(obj, config)
return obj
@property
def host(self) -> str:
return self._host

View File

@ -1,78 +1,135 @@
import logging
import sqlalchemy as sq
import sqlparse
import pandas as pd
from config import DatabaseConfig, DatabaseType
from keepass import KeePass
def _read_and_sql_file_and_strip_for_comments(filename: str):
query: str
with open(filename, 'r') as f:
query = f.read()
query = sqlparse.format(query, strip_comments=True)
return query
from models import Municipality
import time
class DBAdapter:
_engine: sq.Engine
_database_config: DatabaseConfig
def __init__(self, keepass: KeePass, database_config: DatabaseConfig):
_has_tables_been_initialized: bool = False
_logger: logging.Logger
def __init__(self, keepass: KeePass, database_config: DatabaseConfig, logger: logging.Logger):
self._database_config = database_config
connection_string: str
keepass_entry = keepass.get_db_credentials()
self._logger = logger
match self._database_config.type:
case DatabaseType.PSQL:
connection_string = f'postgresql+pg8000://{keepass_entry.name}:{keepass_entry.password}@{self._database_config.host}/{self._database_config.name}'
case DatabaseType.ORCL:
connection_string = f'oracle:{keepass_entry.name}:{keepass_entry.password}@{self._database_config.host}:{self._database_config.port}:{self._database_config.ssid}'
case DatabaseType.SQLITE:
connection_string = f'sqlite:///{self._database_config.host}'
case _:
raise Exception(f'Database type {database_config.type} is not supported')
logger.info(f'Initializing database {database_config.host}:{database_config.name}')
self._engine: sq.Engine = sq.create_engine(connection_string)
logger.info('Database initialized')
def _read_and_sql_file_and_strip_for_comments(self, filename: str):
query: str
self._logger.info(f'Reading file {filename}')
with open(filename, 'r') as f:
query = f.read()
self._logger.info(f'Stripping comments from file {filename}')
query = sqlparse.format(query, strip_comments=True)
return query
def _set_transaction_readonly(self, conn: sq.Connection):
self._logger.info('Setting transaction to readonly.')
if not conn.in_transaction():
raise Exception('Connection is not in a transaction')
match self._database_config.type:
case DatabaseType.PSQL:
conn.execute(sq.text('SET TRANSACTION READ ONLY'))
case DatabaseType.ORCL:
case DatabaseType.PSQL | DatabaseType.ORCL:
conn.execute(sq.text('SET TRANSACTION READ ONLY'))
case _:
raise Exception(f'Database type {self.database_config.type} is not supported for readonly transactions')
def _set_schema(self, conn: sq.Connection, schema: str):
self._logger.info(f'Setting schema to "{schema}"')
if not conn.in_transaction():
raise Exception('Connection is not in a transaction')
match self._database_config.type:
case DatabaseType.ORCL:
conn.execute(sq.text(f"alter session set current_schema = '{schema}'"))
case DatabaseType.PSQL:
conn.execute(sq.text(f"set schema '{schema}'"))
case _:
raise Exception(f'Database type {self.database_config.type} is not supported for readonly transactions')
def run_query(self, query: str, read_only = True) -> sq.CursorResult:
result: sq.CursorResult
self._logger.info(f'Running query: "{query}"')
with self._engine.connect() as conn:
with conn.begin():
self._logger.info("Starting transaction")
try:
if read_only:
self._set_transaction_readonly(conn)
start = time.time()
result = conn.execute(sq.text(query))
end = time.time()
self._logger.info(f'Query took {end - start} seconds')
conn.commit()
except Exception as e:
conn.rollback()
raise e
self._logger.info(f'Transaction commited')
return result
def run_sql_file_one_statement(self, filename: str = "query.sql", read_only = True) -> sq.CursorResult:
query = _read_and_sql_file_and_strip_for_comments(filename)
query = self._read_and_sql_file_and_strip_for_comments(filename)
return self.run_query(query, read_only)
def run_sql_file_export_to_csv(self, input_name: str = "query.sql", output_name: str = "export.csv", read_only = True):
def run_sql_file_export_to_csv(self, schema: str | None = None, input_name: str = "query.sql", output_name: str = "export.csv", read_only = True):
result: pd.DataFrame
query = _read_and_sql_file_and_strip_for_comments(input_name)
query = self._read_and_sql_file_and_strip_for_comments(input_name)
self._logger.info(f'Running query: "{query}"')
with self._engine.connect() as conn:
with conn.begin():
try:
if read_only:
self._set_transaction_readonly(conn)
result = pd.read_sql(query, conn)
conn.commit()
except Exception as e:
conn.rollback()
raise e
if schema is not None:
self._set_schema(conn, schema)
result = self._extract_dataframe(conn, query, read_only)
result.to_csv(output_name, index=False, sep=';')
def _extract_dataframe(self, conn: sq.Connection, query: str, read_only: bool, schema: str | None = None) -> pd.DataFrame:
result: pd.DataFrame
with conn.begin():
self._logger.info("Starting transaction")
try:
if read_only:
self._set_transaction_readonly(conn)
if schema is not None:
self._set_schema(conn, schema)
start = time.time()
result = pd.read_sql(query, conn)
end = time.time()
self._logger.info(f'Query took {(end - start):.4f} seconds')
conn.commit()
except Exception as e:
conn.rollback()
raise e
return result
def run_sql_file_export_to_csv_multiple_schemas(self, municipalities: list[Municipality], base_output_name: str = "", input_name: str = "query.sql", read_only = True):
query = self._read_and_sql_file_and_strip_for_comments(input_name)
self._logger.info(f'Running on {len(municipalities)}')
self._logger.info(f'Running query: "{query}"')
with self._engine.connect() as conn:
for index, municipality in enumerate(municipalities):
self._logger.info(f'({index}/{len(municipalities)} running for municipality {municipality.name})')
result = self._extract_dataframe(conn, query, read_only, schema = municipality.schema)
output_file_name = f'{base_output_name}{municipality.name}.csv'
result.to_csv(output_file_name, index=False, sep=';')
self._logger.info(f'Created file {output_file_name}')

View File

@ -1,3 +1,5 @@
import logging
from pykeepass import PyKeePass
from config import KeePassConfig
import getpass
@ -20,22 +22,32 @@ class KeePassEntry:
class KeePass:
def __init__(self, config: KeePassConfig):
def __init__(self, config: KeePassConfig, logger: logging.Logger):
self._logger: logging.Logger = logger
self._kee_pass_config: KeePassConfig = config
self._password: str = getpass.getpass(f'KeePass password for {config.path}: ')
self._logger.info(f'Initializing connection to keepass file {config.path}')
self._kee_pass: PyKeePass = PyKeePass(config.path, password=self._password)
self._logger.info(f'Successfully connected to keepass')
def get_db_credentials(self) -> KeePassEntry:
self._logger.info(f'Searching for database credentials on credential: {self._kee_pass_config.db_credentials_name}')
group = self._kee_pass
if self._kee_pass_config.db_credentials_group.strip() != "" and self._kee_pass_config.db_credentials_group.strip() is not None:
if self._kee_pass_config.db_credentials_group.strip() is not None and self._kee_pass_config.db_credentials_group.strip() != "":
self._logger.info(f'Searching in group {self._kee_pass_config.db_credentials_group}')
group = self._kee_pass.find_entries(name=self._kee_pass_config.db_credentials_name)
else:
self._logger.info('Searching in root')
group = group.find_entries(title=self._kee_pass_config.db_credentials_name)
if group is None:
self._logger.critical(f'Group {self._kee_pass_config.db_credentials_group} not found')
raise Exception(f'Group {self._kee_pass_config.db_credentials_group} not found')
if len(group) != 1:
self._logger.critical(f'Could not find password, found {len(group)} entries')
raise Exception(f'Could not find password, found {len(group)} entries')
self._logger.info(f'Found credentials for database for username {group[0].username}')
return KeePassEntry(group[0].username, group[0].password)

92
logger.py Normal file
View File

@ -0,0 +1,92 @@
import logging
import os
import sqlite3
DEFAULT_SEPARATOR = '|'
DEFAULT_DATA_TYPE = 'TEXT'
#WARNING: attributes must be choosen from https://docs.python.org/3/library/logging.html#formatter-objects
DEFAULT_ATTRIBUTES_LIST = ['asctime', 'levelname', 'name', 'message']
class SQLiteHandler(logging.Handler):
"""
Logging handler for SQLite
Based on Yarin Kessler's sqlite_handler.py https://gist.github.com/ykessler/2662203#file_sqlite_handler.py
"""
def __init__(self, database:str = "local.db", table = "log", attributes_list=None):
"""
SQLiteHandler class constructor
Parameters:
self: instance of the class
database: database
table: log table name
attributes_list: log table columns
Returns:
None
"""
#super(SQLiteHandler, self).__init__() # for python 2.X
super().__init__() # for python 3.X
if attributes_list is None:
attributes_list = DEFAULT_ATTRIBUTES_LIST
self.database = database
self.table = table
self.attributes = attributes_list
# Create table if needed
create_table_sql = 'CREATE TABLE IF NOT EXISTS ' + self.table + ' (' + ((' ' + DEFAULT_DATA_TYPE + ', ').join(self.attributes)) + ' ' + DEFAULT_DATA_TYPE + ');'
#print(create_table_sql)
conn = sqlite3.connect(self.database)
conn.execute(create_table_sql)
conn.commit()
conn.close()
def emit(self, record):
"""
Save the log record
Parameters:
self: instance of the class
record: log record to be saved
Returns:
None
"""
# Use default formatting if no formatter is set
self.format(record)
#print(record.__dict__)
record_values = [record.__dict__[k] for k in self.attributes]
str_record_values = ', '.join("'{0}'".format(v.replace("'", '').replace('"', '').replace('\n', ' ')) for v in record_values)
#print(str_record_values)
insert_sql = 'INSERT INTO ' + self.table + ' (' + (', '.join(self.attributes)) + ') VALUES (' + str_record_values + ');'
#print(insert_sql)
conn = sqlite3.connect(self.database)
conn.execute(insert_sql)
conn.commit()
conn.close()
def init_logger() -> logging.Logger:
# changing level we can change frome what level we want to log the events
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
attributes_list = ['asctime', 'levelname', 'message']
formatter = logging.Formatter('%(' + ((')s' + DEFAULT_SEPARATOR + '%(').join(attributes_list)) + ')s')
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(formatter)
sql_handler = SQLiteHandler()
sql_handler.setLevel(logging.INFO)
sql_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logger.addHandler(sql_handler)
return logger

15
main.py
View File

@ -1,13 +1,14 @@
from config import Config, DatabaseConfig, KeePassConfig
from config import Config
from db_adapter import DBAdapter
from keepass import KeePass
from logger import init_logger
from models import Municipality
logger = init_logger()
config = Config()
print(config.kee_pass)
print(config.database)
keepass = KeePass(config.kee_pass)
print(keepass.get_db_credentials())
keepass = KeePass(config.kee_pass, logger)
db_adapter = DBAdapter(keepass, config.database)
db_adapter.run_sql_file_export_to_csv()
db_adapter = DBAdapter(keepass, config.database, logger)
db_adapter.run_sql_file_export_to_csv_multiple_schemas([Municipality('test', '123456789', 'op_test', '123')])

View File

@ -1 +1 @@
SELECT * FROM simple_healthcheck_counters shc
SELECT * FROM public.simple_healthcheck_counters shc