Skip to content

Commit

Permalink
Use multiprocessing for eventhandler
Browse files Browse the repository at this point in the history
  • Loading branch information
jjh-kim committed Jan 9, 2025
1 parent 085eb94 commit 98de164
Showing 1 changed file with 26 additions and 31 deletions.
57 changes: 26 additions & 31 deletions src/sbosc/eventhandler/eventhandler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import concurrent.futures
import time
from queue import Queue, Empty
from threading import Thread

from MySQLdb.cursors import Cursor, DictCursor
Expand Down Expand Up @@ -85,7 +84,7 @@ def __init__(self):
'passwd': secret.PASSWORD,
}

self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.thread_count)
self.executor = concurrent.futures.ProcessPoolExecutor(max_workers=self.thread_count)

self.log_file = None
self.log_pos = None
Expand Down Expand Up @@ -258,26 +257,21 @@ def apply_dml_events_pre_validation(self):
else:
self.redis_data.set_current_stage(Stage.ADD_INDEX)

def parse_binlog_batch(self, thread_id, batch_queue: Queue, done_batch: list):
@staticmethod
def parse_binlog_batch(stream):
event_store = EventStore()
while batch_queue.qsize() > 0 and not self.stop_flag:
try:
binlog_file, start_pos = batch_queue.get_nowait()
except Empty:
self.logger.warning('Binlog batch queue is empty')
continue
stream = self.create_binlog_stream(binlog_file, start_pos, thread_id)
for event in stream:
event_store.add_event(event)
if stream.log_file != binlog_file:
break

done_batch.append((stream.log_file, stream.log_pos))
stream.close()
return event_store
start_file = stream.log_file
for event in stream:
event_store.add_event(event)
if stream.log_file != start_file:
break
end_file = stream.log_file
end_pos = stream.log_pos
stream.close()
return event_store, (end_file, end_pos)

def follow_event_stream(self):
file_queue = Queue()
target_files = []

# Create binlog batch queue
with self.db.cursor(DictCursor) as cursor:
Expand All @@ -293,25 +287,31 @@ def follow_event_stream(self):
]
for log_file in binlog_files[:self.thread_count]:
start_pos = self.log_pos if log_file == self.log_file else 4
file_queue.put((log_file, start_pos))
target_files.append((log_file, start_pos))

# Parse binlog batches
threads = []
done_files = []
queued_files = file_queue.qsize()
event_store = EventStore()
result_event_stores = []
done_files = []

for i in range(self.thread_count):
threads.append(self.executor.submit(self.parse_binlog_batch, i, file_queue, done_files))
for thread_id in range(len(target_files)):
binlog_file, start_pos = target_files[thread_id]
stream = self.create_binlog_stream(binlog_file, start_pos, thread_id)
threads.append(self.executor.submit(self.parse_binlog_batch, stream))
done, not_done = concurrent.futures.wait(threads, timeout=self.thread_timeout)
if len(not_done) > 0:
self.set_stop_flag()
raise Exception('Binlog batch parsing timed out')

for thread in threads:
result_event_stores.append(thread.result())
result_event_store, done_file = thread.result()
result_event_stores.append(result_event_store)
done_files.append(done_file)

if len(done_files) == queued_files:
if self.stop_flag:
self.logger.info('Binlog parsing stopped')
else:
self.log_file, self.log_pos = max(done_files)
self.handled_binlog_files = self.handled_binlog_files | set([binlog_file for binlog_file, _ in done_files])

Expand Down Expand Up @@ -340,8 +340,3 @@ def follow_event_stream(self):

if len(binlog_files) == 1:
self.redis_data.set_last_catchup_timestamp(last_binlog_check_timestamp)

elif self.stop_flag:
self.logger.info('Binlog parsing stopped')
else:
self.logger.error('Binlog parsing failed')

0 comments on commit 98de164

Please sign in to comment.