From 1347c12e29a8101d8dd7143c347c0310ba36008a Mon Sep 17 00:00:00 2001 From: Jimmy Kim Date: Wed, 27 Nov 2024 17:17:18 +0900 Subject: [PATCH 1/2] Add get_max_pk to operation class --- src/sbosc/operations/base.py | 20 ++++++++++++++++++++ src/sbosc/operations/operation.py | 8 ++++++++ src/sbosc/worker/worker.py | 11 +---------- 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/src/sbosc/operations/base.py b/src/sbosc/operations/base.py index 2eda1a6..536fd20 100644 --- a/src/sbosc/operations/base.py +++ b/src/sbosc/operations/base.py @@ -38,6 +38,16 @@ def apply_update(self, db, updated_pks): cursor.execute(query) return cursor + def get_max_pk(self, db, start_pk, end_pk): + metadata = self.redis_data.metadata + with db.cursor(host='dest') as cursor: + cursor: Cursor + cursor.execute(f''' + SELECT MAX({self.pk_column}) FROM {metadata.destination_db}.{metadata.destination_table} + WHERE {self.pk_column} BETWEEN {start_pk} AND {end_pk} + ''') + return cursor.fetchone()[0] + def _get_not_imported_pks_query(self, start_pk, end_pk): return f''' SELECT source.{self.pk_column} FROM {self.source_db}.{self.source_table} AS source @@ -181,6 +191,16 @@ def apply_update(self, db, updated_pks): else: return cursor + def get_max_pk(self, db, start_pk, end_pk): + metadata = self.redis_data.metadata + with db.cursor(host='dest') as cursor: + cursor: Cursor + cursor.execute(f''' + SELECT MAX({self.pk_column}) FROM {metadata.destination_db}.{metadata.destination_table} + WHERE {self.pk_column} BETWEEN {start_pk} AND {end_pk} + ''') + return cursor.fetchone()[0] + def get_not_imported_pks(self, source_cursor, dest_cursor, start_pk, end_pk): source_cursor.execute(f''' SELECT {self.pk_column} FROM {self.source_db}.{self.source_table} diff --git a/src/sbosc/operations/operation.py b/src/sbosc/operations/operation.py index ffe3193..8148b75 100644 --- a/src/sbosc/operations/operation.py +++ b/src/sbosc/operations/operation.py @@ -50,6 +50,14 @@ def apply_update(self, db: Database, updated_pks: list) -> Cursor: """ pass + @abstractmethod + def get_max_pk(self, db: Database, start_pk: int, end_pk: int) -> int: + """ + Returns the maximum primary key in the destination table. + Used when chunk status is DUPLICATE_KEY to determine starting batch range. + """ + pass + @abstractmethod def get_not_imported_pks(self, source_cursor: Cursor, dest_cursor: Cursor, start_pk: int, end_pk: int) -> list: """ diff --git a/src/sbosc/worker/worker.py b/src/sbosc/worker/worker.py index e0d0d57..76b21e8 100644 --- a/src/sbosc/worker/worker.py +++ b/src/sbosc/worker/worker.py @@ -100,7 +100,7 @@ def get_start_pk(self, chunk_info: ChunkInfo): elif chunk_info.status == ChunkStatus.IN_PROGRESS: return chunk_info.last_pk_inserted + 1 elif chunk_info.status == ChunkStatus.DUPLICATE_KEY: - max_pk = self.get_max_pk(chunk_info.start_pk, chunk_info.end_pk) + max_pk = self.migration_operation.get_max_pk(self.db, chunk_info.start_pk, chunk_info.end_pk) return max_pk + 1 def bulk_import(self): @@ -206,15 +206,6 @@ def apply_dml_events(self): except Exception as e: self.logger.error(e) - def get_max_pk(self, start_pk, end_pk): - metadata = self.redis_data.metadata - with self.db.cursor(host='dest') as cursor: - cursor: Cursor - cursor.execute(f''' - SELECT MAX({metadata.pk_column}) FROM {metadata.destination_db}.{metadata.destination_table} - WHERE {metadata.pk_column} BETWEEN {start_pk} AND {end_pk} - ''') - return cursor.fetchone()[0] @staticmethod def calculate_metrics(func: Callable[..., Cursor]): From 0fd6bd11c7cd13c19375ef7466acb2cdbb958868 Mon Sep 17 00:00:00 2001 From: Jimmy Kim Date: Wed, 27 Nov 2024 17:20:35 +0900 Subject: [PATCH 2/2] lint fix --- src/sbosc/worker/worker.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/sbosc/worker/worker.py b/src/sbosc/worker/worker.py index 76b21e8..167344d 100644 --- a/src/sbosc/worker/worker.py +++ b/src/sbosc/worker/worker.py @@ -206,7 +206,6 @@ def apply_dml_events(self): except Exception as e: self.logger.error(e) - @staticmethod def calculate_metrics(func: Callable[..., Cursor]): def wrapper(self: Self, *args, **kwargs):