Merge pull request #39 from martinsumner/mas-refine2i-34

Mas refine2i 34
This commit is contained in:
Martin Sumner 2017-03-02 22:18:46 +00:00 committed by GitHub
commit 47060740b6
2 changed files with 186 additions and 43 deletions

View file

@ -83,9 +83,10 @@
-define(LONG_RUNNING, 80000).
-record(ledger_cache, {mem :: ets:tab(),
loader = leveled_tree:empty(?CACHE_TYPE) :: tuple(),
loader = leveled_tree:empty(?CACHE_TYPE)
:: tuple()|empty_cache,
load_queue = [] :: list(),
index = leveled_pmem:new_index(), % array
index = leveled_pmem:new_index(), % array or empty_index
min_sqn = infinity :: integer()|infinity,
max_sqn = 0 :: integer()}).
@ -502,9 +503,17 @@ handle_call({return_folder, FolderType}, _From, State) ->
{FoldKeysFun, Acc},
{IdxField, StartValue, EndValue},
{ReturnTerms, TermRegex}} ->
{Bucket, StartObjKey} =
case Constraint of
{B, SK} ->
{B, SK};
B ->
{B, null}
end,
{reply,
index_query(State,
Constraint,
Bucket,
StartObjKey,
{FoldKeysFun, Acc},
{IdxField, StartValue, EndValue},
{ReturnTerms, TermRegex}),
@ -579,6 +588,10 @@ code_change(_OldVsn, State, _Extra) ->
%%% External functions
%%%============================================================================
%% @doc Load a snapshot of the penciller with the contents of the ledger cache
%% If the snapshot is to be loaded for a query then #ledger_cache.index may
%% be empty_index (As no need to update lookups), also #ledger_cache.loader
%% may also be empty_cache if there are no relevant results in the LedgerCache
load_snapshot(LedgerSnapshot, LedgerCache) ->
CacheToLoad = {LedgerCache#ledger_cache.loader,
LedgerCache#ledger_cache.index,
@ -586,9 +599,14 @@ load_snapshot(LedgerSnapshot, LedgerCache) ->
LedgerCache#ledger_cache.max_sqn},
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, CacheToLoad).
%% @doc Empty the ledger cache table following a push
empty_ledgercache() ->
#ledger_cache{mem = ets:new(empty, [ordered_set])}.
%% @doc push the ledgercache to the Penciller - which should respond ok or
%% returned. If the response is ok the cache can be flushed, but if the
%% response is returned the cache should continue to build and it should try
%% to flush at a later date
push_ledgercache(Penciller, Cache) ->
CacheToLoad = {Cache#ledger_cache.loader,
Cache#ledger_cache.index,
@ -596,6 +614,11 @@ push_ledgercache(Penciller, Cache) ->
Cache#ledger_cache.max_sqn},
leveled_penciller:pcl_pushmem(Penciller, CacheToLoad).
%% @doc the ledger cache can be built from a queue, for example when
%% loading the ledger from the head of the journal on startup
%%
%% The queue should be build using [NewKey|Acc] so that the most recent
%% key is kept in the sort
loadqueue_ledgercache(Cache) ->
SL = lists:ukeysort(1, Cache#ledger_cache.load_queue),
T = leveled_tree:from_orderedlist(SL, ?CACHE_TYPE),
@ -605,8 +628,6 @@ loadqueue_ledgercache(Cache) ->
%%% Internal functions
%%%============================================================================
maybe_longrunning(SW, Aspect) ->
case timer:now_diff(os:timestamp(), SW) of
N when N > ?LONG_RUNNING ->
@ -621,7 +642,7 @@ cache_size(LedgerCache) ->
bucket_stats(State, Bucket, Tag) ->
{ok,
{LedgerSnapshot, LedgerCache},
_JournalSnapshot} = snapshot_store(State, ledger),
_JournalSnapshot} = snapshot_store(State, ledger, no_lookup),
Folder = fun() ->
leveled_log:log("B0004", [cache_size(LedgerCache)]),
load_snapshot(LedgerSnapshot, LedgerCache),
@ -643,7 +664,7 @@ binary_bucketlist(State, Tag, {FoldBucketsFun, InitAcc}) ->
% List buckets for tag, assuming bucket names are all binary type
{ok,
{LedgerSnapshot, LedgerCache},
_JournalSnapshot} = snapshot_store(State, ledger),
_JournalSnapshot} = snapshot_store(State, ledger, no_lookup),
Folder = fun() ->
leveled_log:log("B0004", [cache_size(LedgerCache)]),
load_snapshot(LedgerSnapshot, LedgerCache),
@ -684,23 +705,11 @@ get_nextbucket(NextBucket, Tag, LedgerSnapshot, BKList) ->
index_query(State,
Constraint,
Bucket,
StartObjKey,
{FoldKeysFun, InitAcc},
{IdxField, StartValue, EndValue},
{ReturnTerms, TermRegex}) ->
{ok,
{LedgerSnapshot, LedgerCache},
_JournalSnapshot} = snapshot_store(State, ledger),
{Bucket, StartObjKey} =
case Constraint of
{B, SK} ->
{B, SK};
B ->
{B, null}
end,
Folder = fun() ->
leveled_log:log("B0004", [cache_size(LedgerCache)]),
load_snapshot(LedgerSnapshot, LedgerCache),
StartKey = leveled_codec:to_ledgerkey(Bucket,
StartObjKey,
?IDX_TAG,
@ -711,6 +720,12 @@ index_query(State,
?IDX_TAG,
IdxField,
EndValue),
{ok,
{LedgerSnapshot, LedgerCache},
_JournalSnapshot} = snapshot_store(State, ledger, {StartKey, EndKey}),
Folder = fun() ->
leveled_log:log("B0004", [cache_size(LedgerCache)]),
load_snapshot(LedgerSnapshot, LedgerCache),
AddFun = case ReturnTerms of
true ->
fun add_terms/2;
@ -738,7 +753,7 @@ hashtree_query(State, Tag, JournalCheck) ->
end,
{ok,
{LedgerSnapshot, LedgerCache},
JournalSnapshot} = snapshot_store(State, SnapType),
JournalSnapshot} = snapshot_store(State, SnapType, no_lookup),
Folder = fun() ->
leveled_log:log("B0004", [cache_size(LedgerCache)]),
load_snapshot(LedgerSnapshot, LedgerCache),
@ -808,7 +823,7 @@ foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun) ->
bucketkey_query(State, Tag, Bucket, {FoldKeysFun, InitAcc}) ->
{ok,
{LedgerSnapshot, LedgerCache},
_JournalSnapshot} = snapshot_store(State, ledger),
_JournalSnapshot} = snapshot_store(State, ledger, no_lookup),
Folder = fun() ->
leveled_log:log("B0004", [cache_size(LedgerCache)]),
load_snapshot(LedgerSnapshot, LedgerCache),
@ -830,10 +845,13 @@ allkey_query(State, Tag, {FoldKeysFun, InitAcc}) ->
snapshot_store(State, SnapType) ->
snapshot_store(State, SnapType, undefined).
snapshot_store(State, SnapType, Query) ->
PCLopts = #penciller_options{start_snapshot=true,
source_penciller=State#state.penciller},
{ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts),
LedgerCache = readycache_forsnapshot(State#state.ledger_cache),
LedgerCache = readycache_forsnapshot(State#state.ledger_cache, Query),
case SnapType of
store ->
InkerOpts = #inker_options{start_snapshot=true,
@ -844,14 +862,70 @@ snapshot_store(State, SnapType) ->
{ok, {LedgerSnapshot, LedgerCache}, null}
end.
readycache_forsnapshot(LedgerCache) ->
readycache_forsnapshot(LedgerCache, {StartKey, EndKey}) ->
{KL, MinSQN, MaxSQN} = scan_table(LedgerCache#ledger_cache.mem,
StartKey,
EndKey),
case KL of
[] ->
#ledger_cache{loader=empty_cache,
index=empty_index,
min_sqn=MinSQN,
max_sqn=MaxSQN};
_ ->
#ledger_cache{loader=leveled_tree:from_orderedlist(KL,
?CACHE_TYPE),
index=empty_index,
min_sqn=MinSQN,
max_sqn=MaxSQN}
end;
readycache_forsnapshot(LedgerCache, Query) ->
% Need to convert the Ledger Cache away from using the ETS table
Tree = leveled_tree:from_orderedset(LedgerCache#ledger_cache.mem,
?CACHE_TYPE),
Idx = LedgerCache#ledger_cache.index,
MinSQN = LedgerCache#ledger_cache.min_sqn,
MaxSQN = LedgerCache#ledger_cache.max_sqn,
#ledger_cache{loader=Tree, index=Idx, min_sqn=MinSQN, max_sqn=MaxSQN}.
case leveled_tree:tsize(Tree) of
0 ->
#ledger_cache{loader=empty_cache,
index=empty_index,
min_sqn=LedgerCache#ledger_cache.min_sqn,
max_sqn=LedgerCache#ledger_cache.max_sqn};
_ ->
Idx =
case Query of
no_lookup ->
empty_index;
_ ->
LedgerCache#ledger_cache.index
end,
#ledger_cache{loader=Tree,
index=Idx,
min_sqn=LedgerCache#ledger_cache.min_sqn,
max_sqn=LedgerCache#ledger_cache.max_sqn}
end.
scan_table(Table, StartKey, EndKey) ->
scan_table(Table, StartKey, EndKey, [], infinity, 0).
scan_table(Table, StartKey, EndKey, Acc, MinSQN, MaxSQN) ->
case ets:next(Table, StartKey) of
'$end_of_table' ->
{lists:reverse(Acc), MinSQN, MaxSQN};
NextKey ->
case leveled_codec:endkey_passed(EndKey, NextKey) of
true ->
{lists:reverse(Acc), MinSQN, MaxSQN};
false ->
[{NextKey, NextVal}] = ets:lookup(Table, NextKey),
SQN = leveled_codec:strip_to_seqonly({NextKey, NextVal}),
scan_table(Table,
NextKey,
EndKey,
[{NextKey, NextVal}|Acc],
min(MinSQN, SQN),
max(MaxSQN, SQN))
end
end.
set_options(Opts) ->
MaxJournalSize0 = get_opt(max_journalsize, Opts, 10000000000),
@ -1402,6 +1476,63 @@ foldobjects_vs_hashtree_test() ->
ok = book_close(Bookie1),
reset_filestructure().
scan_table_test() ->
K1 = leveled_codec:to_ledgerkey(<<"B1">>,
<<"K1">>,
?IDX_TAG,
<<"F1-bin">>,
<<"AA1">>),
K2 = leveled_codec:to_ledgerkey(<<"B1">>,
<<"K2">>,
?IDX_TAG,
<<"F1-bin">>,
<<"AA1">>),
K3 = leveled_codec:to_ledgerkey(<<"B1">>,
<<"K3">>,
?IDX_TAG,
<<"F1-bin">>,
<<"AB1">>),
K4 = leveled_codec:to_ledgerkey(<<"B1">>,
<<"K4">>,
?IDX_TAG,
<<"F1-bin">>,
<<"AA2">>),
K5 = leveled_codec:to_ledgerkey(<<"B2">>,
<<"K5">>,
?IDX_TAG,
<<"F1-bin">>,
<<"AA2">>),
Tab0 = ets:new(mem, [ordered_set]),
SK_A0 = leveled_codec:to_ledgerkey(<<"B1">>,
null,
?IDX_TAG,
<<"F1-bin">>,
<<"AA0">>),
EK_A9 = leveled_codec:to_ledgerkey(<<"B1">>,
null,
?IDX_TAG,
<<"F1-bin">>,
<<"AA9">>),
Empty = {[], infinity, 0},
?assertMatch(Empty,
scan_table(Tab0, SK_A0, EK_A9)),
ets:insert(Tab0, [{K1, {1, active, no_lookup, null}}]),
?assertMatch({[{K1, _}], 1, 1},
scan_table(Tab0, SK_A0, EK_A9)),
ets:insert(Tab0, [{K2, {2, active, no_lookup, null}}]),
?assertMatch({[{K1, _}, {K2, _}], 1, 2},
scan_table(Tab0, SK_A0, EK_A9)),
ets:insert(Tab0, [{K3, {3, active, no_lookup, null}}]),
?assertMatch({[{K1, _}, {K2, _}], 1, 2},
scan_table(Tab0, SK_A0, EK_A9)),
ets:insert(Tab0, [{K4, {4, active, no_lookup, null}}]),
?assertMatch({[{K1, _}, {K2, _}, {K4, _}], 1, 4},
scan_table(Tab0, SK_A0, EK_A9)),
ets:insert(Tab0, [{K5, {5, active, no_lookup, null}}]),
?assertMatch({[{K1, _}, {K2, _}, {K4, _}], 1, 4},
scan_table(Tab0, SK_A0, EK_A9)).
longrunning_test() ->
SW = os:timestamp(),
timer:sleep(100),

View file

@ -312,7 +312,6 @@ pcl_releasesnapshot(Pid, Snapshot) ->
pcl_loadsnapshot(Pid, Increment) ->
gen_server:call(Pid, {load_snapshot, Increment}, infinity).
pcl_close(Pid) ->
gen_server:call(Pid, close, 60000).
@ -437,14 +436,27 @@ handle_call({register_snapshot, Snapshot}, _From, State) ->
{reply, {ok, State}, State#state{manifest = Manifest0}};
handle_call({load_snapshot, {BookieIncrTree, BookieIdx, MinSQN, MaxSQN}},
_From, State) ->
L0D = leveled_pmem:add_to_cache(State#state.levelzero_size,
{LedgerSQN, L0Size, L0Cache} =
case BookieIncrTree of
empty_cache ->
{State#state.ledger_sqn,
State#state.levelzero_size,
State#state.levelzero_cache};
_ ->
leveled_pmem:add_to_cache(State#state.levelzero_size,
{BookieIncrTree, MinSQN, MaxSQN},
State#state.ledger_sqn,
State#state.levelzero_cache),
{LedgerSQN, L0Size, L0Cache} = L0D,
L0Index = leveled_pmem:add_to_index(BookieIdx,
State#state.levelzero_cache)
end,
L0Index =
case BookieIdx of
empty_index ->
State#state.levelzero_index;
_ ->
leveled_pmem:add_to_index(BookieIdx,
State#state.levelzero_index,
length(L0Cache)),
length(L0Cache))
end,
{reply, ok, State#state{levelzero_cache=L0Cache,
levelzero_size=L0Size,
levelzero_index=L0Index,