diff --git a/changelog.md b/changelog.md index d84a85f..9e0c0b1 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,7 @@ +* 1.9.0 + - No global stats collection by default. + There is a ets table based stats collector to record the number of sent bytes and messages. Consider this feature deprecated. + Since 1.7.0, there there is a better integration for metrics. * 1.8.0 - Add wolff:check_if_topic_exists/2 for checking if a topic exists making use of an existing client process. [#52](https://github.com/kafka4beam/wolff/pull/52) - Improved logs when reporting connection errors. (merged 1.5.12) @@ -20,8 +24,9 @@ - Upgrade `kafka_protocol` from version 4.1.0 to 4.1.1 to enable customizing the SNI without needing to set the `verify_peer` option. * 1.7.1 (merged 1.5.9) - Fix: when picking a producer PID, if it was dead, it could lead to an error being raised. [#37](https://github.com/kafka4beam/wolff/pull/37) -* 1.6.5 +* 1.7.0 - Upgrade `kafka_protocol` from version 4.0.3 to version to 4.1.0 for SASL/GSSAPI auth support. + - Also added beam-telemetry for better metrics report. * 1.6.4 (merged 1.5.8) * 1.6.3 (merged 1.5.7) - Stop supervised producer if failed to start. Otherwise the caller may have to call the wolff:stop_and_delete_supervised_producers/3 diff --git a/src/wolff_producer.erl b/src/wolff_producer.erl index c48e516..158fffe 100644 --- a/src/wolff_producer.erl +++ b/src/wolff_producer.erl @@ -107,6 +107,9 @@ %% * `max_send_ahead': Number of batches to be sent ahead without receiving ack for %% the last request. Must be 0 if messages must be delivered in strict order. %% * `compression': `no_compression', `snappy' or `gzip'. +%% * `enable_global_stats': `true' | `false'. +%% Introduced in 1.9.0, default is `false'. Set to `true' to enalbe a global +%% send/receive stats table created in `wolff_stats' module. -spec start_link(wolff:client_id(), topic(), partition(), pid() | ?conn_down(any()), config()) -> {ok, pid()} | {error, any()}. start_link(ClientId, Topic, Partition, MaybeConnPid, Config) -> @@ -224,8 +227,9 @@ do_init(#{client_id := ClientId, sent_reqs_count => 0, inflight_calls => 0, conn := undefined, - client_id => ClientId - }. + client_id => ClientId, + enable_global_stats => maps:get(enable_global_stats, Config0, false) + }. handle_call(stop, From, St) -> gen_server:reply(From, ok), @@ -238,13 +242,9 @@ handle_info({do_init, St0}, _) -> {noreply, St}; handle_info(?linger_expire, St) -> {noreply, maybe_send_to_kafka(St#{?linger_expire_timer := false})}; -handle_info(?SEND_REQ(_, Batch, _) = Call, #{client_id := ClientId, - topic := Topic, - partition := Partition, - config := #{max_batch_bytes := Limit} - } = St0) -> +handle_info(?SEND_REQ(_, Batch, _) = Call, #{config := #{max_batch_bytes := Limit}} = St0) -> {Calls, Cnt, Oct} = collect_send_calls([Call], 1, batch_bytes(Batch), Limit), - ok = wolff_stats:recv(ClientId, Topic, Partition, #{cnt => Cnt, oct => Oct}), + ok = recv_stats(St0, #{cnt => Cnt, oct => Oct}), St1 = enqueue_calls(Calls, St0), St = maybe_send_to_kafka(St1), {noreply, St}; @@ -653,11 +653,18 @@ log_error(Topic, Partition, Msg, Args) -> log(Level, Report) -> logger:log(Level, Report). +send_stats(#{enable_global_stats := false}, _) -> + ok; send_stats(#{client_id := ClientId, topic := Topic, partition := Partition}, Batch) -> {Cnt, Oct} = lists:foldl(fun(Msg, {C, O}) -> {C + 1, O + oct(Msg)} end, {0, 0}, Batch), ok = wolff_stats:sent(ClientId, Topic, Partition, #{cnt => Cnt, oct => Oct}). +recv_stats(#{enable_global_stats := false}, _) -> + ok; +recv_stats(#{client_id := ClientId, topic := Topic, partition := Partition}, Increments) -> + ok = wolff_stats:recv(ClientId, Topic, Partition, Increments). + %% Estimation of size in bytes of one payload sent to Kafka. %% According to Kafka protocol, a v2 record consists of below fields: %% Length => varint # varint_bytes(SizeOfAllRestFields) diff --git a/test/wolff_bench.erl b/test/wolff_bench.erl index 0912c91..75729ad 100644 --- a/test/wolff_bench.erl +++ b/test/wolff_bench.erl @@ -22,7 +22,8 @@ start_producers(Client) -> ProducerCfg = #{required_acks => all_isr, max_batch_bytes => 800*1000, max_linger_ms => 1000, - max_send_ahead => 100 + max_send_ahead => 100, + enable_global_stats => true }, wolff:start_producers(Client, ?TOPIC, ProducerCfg). diff --git a/test/wolff_supervised_tests.erl b/test/wolff_supervised_tests.erl index 9232851..eabd95a 100644 --- a/test/wolff_supervised_tests.erl +++ b/test/wolff_supervised_tests.erl @@ -440,7 +440,9 @@ fetch(Connection, Topic, Partition, Offset, MaxBytes) -> client_config() -> #{}. producer_config() -> - #{replayq_dir => "test-data"}. + #{replayq_dir => "test-data", + enable_global_stats => true + }. key(Name) -> iolist_to_binary(io_lib:format("~p/~p/~p", [Name, calendar:local_time(), diff --git a/test/wolff_tests.erl b/test/wolff_tests.erl index 1772c35..9cefc5e 100644 --- a/test/wolff_tests.erl +++ b/test/wolff_tests.erl @@ -627,7 +627,8 @@ to_old_client_state(St0) -> client_config() -> #{}. producer_config() -> - #{replayq_dir => "test-data"}. + #{replayq_dir => "test-data", + enable_global_stats => true}. key(Name) -> iolist_to_binary(io_lib:format("~p/~p", [Name, calendar:local_time()])).