From 93dd9b331f0598e7f2081ad5d3bf75d13e04df99 Mon Sep 17 00:00:00 2001 From: Tsonglew Date: Sat, 29 Jun 2024 21:51:24 +0800 Subject: [PATCH] Fix: no attribute '_SkyWalkingAgent__log_queue' using kafka plain text (#343) --- CHANGELOG.md | 1 + skywalking/agent/__init__.py | 30 ++++++++++++++++++++---------- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 68c05638..8aaedf01 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ - **Tentative**: Set upper bound <=5.9.5 for psutil package due to test failure. (#326) - Remove `DeprecationWarning` from `pkg_resources` by replace it with `importlib_metadata` (#329) - Fix unexpected 'decode' AttributeError when MySQLdb module is mapped by PyMySQL (#336) + - Fix SkyWalking agent failed to start if using kafka protocol with sasl_mechanism=PLAIN. (#343) ### 1.0.1 diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py index 9ee4d928..70b28e98 100644 --- a/skywalking/agent/__init__.py +++ b/skywalking/agent/__init__.py @@ -119,6 +119,10 @@ def __init__(self): def __bootstrap(self): # when forking, already instrumented modules must not be instrumented again # otherwise it will cause double instrumentation! (we should provide an un-instrument method) + + # Initialize queues for segment, log, meter and profiling snapshots + self.__init_queues() + if config.agent_protocol == 'grpc': from skywalking.agent.protocol.grpc import GrpcProtocol self.__protocol = GrpcProtocol() @@ -129,18 +133,29 @@ def __bootstrap(self): from skywalking.agent.protocol.kafka import KafkaProtocol self.__protocol = KafkaProtocol() - # Initialize queues for segment, log, meter and profiling snapshots - self.__segment_queue: Optional[Queue] = None + # Start reporter threads and register queues + self.__init_threading() + + def __init_queues(self) -> None: + """ + This method initializes all the queues for the agent and reporters. + """ + self.__segment_queue = Queue(maxsize=config.agent_trace_reporter_max_buffer_size) self.__log_queue: Optional[Queue] = None self.__meter_queue: Optional[Queue] = None self.__snapshot_queue: Optional[Queue] = None - # Start reporter threads and register queues - self.__init_threading() + if config.agent_meter_reporter_active: + self.__meter_queue = Queue(maxsize=config.agent_meter_reporter_max_buffer_size) + if config.agent_log_reporter_active: + self.__log_queue = Queue(maxsize=config.agent_log_reporter_max_buffer_size) + if config.agent_profile_active: + self.__snapshot_queue = Queue(maxsize=config.agent_profile_snapshot_transport_buffer_size) + def __init_threading(self) -> None: """ - This method initializes all the queues and threads for the agent and reporters. + This method initializes all the threads for the agent and reporters. Upon os.fork(), callback will reinitialize threads and queues by calling this method Heartbeat thread is started by default. @@ -152,12 +167,10 @@ def __init_threading(self) -> None: __heartbeat_thread = Thread(name='HeartbeatThread', target=self.__heartbeat, daemon=True) __heartbeat_thread.start() - self.__segment_queue = Queue(maxsize=config.agent_trace_reporter_max_buffer_size) __segment_report_thread = Thread(name='SegmentReportThread', target=self.__report_segment, daemon=True) __segment_report_thread.start() if config.agent_meter_reporter_active: - self.__meter_queue = Queue(maxsize=config.agent_meter_reporter_max_buffer_size) __meter_report_thread = Thread(name='MeterReportThread', target=self.__report_meter, daemon=True) __meter_report_thread.start() @@ -173,7 +186,6 @@ def __init_threading(self) -> None: ThreadDataSource().register() if config.agent_log_reporter_active: - self.__log_queue = Queue(maxsize=config.agent_log_reporter_max_buffer_size) __log_report_thread = Thread(name='LogReportThread', target=self.__report_log, daemon=True) __log_report_thread.start() @@ -183,8 +195,6 @@ def __init_threading(self) -> None: daemon=True) __command_dispatch_thread.start() - self.__snapshot_queue = Queue(maxsize=config.agent_profile_snapshot_transport_buffer_size) - __query_profile_thread = Thread(name='QueryProfileCommandThread', target=self.__query_profile_command, daemon=True) __query_profile_thread.start()