Split code for folders - introduce runner actor
Introduce a dedicated module for all the different fold types. Also simplify the list of folders by deprecating those folds that should eb achieveable by fold_heads/fold_objects type folds but with smarter functions. Makes sure that the fold functiosn also have better spec coverage, and are dialyzer checked.
This commit is contained in:
parent
d0b8e47f77
commit
bfaed921e6
7 changed files with 817 additions and 735 deletions
File diff suppressed because it is too large
Load diff
|
@ -839,7 +839,8 @@ filepath(CompactFilePath, NewSQN, compact_journal) ->
|
|||
|
||||
|
||||
initiate_penciller_snapshot(Bookie) ->
|
||||
{ok, LedgerSnap, _} = leveled_bookie:book_snapshotledger(Bookie, self(), undefined),
|
||||
{ok, LedgerSnap, _} =
|
||||
leveled_bookie:book_snapshot(Bookie, ledger, undefined, true),
|
||||
MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap),
|
||||
{LedgerSnap, MaxSQN}.
|
||||
|
||||
|
|
|
@ -1457,7 +1457,8 @@ simple_server_test() ->
|
|||
PCLr,
|
||||
null,
|
||||
ledger,
|
||||
undefined),
|
||||
undefined,
|
||||
false),
|
||||
|
||||
?assertMatch(Key1, pcl_fetch(PclSnap, {o,"Bucket0001", "Key0001", null})),
|
||||
?assertMatch(Key2, pcl_fetch(PclSnap, {o,"Bucket0002", "Key0002", null})),
|
||||
|
@ -1510,7 +1511,8 @@ simple_server_test() ->
|
|||
PCLr,
|
||||
null,
|
||||
ledger,
|
||||
undefined),
|
||||
undefined,
|
||||
false),
|
||||
|
||||
?assertMatch(false, pcl_checksequencenumber(PclSnap2,
|
||||
{o,
|
||||
|
|
543
src/leveled_runner.erl
Normal file
543
src/leveled_runner.erl
Normal file
|
@ -0,0 +1,543 @@
|
|||
%% -------- RUNNER ---------
|
||||
%%
|
||||
%% A bookie's runner would traditionally allow remote actors to place bets
|
||||
%% via the runner. In this case the runner will allow a remote actor to
|
||||
%% have query access to the ledger or journal. Runners provide a snapshot of
|
||||
%% the book for querying the backend.
|
||||
%%
|
||||
%% Runners implement the {async, Folder} within Riak backends - returning an
|
||||
%% {async, Runner}, where the Runner() can be called as a function. The
|
||||
%% Runner may make the snapshot at the point it is called, or the snapshot can
|
||||
%% be generated and encapsulated within the function (known as snap_prefold).
|
||||
%%
|
||||
%% Runners which view only the Ledger (the Penciller view of the state) may
|
||||
%% have a CheckPresence option - which causes the function to perform a basic
|
||||
%% check that the item is available in the Journal via the Inker as part of
|
||||
%% the fold. This may be useful for anti-entropy folds
|
||||
|
||||
|
||||
-module(leveled_runner).
|
||||
|
||||
-include("include/leveled.hrl").
|
||||
|
||||
-export([
|
||||
bucket_sizestats/3,
|
||||
binary_bucketlist/4,
|
||||
index_query/3,
|
||||
bucketkey_query/4,
|
||||
hashlist_query/3,
|
||||
tictactree/5,
|
||||
foldheads_allkeys/4,
|
||||
foldobjects_allkeys/3,
|
||||
foldheads_bybucket/4,
|
||||
foldobjects_bybucket/3,
|
||||
foldobjects_byindex/3
|
||||
]).
|
||||
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-define(CHECKJOURNAL_PROB, 0.2).
|
||||
|
||||
%%%============================================================================
|
||||
%%% External functions
|
||||
%%%============================================================================
|
||||
|
||||
|
||||
-spec bucket_sizestats(fun(), any(), atom()) -> {async, fun()}.
|
||||
%% @doc
|
||||
%% Fold over a bucket accumulating the count of objects and their total sizes
|
||||
bucket_sizestats(SnapFun, Bucket, Tag) ->
|
||||
StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
||||
EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
||||
AccFun = accumulate_size(),
|
||||
Runner =
|
||||
fun() ->
|
||||
{ok, LedgerSnap, _JournalSnap} = SnapFun(),
|
||||
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnap,
|
||||
StartKey,
|
||||
EndKey,
|
||||
AccFun,
|
||||
{0, 0}),
|
||||
ok = leveled_penciller:pcl_close(LedgerSnap),
|
||||
Acc
|
||||
end,
|
||||
{async, Runner}.
|
||||
|
||||
-spec binary_bucketlist(fun(), atom(), fun(), any()) -> {async, fun()}.
|
||||
%% @doc
|
||||
%% List buckets for tag, assuming bucket names are all binary type
|
||||
binary_bucketlist(SnapFun, Tag, FoldBucketsFun, InitAcc) ->
|
||||
Runner =
|
||||
fun() ->
|
||||
{ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(),
|
||||
BucketAcc = get_nextbucket(null, null, Tag, LedgerSnapshot, []),
|
||||
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
||||
lists:foldl(fun({B, _K}, Acc) -> FoldBucketsFun(B, Acc) end,
|
||||
InitAcc,
|
||||
BucketAcc)
|
||||
end,
|
||||
{async, Runner}.
|
||||
|
||||
-spec index_query(fun(), tuple(), tuple()) -> {async, fun()}.
|
||||
%% @doc
|
||||
%% Secondary index query
|
||||
index_query(SnapFun, {StartKey, EndKey, TermHandling}, FoldAccT) ->
|
||||
{FoldKeysFun, InitAcc} = FoldAccT,
|
||||
{ReturnTerms, TermRegex} = TermHandling,
|
||||
AddFun =
|
||||
case ReturnTerms of
|
||||
true ->
|
||||
fun add_terms/2;
|
||||
_ ->
|
||||
fun add_keys/2
|
||||
end,
|
||||
AccFun = accumulate_index(TermRegex, AddFun, FoldKeysFun),
|
||||
Runner =
|
||||
fun() ->
|
||||
{ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(),
|
||||
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
||||
StartKey,
|
||||
EndKey,
|
||||
AccFun,
|
||||
InitAcc),
|
||||
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
||||
Acc
|
||||
end,
|
||||
{async, Runner}.
|
||||
|
||||
-spec bucketkey_query(fun(), atom(), any(), tuple()) -> {async, fun()}.
|
||||
%% @doc
|
||||
%% Fold over all keys under tak (potentially restricted to a given bucket)
|
||||
bucketkey_query(SnapFun, Tag, Bucket, {FoldKeysFun, InitAcc}) ->
|
||||
SK = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
||||
EK = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
||||
AccFun = accumulate_keys(FoldKeysFun),
|
||||
Runner =
|
||||
fun() ->
|
||||
{ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(),
|
||||
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
||||
SK,
|
||||
EK,
|
||||
AccFun,
|
||||
InitAcc),
|
||||
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
||||
Acc
|
||||
end,
|
||||
{async, Runner}.
|
||||
|
||||
-spec hashlist_query(fun(), atom(), boolean()) -> {async, fun()}.
|
||||
%% @doc
|
||||
%% Fold pver the key accumulating the hashes
|
||||
hashlist_query(SnapFun, Tag, JournalCheck) ->
|
||||
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
||||
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
||||
Runner =
|
||||
fun() ->
|
||||
{ok, LedgerSnapshot, JournalSnapshot} = SnapFun(),
|
||||
AccFun = accumulate_hashes(JournalCheck, JournalSnapshot),
|
||||
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
||||
StartKey,
|
||||
EndKey,
|
||||
AccFun,
|
||||
[]),
|
||||
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
||||
case JournalCheck of
|
||||
false ->
|
||||
ok;
|
||||
true ->
|
||||
leveled_inker:ink_close(JournalSnapshot)
|
||||
end,
|
||||
Acc
|
||||
end,
|
||||
{async, Runner}.
|
||||
|
||||
-spec tictactree(fun(), {atom(), any(), tuple()}, boolean(), atom(), fun())
|
||||
-> {async, fun()}.
|
||||
%% @doc
|
||||
%% Return a merkle tree from the fold, directly accessing hashes cached in the
|
||||
%% metadata
|
||||
tictactree(SnapFun, {Tag, Bucket, Query}, JournalCheck, TreeSize, Filter) ->
|
||||
% Journal check can be used for object key folds to confirm that the
|
||||
% object is still indexed within the journal
|
||||
Tree = leveled_tictac:new_tree(temp, TreeSize),
|
||||
Runner =
|
||||
fun() ->
|
||||
{ok, LedgerSnap, JournalSnap} = SnapFun(),
|
||||
% The start key and end key will vary depending on whether the
|
||||
% fold is to fold over an index or a key range
|
||||
{StartKey, EndKey, ExtractFun} =
|
||||
case Tag of
|
||||
?IDX_TAG ->
|
||||
{IdxFld, StartIdx, EndIdx} = Query,
|
||||
KeyDefFun = fun leveled_codec:to_ledgerkey/5,
|
||||
{KeyDefFun(Bucket, null, ?IDX_TAG, IdxFld, StartIdx),
|
||||
KeyDefFun(Bucket, null, ?IDX_TAG, IdxFld, EndIdx),
|
||||
fun(K, T) -> {K, T} end};
|
||||
_ ->
|
||||
{StartOKey, EndOKey} = Query,
|
||||
{leveled_codec:to_ledgerkey(Bucket, StartOKey, Tag),
|
||||
leveled_codec:to_ledgerkey(Bucket, EndOKey, Tag),
|
||||
fun(K, H) -> {K, {is_hash, H}} end}
|
||||
end,
|
||||
AccFun =
|
||||
accumulate_tree(Filter, JournalCheck, JournalSnap, ExtractFun),
|
||||
Acc =
|
||||
leveled_penciller:pcl_fetchkeys(LedgerSnap,
|
||||
StartKey, EndKey,
|
||||
AccFun, Tree),
|
||||
|
||||
% Close down snapshot when complete so as not to hold removed
|
||||
% files open
|
||||
ok = leveled_penciller:pcl_close(LedgerSnap),
|
||||
case JournalCheck of
|
||||
false ->
|
||||
ok;
|
||||
true ->
|
||||
leveled_inker:ink_close(JournalSnap)
|
||||
end,
|
||||
Acc
|
||||
end,
|
||||
{async, Runner}.
|
||||
|
||||
-spec foldheads_allkeys(fun(), atom(), fun(), boolean()) -> {async, fun()}.
|
||||
%% @doc
|
||||
%% Fold over all heads in the store for a given tag - applying the passed
|
||||
%% function to each proxy object
|
||||
foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck) ->
|
||||
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
||||
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
||||
foldobjects(SnapFun, Tag, StartKey, EndKey, FoldFun, {true, JournalCheck}).
|
||||
|
||||
-spec foldobjects_allkeys(fun(), atom(), fun()) -> {async, fun()}.
|
||||
%% @doc
|
||||
%% Fold over all objects for a given tag
|
||||
foldobjects_allkeys(SnapFun, Tag, FoldFun) ->
|
||||
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
||||
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
||||
foldobjects(SnapFun, Tag, StartKey, EndKey, FoldFun, false).
|
||||
|
||||
-spec foldobjects_bybucket(fun(), {atom(), any(), any()}, fun()) ->
|
||||
{async, fun()}.
|
||||
%% @doc
|
||||
%% Fold over all objects within a given key range in a bucket
|
||||
foldobjects_bybucket(SnapFun, {Tag, StartKey, EndKey}, FoldFun) ->
|
||||
foldobjects(SnapFun, Tag, StartKey, EndKey, FoldFun, false).
|
||||
|
||||
-spec foldheads_bybucket(fun(), {atom(), any(), any()}, fun(), boolean()) ->
|
||||
{async, fun()}.
|
||||
%% @doc
|
||||
%% Fold over all object metadata within a given key range in a bucket
|
||||
foldheads_bybucket(SnapFun, {Tag, StartKey, EndKey}, FoldFun, JournalCheck) ->
|
||||
foldobjects(SnapFun, Tag, StartKey, EndKey, FoldFun, {true, JournalCheck}).
|
||||
|
||||
-spec foldobjects_byindex(fun(), tuple(), fun()) -> {async, fun()}.
|
||||
%% @doc
|
||||
%% Folds over an index, fetching the objects associated with the keys returned
|
||||
%% and passing those objects into the fold function
|
||||
foldobjects_byindex(SnapFun, {Tag, Bucket, Field, FromTerm, ToTerm}, FoldFun) ->
|
||||
StartKey =
|
||||
leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, FromTerm),
|
||||
EndKey =
|
||||
leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, ToTerm),
|
||||
foldobjects(SnapFun, Tag, StartKey, EndKey, FoldFun, false).
|
||||
|
||||
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
%%% Internal functions
|
||||
%%%============================================================================
|
||||
|
||||
get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList) ->
|
||||
Now = leveled_codec:integer_now(),
|
||||
StartKey = leveled_codec:to_ledgerkey(NextBucket, NextKey, Tag),
|
||||
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
||||
ExtractFun =
|
||||
fun(LK, V, _Acc) ->
|
||||
{leveled_codec:from_ledgerkey(LK), V}
|
||||
end,
|
||||
R = leveled_penciller:pcl_fetchnextkey(LedgerSnapshot,
|
||||
StartKey,
|
||||
EndKey,
|
||||
ExtractFun,
|
||||
null),
|
||||
case R of
|
||||
null ->
|
||||
leveled_log:log("B0008",[]),
|
||||
BKList;
|
||||
{{B, K}, V} when is_binary(B), is_binary(K) ->
|
||||
case leveled_codec:is_active({B, K}, V, Now) of
|
||||
true ->
|
||||
leveled_log:log("B0009",[B]),
|
||||
get_nextbucket(<<B/binary, 0>>,
|
||||
null,
|
||||
Tag,
|
||||
LedgerSnapshot,
|
||||
[{B, K}|BKList]);
|
||||
false ->
|
||||
get_nextbucket(B,
|
||||
<<K/binary, 0>>,
|
||||
Tag,
|
||||
LedgerSnapshot,
|
||||
BKList)
|
||||
end;
|
||||
{NB, _V} ->
|
||||
leveled_log:log("B0010",[NB]),
|
||||
[]
|
||||
end.
|
||||
|
||||
|
||||
-spec foldobjects(fun(), atom(), tuple(), tuple(), fun(),
|
||||
false|{true, boolean()}) ->
|
||||
{async, fun()}.
|
||||
%% @doc
|
||||
%% The object folder should be passed DeferredFetch.
|
||||
%% DeferredFetch can either be false (which will return to the fold function
|
||||
%% the full object), or {true, CheckPresence} - in which case a proxy object
|
||||
%% will be created that if understood by the fold function will allow the fold
|
||||
%% function to work on the head of the object, and defer fetching the body in
|
||||
%% case such a fetch is unecessary.
|
||||
foldobjects(SnapFun, Tag, StartKey, EndKey, FoldObjectsFun, DeferredFetch) ->
|
||||
{FoldFun, InitAcc} =
|
||||
case is_tuple(FoldObjectsFun) of
|
||||
true ->
|
||||
% FoldObjectsFun is already a tuple with a Fold function and an
|
||||
% initial accumulator
|
||||
FoldObjectsFun;
|
||||
false ->
|
||||
% no initial accumulatr passed, and so should be just a list
|
||||
{FoldObjectsFun, []}
|
||||
end,
|
||||
|
||||
Folder =
|
||||
fun() ->
|
||||
{ok, LedgerSnapshot, JournalSnapshot} = SnapFun(),
|
||||
|
||||
AccFun = accumulate_objects(FoldFun,
|
||||
JournalSnapshot,
|
||||
Tag,
|
||||
DeferredFetch),
|
||||
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
||||
StartKey,
|
||||
EndKey,
|
||||
AccFun,
|
||||
InitAcc),
|
||||
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
||||
case DeferredFetch of
|
||||
{true, false} ->
|
||||
ok;
|
||||
_ ->
|
||||
ok = leveled_inker:ink_close(JournalSnapshot)
|
||||
end,
|
||||
Acc
|
||||
end,
|
||||
{async, Folder}.
|
||||
|
||||
|
||||
accumulate_size() ->
|
||||
Now = leveled_codec:integer_now(),
|
||||
AccFun = fun(Key, Value, {Size, Count}) ->
|
||||
case leveled_codec:is_active(Key, Value, Now) of
|
||||
true ->
|
||||
{Size + leveled_codec:get_size(Key, Value),
|
||||
Count + 1};
|
||||
false ->
|
||||
{Size, Count}
|
||||
end
|
||||
end,
|
||||
AccFun.
|
||||
|
||||
accumulate_hashes(JournalCheck, InkerClone) ->
|
||||
AddKeyFun =
|
||||
fun(B, K, H, Acc) ->
|
||||
[{B, K, H}|Acc]
|
||||
end,
|
||||
get_hashaccumulator(JournalCheck,
|
||||
InkerClone,
|
||||
AddKeyFun).
|
||||
|
||||
accumulate_tree(FilterFun, JournalCheck, InkerClone, HashFun) ->
|
||||
AddKeyFun =
|
||||
fun(B, K, H, Tree) ->
|
||||
case FilterFun(B, K) of
|
||||
accumulate ->
|
||||
leveled_tictac:add_kv(Tree, K, H, HashFun, false);
|
||||
pass ->
|
||||
Tree
|
||||
end
|
||||
end,
|
||||
get_hashaccumulator(JournalCheck,
|
||||
InkerClone,
|
||||
AddKeyFun).
|
||||
|
||||
get_hashaccumulator(JournalCheck, InkerClone, AddKeyFun) ->
|
||||
Now = leveled_codec:integer_now(),
|
||||
AccFun =
|
||||
fun(LK, V, Acc) ->
|
||||
case leveled_codec:is_active(LK, V, Now) of
|
||||
true ->
|
||||
{B, K, H} = leveled_codec:get_keyandobjhash(LK, V),
|
||||
Check = leveled_rand:uniform() < ?CHECKJOURNAL_PROB,
|
||||
case {JournalCheck, Check} of
|
||||
{true, true} ->
|
||||
case check_presence(LK, V, InkerClone) of
|
||||
true ->
|
||||
AddKeyFun(B, K, H, Acc);
|
||||
false ->
|
||||
Acc
|
||||
end;
|
||||
_ ->
|
||||
AddKeyFun(B, K, H, Acc)
|
||||
end;
|
||||
false ->
|
||||
Acc
|
||||
end
|
||||
end,
|
||||
AccFun.
|
||||
|
||||
|
||||
accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
|
||||
Now = leveled_codec:integer_now(),
|
||||
AccFun =
|
||||
fun(LK, V, Acc) ->
|
||||
% The function takes the Ledger Key and the value from the
|
||||
% ledger (with the value being the object metadata)
|
||||
%
|
||||
% Need to check if this is an active object (so TTL has not
|
||||
% expired).
|
||||
% If this is a deferred_fetch (i.e. the fold is a fold_heads not
|
||||
% a fold_objects), then a metadata object needs to be built to be
|
||||
% returned - but a quick check that Key is present in the Journal
|
||||
% is made first
|
||||
case leveled_codec:is_active(LK, V, Now) of
|
||||
true ->
|
||||
{SQN, _St, _MH, MD} =
|
||||
leveled_codec:striphead_to_details(V),
|
||||
{B, K} =
|
||||
case leveled_codec:from_ledgerkey(LK) of
|
||||
{B0, K0} ->
|
||||
{B0, K0};
|
||||
{B0, K0, _T0} ->
|
||||
{B0, K0}
|
||||
end,
|
||||
JK = {leveled_codec:to_ledgerkey(B, K, Tag), SQN},
|
||||
case DeferredFetch of
|
||||
{true, true} ->
|
||||
InJournal =
|
||||
leveled_inker:ink_keycheck(InkerClone,
|
||||
LK,
|
||||
SQN),
|
||||
case InJournal of
|
||||
probably ->
|
||||
ProxyObj = make_proxy_object(LK, JK,
|
||||
MD, V,
|
||||
InkerClone),
|
||||
FoldObjectsFun(B, K, ProxyObj, Acc);
|
||||
missing ->
|
||||
Acc
|
||||
end;
|
||||
{true, false} ->
|
||||
ProxyObj = make_proxy_object(LK, JK,
|
||||
MD, V,
|
||||
InkerClone),
|
||||
FoldObjectsFun(B, K,ProxyObj, Acc);
|
||||
false ->
|
||||
R = leveled_bookie:fetch_value(InkerClone, JK),
|
||||
case R of
|
||||
not_present ->
|
||||
Acc;
|
||||
Value ->
|
||||
FoldObjectsFun(B, K, Value, Acc)
|
||||
|
||||
end
|
||||
end;
|
||||
false ->
|
||||
Acc
|
||||
end
|
||||
end,
|
||||
AccFun.
|
||||
|
||||
make_proxy_object(LK, JK, MD, V, InkerClone) ->
|
||||
Size = leveled_codec:get_size(LK, V),
|
||||
MDBin = leveled_codec:build_metadata_object(LK, MD),
|
||||
term_to_binary({proxy_object,
|
||||
MDBin,
|
||||
Size,
|
||||
{fun leveled_bookie:fetch_value/2, InkerClone, JK}}).
|
||||
|
||||
check_presence(Key, Value, InkerClone) ->
|
||||
{LedgerKey, SQN} = leveled_codec:strip_to_keyseqonly({Key, Value}),
|
||||
case leveled_inker:ink_keycheck(InkerClone, LedgerKey, SQN) of
|
||||
probably ->
|
||||
true;
|
||||
missing ->
|
||||
false
|
||||
end.
|
||||
|
||||
accumulate_keys(FoldKeysFun) ->
|
||||
Now = leveled_codec:integer_now(),
|
||||
AccFun = fun(Key, Value, Acc) ->
|
||||
case leveled_codec:is_active(Key, Value, Now) of
|
||||
true ->
|
||||
{B, K} = leveled_codec:from_ledgerkey(Key),
|
||||
FoldKeysFun(B, K, Acc);
|
||||
false ->
|
||||
Acc
|
||||
end
|
||||
end,
|
||||
AccFun.
|
||||
|
||||
add_keys(ObjKey, _IdxValue) ->
|
||||
ObjKey.
|
||||
|
||||
add_terms(ObjKey, IdxValue) ->
|
||||
{IdxValue, ObjKey}.
|
||||
|
||||
accumulate_index(TermRe, AddFun, FoldKeysFun) ->
|
||||
Now = leveled_codec:integer_now(),
|
||||
case TermRe of
|
||||
undefined ->
|
||||
fun(Key, Value, Acc) ->
|
||||
case leveled_codec:is_active(Key, Value, Now) of
|
||||
true ->
|
||||
{Bucket,
|
||||
ObjKey,
|
||||
IdxValue} = leveled_codec:from_ledgerkey(Key),
|
||||
FoldKeysFun(Bucket, AddFun(ObjKey, IdxValue), Acc);
|
||||
false ->
|
||||
Acc
|
||||
end end;
|
||||
TermRe ->
|
||||
fun(Key, Value, Acc) ->
|
||||
case leveled_codec:is_active(Key, Value, Now) of
|
||||
true ->
|
||||
{Bucket,
|
||||
ObjKey,
|
||||
IdxValue} = leveled_codec:from_ledgerkey(Key),
|
||||
case re:run(IdxValue, TermRe) of
|
||||
nomatch ->
|
||||
Acc;
|
||||
_ ->
|
||||
FoldKeysFun(Bucket,
|
||||
AddFun(ObjKey, IdxValue),
|
||||
Acc)
|
||||
end;
|
||||
false ->
|
||||
Acc
|
||||
end end
|
||||
end.
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
%%% Test
|
||||
%%%============================================================================
|
||||
|
||||
-ifdef(TEST).
|
||||
|
||||
|
||||
|
||||
-endif.
|
||||
|
||||
|
||||
|
|
@ -156,11 +156,8 @@ journal_compaction(_Config) ->
|
|||
|
||||
testutil:wait_for_compaction(Bookie1),
|
||||
% Start snapshot - should not stop deletions
|
||||
{ok,
|
||||
PclClone,
|
||||
InkClone} = leveled_bookie:book_snapshotstore(Bookie1,
|
||||
self(),
|
||||
300000),
|
||||
{ok, PclClone, InkClone} =
|
||||
leveled_bookie:book_snapshot(Bookie1, store, undefined, false),
|
||||
% Wait 2 seconds for files to be deleted
|
||||
WasteFP = RootPath ++ "/journal/journal_files/waste",
|
||||
lists:foldl(fun(X, Found) ->
|
||||
|
@ -500,7 +497,8 @@ space_clear_ondelete(_Config) ->
|
|||
{async, HTreeF1} = leveled_bookie:book_returnfolder(Book1,
|
||||
{foldobjects_allkeys,
|
||||
?RIAK_TAG,
|
||||
FoldObjectsFun}),
|
||||
FoldObjectsFun,
|
||||
false}),
|
||||
{async, KF1} = leveled_bookie:book_returnfolder(Book1, AllKeyQuery),
|
||||
|
||||
% Delete the keys
|
||||
|
@ -534,8 +532,8 @@ space_clear_ondelete(_Config) ->
|
|||
io:format("Waiting for journal deletes - blocked~n"),
|
||||
timer:sleep(20000),
|
||||
|
||||
% for this query snapshot is made at fold time - EDIT now uses a snapshot!
|
||||
true = length(HTreeF1()) == 80000,
|
||||
% for this query snapshot is made at fold time
|
||||
true = length(HTreeF1()) == 0,
|
||||
% This query uses a genuine async fold on a snasphot made at request time
|
||||
true = length(KF1()) == 80000,
|
||||
|
||||
|
|
|
@ -128,13 +128,16 @@ small_load_with2i(_Config) ->
|
|||
{async, HTreeF1} = leveled_bookie:book_returnfolder(Bookie1,
|
||||
{foldobjects_allkeys,
|
||||
?RIAK_TAG,
|
||||
FoldObjectsFun}),
|
||||
FoldObjectsFun,
|
||||
false}),
|
||||
KeyHashList1 = HTreeF1(),
|
||||
{async, HTreeF2} = leveled_bookie:book_returnfolder(Bookie1,
|
||||
{foldobjects_bybucket,
|
||||
?RIAK_TAG,
|
||||
"Bucket",
|
||||
FoldObjectsFun}),
|
||||
all,
|
||||
FoldObjectsFun,
|
||||
false}),
|
||||
KeyHashList2 = HTreeF2(),
|
||||
{async, HTreeF3} = leveled_bookie:book_returnfolder(Bookie1,
|
||||
{foldobjects_byindex,
|
||||
|
@ -142,7 +145,8 @@ small_load_with2i(_Config) ->
|
|||
"Bucket",
|
||||
{"idx1_bin",
|
||||
"#", "|"},
|
||||
FoldObjectsFun}),
|
||||
FoldObjectsFun,
|
||||
false}),
|
||||
KeyHashList3 = HTreeF3(),
|
||||
true = 9901 == length(KeyHashList1), % also includes the test object
|
||||
true = 9900 == length(KeyHashList2),
|
||||
|
@ -152,7 +156,8 @@ small_load_with2i(_Config) ->
|
|||
{I, _Bin} = testutil:get_value(Obj),
|
||||
Acc + I
|
||||
end,
|
||||
BucketObjQ = {foldobjects_bybucket, ?RIAK_TAG, "Bucket", {SumIntFun, 0}},
|
||||
BucketObjQ =
|
||||
{foldobjects_bybucket, ?RIAK_TAG, "Bucket", all, {SumIntFun, 0}, true},
|
||||
{async, Sum1} = leveled_bookie:book_returnfolder(Bookie1, BucketObjQ),
|
||||
Total1 = Sum1(),
|
||||
true = Total1 > 100000,
|
||||
|
|
|
@ -202,7 +202,7 @@ aae_bustedjournal(_Config) ->
|
|||
{async, HashTreeF2} = leveled_bookie:book_returnfolder(Bookie2,
|
||||
{hashlist_query,
|
||||
?RIAK_TAG,
|
||||
check_presence}),
|
||||
true}),
|
||||
KeyHashList2 = HashTreeF2(),
|
||||
% The file is still there, and the hashtree is not corrupted
|
||||
KeyHashList2 = KeyHashList1,
|
||||
|
@ -219,7 +219,8 @@ aae_bustedjournal(_Config) ->
|
|||
{async, HashTreeF3} = leveled_bookie:book_returnfolder(Bookie2,
|
||||
{foldobjects_allkeys,
|
||||
?RIAK_TAG,
|
||||
FoldObjectsFun}),
|
||||
FoldObjectsFun,
|
||||
false}),
|
||||
KeyHashList3 = HashTreeF3(),
|
||||
|
||||
true = length(KeyHashList3) > 19000,
|
||||
|
@ -241,7 +242,8 @@ aae_bustedjournal(_Config) ->
|
|||
{async, HashTreeF4} = leveled_bookie:book_returnfolder(Bookie3,
|
||||
{foldobjects_allkeys,
|
||||
?RIAK_TAG,
|
||||
FoldObjectsFun}),
|
||||
FoldObjectsFun,
|
||||
false}),
|
||||
KeyHashList4 = HashTreeF4(),
|
||||
|
||||
true = length(KeyHashList4) == 20001,
|
||||
|
@ -258,7 +260,8 @@ aae_bustedjournal(_Config) ->
|
|||
{async, HashTreeF5} = leveled_bookie:book_returnfolder(Bookie4,
|
||||
{foldobjects_allkeys,
|
||||
?RIAK_TAG,
|
||||
FoldObjectsFun}),
|
||||
FoldObjectsFun,
|
||||
false}),
|
||||
KeyHashList5 = HashTreeF5(),
|
||||
|
||||
true = length(KeyHashList5) > 19000,
|
||||
|
@ -274,7 +277,7 @@ aae_bustedjournal(_Config) ->
|
|||
{async, HashTreeF6} = leveled_bookie:book_returnfolder(Bookie4,
|
||||
{hashlist_query,
|
||||
?RIAK_TAG,
|
||||
check_presence}),
|
||||
true}),
|
||||
KeyHashList6 = HashTreeF6(),
|
||||
true = length(KeyHashList6) > 19000,
|
||||
true = length(KeyHashList6) < HeadCount,
|
||||
|
@ -289,7 +292,8 @@ aae_bustedjournal(_Config) ->
|
|||
{async, HashTreeF7} = leveled_bookie:book_returnfolder(Bookie5,
|
||||
{foldobjects_allkeys,
|
||||
?RIAK_TAG,
|
||||
FoldObjectsFun}),
|
||||
FoldObjectsFun,
|
||||
false}),
|
||||
KeyHashList7 = HashTreeF7(),
|
||||
|
||||
true = length(KeyHashList7) == 20001,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue