Infrequent check of Journal for key presence

with decreasing frequency if there are repeated successes when checking
This commit is contained in:
Martin Sumner 2018-09-28 15:46:43 +01:00
parent 8be873f776
commit 575397229e

View file

@ -89,7 +89,8 @@
loadqueue_ledgercache/1, loadqueue_ledgercache/1,
push_ledgercache/2, push_ledgercache/2,
snapshot_store/6, snapshot_store/6,
fetch_value/2]). fetch_value/2,
journal_notfound/4]).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
@ -109,6 +110,8 @@
-define(TIMING_SAMPLESIZE, 100). -define(TIMING_SAMPLESIZE, 100).
-define(TIMING_SAMPLECOUNTDOWN, 10000). -define(TIMING_SAMPLECOUNTDOWN, 10000).
-define(DUMMY, dummy). % Dummy key used for mput operations -define(DUMMY, dummy). % Dummy key used for mput operations
-define(MAX_KEYCHECK_FREQUENCY, 100).
-define(MIN_KEYCHECK_FREQUENCY, 1).
-define(OPTION_DEFAULTS, -define(OPTION_DEFAULTS,
[{root_path, undefined}, [{root_path, undefined},
{snapshot_bookie, undefined}, {snapshot_bookie, undefined},
@ -145,6 +148,8 @@
head_only = false :: boolean(), head_only = false :: boolean(),
head_lookup = true :: boolean(), head_lookup = true :: boolean(),
ink_checking = ?MAX_KEYCHECK_FREQUENCY :: integer(),
put_countdown = 0 :: integer(), put_countdown = 0 :: integer(),
get_countdown = 0 :: integer(), get_countdown = 0 :: integer(),
fold_countdown = 0 :: integer(), fold_countdown = 0 :: integer(),
@ -1170,39 +1175,56 @@ handle_call({head, Bucket, Key, Tag}, _From, State)
when State#state.head_lookup == true -> when State#state.head_lookup == true ->
SWp = os:timestamp(), SWp = os:timestamp(),
LK = leveled_codec:to_ledgerkey(Bucket, Key, Tag), LK = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
case fetch_head(LK, Head = fetch_head(LK,
State#state.penciller, State#state.penciller,
State#state.ledger_cache, State#state.ledger_cache,
State#state.head_only) of State#state.head_only),
not_present -> {SWr, UpdTimingsP} =
{reply, not_found, State}; update_timings(SWp, {head, pcl}, State#state.head_timings),
Head -> {LedgerMD, JournalCheckFrequency} =
case leveled_codec:striphead_to_details(Head) of case Head of
{_SeqN, tomb, _MH, _MD} -> not_present ->
{reply, not_found, State}; {not_found, State#state.ink_checking};
{_SeqN, {active, TS}, _MH, MD} -> Head ->
case TS >= leveled_util:integer_now() of case leveled_codec:striphead_to_details(Head) of
true -> {_SeqN, tomb, _MH, _MD} ->
{SWr, UpdTimingsP} = {not_found, State#state.ink_checking};
update_timings(SWp, {SeqN, {active, TS}, _MH, MD} ->
{head, pcl}, case TS >= leveled_util:integer_now() of
State#state.head_timings), true ->
OMD = leveled_codec:build_metadata_object(LK, MD), case journal_notfound(State#state.ink_checking,
{_SW, UpdTimingsR} = State#state.inker,
update_timings(SWr, {head, rsp}, UpdTimingsP), LK,
{UpdTimings, CountDown} = SeqN) of
update_statetimings(head, {true, UppedFrequency} ->
UpdTimingsR, {not_found, UppedFrequency};
State#state.head_countdown), {false, ReducedFrequency} ->
{reply, {MD, ReducedFrequency}
{ok, OMD}, end;
State#state{head_timings = UpdTimings, false ->
head_countdown = CountDown}}; {not_found, State#state.ink_checking}
false -> end
{reply, not_found, State} end
end end,
end Reply =
end; case LedgerMD of
not_found ->
not_found;
_ ->
{ok, leveled_codec:build_metadata_object(LK, LedgerMD)}
end,
{_SW, UpdTimingsR} =
update_timings(SWr, {head, rsp}, UpdTimingsP),
{UpdTimings, CountDown} =
update_statetimings(head,
UpdTimingsR,
State#state.head_countdown),
{reply,
Reply,
State#state{head_timings = UpdTimings,
head_countdown = CountDown,
ink_checking = JournalCheckFrequency}};
handle_call({snapshot, SnapType, Query, LongRunning}, _From, State) -> handle_call({snapshot, SnapType, Query, LongRunning}, _From, State) ->
% Snapshot the store, specifying if the snapshot should be long running % Snapshot the store, specifying if the snapshot should be long running
% (i.e. will the snapshot be queued or be required for an extended period % (i.e. will the snapshot be queued or be required for an extended period
@ -1841,6 +1863,28 @@ fetch_head(Key, Penciller, LedgerCache, HeadOnly) ->
end. end.
-spec journal_notfound(integer(), pid(), leveled_codec:ledger_key(), integer())
-> {boolean(), integer()}.
%% @doc Check to see if the item is not_found in the journal. If it is found
%% return false, and drop the counter that represents the frequency this check
%% should be made. If it is not_found, this is not expected so up the check
%% frequency to the maximum value
journal_notfound(CheckFrequency, Inker, LK, SQN) ->
case leveled_rand:uniform(?MAX_KEYCHECK_FREQUENCY) of
X when X =< CheckFrequency ->
InJournal =
leveled_inker:ink_keycheck(Inker, LK, SQN),
case InJournal of
probably ->
{false, max(?MIN_KEYCHECK_FREQUENCY, CheckFrequency - 1)};
missing ->
{true, ?MAX_KEYCHECK_FREQUENCY}
end;
_X ->
{false, CheckFrequency}
end.
-spec preparefor_ledgercache(leveled_codec:journal_key_tag()|null, -spec preparefor_ledgercache(leveled_codec:journal_key_tag()|null,
leveled_codec:ledger_key()|?DUMMY, leveled_codec:ledger_key()|?DUMMY,
integer(), any(), integer(), integer(), any(), integer(),
@ -2742,5 +2786,46 @@ coverage_cheat_test() ->
{ok, _State1} = code_change(null, #state{}, null), {ok, _State1} = code_change(null, #state{}, null),
{noreply, _State2} = handle_cast(null, #state{}). {noreply, _State2} = handle_cast(null, #state{}).
erase_journal_test() ->
RootPath = reset_filestructure(),
{ok, Bookie1} = book_start([{root_path, RootPath},
{max_journalsize, 50000},
{cache_size, 100}]),
ObjL1 = generate_multiple_objects(500, 1),
% Put in all the objects with a TTL in the future
lists:foreach(fun({K, V, S}) -> ok = book_put(Bookie1,
"Bucket", K, V, S,
?STD_TAG) end,
ObjL1),
lists:foreach(fun({K, V, _S}) ->
{ok, V} = book_get(Bookie1, "Bucket", K, ?STD_TAG)
end,
ObjL1),
CheckHeadFun =
fun(Book) ->
fun({K, _V, _S}, Acc) ->
case book_head(Book, "Bucket", K, ?STD_TAG) of
{ok, _Head} -> Acc;
not_found -> Acc + 1
end
end
end,
HeadsNotFound1 = lists:foldl(CheckHeadFun(Bookie1), 0, ObjL1),
?assertMatch(0, HeadsNotFound1),
ok = book_close(Bookie1),
io:format("Bookie closed - clearing Journal~n"),
leveled_inker:clean_testdir(RootPath ++ "/" ++ ?JOURNAL_FP),
{ok, Bookie2} = book_start([{root_path, RootPath},
{max_journalsize, 5000},
{cache_size, 100}]),
HeadsNotFound2 = lists:foldl(CheckHeadFun(Bookie2), 0, ObjL1),
?assertMatch(500, HeadsNotFound2),
ok = book_destroy(Bookie2).
-endif. -endif.