diff --git a/src/sbosc/component.py b/src/sbosc/component.py index 84090ea..e27e5f6 100644 --- a/src/sbosc/component.py +++ b/src/sbosc/component.py @@ -41,6 +41,8 @@ def is_preferred_window(): start_time_str, end_time_str = config.PREFERRED_WINDOW.split('-') start_time = datetime.strptime(start_time_str, '%H:%M').time() end_time = datetime.strptime(end_time_str, '%H:%M').time() + if start_time >= end_time: + return start_time <= current_time or current_time <= end_time return start_time <= current_time <= end_time def get_migration_id(self): diff --git a/src/sbosc/controller/validator.py b/src/sbosc/controller/validator.py index f790d82..0dfb97d 100644 --- a/src/sbosc/controller/validator.py +++ b/src/sbosc/controller/validator.py @@ -268,12 +268,11 @@ def __validate_unmatched_pks(self): def validate_apply_dml_events(self, start_timestamp, end_timestamp): unmatched_pks = [] - with self.db.cursor() as cursor: - cursor: Cursor - - if start_timestamp <= end_timestamp: - self.logger.info(f"Start validating DML events from {start_timestamp} to {end_timestamp}") - for table in ['inserted_pk', 'updated_pk', 'deleted_pk']: + if start_timestamp <= end_timestamp: + self.logger.info(f"Start validating DML events from {start_timestamp} to {end_timestamp}") + for table in ['inserted_pk', 'updated_pk', 'deleted_pk']: + with self.db.cursor() as cursor: + cursor: Cursor cursor.execute(f''' ANALYZE TABLE {config.SBOSC_DB}.{table}_{self.migration_id} ''') @@ -301,14 +300,20 @@ def validate_apply_dml_events(self, start_timestamp, end_timestamp): for thread in threads: thread.result() - cursor.executemany(f''' - INSERT IGNORE INTO {config.SBOSC_DB}.unmatched_rows (source_pk, migration_id, unmatch_type) - VALUES (%s, {self.migration_id}, %s) - ''', unmatched_pks) + with self.db.cursor() as cursor: + cursor: Cursor + cursor.executemany(f''' + INSERT IGNORE INTO {config.SBOSC_DB}.unmatched_rows (source_pk, migration_id, unmatch_type) + VALUES (%s, {self.migration_id}, %s) + ''', unmatched_pks) + self.__validate_unmatched_pks() - cursor.execute( - f"SELECT COUNT(1) FROM {config.SBOSC_DB}.unmatched_rows WHERE migration_id = {self.migration_id}") - unmatched_rows = cursor.fetchone()[0] + + with self.db.cursor() as cursor: + cursor: Cursor + cursor.execute( + f"SELECT COUNT(1) FROM {config.SBOSC_DB}.unmatched_rows WHERE migration_id = {self.migration_id}") + unmatched_rows = cursor.fetchone()[0] # Even though validation logic is based on data in tables following valid condition can be achieved. # All events are being pushed to redis in validation stage. diff --git a/src/sbosc/worker/worker.py b/src/sbosc/worker/worker.py index c50474a..1b649f1 100644 --- a/src/sbosc/worker/worker.py +++ b/src/sbosc/worker/worker.py @@ -116,6 +116,10 @@ def bulk_import(self): start_pk = self.get_start_pk(chunk_info) if start_pk is None: return + # If the start_pk is greater than the end_pk, set the last_pk_inserted to end_pk + # This can happen when chunk ended with a duplicate key error + elif start_pk > chunk_info.end_pk: + chunk_info.last_pk_inserted = chunk_info.end_pk end_pk = chunk_info.end_pk chunk_info.status = ChunkStatus.IN_PROGRESS