From ccb01bfa78cd9654c1f40eca5ab8a52dd4023ef3 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 23 Oct 2024 09:54:04 -0300 Subject: [PATCH] feat: add `[wolff, queuing_bytes]` telemetry event for ram/disk usage observability Part of https://emqx.atlassian.net/browse/EMQX-13074 --- changelog.md | 3 +++ src/wolff_metrics.erl | 7 ++++++ src/wolff_producer.erl | 10 ++++++-- test/wolff_supervised_tests.erl | 5 ++++ test/wolff_tests.erl | 44 +++++++++++++++++++++++++++------ 5 files changed, 60 insertions(+), 9 deletions(-) diff --git a/changelog.md b/changelog.md index 5c99f92..8851d4e 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,6 @@ +* 4.0.3 + - Added the `[wolff, queuing_bytes]` telemetry event which reports the amount of RAM/disk used by producer queues. + * 4.0.2 - Fix dynamic topic producer initialization failure handling (introduced in 3.0.0). - Fix `unexpected_id` crash when replayq overflow (introduced in 4.0.1). diff --git a/src/wolff_metrics.erl b/src/wolff_metrics.erl index 6afdd02..3f3443d 100644 --- a/src/wolff_metrics.erl +++ b/src/wolff_metrics.erl @@ -4,6 +4,7 @@ -export([ inflight_set/2, queuing_set/2, + queuing_bytes_set/2, dropped_inc/1, dropped_inc/2, dropped_queue_full_inc/1, @@ -29,6 +30,12 @@ queuing_set(Config, Val) -> #{gauge_set => Val}, telemetry_meta_data(Config)). +%% @doc Number of bytes (RAM and/or disk) currently queuing. [Gauge] +queuing_bytes_set(Config, Val) -> + telemetry:execute([wolff, queuing_bytes], + #{gauge_set => Val}, + telemetry_meta_data(Config)). + %% @doc Count of messages that were sent asynchronously but ACKs are not %% received. [Gauge] inflight_set(Config, Val) -> diff --git a/src/wolff_producer.erl b/src/wolff_producer.erl index adaaedd..af1ae60 100644 --- a/src/wolff_producer.erl +++ b/src/wolff_producer.erl @@ -275,6 +275,7 @@ do_init(#{client_id := ClientId, Config2 = resolve_max_linger_bytes(Config1, Q), Config = maps:without([replayq_dir, replayq_seg_bytes], Config2), wolff_metrics:queuing_set(Config, replayq:count(Q)), + wolff_metrics:queuing_bytes_set(Config, replayq:bytes(Q)), wolff_metrics:inflight_set(Config, 0), St#{replayq => Q, config := Config, @@ -376,9 +377,11 @@ clear_gauges(#{config := Config}, Q) -> maybe_reset_queuing(Config, Q) -> case {replayq:count(Q), is_replayq_durable(Config, Q)} of {0, _} -> - wolff_metrics:queuing_set(Config, 0); + wolff_metrics:queuing_set(Config, 0), + wolff_metrics:queuing_bytes_set(Config, 0); {_, false} -> - wolff_metrics:queuing_set(Config, 0); + wolff_metrics:queuing_set(Config, 0), + wolff_metrics:queuing_bytes_set(Config, 0); {_, _} -> ok end. @@ -505,6 +508,7 @@ send_to_kafka(#{sent_reqs := SentReqs, IDs = lists:map(fun({ID, _}) -> ID end, get_calls_from_queue_items(Items)), NewPendingAcks = wolff_pendack:move_backlog_to_inflight(PendingAcks, IDs), wolff_metrics:queuing_set(Config, replayq:count(NewQ)), + wolff_metrics:queuing_bytes_set(Config, replayq:bytes(NewQ)), NewSentReqsCount = SentReqsCount + 1, NrOfCalls = count_calls(Items), NewInflightCalls = InflightCalls + NrOfCalls, @@ -923,6 +927,7 @@ enqueue_calls2(Calls, end, {[], PendingAcks0, 0}, Calls), NewQ = replayq:append(Q, lists:reverse(QueueItems)), wolff_metrics:queuing_set(Config0, replayq:count(NewQ)), + wolff_metrics:queuing_bytes_set(Config0, replayq:bytes(NewQ)), lists:foreach(fun maybe_reply_queued/1, Calls), Overflow = case maps:get(drop_if_highmem, Config0, false) andalso replayq:is_mem_only(NewQ) @@ -968,6 +973,7 @@ handle_overflow(#{replayq := Q, wolff_metrics:dropped_queue_full_inc(Config, NrOfCalls), wolff_metrics:dropped_inc(Config, NrOfCalls), wolff_metrics:queuing_set(Config, replayq:count(NewQ)), + wolff_metrics:queuing_bytes_set(Config, replayq:bytes(NewQ)), ok = maybe_log_discard(St, NrOfCalls), {CbList, NewPendingAcks} = wolff_pendack:drop_backlog(PendingAcks, CallIDs), lists:foreach(fun(Cb) -> eval_ack_cb(Cb, ?buffer_overflow_discarded) end, CbList), diff --git a/test/wolff_supervised_tests.erl b/test/wolff_supervised_tests.erl index b4f60cb..a6c7d01 100644 --- a/test/wolff_supervised_tests.erl +++ b/test/wolff_supervised_tests.erl @@ -31,6 +31,7 @@ supervised_client_test() -> ok = application:stop(wolff), ?assertEqual(undefined, whereis(wolff_sup)), assert_last_event_is_zero(queuing, CntrEventsTable), + assert_last_event_is_zero(queuing_bytes, CntrEventsTable), assert_last_event_is_zero(inflight, CntrEventsTable), [1] = get_telemetry_seq(CntrEventsTable, [wolff,success]), ets:delete(CntrEventsTable), @@ -65,6 +66,7 @@ test_supervised_producers(Name) -> ?assertEqual([], supervisor:which_children(wolff_client_sup)), ok = application:stop(wolff), assert_last_event_is_zero(queuing, CntrEventsTable), + assert_last_event_is_zero(queuing_bytes, CntrEventsTable), assert_last_event_is_zero(inflight, CntrEventsTable), [1] = get_telemetry_seq(CntrEventsTable, [wolff,success]), ets:delete(CntrEventsTable), @@ -175,6 +177,7 @@ test_client_restart(ClientId, Topic, Partition) -> ok = application:stop(wolff), [1,1] = get_telemetry_seq(CntrEventsTable, [wolff,success]), assert_last_event_is_zero(queuing, CntrEventsTable), + assert_last_event_is_zero(queuing_bytes, CntrEventsTable), assert_last_event_is_zero(inflight, CntrEventsTable), ets:delete(CntrEventsTable), wolff_tests:deinstall_event_logging(?FUNCTION_NAME), @@ -256,6 +259,7 @@ producer_restart_test() -> ok = application:stop(wolff), [1,2] = get_telemetry_seq(CntrEventsTable, [wolff,success]), assert_last_event_is_zero(queuing, CntrEventsTable), + assert_last_event_is_zero(queuing_bytes, CntrEventsTable), assert_last_event_is_zero(inflight, CntrEventsTable), ets:delete(CntrEventsTable), wolff_tests:deinstall_event_logging(?FUNCTION_NAME), @@ -322,6 +326,7 @@ test_partition_count_refresh() -> ?assertEqual(Partitions0, Partition1), [1,1] = get_telemetry_seq(CntrEventsTable, [wolff,success]), assert_last_event_is_zero(queuing, CntrEventsTable), + assert_last_event_is_zero(queuing_bytes, CntrEventsTable), assert_last_event_is_zero(inflight, CntrEventsTable), ets:delete(CntrEventsTable), wolff_tests:deinstall_event_logging(?FUNCTION_NAME), diff --git a/test/wolff_tests.erl b/test/wolff_tests.erl index e65e538..2cafa0e 100644 --- a/test/wolff_tests.erl +++ b/test/wolff_tests.erl @@ -117,6 +117,10 @@ send_test() -> ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing])), [0,1,0]), + ?assertMatch( + [0,N,0] when N > 0, + wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing_bytes])) + ), ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, inflight])), [0,1,0]), @@ -155,6 +159,9 @@ send_one_msg_max_batch_test() -> ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing])), [0,1,0]), + ?assert_eq_optional_tail( + wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing_bytes])), + [0,EstimatedBytes,0]), ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, inflight])), [0,1,0]), @@ -192,6 +199,9 @@ send_smallest_msg_max_batch_test() -> ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing])), [0,1,0]), + ?assert_eq_optional_tail( + wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing_bytes])), + [0,batch_bytes(Batch),0]), ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, inflight])), [0,1,0]), @@ -432,6 +442,9 @@ replayq_overflow_test() -> ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing])), [0,2,1,0]), + ?assert_eq_optional_tail( + wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing_bytes])), + [0,64,32,0]), ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, inflight])), [0,1,0]), @@ -503,6 +516,9 @@ replayq_highmem_overflow_test() -> ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing])), [0, 1, 0, 1, 0, 1, 0]), + ?assert_eq_optional_tail( + wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing_bytes])), + [0,32,0,32,0,32,0]), ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, inflight])), [0,1,0]), @@ -599,6 +615,10 @@ replayq_offload_test() -> ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing])), [0, 1, 0]), + ?assertMatch( + [0,N,0] when N > 0, + wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing_bytes])) + ), ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, inflight])), [0, 1, 0]), @@ -949,13 +969,7 @@ handle_telemetry_event( ) -> case EventRecordTable =/= none of true -> - PastEvents = case ets:lookup(EventRecordTable, EventId) of - [] -> []; - [{_EventId, PE}] -> PE - end, - NewEventList = [ #{metrics_data => MetricsData, - meta_data => MetaData} | PastEvents], - ets:insert(EventRecordTable, {EventId, NewEventList}); + do_handle_telemetry_event(EventRecordTable, EventId, MetricsData, MetaData); false -> ok end, @@ -967,6 +981,21 @@ handle_telemetry_event( ok end. +do_handle_telemetry_event(EventRecordTable, EventId, MetricsData, MetaData) -> + try + PastEvents = case ets:lookup(EventRecordTable, EventId) of + [] -> []; + [{_EventId, PE}] -> PE + end, + NewEventList = [ #{metrics_data => MetricsData, + meta_data => MetaData} | PastEvents], + ets:insert(EventRecordTable, {EventId, NewEventList}) + catch + error:badarg:Stacktrace -> + ct:pal("<<< error handling telemetry event >>>\n[event id]: ~p\n[metrics data]: ~p\n[meta data]: ~p\n\nStacktrace:\n ~p\n", + [EventId, MetricsData, MetaData, Stacktrace]) + end. + telemetry_id() -> <<"emqx-bridge-kafka-producer-telemetry-handler">>. @@ -1001,6 +1030,7 @@ telemetry_events() -> [wolff, dropped_queue_full], [wolff, matched], [wolff, queuing], + [wolff, queuing_bytes], [wolff, retried], [wolff, failed], [wolff, inflight],