Merge branch 'master' into mas-i108-cdbtimings

This commit is contained in:
Martin Sumner 2017-11-20 17:34:50 +00:00
commit 5b4bc1ce59
10 changed files with 499 additions and 103 deletions

View file

@ -653,7 +653,9 @@ loadqueue_ledgercache(Cache) ->
%% Query can be no_lookup, indicating the snapshot will be used for non-specific
%% range queries and not direct fetch requests. {StartKey, EndKey} if the the
%% snapshot is to be used for one specific query only (this is much quicker to
%% setup, assuming the range is a small subset of the overall key space).
%% setup, assuming the range is a small subset of the overall key space). If
%% lookup is required but the range isn't defined then 'undefined' should be
%% passed as the query
snapshot_store(LedgerCache, Penciller, Inker, SnapType, Query, LongRunning) ->
LedgerCacheReady = readycache_forsnapshot(LedgerCache, Query),
BookiesMem = {LedgerCacheReady#ledger_cache.loader,
@ -760,10 +762,18 @@ get_runner(State,
leveled_runner:foldheads_allkeys(SnapFun,
Tag, FoldFun,
JournalCheck, SegmentList);
get_runner(State,
get_runner(State,
{foldobjects_allkeys, Tag, FoldFun, SnapPreFold}) ->
get_runner(State,
{foldobjects_allkeys, Tag, FoldFun, SnapPreFold, key_order});
get_runner(State,
{foldobjects_allkeys, Tag, FoldFun, SnapPreFold, key_order}) ->
SnapFun = return_snapfun(State, store, no_lookup, true, SnapPreFold),
leveled_runner:foldobjects_allkeys(SnapFun, Tag, FoldFun);
leveled_runner:foldobjects_allkeys(SnapFun, Tag, FoldFun, key_order);
get_runner(State,
{foldobjects_allkeys, Tag, FoldFun, SnapPreFold, sqn_order}) ->
SnapFun = return_snapfun(State, store, undefined, true, SnapPreFold),
leveled_runner:foldobjects_allkeys(SnapFun, Tag, FoldFun, sqn_order);
get_runner(State,
{foldheads_bybucket,
Tag, Bucket, KeyRange,

View file

@ -669,19 +669,29 @@ handle_sync_event({cdb_scan, FilterFun, Acc, StartPos},
{ok, StartPos}
end,
file:position(State#state.handle, StartPos0),
file:advise(State#state.handle,
StartPos0,
EndPos0 - StartPos0,
sequential),
MaybeEnd = (check_last_key(State#state.last_key) == empty) or
(StartPos0 >= (EndPos0 - ?DWORD_SIZE)),
case MaybeEnd of
true ->
{reply, {eof, Acc}, StateName, State};
false ->
{LastPosition, Acc2} = scan_over_file(State#state.handle,
StartPos0,
FilterFun,
Acc,
State#state.last_key),
{reply, {LastPosition, Acc2}, StateName, State}
end;
{LastPosition, Acc2} =
case MaybeEnd of
true ->
{eof, Acc};
false ->
scan_over_file(State#state.handle,
StartPos0,
FilterFun,
Acc,
State#state.last_key)
end,
{ok, LastReadPos} = file:position(State#state.handle, cur),
file:advise(State#state.handle,
StartPos0,
LastReadPos - StartPos0,
dont_need),
{reply, {LastPosition, Acc2}, StateName, State};
handle_sync_event(cdb_lastkey, _From, StateName, State) ->
{reply, State#state.last_key, StateName, State};
handle_sync_event(cdb_firstkey, _From, StateName, State) ->

View file

@ -46,6 +46,7 @@
to_ledgerkey/3,
to_ledgerkey/5,
from_ledgerkey/1,
from_ledgerkey/2,
to_inkerkv/3,
to_inkerkv/6,
from_inkerkv/1,
@ -204,6 +205,19 @@ is_active(Key, Value, Now) ->
false
end.
-spec from_ledgerkey(atom(), tuple()) -> false|tuple().
%% @doc
%% Return the "significant information" from the Ledger Key (normally the
%% {Bucket, Key} pair) if and only if the ExpectedTag matched the tag -
%% otherwise return false
from_ledgerkey(ExpectedTag, {ExpectedTag, Bucket, Key, SubKey}) ->
from_ledgerkey({ExpectedTag, Bucket, Key, SubKey});
from_ledgerkey(_ExpectedTag, _OtherKey) ->
false.
-spec from_ledgerkey(tuple()) -> tuple().
%% @doc
%% Return identifying information from the LedgerKey
from_ledgerkey({?IDX_TAG, ?ALL_BUCKETS, {_IdxFld, IdxVal}, {Bucket, Key}}) ->
{Bucket, Key, IdxVal};
from_ledgerkey({?IDX_TAG, Bucket, {_IdxFld, IdxVal}, Key}) ->

View file

@ -98,6 +98,7 @@
ink_get/3,
ink_fetch/3,
ink_keycheck/3,
ink_fold/4,
ink_loadpcl/4,
ink_registersnapshot/2,
ink_confirmdelete/2,
@ -252,8 +253,44 @@ ink_close(Pid) ->
ink_doom(Pid) ->
gen_server:call(Pid, doom, 60000).
-spec ink_loadpcl(pid(), integer(), fun(), pid()) -> ok.
-spec ink_fold(pid(), integer(), {fun(), fun(), fun()}, any()) -> ok.
%% @doc
%% Fold over the journal from a starting sequence number (MinSQN), passing
%% in three functions and a snapshot of the penciller. The Fold functions
%% should be
%% - a FilterFun to accumulate the objects and decided when to stop or loop
%% - a InitAccFun to re-initialise for the fold over the accumulator
%% - a FoldFun to actually perform the fold
%%
%% The inker fold works in batches, so the FilterFun determines what should
%% go into a batch and when the batch is complete. The FoldFun completes the
%% actual desired outcome by being applied on the batch.
%%
%% The FilterFun should be a five arity function which takes as inputs:
%% KeyInJournal
%% ValueInJournal
%% Position - the actual position within the CDB file of the object
%% Acc - the bathc accumulator
%% ExtractFun - a single arity function which can be applied to ValueInJournal
%% to extract the actual object, and the size of the object,
%%
%% The FilterFun should return either:
%% {loop, {MinSQN, MaxSQN, UpdAcc}} or
%% {stop, {MinSQN, MaxSQN, UpdAcc}}
%% The FilterFun is required to call stop when MaxSQN is reached
%%
%% The InitAccFun should return an initial batch accumulator for each subfold.
%% It is a 2-arity function that takes a filename and a MinSQN as an input
%% potentially to be use din logging
%%
%% The BatchFun is a two arity function that should take as inputs:
%% An overall accumulator
%% The batch accumulator built over the sub-fold
ink_fold(Pid, MinSQN, FoldFuns, Acc) ->
gen_server:call(Pid, {fold, MinSQN, FoldFuns, Acc}, infinity).
-spec ink_loadpcl(pid(), integer(), fun(), pid()) -> ok.
%%
%% Function to prompt load of the Ledger at startup. the Penciller should
%% have determined the lowest SQN not present in the Ledger, and the inker
%% should fold over the Journal from that point, using the function to load
@ -262,7 +299,21 @@ ink_doom(Pid) ->
%% The load fun should be a five arity function like:
%% load_fun(KeyInJournal, ValueInJournal, _Position, Acc0, ExtractFun)
ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) ->
gen_server:call(Pid, {load_pcl, MinSQN, FilterFun, Penciller}, infinity).
BatchFun =
fun(BatchAcc, _Acc) ->
push_to_penciller(Penciller, BatchAcc)
end,
InitAccFun =
fun(FN, CurrentMinSQN) ->
leveled_log:log("I0014", [FN, CurrentMinSQN]),
leveled_bookie:empty_ledgercache()
end,
gen_server:call(Pid,
{fold,
MinSQN,
{FilterFun, InitAccFun, BatchFun},
ok},
infinity).
-spec ink_compactjournal(pid(), pid(), integer()) -> ok.
%% @doc
@ -381,9 +432,16 @@ handle_call({get, Key, SQN}, _From, State) ->
{reply, get_object(Key, SQN, State#state.manifest), State};
handle_call({key_check, Key, SQN}, _From, State) ->
{reply, key_check(Key, SQN, State#state.manifest), State};
handle_call({load_pcl, StartSQN, FilterFun, Penciller}, _From, State) ->
handle_call({fold,
StartSQN,
{FilterFun, InitAccFun, FoldFun},
Acc}, _From, State) ->
Manifest = lists:reverse(leveled_imanifest:to_list(State#state.manifest)),
Reply = load_from_sequence(StartSQN, FilterFun, Penciller, Manifest),
Reply =
fold_from_sequence(StartSQN,
{FilterFun, InitAccFun, FoldFun},
Acc,
Manifest),
{reply, Reply, State};
handle_call({register_snapshot, Requestor}, _From , State) ->
Rs = [{Requestor,
@ -744,75 +802,81 @@ start_new_activejournal(SQN, RootPath, CDBOpts) ->
{SQN, Filename, PidW, empty}.
%% Scan between sequence numbers applying FilterFun to each entry where
%% FilterFun{K, V, Acc} -> Penciller Key List
%% Load the output for the CDB file into the Penciller.
load_from_sequence(_MinSQN, _FilterFun, _PCL, []) ->
ok;
load_from_sequence(MinSQN, FilterFun, PCL, [{LowSQN, FN, Pid, _LK}|Rest])
when LowSQN >= MinSQN ->
load_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH,
FilterFun,
PCL,
Pid,
undefined,
FN,
Rest);
load_from_sequence(MinSQN, FilterFun, PCL, [{_LowSQN, FN, Pid, _LK}|Rest]) ->
case Rest of
[] ->
load_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH,
FilterFun,
PCL,
Pid,
undefined,
FN,
Rest);
[{NextSQN, _NxtFN, _NxtPid, _NxtLK}|_Rest] when NextSQN > MinSQN ->
load_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH,
FilterFun,
PCL,
Pid,
undefined,
FN,
Rest);
_ ->
load_from_sequence(MinSQN, FilterFun, PCL, Rest)
-spec fold_from_sequence(integer(), {fun(), fun(), fun()}, any(), list())
-> any().
%% @doc
%%
%% Scan from the starting sequence number to the end of the Journal. Apply
%% the FilterFun as it scans over the CDB file to build up a Batch of relevant
%% objects - and then apply the FoldFun to the batch once the batch is
%% complete
%%
%% Inputs - MinSQN, FoldFuns, OverallAccumulator, Inker's Manifest
%%
%% The fold loops over all the CDB files in the Manifest. Each file is looped
%% over in batches using foldfile_between_sequence/7. The batch is a range of
%% sequence numbers (so the batch size may be << ?LOADING_BATCH) in compacted
%% files
fold_from_sequence(_MinSQN, _FoldFuns, Acc, []) ->
Acc;
fold_from_sequence(MinSQN, FoldFuns, Acc, [{LowSQN, FN, Pid, _LK}|Rest])
when LowSQN >= MinSQN ->
Acc0 = foldfile_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH,
FoldFuns,
Acc,
Pid,
undefined,
FN),
fold_from_sequence(MinSQN, FoldFuns, Acc0, Rest);
fold_from_sequence(MinSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) ->
% If this file has a LowSQN less than the minimum, we can skip it if the
% next file also has a LowSQN below the minimum
Acc0 =
case Rest of
[] ->
foldfile_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH,
FoldFuns,
Acc,
Pid,
undefined,
FN);
[{NextSQN, _NxtFN, _NxtPid, _NxtLK}|_Rest] when NextSQN > MinSQN ->
foldfile_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH,
FoldFuns,
Acc,
Pid,
undefined,
FN);
_ ->
Acc
end,
fold_from_sequence(MinSQN, FoldFuns, Acc0, Rest).
foldfile_between_sequence(MinSQN, MaxSQN, FoldFuns,
Acc, CDBpid, StartPos, FN) ->
{FilterFun, InitAccFun, FoldFun} = FoldFuns,
InitBatchAcc = {MinSQN, MaxSQN, InitAccFun(FN, MinSQN)},
case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitBatchAcc, StartPos) of
{eof, {_AccMinSQN, _AccMaxSQN, BatchAcc}} ->
FoldFun(BatchAcc, Acc);
{LastPosition, {_AccMinSQN, _AccMaxSQN, BatchAcc}} ->
UpdAcc = FoldFun(BatchAcc, Acc),
NextSQN = MaxSQN + 1,
foldfile_between_sequence(NextSQN,
NextSQN + ?LOADING_BATCH,
FoldFuns,
UpdAcc,
CDBpid,
LastPosition,
FN)
end.
load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller,
CDBpid, StartPos, FN, Rest) ->
leveled_log:log("I0014", [FN, MinSQN]),
InitAcc = {MinSQN, MaxSQN, leveled_bookie:empty_ledgercache()},
Res = case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitAcc, StartPos) of
{eof, {AccMinSQN, _AccMaxSQN, AccLC}} ->
ok = push_to_penciller(Penciller, AccLC),
{ok, AccMinSQN};
{LastPosition, {_AccMinSQN, _AccMaxSQN, AccLC}} ->
ok = push_to_penciller(Penciller, AccLC),
NextSQN = MaxSQN + 1,
load_between_sequence(NextSQN,
NextSQN + ?LOADING_BATCH,
FilterFun,
Penciller,
CDBpid,
LastPosition,
FN,
Rest)
end,
case Res of
{ok, LMSQN} ->
load_from_sequence(LMSQN, FilterFun, Penciller, Rest);
ok ->
ok
end.
push_to_penciller(Penciller, LedgerCache) ->
% The push to penciller must start as a tree to correctly de-duplicate
% the list by order before becoming a de-duplicated list for loading

View file

@ -331,7 +331,10 @@
{"CDB19",
{info, "Sample timings in microseconds for sample_count=~w "
++ "with totals of cycle_count=~w "
++ "fetch_time=~w index_time=~w"}}
++ "fetch_time=~w index_time=~w"}},
{"R0001",
{debug, "Object fold to process batch of ~w objects"}}
]).

View file

@ -1820,7 +1820,8 @@ create_file_test() ->
?assertMatch("hello", binary_to_term(Bin)).
slow_fetch_test() ->
?assertMatch(not_present, log_slowfetch(2, not_present, "fake", 0, 1)).
?assertMatch(not_present, log_slowfetch(2, not_present, "fake", 0, 1)),
?assertMatch("value", log_slowfetch(2, "value", "fake", 0, 1)).
checkready(Pid) ->
try

View file

@ -29,7 +29,7 @@
hashlist_query/3,
tictactree/5,
foldheads_allkeys/5,
foldobjects_allkeys/3,
foldobjects_allkeys/4,
foldheads_bybucket/5,
foldobjects_bybucket/3,
foldobjects_byindex/3
@ -226,16 +226,100 @@ foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck, SegmentList) ->
FoldFun,
{true, JournalCheck}, SegmentList).
-spec foldobjects_allkeys(fun(), atom(), fun()) -> {async, fun()}.
-spec foldobjects_allkeys(fun(), atom(), fun(), key_order|sqn_order)
-> {async, fun()}.
%% @doc
%% Fold over all objects for a given tag
foldobjects_allkeys(SnapFun, Tag, FoldFun) ->
foldobjects_allkeys(SnapFun, Tag, FoldFun, key_order) ->
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
foldobjects(SnapFun,
Tag, StartKey, EndKey,
FoldFun,
false, false).
false, false);
foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) ->
% Fold over the journal in order of receipt
{FoldFun, InitAcc} =
case is_tuple(FoldObjectsFun) of
true ->
% FoldObjectsFun is already a tuple with a Fold function and an
% initial accumulator
FoldObjectsFun;
false ->
% no initial accumulatr passed, and so should be just a list
{FoldObjectsFun, []}
end,
FilterFun =
fun(JKey, JVal, _Pos, Acc, ExtractFun) ->
{SQN, InkTag, LedgerKey} = JKey,
case {InkTag, leveled_codec:from_ledgerkey(Tag, LedgerKey)} of
{?INKT_STND, {B, K}} ->
% Ignore tombstones and non-matching Tags and Key changes
% objects.
{MinSQN, MaxSQN, BatchAcc} = Acc,
case SQN of
SQN when SQN < MinSQN ->
{loop, Acc};
SQN when SQN > MaxSQN ->
{stop, Acc};
_ ->
{VBin, _VSize} = ExtractFun(JVal),
{Obj, _IdxSpecs} = leveled_codec:split_inkvalue(VBin),
ToLoop =
case SQN of
MaxSQN -> stop;
_ -> loop
end,
{ToLoop,
{MinSQN, MaxSQN, [{B, K, SQN, Obj}|BatchAcc]}}
end;
_ ->
{loop, Acc}
end
end,
InitAccFun = fun(_FN, _SQN) -> [] end,
Folder =
fun() ->
{ok, LedgerSnapshot, JournalSnapshot} = SnapFun(),
IsValidFun =
fun(Bucket, Key, SQN) ->
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
leveled_penciller:pcl_checksequencenumber(LedgerSnapshot,
LedgerKey,
SQN)
end,
BatchFoldFun =
fun(BatchAcc, ObjAcc) ->
ObjFun =
fun({B, K, SQN, Obj}, Acc) ->
case IsValidFun(B, K, SQN) of
true ->
FoldFun(B, K, Obj, Acc);
false ->
Acc
end
end,
leveled_log:log("R0001", [length(BatchAcc)]),
lists:foldr(ObjFun, ObjAcc, BatchAcc)
end,
Acc =
leveled_inker:ink_fold(JournalSnapshot,
0,
{FilterFun, InitAccFun, BatchFoldFun},
InitAcc),
ok = leveled_penciller:pcl_close(LedgerSnapshot),
ok = leveled_inker:ink_close(JournalSnapshot),
Acc
end,
{async, Folder}.
-spec foldobjects_bybucket(fun(), {atom(), any(), any()}, fun()) ->
{async, fun()}.
@ -480,7 +564,7 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
ProxyObj = make_proxy_object(LK, JK,
MD, V,
InkerClone),
FoldObjectsFun(B, K,ProxyObj, Acc);
FoldObjectsFun(B, K, ProxyObj, Acc);
false ->
R = leveled_bookie:fetch_value(InkerClone, JK),
case R of