Add pre-filter for 2i queries

This commit is contained in:
martinsumner 2017-03-02 17:49:43 +00:00
parent 84d53295cc
commit b01f7d23df
2 changed files with 175 additions and 38 deletions

View file

@ -85,7 +85,7 @@
-record(ledger_cache, {mem :: ets:tab(), -record(ledger_cache, {mem :: ets:tab(),
loader = leveled_tree:empty(?CACHE_TYPE) :: tuple(), loader = leveled_tree:empty(?CACHE_TYPE) :: tuple(),
load_queue = [] :: list(), load_queue = [] :: list(),
index = leveled_pmem:new_index(), % array index = leveled_pmem:new_index(), % array or empty_index
min_sqn = infinity :: integer()|infinity, min_sqn = infinity :: integer()|infinity,
max_sqn = 0 :: integer()}). max_sqn = 0 :: integer()}).
@ -502,9 +502,17 @@ handle_call({return_folder, FolderType}, _From, State) ->
{FoldKeysFun, Acc}, {FoldKeysFun, Acc},
{IdxField, StartValue, EndValue}, {IdxField, StartValue, EndValue},
{ReturnTerms, TermRegex}} -> {ReturnTerms, TermRegex}} ->
{Bucket, StartObjKey} =
case Constraint of
{B, SK} ->
{B, SK};
B ->
{B, null}
end,
{reply, {reply,
index_query(State, index_query(State,
Constraint, Bucket,
StartObjKey,
{FoldKeysFun, Acc}, {FoldKeysFun, Acc},
{IdxField, StartValue, EndValue}, {IdxField, StartValue, EndValue},
{ReturnTerms, TermRegex}), {ReturnTerms, TermRegex}),
@ -579,6 +587,10 @@ code_change(_OldVsn, State, _Extra) ->
%%% External functions %%% External functions
%%%============================================================================ %%%============================================================================
%% @doc Load a snapshot of the penciller with the contents of the ledger cache
%% If the snapshot is to be loaded for a query then #ledger_cache.index may
%% be empty_index (As no need to update lookups), also #ledger_cache.loader
%% may also be empty_cache if there are no relevant results in the LedgerCache
load_snapshot(LedgerSnapshot, LedgerCache) -> load_snapshot(LedgerSnapshot, LedgerCache) ->
CacheToLoad = {LedgerCache#ledger_cache.loader, CacheToLoad = {LedgerCache#ledger_cache.loader,
LedgerCache#ledger_cache.index, LedgerCache#ledger_cache.index,
@ -586,9 +598,14 @@ load_snapshot(LedgerSnapshot, LedgerCache) ->
LedgerCache#ledger_cache.max_sqn}, LedgerCache#ledger_cache.max_sqn},
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, CacheToLoad). ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, CacheToLoad).
%% @doc Empty the ledger cache table following a push
empty_ledgercache() -> empty_ledgercache() ->
#ledger_cache{mem = ets:new(empty, [ordered_set])}. #ledger_cache{mem = ets:new(empty, [ordered_set])}.
%% @doc push the ledgercache to the Penciller - which should respond ok or
%% returned. If the response is ok the cache can be flushed, but if the
%% response is returned the cache should continue to build and it should try
%% to flush at a later date
push_ledgercache(Penciller, Cache) -> push_ledgercache(Penciller, Cache) ->
CacheToLoad = {Cache#ledger_cache.loader, CacheToLoad = {Cache#ledger_cache.loader,
Cache#ledger_cache.index, Cache#ledger_cache.index,
@ -596,6 +613,11 @@ push_ledgercache(Penciller, Cache) ->
Cache#ledger_cache.max_sqn}, Cache#ledger_cache.max_sqn},
leveled_penciller:pcl_pushmem(Penciller, CacheToLoad). leveled_penciller:pcl_pushmem(Penciller, CacheToLoad).
%% @doc the ledger cache can be built from a queue, for example when
%% loading the ledger from the head of the journal on startup
%%
%% The queue should be build using [NewKey|Acc] so that the most recent
%% key is kept in the sort
loadqueue_ledgercache(Cache) -> loadqueue_ledgercache(Cache) ->
SL = lists:ukeysort(1, Cache#ledger_cache.load_queue), SL = lists:ukeysort(1, Cache#ledger_cache.load_queue),
T = leveled_tree:from_orderedlist(SL, ?CACHE_TYPE), T = leveled_tree:from_orderedlist(SL, ?CACHE_TYPE),
@ -605,8 +627,6 @@ loadqueue_ledgercache(Cache) ->
%%% Internal functions %%% Internal functions
%%%============================================================================ %%%============================================================================
maybe_longrunning(SW, Aspect) -> maybe_longrunning(SW, Aspect) ->
case timer:now_diff(os:timestamp(), SW) of case timer:now_diff(os:timestamp(), SW) of
N when N > ?LONG_RUNNING -> N when N > ?LONG_RUNNING ->
@ -615,6 +635,8 @@ maybe_longrunning(SW, Aspect) ->
ok ok
end. end.
cache_size(empty_cache) ->
0;
cache_size(LedgerCache) -> cache_size(LedgerCache) ->
ets:info(LedgerCache#ledger_cache.mem, size). ets:info(LedgerCache#ledger_cache.mem, size).
@ -684,33 +706,27 @@ get_nextbucket(NextBucket, Tag, LedgerSnapshot, BKList) ->
index_query(State, index_query(State,
Constraint, Bucket,
StartObjKey,
{FoldKeysFun, InitAcc}, {FoldKeysFun, InitAcc},
{IdxField, StartValue, EndValue}, {IdxField, StartValue, EndValue},
{ReturnTerms, TermRegex}) -> {ReturnTerms, TermRegex}) ->
StartKey = leveled_codec:to_ledgerkey(Bucket,
StartObjKey,
?IDX_TAG,
IdxField,
StartValue),
EndKey = leveled_codec:to_ledgerkey(Bucket,
null,
?IDX_TAG,
IdxField,
EndValue),
{ok, {ok,
{LedgerSnapshot, LedgerCache}, {LedgerSnapshot, LedgerCache},
_JournalSnapshot} = snapshot_store(State, ledger), _JournalSnapshot} = snapshot_store(State, ledger, {StartKey, EndKey}),
{Bucket, StartObjKey} =
case Constraint of
{B, SK} ->
{B, SK};
B ->
{B, null}
end,
Folder = fun() -> Folder = fun() ->
leveled_log:log("B0004", [cache_size(LedgerCache)]), leveled_log:log("B0004", [cache_size(LedgerCache)]),
load_snapshot(LedgerSnapshot, LedgerCache), load_snapshot(LedgerSnapshot, LedgerCache),
StartKey = leveled_codec:to_ledgerkey(Bucket,
StartObjKey,
?IDX_TAG,
IdxField,
StartValue),
EndKey = leveled_codec:to_ledgerkey(Bucket,
null,
?IDX_TAG,
IdxField,
EndValue),
AddFun = case ReturnTerms of AddFun = case ReturnTerms of
true -> true ->
fun add_terms/2; fun add_terms/2;
@ -830,10 +846,13 @@ allkey_query(State, Tag, {FoldKeysFun, InitAcc}) ->
snapshot_store(State, SnapType) -> snapshot_store(State, SnapType) ->
snapshot_store(State, SnapType, undefined).
snapshot_store(State, SnapType, Query) ->
PCLopts = #penciller_options{start_snapshot=true, PCLopts = #penciller_options{start_snapshot=true,
source_penciller=State#state.penciller}, source_penciller=State#state.penciller},
{ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts),
LedgerCache = readycache_forsnapshot(State#state.ledger_cache), LedgerCache = readycache_forsnapshot(State#state.ledger_cache, Query),
case SnapType of case SnapType of
store -> store ->
InkerOpts = #inker_options{start_snapshot=true, InkerOpts = #inker_options{start_snapshot=true,
@ -844,14 +863,63 @@ snapshot_store(State, SnapType) ->
{ok, {LedgerSnapshot, LedgerCache}, null} {ok, {LedgerSnapshot, LedgerCache}, null}
end. end.
readycache_forsnapshot(LedgerCache) -> readycache_forsnapshot(LedgerCache, undefined) ->
% Need to convert the Ledger Cache away from using the ETS table % Need to convert the Ledger Cache away from using the ETS table
Tree = leveled_tree:from_orderedset(LedgerCache#ledger_cache.mem, Tree = leveled_tree:from_orderedset(LedgerCache#ledger_cache.mem,
?CACHE_TYPE), ?CACHE_TYPE),
Idx = LedgerCache#ledger_cache.index, case leveled_tree:tsize(Tree) of
MinSQN = LedgerCache#ledger_cache.min_sqn, 0 ->
MaxSQN = LedgerCache#ledger_cache.max_sqn, #ledger_cache{loader=empty_cache,
#ledger_cache{loader=Tree, index=Idx, min_sqn=MinSQN, max_sqn=MaxSQN}. index=empty_index,
min_sqn=LedgerCache#ledger_cache.min_sqn,
max_sqn=LedgerCache#ledger_cache.max_sqn};
_ ->
#ledger_cache{loader=Tree,
index=LedgerCache#ledger_cache.index,
min_sqn=LedgerCache#ledger_cache.min_sqn,
max_sqn=LedgerCache#ledger_cache.max_sqn}
end;
readycache_forsnapshot(LedgerCache, {StartKey, EndKey}) ->
{KL, MinSQN, MaxSQN} = scan_table(LedgerCache#ledger_cache.mem,
StartKey,
EndKey),
case KL of
[] ->
#ledger_cache{loader=empty_cache,
index=empty_index,
min_sqn=MinSQN,
max_sqn=MaxSQN};
_ ->
#ledger_cache{loader=leveled_tree:from_orderedlist(KL,
?CACHE_TYPE),
index=empty_index,
min_sqn=MinSQN,
max_sqn=MaxSQN}
end.
scan_table(Table, StartKey, EndKey) ->
scan_table(Table, StartKey, EndKey, [], infinity, 0).
scan_table(Table, StartKey, EndKey, Acc, MinSQN, MaxSQN) ->
case ets:next(Table, StartKey) of
'$end_of_table' ->
{lists:reverse(Acc), MinSQN, MaxSQN};
NextKey ->
case leveled_codec:endkey_passed(EndKey, NextKey) of
true ->
{lists:reverse(Acc), MinSQN, MaxSQN};
false ->
[{NextKey, NextVal}] = ets:lookup(Table, NextKey),
SQN = leveled_codec:strip_to_seqonly({NextKey, NextVal}),
scan_table(Table,
NextKey,
EndKey,
[{NextKey, NextVal}|Acc],
min(MinSQN, SQN),
max(MaxSQN, SQN))
end
end.
set_options(Opts) -> set_options(Opts) ->
MaxJournalSize0 = get_opt(max_journalsize, Opts, 10000000000), MaxJournalSize0 = get_opt(max_journalsize, Opts, 10000000000),
@ -1402,6 +1470,63 @@ foldobjects_vs_hashtree_test() ->
ok = book_close(Bookie1), ok = book_close(Bookie1),
reset_filestructure(). reset_filestructure().
scan_table_test() ->
K1 = leveled_codec:to_ledgerkey(<<"B1">>,
<<"K1">>,
?IDX_TAG,
<<"F1-bin">>,
<<"AA1">>),
K2 = leveled_codec:to_ledgerkey(<<"B1">>,
<<"K2">>,
?IDX_TAG,
<<"F1-bin">>,
<<"AA1">>),
K3 = leveled_codec:to_ledgerkey(<<"B1">>,
<<"K3">>,
?IDX_TAG,
<<"F1-bin">>,
<<"AB1">>),
K4 = leveled_codec:to_ledgerkey(<<"B1">>,
<<"K4">>,
?IDX_TAG,
<<"F1-bin">>,
<<"AA2">>),
K5 = leveled_codec:to_ledgerkey(<<"B2">>,
<<"K5">>,
?IDX_TAG,
<<"F1-bin">>,
<<"AA2">>),
Tab0 = ets:new(mem, [ordered_set]),
SK_A0 = leveled_codec:to_ledgerkey(<<"B1">>,
null,
?IDX_TAG,
<<"F1-bin">>,
<<"AA0">>),
EK_A9 = leveled_codec:to_ledgerkey(<<"B1">>,
null,
?IDX_TAG,
<<"F1-bin">>,
<<"AA9">>),
Empty = {[], infinity, 0},
?assertMatch(Empty,
scan_table(Tab0, SK_A0, EK_A9)),
ets:insert(Tab0, [{K1, {1, active, no_lookup, null}}]),
?assertMatch({[{K1, _}], 1, 1},
scan_table(Tab0, SK_A0, EK_A9)),
ets:insert(Tab0, [{K2, {2, active, no_lookup, null}}]),
?assertMatch({[{K1, _}, {K2, _}], 1, 2},
scan_table(Tab0, SK_A0, EK_A9)),
ets:insert(Tab0, [{K3, {3, active, no_lookup, null}}]),
?assertMatch({[{K1, _}, {K2, _}], 1, 2},
scan_table(Tab0, SK_A0, EK_A9)),
ets:insert(Tab0, [{K4, {4, active, no_lookup, null}}]),
?assertMatch({[{K1, _}, {K2, _}, {K4, _}], 1, 4},
scan_table(Tab0, SK_A0, EK_A9)),
ets:insert(Tab0, [{K5, {5, active, no_lookup, null}}]),
?assertMatch({[{K1, _}, {K2, _}, {K4, _}], 1, 4},
scan_table(Tab0, SK_A0, EK_A9)).
longrunning_test() -> longrunning_test() ->
SW = os:timestamp(), SW = os:timestamp(),
timer:sleep(100), timer:sleep(100),

View file

@ -312,7 +312,6 @@ pcl_releasesnapshot(Pid, Snapshot) ->
pcl_loadsnapshot(Pid, Increment) -> pcl_loadsnapshot(Pid, Increment) ->
gen_server:call(Pid, {load_snapshot, Increment}, infinity). gen_server:call(Pid, {load_snapshot, Increment}, infinity).
pcl_close(Pid) -> pcl_close(Pid) ->
gen_server:call(Pid, close, 60000). gen_server:call(Pid, close, 60000).
@ -437,14 +436,27 @@ handle_call({register_snapshot, Snapshot}, _From, State) ->
{reply, {ok, State}, State#state{manifest = Manifest0}}; {reply, {ok, State}, State#state{manifest = Manifest0}};
handle_call({load_snapshot, {BookieIncrTree, BookieIdx, MinSQN, MaxSQN}}, handle_call({load_snapshot, {BookieIncrTree, BookieIdx, MinSQN, MaxSQN}},
_From, State) -> _From, State) ->
L0D = leveled_pmem:add_to_cache(State#state.levelzero_size, {LedgerSQN, L0Size, L0Cache} =
{BookieIncrTree, MinSQN, MaxSQN}, case BookieIncrTree of
State#state.ledger_sqn, empty_cache ->
State#state.levelzero_cache), {State#state.ledger_sqn,
{LedgerSQN, L0Size, L0Cache} = L0D, State#state.levelzero_size,
L0Index = leveled_pmem:add_to_index(BookieIdx, State#state.levelzero_cache};
State#state.levelzero_index, _ ->
length(L0Cache)), leveled_pmem:add_to_cache(State#state.levelzero_size,
{BookieIncrTree, MinSQN, MaxSQN},
State#state.ledger_sqn,
State#state.levelzero_cache)
end,
L0Index =
case BookieIdx of
empty_index ->
State#state.levelzero_index;
_ ->
leveled_pmem:add_to_index(BookieIdx,
State#state.levelzero_index,
length(L0Cache))
end,
{reply, ok, State#state{levelzero_cache=L0Cache, {reply, ok, State#state{levelzero_cache=L0Cache,
levelzero_size=L0Size, levelzero_size=L0Size,
levelzero_index=L0Index, levelzero_index=L0Index,