Refine query to accept fold functions
Need to be able to pass external fold functions into different queries, to work as a Riak backend
This commit is contained in:
parent
ac223ced68
commit
6684e8e1d3
6 changed files with 68 additions and 34 deletions
|
@ -336,24 +336,24 @@ handle_call({return_folder, FolderType}, _From, State) ->
|
||||||
bucket_stats(State, Bucket, ?RIAK_TAG),
|
bucket_stats(State, Bucket, ?RIAK_TAG),
|
||||||
State};
|
State};
|
||||||
{index_query,
|
{index_query,
|
||||||
Bucket,
|
Constraint,
|
||||||
{FoldKeysFun, Acc},
|
{FoldKeysFun, Acc},
|
||||||
{IdxField, StartValue, EndValue},
|
{IdxField, StartValue, EndValue},
|
||||||
{ReturnTerms, TermRegex}} ->
|
{ReturnTerms, TermRegex}} ->
|
||||||
{reply,
|
{reply,
|
||||||
index_query(State,
|
index_query(State,
|
||||||
Bucket,
|
Constraint,
|
||||||
{FoldKeysFun, Acc},
|
{FoldKeysFun, Acc},
|
||||||
{IdxField, StartValue, EndValue},
|
{IdxField, StartValue, EndValue},
|
||||||
{ReturnTerms, TermRegex}),
|
{ReturnTerms, TermRegex}),
|
||||||
State};
|
State};
|
||||||
{keylist, Tag} ->
|
{keylist, Tag, {FoldKeysFun, Acc}} ->
|
||||||
{reply,
|
{reply,
|
||||||
allkey_query(State, Tag),
|
allkey_query(State, Tag, {FoldKeysFun, Acc}),
|
||||||
State};
|
State};
|
||||||
{keylist, Tag, Bucket} ->
|
{keylist, Tag, Bucket, {FoldKeysFun, Acc}} ->
|
||||||
{reply,
|
{reply,
|
||||||
bucketkey_query(State, Tag, Bucket),
|
bucketkey_query(State, Tag, Bucket, {FoldKeysFun, Acc}),
|
||||||
State};
|
State};
|
||||||
{hashtree_query, Tag, JournalCheck} ->
|
{hashtree_query, Tag, JournalCheck} ->
|
||||||
{reply,
|
{reply,
|
||||||
|
@ -431,21 +431,34 @@ bucket_stats(State, Bucket, Tag) ->
|
||||||
{async, Folder}.
|
{async, Folder}.
|
||||||
|
|
||||||
index_query(State,
|
index_query(State,
|
||||||
Bucket,
|
Constraint,
|
||||||
{FoldKeysFun, InitAcc},
|
{FoldKeysFun, InitAcc},
|
||||||
{IdxField, StartValue, EndValue},
|
{IdxField, StartValue, EndValue},
|
||||||
{ReturnTerms, TermRegex}) ->
|
{ReturnTerms, TermRegex}) ->
|
||||||
{ok,
|
{ok,
|
||||||
{LedgerSnapshot, LedgerCache},
|
{LedgerSnapshot, LedgerCache},
|
||||||
_JournalSnapshot} = snapshot_store(State, ledger),
|
_JournalSnapshot} = snapshot_store(State, ledger),
|
||||||
|
{Bucket, StartObjKey} =
|
||||||
|
case Constraint of
|
||||||
|
{B, SK} ->
|
||||||
|
{B, SK};
|
||||||
|
B ->
|
||||||
|
{B, null}
|
||||||
|
end,
|
||||||
Folder = fun() ->
|
Folder = fun() ->
|
||||||
leveled_log:log("B0004", [gb_trees:size(LedgerCache)]),
|
leveled_log:log("B0004", [gb_trees:size(LedgerCache)]),
|
||||||
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
|
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
|
||||||
LedgerCache),
|
LedgerCache),
|
||||||
StartKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG,
|
StartKey = leveled_codec:to_ledgerkey(Bucket,
|
||||||
IdxField, StartValue),
|
StartObjKey,
|
||||||
EndKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG,
|
?IDX_TAG,
|
||||||
IdxField, EndValue),
|
IdxField,
|
||||||
|
StartValue),
|
||||||
|
EndKey = leveled_codec:to_ledgerkey(Bucket,
|
||||||
|
null,
|
||||||
|
?IDX_TAG,
|
||||||
|
IdxField,
|
||||||
|
EndValue),
|
||||||
AddFun = case ReturnTerms of
|
AddFun = case ReturnTerms of
|
||||||
true ->
|
true ->
|
||||||
fun add_terms/2;
|
fun add_terms/2;
|
||||||
|
@ -542,7 +555,7 @@ foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun) ->
|
||||||
{async, Folder}.
|
{async, Folder}.
|
||||||
|
|
||||||
|
|
||||||
bucketkey_query(State, Tag, Bucket) ->
|
bucketkey_query(State, Tag, Bucket, {FoldKeysFun, InitAcc}) ->
|
||||||
{ok,
|
{ok,
|
||||||
{LedgerSnapshot, LedgerCache},
|
{LedgerSnapshot, LedgerCache},
|
||||||
_JournalSnapshot} = snapshot_store(State, ledger),
|
_JournalSnapshot} = snapshot_store(State, ledger),
|
||||||
|
@ -552,19 +565,19 @@ bucketkey_query(State, Tag, Bucket) ->
|
||||||
LedgerCache),
|
LedgerCache),
|
||||||
SK = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
SK = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
||||||
EK = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
EK = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
||||||
AccFun = accumulate_keys(),
|
AccFun = accumulate_keys(FoldKeysFun),
|
||||||
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
||||||
SK,
|
SK,
|
||||||
EK,
|
EK,
|
||||||
AccFun,
|
AccFun,
|
||||||
[]),
|
InitAcc),
|
||||||
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
||||||
lists:reverse(Acc)
|
lists:reverse(Acc)
|
||||||
end,
|
end,
|
||||||
{async, Folder}.
|
{async, Folder}.
|
||||||
|
|
||||||
allkey_query(State, Tag) ->
|
allkey_query(State, Tag, {FoldKeysFun, InitAcc}) ->
|
||||||
bucketkey_query(State, Tag, null).
|
bucketkey_query(State, Tag, null, {FoldKeysFun, InitAcc}).
|
||||||
|
|
||||||
|
|
||||||
snapshot_store(State, SnapType) ->
|
snapshot_store(State, SnapType) ->
|
||||||
|
@ -715,14 +728,15 @@ check_presence(Key, Value, InkerClone) ->
|
||||||
false
|
false
|
||||||
end.
|
end.
|
||||||
|
|
||||||
accumulate_keys() ->
|
accumulate_keys(FoldKeysFun) ->
|
||||||
Now = leveled_codec:integer_now(),
|
Now = leveled_codec:integer_now(),
|
||||||
AccFun = fun(Key, Value, KeyList) ->
|
AccFun = fun(Key, Value, Acc) ->
|
||||||
case leveled_codec:is_active(Key, Value, Now) of
|
case leveled_codec:is_active(Key, Value, Now) of
|
||||||
true ->
|
true ->
|
||||||
[leveled_codec:from_ledgerkey(Key)|KeyList];
|
{B, K} = leveled_codec:from_ledgerkey(Key),
|
||||||
|
FoldKeysFun(B, K, Acc);
|
||||||
false ->
|
false ->
|
||||||
KeyList
|
Acc
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
AccFun.
|
AccFun.
|
||||||
|
|
|
@ -69,8 +69,9 @@
|
||||||
%% https://github.com/afiskon/erlang-uuid-v4/blob/master/src/uuid.erl
|
%% https://github.com/afiskon/erlang-uuid-v4/blob/master/src/uuid.erl
|
||||||
generate_uuid() ->
|
generate_uuid() ->
|
||||||
<<A:32, B:16, C:16, D:16, E:48>> = crypto:rand_bytes(16),
|
<<A:32, B:16, C:16, D:16, E:48>> = crypto:rand_bytes(16),
|
||||||
io_lib:format("~8.16.0b-~4.16.0b-4~3.16.0b-~4.16.0b-~12.16.0b",
|
L = io_lib:format("~8.16.0b-~4.16.0b-4~3.16.0b-~4.16.0b-~12.16.0b",
|
||||||
[A, B, C band 16#0fff, D band 16#3fff bor 16#8000, E]).
|
[A, B, C band 16#0fff, D band 16#3fff bor 16#8000, E]),
|
||||||
|
binary_to_list(list_to_binary(L)).
|
||||||
|
|
||||||
inker_reload_strategy(AltList) ->
|
inker_reload_strategy(AltList) ->
|
||||||
ReloadStrategy0 = [{?RIAK_TAG, retain}, {?STD_TAG, retain}],
|
ReloadStrategy0 = [{?RIAK_TAG, retain}, {?STD_TAG, retain}],
|
||||||
|
|
|
@ -459,7 +459,8 @@ space_clear_ondelete(_Config) ->
|
||||||
no_check,
|
no_check,
|
||||||
G2),
|
G2),
|
||||||
|
|
||||||
{async, F1} = leveled_bookie:book_returnfolder(Book1, {keylist, o_rkv}),
|
AllKeyQuery = {keylist, o_rkv, {fun testutil:foldkeysfun/3, []}},
|
||||||
|
{async, F1} = leveled_bookie:book_returnfolder(Book1, AllKeyQuery),
|
||||||
SW1 = os:timestamp(),
|
SW1 = os:timestamp(),
|
||||||
KL1 = F1(),
|
KL1 = F1(),
|
||||||
ok = case length(KL1) of
|
ok = case length(KL1) of
|
||||||
|
@ -525,7 +526,7 @@ space_clear_ondelete(_Config) ->
|
||||||
"after deletes~n",
|
"after deletes~n",
|
||||||
[PointB_Journals, length(FNsB_L)]),
|
[PointB_Journals, length(FNsB_L)]),
|
||||||
|
|
||||||
{async, F2} = leveled_bookie:book_returnfolder(Book1, {keylist, o_rkv}),
|
{async, F2} = leveled_bookie:book_returnfolder(Book1, AllKeyQuery),
|
||||||
SW3 = os:timestamp(),
|
SW3 = os:timestamp(),
|
||||||
KL2 = F2(),
|
KL2 = F2(),
|
||||||
ok = case length(KL2) of
|
ok = case length(KL2) of
|
||||||
|
@ -537,7 +538,7 @@ space_clear_ondelete(_Config) ->
|
||||||
ok = leveled_bookie:book_close(Book1),
|
ok = leveled_bookie:book_close(Book1),
|
||||||
|
|
||||||
{ok, Book2} = leveled_bookie:book_start(StartOpts1),
|
{ok, Book2} = leveled_bookie:book_start(StartOpts1),
|
||||||
{async, F3} = leveled_bookie:book_returnfolder(Book2, {keylist, o_rkv}),
|
{async, F3} = leveled_bookie:book_returnfolder(Book2, AllKeyQuery),
|
||||||
SW4 = os:timestamp(),
|
SW4 = os:timestamp(),
|
||||||
KL3 = F3(),
|
KL3 = F3(),
|
||||||
ok = case length(KL3) of
|
ok = case length(KL3) of
|
||||||
|
|
|
@ -11,9 +11,9 @@
|
||||||
rotating_objects/1]).
|
rotating_objects/1]).
|
||||||
|
|
||||||
all() -> [
|
all() -> [
|
||||||
small_load_with2i,
|
small_load_with2i %,
|
||||||
query_count,
|
% query_count,
|
||||||
rotating_objects
|
% rotating_objects
|
||||||
].
|
].
|
||||||
|
|
||||||
|
|
||||||
|
@ -40,6 +40,26 @@ small_load_with2i(_Config) ->
|
||||||
testutil:check_forlist(Bookie1, ChkList1),
|
testutil:check_forlist(Bookie1, ChkList1),
|
||||||
testutil:check_forobject(Bookie1, TestObject),
|
testutil:check_forobject(Bookie1, TestObject),
|
||||||
|
|
||||||
|
% Find all keys index, and then just the last key
|
||||||
|
IdxQ1 = {index_query,
|
||||||
|
"Bucket",
|
||||||
|
{fun testutil:foldkeysfun/3, []},
|
||||||
|
{"idx1_bin", "#", "~"},
|
||||||
|
{true, undefined}},
|
||||||
|
{async, IdxFolder} = leveled_bookie:book_returnfolder(Bookie1, IdxQ1),
|
||||||
|
KeyList1 = lists:usort(IdxFolder()),
|
||||||
|
true = 10000 == length(KeyList1),
|
||||||
|
{LastTerm, LastKey} = lists:last(KeyList1),
|
||||||
|
IdxQ2 = {index_query,
|
||||||
|
{"Bucket", LastKey},
|
||||||
|
{fun testutil:foldkeysfun/3, []},
|
||||||
|
{"idx1_bin", LastTerm, "~"},
|
||||||
|
{false, undefined}},
|
||||||
|
{async, IdxFolderLK} = leveled_bookie:book_returnfolder(Bookie1, IdxQ2),
|
||||||
|
KeyList2 = lists:usort(IdxFolderLK()),
|
||||||
|
io:format("List should be last key ~w ~w~n", [LastKey, KeyList2]),
|
||||||
|
true = 1 == length(KeyList2),
|
||||||
|
|
||||||
%% Delete the objects from the ChkList removing the indexes
|
%% Delete the objects from the ChkList removing the indexes
|
||||||
lists:foreach(fun({_RN, Obj, Spc}) ->
|
lists:foreach(fun({_RN, Obj, Spc}) ->
|
||||||
DSpc = lists:map(fun({add, F, T}) -> {remove, F, T}
|
DSpc = lists:map(fun({add, F, T}) -> {remove, F, T}
|
||||||
|
|
|
@ -105,8 +105,8 @@ aae_bustedjournal(_Config) ->
|
||||||
testutil:corrupt_journal(RootPath, HeadF, 1000, 2048, 1000),
|
testutil:corrupt_journal(RootPath, HeadF, 1000, 2048, 1000),
|
||||||
{ok, Bookie2} = leveled_bookie:book_start(StartOpts),
|
{ok, Bookie2} = leveled_bookie:book_start(StartOpts),
|
||||||
|
|
||||||
{async, KeyF} = leveled_bookie:book_returnfolder(Bookie2,
|
AllKeyQuery = {keylist, o_rkv, {fun testutil:foldkeysfun/3, []}},
|
||||||
{keylist, ?RIAK_TAG}),
|
{async, KeyF} = leveled_bookie:book_returnfolder(Bookie2, AllKeyQuery),
|
||||||
KeyList = KeyF(),
|
KeyList = KeyF(),
|
||||||
20001 = length(KeyList),
|
20001 = length(KeyList),
|
||||||
HeadCount = lists:foldl(fun({B, K}, Acc) ->
|
HeadCount = lists:foldl(fun({B, K}, Acc) ->
|
||||||
|
|
|
@ -432,10 +432,8 @@ rotating_object_check(RootPath, B, NumberOfObjects) ->
|
||||||
ok = testutil:check_indexed_objects(Book2, B, KSpcL3, V3),
|
ok = testutil:check_indexed_objects(Book2, B, KSpcL3, V3),
|
||||||
{KSpcL4, V4} = testutil:put_altered_indexed_objects(Book2, B, KSpcL3),
|
{KSpcL4, V4} = testutil:put_altered_indexed_objects(Book2, B, KSpcL3),
|
||||||
ok = testutil:check_indexed_objects(Book2, B, KSpcL4, V4),
|
ok = testutil:check_indexed_objects(Book2, B, KSpcL4, V4),
|
||||||
{async, BList} = leveled_bookie:book_returnfolder(Book2,
|
Query = {keylist, ?RIAK_TAG, B, {fun foldkeysfun/3, []}},
|
||||||
{keylist,
|
{async, BList} = leveled_bookie:book_returnfolder(Book2, Query),
|
||||||
?RIAK_TAG,
|
|
||||||
B}),
|
|
||||||
true = NumberOfObjects == length(BList()),
|
true = NumberOfObjects == length(BList()),
|
||||||
ok = leveled_bookie:book_close(Book2),
|
ok = leveled_bookie:book_close(Book2),
|
||||||
ok.
|
ok.
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue