Iterator support
Add iterator support, used initially only for retrieving bucket statistics. The iterator is supported by exporting a function, and when the function is claled it will take a snapshot of the ledger, run the iterator and hten close the snapshot. This required a numbe rof underlying changes, in particular to get key comparison to work as "expected". The code had previously misunderstood how comparison worked between Erlang terms, and in particular did not account for tuple length being compared first by size of the tuple (and not just by each element in order).
This commit is contained in:
parent
d2cc07a9eb
commit
0a08867280
6 changed files with 762 additions and 202 deletions
|
@ -71,7 +71,7 @@
|
|||
%% The Bookie should generate a series of ledger key changes from this
|
||||
%% information, using a function passed in at startup. For Riak this will be
|
||||
%% of the form:
|
||||
%% {{o, Bucket, Key},
|
||||
%% {{o, Bucket, Key, SubKey|null},
|
||||
%% SQN,
|
||||
%% {Hash, Size, {Riak_Metadata}},
|
||||
%% {active, TS}|{tomb, TS}} or
|
||||
|
@ -136,6 +136,7 @@
|
|||
book_riakput/3,
|
||||
book_riakget/3,
|
||||
book_riakhead/3,
|
||||
book_returnfolder/2,
|
||||
book_snapshotstore/3,
|
||||
book_snapshotledger/3,
|
||||
book_compactjournal/2,
|
||||
|
@ -144,7 +145,11 @@
|
|||
strip_to_keyseqonly/1,
|
||||
strip_to_seqonly/1,
|
||||
strip_to_statusonly/1,
|
||||
striphead_to_details/1]).
|
||||
strip_to_keyseqstatusonly/1,
|
||||
striphead_to_details/1,
|
||||
key_compare/3,
|
||||
key_dominates/2,
|
||||
print_key/1]).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
|
@ -172,17 +177,20 @@ book_start(Opts) ->
|
|||
gen_server:start(?MODULE, [Opts], []).
|
||||
|
||||
book_riakput(Pid, Object, IndexSpecs) ->
|
||||
PrimaryKey = {o, Object#r_object.bucket, Object#r_object.key},
|
||||
PrimaryKey = {o, Object#r_object.bucket, Object#r_object.key, null},
|
||||
gen_server:call(Pid, {put, PrimaryKey, Object, IndexSpecs}, infinity).
|
||||
|
||||
book_riakget(Pid, Bucket, Key) ->
|
||||
PrimaryKey = {o, Bucket, Key},
|
||||
PrimaryKey = {o, Bucket, Key, null},
|
||||
gen_server:call(Pid, {get, PrimaryKey}, infinity).
|
||||
|
||||
book_riakhead(Pid, Bucket, Key) ->
|
||||
PrimaryKey = {o, Bucket, Key},
|
||||
PrimaryKey = {o, Bucket, Key, null},
|
||||
gen_server:call(Pid, {head, PrimaryKey}, infinity).
|
||||
|
||||
book_returnfolder(Pid, FolderType) ->
|
||||
gen_server:call(Pid, {return_folder, FolderType}, infinity).
|
||||
|
||||
book_snapshotstore(Pid, Requestor, Timeout) ->
|
||||
gen_server:call(Pid, {snapshot, Requestor, store, Timeout}, infinity).
|
||||
|
||||
|
@ -307,6 +315,15 @@ handle_call({snapshot, _Requestor, SnapType, _Timeout}, _From, State) ->
|
|||
null},
|
||||
State}
|
||||
end;
|
||||
handle_call({return_folder, FolderType}, _From, State) ->
|
||||
case FolderType of
|
||||
{bucket_stats, Bucket} ->
|
||||
{reply,
|
||||
bucket_stats(State#state.penciller,
|
||||
State#state.ledger_cache,
|
||||
Bucket),
|
||||
State}
|
||||
end;
|
||||
handle_call({compact_journal, Timeout}, _From, State) ->
|
||||
ok = leveled_inker:ink_compactjournal(State#state.inker,
|
||||
self(),
|
||||
|
@ -343,6 +360,26 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%%% Internal functions
|
||||
%%%============================================================================
|
||||
|
||||
bucket_stats(Penciller, LedgerCache, Bucket) ->
|
||||
Folder = fun() ->
|
||||
PCLopts = #penciller_options{start_snapshot=true,
|
||||
source_penciller=Penciller},
|
||||
{ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts),
|
||||
Increment = gb_trees:to_list(LedgerCache),
|
||||
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
|
||||
Increment),
|
||||
StartKey = {o, Bucket, null, null},
|
||||
EndKey = {o, Bucket, null, null},
|
||||
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
||||
StartKey,
|
||||
EndKey,
|
||||
fun accumulate_size/3,
|
||||
{0, 0}),
|
||||
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
||||
Acc
|
||||
end,
|
||||
{async, Folder}.
|
||||
|
||||
shutdown_wait([], _Inker) ->
|
||||
false;
|
||||
shutdown_wait([TopPause|Rest], Inker) ->
|
||||
|
@ -411,12 +448,30 @@ strip_to_keyonly({K, _V}) -> K.
|
|||
|
||||
strip_to_keyseqonly({K, {SeqN, _, _}}) -> {K, SeqN}.
|
||||
|
||||
strip_to_keyseqstatusonly({K, {SeqN, St, _MD}}) -> {K, SeqN, St}.
|
||||
|
||||
strip_to_statusonly({_, {_, St, _}}) -> St.
|
||||
|
||||
strip_to_seqonly({_, {SeqN, _, _}}) -> SeqN.
|
||||
|
||||
striphead_to_details({SeqN, St, MD}) -> {SeqN, St, MD}.
|
||||
|
||||
key_dominates(LeftKey, RightKey) ->
|
||||
case {LeftKey, RightKey} of
|
||||
{{LK, _LVAL}, {RK, _RVAL}} when LK < RK ->
|
||||
left_hand_first;
|
||||
{{LK, _LVAL}, {RK, _RVAL}} when RK < LK ->
|
||||
right_hand_first;
|
||||
{{LK, {LSN, _LST, _LMD}}, {RK, {RSN, _RST, _RMD}}}
|
||||
when LK == RK, LSN >= RSN ->
|
||||
left_hand_dominant;
|
||||
{{LK, {LSN, _LST, _LMD}}, {RK, {RSN, _RST, _RMD}}}
|
||||
when LK == RK, LSN < RSN ->
|
||||
right_hand_dominant
|
||||
end.
|
||||
|
||||
|
||||
|
||||
get_metadatas(#r_object{contents=Contents}) ->
|
||||
[Content#r_content.metadata || Content <- Contents].
|
||||
|
||||
|
@ -435,8 +490,13 @@ hash(Obj=#r_object{}) ->
|
|||
extract_metadata(Obj, Size) ->
|
||||
{get_metadatas(Obj), vclock(Obj), hash(Obj), Size}.
|
||||
|
||||
accumulate_size(_Key, Value, {Size, Count}) ->
|
||||
{_, _, MD} = Value,
|
||||
{_, _, _, ObjSize} = MD,
|
||||
{Size + ObjSize, Count + 1}.
|
||||
|
||||
build_metadata_object(PrimaryKey, Head) ->
|
||||
{o, Bucket, Key} = PrimaryKey,
|
||||
{o, Bucket, Key, null} = PrimaryKey,
|
||||
{MD, VC, _, _} = Head,
|
||||
Contents = lists:foldl(fun(X, Acc) -> Acc ++ [#r_content{metadata=X}] end,
|
||||
[],
|
||||
|
@ -453,12 +513,36 @@ convert_indexspecs(IndexSpecs, SQN, PrimaryKey) ->
|
|||
%% TODO: timestamps for delayed reaping
|
||||
{tomb, infinity}
|
||||
end,
|
||||
{o, B, K} = PrimaryKey,
|
||||
{{i, B, IndexField, IndexValue, K},
|
||||
{o, B, K, _SK} = PrimaryKey,
|
||||
{{i, B, {IndexField, IndexValue}, K},
|
||||
{SQN, Status, null}}
|
||||
end,
|
||||
IndexSpecs).
|
||||
|
||||
% Return a tuple of string to ease the printing of keys to logs
|
||||
print_key(Key) ->
|
||||
case Key of
|
||||
{o, B, K, _SK} ->
|
||||
{"Object", B, K};
|
||||
{i, B, {F, _V}, _K} ->
|
||||
{"Index", B, F}
|
||||
end.
|
||||
|
||||
% Compare a key against a query key, only comparing elements that are non-null
|
||||
% in the Query key
|
||||
key_compare(QueryKey, CheckingKey, gt) ->
|
||||
key_compare(QueryKey, CheckingKey, fun(X,Y) -> X > Y end);
|
||||
key_compare(QueryKey, CheckingKey, lt) ->
|
||||
key_compare(QueryKey, CheckingKey, fun(X,Y) -> X < Y end);
|
||||
key_compare({QK1, null, null, null}, {CK1, _, _, _}, CompareFun) ->
|
||||
CompareFun(QK1, CK1);
|
||||
key_compare({QK1, QK2, null, null}, {CK1, CK2, _, _}, CompareFun) ->
|
||||
CompareFun({QK1, QK2}, {CK1, CK2});
|
||||
key_compare({QK1, QK2, QK3, null}, {CK1, CK2, CK3, _}, CompareFun) ->
|
||||
CompareFun({QK1, QK2, QK3}, {CK1, CK2, CK3});
|
||||
key_compare(QueryKey, CheckingKey, CompareFun) ->
|
||||
CompareFun(QueryKey, CheckingKey).
|
||||
|
||||
|
||||
preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs) ->
|
||||
PrimaryChange = {PK,
|
||||
|
@ -628,12 +712,12 @@ indexspecs_test() ->
|
|||
IndexSpecs = [{add, "t1_int", 456},
|
||||
{add, "t1_bin", "adbc123"},
|
||||
{remove, "t1_bin", "abdc456"}],
|
||||
Changes = convert_indexspecs(IndexSpecs, 1, {o, "Bucket", "Key2"}),
|
||||
?assertMatch({{i, "Bucket", "t1_int", 456, "Key2"},
|
||||
Changes = convert_indexspecs(IndexSpecs, 1, {o, "Bucket", "Key2", null}),
|
||||
?assertMatch({{i, "Bucket", {"t1_int", 456}, "Key2"},
|
||||
{1, {active, infinity}, null}}, lists:nth(1, Changes)),
|
||||
?assertMatch({{i, "Bucket", "t1_bin", "adbc123", "Key2"},
|
||||
?assertMatch({{i, "Bucket", {"t1_bin", "adbc123"}, "Key2"},
|
||||
{1, {active, infinity}, null}}, lists:nth(2, Changes)),
|
||||
?assertMatch({{i, "Bucket", "t1_bin", "abdc456", "Key2"},
|
||||
?assertMatch({{i, "Bucket", {"t1_bin", "abdc456"}, "Key2"},
|
||||
{1, {tomb, infinity}, null}}, lists:nth(3, Changes)).
|
||||
|
||||
-endif.
|
Loading…
Add table
Add a link
Reference in a new issue