Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: List all known keys #448

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/antidote.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@
get_objects/3,
get_log_operations/1,
get_default_txn_properties/0,
get_txn_property/2
get_txn_property/2,
all_keys/1
]).

%% Public API
Expand Down Expand Up @@ -102,6 +103,15 @@ delete_object({_Key, _Type, _Bucket}) ->
%% TODO: Object deletion is not currently supported
{error, operation_not_supported}.


%% returns all known keys at snapshot time
-spec all_keys(snapshot_time()) -> {ok, [bound_object()]} | {error, term()}.
all_keys(ignore) ->
{ok, logging_vnode:all_keys(ignore)};
all_keys(_SnapshotTime) ->
%% TODO: filter time
{error, operation_not_supported}.

%% Register a post commit hook.
%% Module:Function({Key, Type, Op}) will be executed after successful commit of
%% each transaction that updates Key.
Expand Down
71 changes: 70 additions & 1 deletion src/logging_vnode.erl
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@
get_range/6,
get_all/4,
request_bucket_op_id/4,
request_op_id/3]).
request_op_id/3,
all_keys/1
]).

-export([init/1,
terminate/2,
Expand Down Expand Up @@ -222,6 +224,21 @@ get_all(IndexNode, LogId, Continuation, PrevOps) ->
?LOGGING_MASTER,
infinity).


%% @doc Traverses the log and returns all known bound objects
-spec all_keys(ignore) -> [bound_object()].
all_keys(ignore) ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
lists:foldl(fun (IndexAndNode, Responses) ->
{keys, KeysOfIndex} =
riak_core_vnode_master:sync_spawn_command(IndexAndNode,
{list_keys, ignore},
?LOGGING_MASTER),
Responses ++ KeysOfIndex
end,
[],
riak_core_ring:all_owners(Ring)).

%% @doc Gets the last id of operations stored in the log for the given DCID
-spec request_op_id(index_node(), dcid(), partition()) -> {ok, non_neg_integer()}.
request_op_id(IndexNode, DCID, Partition) ->
Expand Down Expand Up @@ -565,6 +582,18 @@ handle_command({get_all, LogId, Continuation, Ops}, _Sender,
{reply, {error, Reason}, State}
end;

handle_command({list_keys, ignore}, _Sender,
#state{logs_map = Map} = State) ->

FoldKeysFun = fun(_, Log, Acc) ->
ok = disk_log:sync(Log),
get_all_keys_from_log(Log, start, ignore) ++ Acc
end,

Keys = dict:fold(FoldKeysFun, [], Map),
KeysU = sets:to_list(sets:from_list(Keys)),
{reply, {keys, KeysU}, State};

handle_command(_Message, _Sender, State) ->
{noreply, State}.

Expand Down Expand Up @@ -675,6 +704,28 @@ get_last_op_from_log(Log, Continuation, ClockTable, PrevMaxVector) ->
end
end.

%% Gets the id of the last operation that was put in the log
%% and the maximum vectorclock of the committed transactions stored in the log
%%-spec get_all_keys_from_log
get_all_keys_from_log(Log, Continuation, ignore) ->
ok = disk_log:sync(Log),
case disk_log:chunk(Log, Continuation) of
eof ->
[];
{error, Reason} ->
{error, Reason};
{NewContinuation, NewTerms} ->
AllKeys = filter_all_keys(NewTerms, ignore),
AllKeys ++ get_all_keys_from_log(Log, NewContinuation, ignore);
{NewContinuation, NewTerms, BadBytes} ->
case BadBytes > 0 of
true -> {error, bad_bytes};
false ->
AllKeys = filter_all_keys(NewTerms, ignore),
AllKeys ++ get_all_keys_from_log(Log, NewContinuation, ignore)
end
end.

%% This is called when the vnode starts and loads into the cache
%% the id of the last operation appended to the log, so that new ops will
%% be assigned correct ids (after crash and restart)
Expand Down Expand Up @@ -791,6 +842,24 @@ filter_terms_for_key([{_, LogRecord}|T], Key, MinSnapshotTime, MaxSnapshotTime,
filter_terms_for_key(T, Key, MinSnapshotTime, MaxSnapshotTime, Ops, CommittedOpsDict)
end.

filter_all_keys([], ignore) ->
[];
filter_all_keys([{_, LogRecord} | T], ignore) ->
#log_record{log_operation = LogOperation} = log_utilities:check_log_record_version(LogRecord),
#log_operation{op_type = OpType, log_payload = OpPayload} = LogOperation,
case OpType of
update ->
#update_log_payload{key = {Key1, Bucket}, type = Type} = OpPayload,
%% bucket is undefined?
%% #update_log_payload{key = {Key1, bucket = Bucket, type = Type} = OpPayload,
[{Key1, Type, Bucket}] ++ filter_all_keys(T, ignore);
%% todo include commit? only commit?
%% commit ->
%% handle_commit(TxId, OpPayload, T, Key, MinSnapshotTime, MaxSnapshotTime, Ops, CommittedOpsDict);
_ ->
filter_all_keys(T, ignore)
end.

-spec handle_update(txid(), update_log_payload(), [{non_neg_integer(), log_record()}], key() | undefined, snapshot_time() | undefined,
snapshot_time() | undefined, dict:dict(txid(), [any_log_payload()]), dict:dict(key(), [clocksi_payload()])) ->
{dict:dict(txid(), [any_log_payload()]), dict:dict(key(), [clocksi_payload()])}.
Expand Down
53 changes: 51 additions & 2 deletions test/multidc/antidote_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
random_test/1,
shard_count/1,
dc_count/1,
meta_data_env_test/1
meta_data_env_test/1,
list_known_keys/1
]).

-include_lib("common_test/include/ct.hrl").
Expand Down Expand Up @@ -76,7 +77,8 @@ all() ->
dc_count,
dummy_test,
random_test,
meta_data_env_test
meta_data_env_test,
list_known_keys
].

%% Tests that add_nodes_to_dc is idempotent
Expand Down Expand Up @@ -202,3 +204,50 @@ meta_data_env_test(Config) ->
lists:foreach(fun(Node) ->
time_utils:wait_until(fun() -> Value = rpc:call(Node, logging_vnode, is_sync_log, []), Value == OldValue end)
end, DC).

list_known_keys(Config) ->
Bucket = ?BUCKET,
[[Node, _Node2] | _] = proplists:get_value(clusters, Config),
Type = antidote_crdt_counter_pn,
Keys = [antidote_int_m1, antidote_int_m2, antidote_int_m3, antidote_int_m4],
IncValues = [1, 2, 3, 4],
Objects = lists:map(fun(Key) -> {Key, Type, Bucket} end, Keys),
Updates = lists:map(fun({Object, IncVal}) ->
{Object, increment, IncVal}
end, lists:zip(Objects, IncValues)),
{ok, TxId} = rpc:call(Node, antidote, start_transaction, [ignore, []]),
%% update objects one by one.
txn_seq_update_check(Node, TxId, Updates),
%% read objects one by one
txn_seq_read_check(Node, TxId, Objects, [1, 2, 3, 4]),
{ok, Clock} = rpc:call(Node, antidote, commit_transaction, [TxId]),

{ok, TxId2} = rpc:call(Node, antidote, start_transaction, [Clock, []]),
%% read objects all at once
{ok, Res} = rpc:call(Node, antidote, read_objects, [Objects, TxId2]),
{ok, _} = rpc:call(Node, antidote, commit_transaction, [TxId2]),
?assertEqual([1, 2, 3, 4], Res),

%% list known keys
%% these keys will contain all keys from the previous tests in this suite, too
{ok, AllKeys} = rpc:call(Node, antidote, all_keys, [ignore]),
ct:pal("Keys: ~p", [AllKeys]),
%% at least 4 keys
?assertEqual(true, length(AllKeys) >= 4),
%% be able to find all keys
lists:foreach(fun(K) ->
true = lists:member({K, Type, Bucket}, AllKeys)
end, Keys).

txn_seq_read_check(Node, TxId, Objects, ExpectedValues) ->
lists:map(fun({Object, Expected}) ->
{ok, [Val]} = rpc:call(Node, antidote, read_objects, [[Object], TxId]),
?assertEqual(Expected, Val)
end, lists:zip(Objects, ExpectedValues)).


txn_seq_update_check(Node, TxId, Updates) ->
lists:map(fun(Update) ->
Res = rpc:call(Node, antidote, update_objects, [[Update], TxId]),
?assertMatch(ok, Res)
end, Updates).
40 changes: 38 additions & 2 deletions test/singledc/antidote_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
static_txn_multi_objects/1,
static_txn_multi_objects_clock/1,
interactive_txn/1,
interactive_txn_abort/1
interactive_txn_abort/1,
list_known_keys/1
]).

-include_lib("common_test/include/ct.hrl").
Expand Down Expand Up @@ -80,7 +81,8 @@ all() ->
static_txn_multi_objects,
static_txn_multi_objects_clock,
interactive_txn,
interactive_txn_abort
interactive_txn_abort,
list_known_keys
].


Expand Down Expand Up @@ -200,6 +202,40 @@ interactive_txn_abort(Config) ->
?assertEqual([0], Res). % prev txn is aborted so read returns 0


list_known_keys(Config) ->
Bucket = ?BUCKET,
Node = proplists:get_value(node, Config),
Type = antidote_crdt_counter_pn,
Keys = [antidote_int_m1, antidote_int_m2, antidote_int_m3, antidote_int_m4],
IncValues = [1, 2, 3, 4],
Objects = lists:map(fun(Key) -> {Key, Type, Bucket} end, Keys),
Updates = lists:map(fun({Object, IncVal}) ->
{Object, increment, IncVal}
end, lists:zip(Objects, IncValues)),
{ok, TxId} = rpc:call(Node, antidote, start_transaction, [ignore, []]),
%% update objects one by one.
txn_seq_update_check(Node, TxId, Updates),
%% read objects one by one
txn_seq_read_check(Node, TxId, Objects, [1, 2, 3, 4]),
{ok, Clock} = rpc:call(Node, antidote, commit_transaction, [TxId]),

{ok, TxId2} = rpc:call(Node, antidote, start_transaction, [Clock, []]),
%% read objects all at once
{ok, Res} = rpc:call(Node, antidote, read_objects, [Objects, TxId2]),
{ok, _} = rpc:call(Node, antidote, commit_transaction, [TxId2]),
?assertEqual([1, 2, 3, 4], Res),

%% list known keys
%% these keys will contain all keys from the previous tests in this suite, too
{ok, AllKeys} = rpc:call(Node, antidote, all_keys, [ignore]),
%% ct:pal("Keys: ~p", [AllKeys]),
%% at least 4 keys
?assertEqual(true, length(AllKeys) >= 4),
%% be able to find keys
BoundKey = {antidote_int_m1, Type, Bucket},
true = lists:member(BoundKey, AllKeys).


txn_seq_read_check(Node, TxId, Objects, ExpectedValues) ->
lists:map(fun({Object, Expected}) ->
{ok, [Val]} = rpc:call(Node, antidote, read_objects, [[Object], TxId]),
Expand Down