diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java index c873b7987a..92738b971e 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java @@ -165,7 +165,7 @@ void test_given_DeleteRows_event_then_calls_correct_handler(EventType eventType) } private BinlogEventListener createObjectUnderTest() { - return new BinlogEventListener(buffer, sourceConfig, s3Prefix, pluginMetrics, binaryLogClient, + return BinlogEventListener.create(streamPartition, buffer, sourceConfig, s3Prefix, pluginMetrics, binaryLogClient, streamCheckpointer, acknowledgementSetManager, dbTableMetadata, cascadingActionDetector); } @@ -176,4 +176,4 @@ private void verifyHandlerCallHelper() { Runnable capturedRunnable = runnableArgumentCaptor.getValue(); capturedRunnable.run(); } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTaskRefresherTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTaskRefresherTest.java index c15b02f717..4e6edadd5b 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTaskRefresherTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTaskRefresherTest.java @@ -125,7 +125,7 @@ void test_initialize_then_process_stream() { dbTableMetadataMockedStatic.when(() -> DbTableMetadata.fromMap(progressState)).thenReturn(dbTableMetadata); streamWorkerMockedStatic.when(() -> StreamWorker.create(eq(sourceCoordinator), any(BinaryLogClient.class), eq(pluginMetrics))) .thenReturn(streamWorker); - binlogEventListenerMockedStatic.when(() -> BinlogEventListener.create(eq(buffer), any(RdsSourceConfig.class), + binlogEventListenerMockedStatic.when(() -> BinlogEventListener.create(eq(streamPartition), eq(buffer), any(RdsSourceConfig.class), any(String.class), eq(pluginMetrics), eq(binlogClient), eq(streamCheckpointer), eq(acknowledgementSetManager), eq(dbTableMetadata), any(CascadingActionDetector.class))) .thenReturn(binlogEventListener); @@ -163,7 +163,7 @@ void test_update_when_credentials_changed_then_refresh_task() { dbTableMetadataMockedStatic.when(() -> DbTableMetadata.fromMap(progressState)).thenReturn(dbTableMetadata); streamWorkerMockedStatic.when(() -> StreamWorker.create(eq(sourceCoordinator), any(BinaryLogClient.class), eq(pluginMetrics))) .thenReturn(streamWorker); - binlogEventListenerMockedStatic.when(() -> BinlogEventListener.create(eq(buffer), any(RdsSourceConfig.class), + binlogEventListenerMockedStatic.when(() -> BinlogEventListener.create(eq(streamPartition), eq(buffer), any(RdsSourceConfig.class), any(String.class), eq(pluginMetrics), eq(binlogClient), eq(streamCheckpointer), eq(acknowledgementSetManager), eq(dbTableMetadata), any(CascadingActionDetector.class))) .thenReturn(binlogEventListener);