db_interactor/checkpoint/CheckpointService.py

63 lines
2.4 KiB
Python

import sqlite3
from checkpoint.Checkpoint import Checkpoint, CheckpointMetadata, CheckpointStatus
class CheckpointService:
_table: str
_database: str
_connection: str
_conn: sqlite3.Connection
_insert_metadata_query = """ INSERT INTO checkpoint_metadata (checkpoint_id, metadata, type) VALUES (?, ?, ?) """
def __init__(self, database: str = "local.db"):
self._database = database
self._conn = sqlite3.connect(database)
self._initialize_table()
def _initialize_table(self):
create_checkpoint_table_query = """
CREATE TABLE IF NOT EXISTS checkpoint (
id INTEGER primary key autoincrement,
name TEXT not null,
hash TEXT not null unique,
status TEXT not null
);
"""
create_checkpoint_metadata_table_query = """
CREATE TABLE IF NOT EXISTS checkpoint_metadata (
id INTEGER primary key autoincrement,
checkpoint_id INTEGER NOT NULL,
metadata TEXT NOT NULL,
type TEXT not null,
constraint checkpoint_metadata_checkpoint_fk
foreign key (checkpoint_id) references checkpoint (id)
);
"""
self._conn.execute(create_checkpoint_table_query)
self._conn.execute(create_checkpoint_metadata_table_query)
self._conn.commit()
def start_checkpoint(self, name, hash, metadata: CheckpointMetadata) -> Checkpoint:
insert_checkpoint_query = """INSERT INTO checkpoint (name, hash, status) VALUES (?, ?, ?)"""
checkpoint_id = self._conn.execute(insert_checkpoint_query, (name, hash, CheckpointStatus.STARTED)).lastrowid
self._conn.execute(self._insert_metadata_query, (checkpoint_id, metadata.metadata, metadata.type))
self._conn.commit()
return Checkpoint(checkpoint_id, name, hash, CheckpointStatus.STARTED)
def end_checkpoint(self, checkpoint_id, metadata: CheckpointMetadata):
update_checkpoint_query = """UPDATE checkpoint SET status = ? WHERE checkpoint_id = ?"""
self._conn.execute(update_checkpoint_query, (CheckpointStatus.COMPLETED, checkpoint_id))
self._conn.execute(self._insert_metadata_query, (checkpoint_id, metadata.metadata, metadata.type))
self._conn.commit()
def get_checkpoint(self, checkpoint_id: int, hash: str):
...
def delete_checkpoint(self):
...