diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 140263c..7159586 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -45,9 +45,6 @@ jobs: otp-version: ${{matrix.otp}} rebar3-version: '3.20.0' - # Check versions and syntax of app and appup - - name: Ensure version consistency - run: ./check-vsns.escript # Compile - name: Compile run: | diff --git a/changelog.md b/changelog.md index bcab36e..71d5f8e 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,6 @@ +* 4.0.0 + - Delete global stats (deprecated since 1.9). + * 3.0.4 - Upgrade to kafka_protocol-4.1.8 diff --git a/check-vsns.escript b/check-vsns.escript deleted file mode 100755 index 66adbc1..0000000 --- a/check-vsns.escript +++ /dev/null @@ -1,12 +0,0 @@ -#!/usr/bin/env escript - --mode(compile). - -main(_) -> - {ok, [{application, wolff, App}]} = file:consult("src/wolff.app.src"), - {ok, [{VsnAppup, _Up, _Down}]} = file:consult("src/wolff.appup.src"), - {vsn, VsnApp} = lists:keyfind(vsn, 1, App), - case VsnAppup =:= VsnApp of - true -> ok; - false -> error([{appup, VsnAppup}, {app, VsnApp}]) - end. diff --git a/src/wolff.app.src b/src/wolff.app.src index dace319..efb0a11 100644 --- a/src/wolff.app.src +++ b/src/wolff.app.src @@ -1,6 +1,6 @@ {application, wolff, [{description, "Kafka's publisher"}, - {vsn, "3.0.4"}, + {vsn, "4.0.0"}, {registered, []}, {applications, [kernel, diff --git a/src/wolff.appup.src b/src/wolff.appup.src deleted file mode 100644 index 4e08a5f..0000000 --- a/src/wolff.appup.src +++ /dev/null @@ -1,7 +0,0 @@ -%% -*- mode: erlang; -*- -{"3.0.4", - [ - ], - [ - ] -}. diff --git a/src/wolff_producer.erl b/src/wolff_producer.erl index 9ccba6d..e3bb5f3 100644 --- a/src/wolff_producer.erl +++ b/src/wolff_producer.erl @@ -64,7 +64,6 @@ compression => kpro:compress_option(), drop_if_highmem => boolean(), telemetry_meta_data => map(), - enable_global_stats => boolean(), max_partitions => pos_integer() }. @@ -80,7 +79,6 @@ compression => kpro:compress_option(), drop_if_highmem => boolean(), telemetry_meta_data => map(), - enable_global_stats => boolean(), max_partitions => pos_integer() }. @@ -117,7 +115,6 @@ , sent_reqs_count := non_neg_integer() , inflight_calls := non_neg_integer() , topic := topic() - , enable_global_stats := boolean() }. %% @doc Start a per-partition producer worker. @@ -136,9 +133,6 @@ %% * `max_send_ahead': Number of batches to be sent ahead without receiving ack for %% the last request. Must be 0 if messages must be delivered in strict order. %% * `compression': `no_compression', `snappy' or `gzip'. -%% * `enable_global_stats': `true' | `false'. -%% Introduced in 1.9.0, default is `false'. Set to `true' to enalbe a global -%% send/receive stats table created in `wolff_stats' module. -spec start_link(wolff:client_id(), topic(), partition(), pid() | ?conn_down(any()), config_in()) -> {ok, pid()} | {error, any()}. start_link(ClientId, Topic, Partition, MaybeConnPid, Config) -> @@ -269,8 +263,7 @@ do_init(#{client_id := ClientId, sent_reqs_count => 0, inflight_calls => 0, conn := undefined, - client_id => ClientId, - enable_global_stats => maps:get(enable_global_stats, Config0, false) + client_id => ClientId }. handle_call(stop, From, St) -> @@ -285,8 +278,7 @@ handle_info({do_init, St0}, _) -> handle_info(?linger_expire, St) -> {noreply, maybe_send_to_kafka(St#{?linger_expire_timer := false})}; handle_info(?SEND_REQ(_, Batch, _) = Call, #{config := #{max_batch_bytes := Limit}} = St0) -> - {Calls, Cnt, Oct} = collect_send_calls([Call], 1, batch_bytes(Batch), Limit), - ok = recv_stats(St0, #{cnt => Cnt, oct => Oct}), + {Calls, _CollectedBytes} = collect_send_calls([Call], batch_bytes(Batch), Limit), St1 = enqueue_calls(Calls, St0), St = maybe_send_to_kafka(St1), {noreply, St}; @@ -510,7 +502,6 @@ send_to_kafka(#{sent_reqs := SentReqs, inflight_calls := NewInflightCalls }, ok = request_async(Conn, Req), - ok = send_stats(St2, Items), St3 = maybe_fake_kafka_ack(NoAck, Sent, St2), maybe_send_to_kafka(St3). @@ -781,19 +772,6 @@ log_error(Topic, Partition, Msg, Args) -> log(Level, Report) -> logger:log(Level, Report). -send_stats(#{enable_global_stats := false}, _) -> - ok; -send_stats(#{client_id := ClientId, topic := Topic, partition := Partition}, Items) -> - Batch = get_batch_from_queue_items(Items), - {Cnt, Oct} = - lists:foldl(fun(Msg, {C, O}) -> {C + 1, O + oct(Msg)} end, {0, 0}, Batch), - ok = wolff_stats:sent(ClientId, Topic, Partition, #{cnt => Cnt, oct => Oct}). - -recv_stats(#{enable_global_stats := false}, _) -> - ok; -recv_stats(#{client_id := ClientId, topic := Topic, partition := Partition}, Increments) -> - ok = wolff_stats:recv(ClientId, Topic, Partition, Increments). - %% Estimation of size in bytes of one payload sent to Kafka. %% According to Kafka protocol, a v2 record consists of below fields: %% Length => varint # varint_bytes(SizeOfAllRestFields) @@ -848,15 +826,15 @@ request_async(Conn, Req) when is_pid(Conn) -> ok = kpro:send(Conn, Req). %% Collect all send requests which are already in process mailbox -collect_send_calls(Calls, Count, Size, Limit) when Size >= Limit -> - {lists:reverse(Calls), Count, Size}; -collect_send_calls(Calls, Count, Size, Limit) -> +collect_send_calls(Calls, Size, Limit) when Size >= Limit -> + {lists:reverse(Calls), Size}; +collect_send_calls(Calls, Size, Limit) -> receive ?SEND_REQ(_, Batch, _) = Call -> - collect_send_calls([Call | Calls], Count + 1, Size + batch_bytes(Batch), Limit) + collect_send_calls([Call | Calls], Size + batch_bytes(Batch), Limit) after 0 -> - {lists:reverse(Calls), Count, Size} + {lists:reverse(Calls), Size} end. enqueue_calls(Calls, #{replayq := Q, diff --git a/src/wolff_stats.erl b/src/wolff_stats.erl deleted file mode 100644 index 5035248..0000000 --- a/src/wolff_stats.erl +++ /dev/null @@ -1,99 +0,0 @@ -%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(wolff_stats). - --behaviour(gen_server). - -%% APIs --export([start_link/0, recv/4, sent/4, getstat/0, getstat/3]). - -%% gen_server callbacks --export([code_change/3, handle_call/3, handle_cast/2, handle_info/2, init/1, terminate/2]). - --define(SERVER, ?MODULE). --define(ETS, ?MODULE). --define(send_cnt(C, T, P), {send_cnt, C, T, P}). --define(send_oct(C, T, P), {send_oct, C, T, P}). --define(recv_cnt(C, T, P), {recv_cnt, C, T, P}). --define(recv_oct(C, T, P), {recv_oct, C, T, P}). - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -%% @doc Bump numbers as input to wolff. -recv(ClientId, Topic, Partition, #{cnt := Cnt, oct := Oct} = Numbers) -> - ok = bump_counter(?recv_cnt(ClientId, Topic, Partition), Cnt), - ok = bump_counter(?recv_oct(ClientId, Topic, Partition), Oct), - gen_server:cast(?SERVER, {recv, Numbers}). - -%% @doc Bump numbers as output of wolff. -sent(ClientId, Topic, Partition, #{cnt := Cnt, oct := Oct} = Numbers) -> - ok = bump_counter(?send_cnt(ClientId, Topic, Partition), Cnt), - ok = bump_counter(?send_oct(ClientId, Topic, Partition), Oct), - gen_server:cast(?SERVER, {sent, Numbers}). - -getstat() -> - gen_server:call(?SERVER, getstat, infinity). - -getstat(ClientId, Topic, Partition) -> - #{send_cnt => get_counter(?send_cnt(ClientId, Topic, Partition)), - send_oct => get_counter(?send_oct(ClientId, Topic, Partition)), - recv_cnt => get_counter(?recv_cnt(ClientId, Topic, Partition)), - recv_oct => get_counter(?recv_oct(ClientId, Topic, Partition)) - }. - -init([]) -> - {ok, #{ets => ets:new(?ETS, [named_table, public, {write_concurrency, true}]), - send_cnt => 0, - send_oct => 0, - recv_cnt => 0, - recv_oct => 0 - }}. - -handle_call(getstat, _From, St) -> - Result = maps:with([send_cnt, send_oct, recv_cnt, recv_oct], St), - {reply, Result, St}; -handle_call(_Call, _From, St) -> - {noreply, St}. - -handle_cast({recv, Numbers}, #{recv_oct := TotalOct, recv_cnt := TotalCnt} = St) -> - #{cnt := Cnt, oct := Oct} = Numbers, - {noreply, St#{recv_oct := TotalOct + Oct, recv_cnt := TotalCnt + Cnt}}; -handle_cast({sent, Numbers}, #{send_oct := TotalOct, send_cnt := TotalCnt} = St) -> - #{cnt := Cnt, oct := Oct} = Numbers, - {noreply, St#{send_oct := TotalOct + Oct, send_cnt := TotalCnt + Cnt}}; -handle_cast(_Cast, St) -> - {noreply, St}. - -handle_info(_Info, St) -> - {noreply, St}. - -code_change(_OldVsn, St, _Extra) -> - {ok, St}. - -terminate(_Reason, _St) -> - ok. - -bump_counter(Key, Inc) -> - try _ = ets:update_counter(?ETS, Key, Inc, {Key, 0}), ok - catch _ : _ -> ok - end. - -get_counter(Key) -> - case ets:lookup(?ETS, Key) of - [] -> 0; - [{_, Value}] -> Value - end. - diff --git a/src/wolff_sup.erl b/src/wolff_sup.erl index b561702..74c9173 100644 --- a/src/wolff_sup.erl +++ b/src/wolff_sup.erl @@ -26,19 +26,10 @@ init([]) -> SupFlags = #{strategy => one_for_all, intensity => 10, period => 5}, - Children = [stats_worker(), client_sup(), producers_sup()], + Children = [client_sup(), producers_sup()], ets:new(?WOLFF_PRODUCERS_GLOBAL_TABLE, [named_table, public, ordered_set, {read_concurrency, true}]), {ok, {SupFlags, Children}}. -stats_worker() -> - #{id => wolff_stats, - start => {wolff_stats, start_link, []}, - restart => permanent, - shutdown => 2000, - type => worker, - modules => [wolff_stats] - }. - client_sup() -> #{id => wolff_client_sup, start => {wolff_client_sup, start_link, []}, diff --git a/test/wolff_bench.erl b/test/wolff_bench.erl index 75729ad..9139aa6 100644 --- a/test/wolff_bench.erl +++ b/test/wolff_bench.erl @@ -15,15 +15,13 @@ start(WorkersCnt) -> SendFun = fun(Msgs) -> {_, _} = wolff:send_sync(Producers, Msgs, timer:seconds(10)) end, - ok = spawn_workers(SendFun, WorkersCnt), - ok = spawn_reporter(ClientId, maps:size(Producers)). + ok = spawn_workers(SendFun, WorkersCnt). start_producers(Client) -> ProducerCfg = #{required_acks => all_isr, max_batch_bytes => 800*1000, max_linger_ms => 1000, - max_send_ahead => 100, - enable_global_stats => true + max_send_ahead => 100 }, wolff:start_producers(Client, ?TOPIC, ProducerCfg). @@ -41,17 +39,3 @@ worker_loop(SendFun) -> Msgs = [#{key => <>, value => Value} || I <- lists:seq(1,100)], SendFun(Msgs), worker_loop(SendFun). - -spawn_reporter(ClientId, Partitions) -> - _ = spawn_link(fun() -> reporter_loop(ClientId, Partitions, 0, 0) end), - ok. - -reporter_loop(ClientId, Partitions, LastCnt, LastOct) -> - IntervalSec = 5, - #{send_cnt := Cnt, send_oct := Oct} = wolff_stats:getstat(), - io:format("count=~p/s bytes=~p/s\n", [(Cnt - LastCnt) / IntervalSec, - (Oct - LastOct) / IntervalSec]), - timer:sleep(timer:seconds(IntervalSec)), - reporter_loop(ClientId, Partitions, Cnt, Oct). - - diff --git a/test/wolff_supervised_tests.erl b/test/wolff_supervised_tests.erl index df3fe2b..b4f60cb 100644 --- a/test/wolff_supervised_tests.erl +++ b/test/wolff_supervised_tests.erl @@ -25,16 +25,11 @@ supervised_client_test() -> {Partition, BaseOffset} = wolff:send_sync(Producers, [Msg], 3000), io:format(user, "\nmessage produced to partition ~p at offset ~p\n", [Partition, BaseOffset]), - ?assertMatch(#{send_oct := O, send_cnt := C} when O > 0 andalso C > 0, - wolff_stats:getstat()), - ?assertMatch(#{send_oct := O, send_cnt := C} when O > 0 andalso C > 0, - wolff_stats:getstat(ClientId, <<"test-topic">>, Partition)), ok = wolff:stop_producers(Producers), ok = wolff:stop_and_delete_supervised_client(ClientId), ?assertEqual([], supervisor:which_children(wolff_client_sup)), ok = application:stop(wolff), ?assertEqual(undefined, whereis(wolff_sup)), - ?assertEqual(undefined, whereis(wolff_stats)), assert_last_event_is_zero(queuing, CntrEventsTable), assert_last_event_is_zero(inflight, CntrEventsTable), [1] = get_telemetry_seq(CntrEventsTable, [wolff,success]), @@ -531,7 +526,6 @@ client_config() -> #{}. producer_config(Name) -> #{replayq_dir => "test-data", - enable_global_stats => true, name => Name }. diff --git a/test/wolff_tests.erl b/test/wolff_tests.erl index bce8e46..d394a04 100644 --- a/test/wolff_tests.erl +++ b/test/wolff_tests.erl @@ -491,49 +491,8 @@ replayq_offload_test() -> ets:delete(CntrEventsTable), deinstall_event_logging(?FUNCTION_NAME). -stats_test() -> - CntrEventsTable = ets:new(cntr_events, [public]), - install_event_logging(?FUNCTION_NAME, CntrEventsTable, false), - ClientId = <<"client-stats-test">>, - _ = application:stop(wolff), %% ensure stopped - {ok, _} = application:ensure_all_started(wolff), - ?assertMatch(#{send_oct := 0, send_cnt := 0, recv_cnt := 0, recv_oct := 0}, - wolff_stats:getstat()), - ?assertMatch(#{send_oct := 0, send_cnt := 0, recv_cnt := 0, recv_oct := 0}, - wolff_stats:getstat(ClientId, <<"nonexisting-topic">>, 0)), - ClientCfg = client_config(), - {ok, Client} = start_client(ClientId, ?HOSTS, ClientCfg), - ProducerCfg0 = producer_config(), - ProducerCfg = ProducerCfg0#{required_acks => leader_only}, - {ok, Producers} = wolff:start_producers(Client, <<"test-topic">>, ProducerCfg), - Msg = #{key => ?KEY, value => <<"value">>}, - {Partition, BaseOffset} = wolff:send_sync(Producers, [Msg], 3000), - io:format(user, "\nmessage produced to partition ~p at offset ~p\n", - [Partition, BaseOffset]), - ?assertMatch(#{send_oct := O, send_cnt := C, - recv_oct := O, recv_cnt := C} when O > 0 andalso C > 0, - wolff_stats:getstat()), - ?assertMatch(#{send_oct := O, send_cnt := C, - recv_oct := O, recv_cnt := C} when O > 0 andalso C > 0, - wolff_stats:getstat(ClientId, <<"test-topic">>, Partition)), - ok = wolff:stop_producers(Producers), - ok = stop_client(Client), - ok = application:stop(wolff), - ?assertEqual(undefined, whereis(wolff_sup)), - ?assertEqual(undefined, whereis(wolff_stats)), - ?assert_eq_optional_tail( - wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing])), - [0, 1, 0]), - ?assert_eq_optional_tail( - wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, inflight])), - [0, 1, 0]), - [1] = get_telemetry_seq(CntrEventsTable, [wolff, success]), - ets:delete(CntrEventsTable), - deinstall_event_logging(?FUNCTION_NAME), - ok. - check_connectivity_test() -> - ClientId = <<"client-stats-test">>, + ClientId = <<"client-connectivity-test">>, _ = application:stop(wolff), %% ensure stopped {ok, _} = application:ensure_all_started(wolff), ClientCfg = client_config(), @@ -551,7 +510,7 @@ check_connectivity_test() -> ok = application:stop(wolff). client_state_upgrade_test() -> - ClientId = <<"client-stats-test">>, + ClientId = <<"client-state-upgrade-test">>, _ = application:stop(wolff), %% ensure stopped {ok, _} = application:ensure_all_started(wolff), ClientCfg = client_config(), @@ -772,8 +731,7 @@ to_old_client_state(St0) -> client_config() -> #{}. producer_config() -> - #{replayq_dir => "test-data", - enable_global_stats => true}. + #{replayq_dir => "test-data"}. key(Name) -> iolist_to_binary(io_lib:format("~p/~p", [Name, calendar:local_time()])).