Merge pull request #193 from martinsumner/mas-192-journalcheck4head
Mas 192 journalcheck4head
This commit is contained in:
commit
cc676190c4
1 changed files with 150 additions and 34 deletions
|
@ -89,7 +89,8 @@
|
|||
loadqueue_ledgercache/1,
|
||||
push_ledgercache/2,
|
||||
snapshot_store/6,
|
||||
fetch_value/2]).
|
||||
fetch_value/2,
|
||||
journal_notfound/4]).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
|
@ -109,6 +110,8 @@
|
|||
-define(TIMING_SAMPLESIZE, 100).
|
||||
-define(TIMING_SAMPLECOUNTDOWN, 10000).
|
||||
-define(DUMMY, dummy). % Dummy key used for mput operations
|
||||
-define(MAX_KEYCHECK_FREQUENCY, 100).
|
||||
-define(MIN_KEYCHECK_FREQUENCY, 1).
|
||||
-define(OPTION_DEFAULTS,
|
||||
[{root_path, undefined},
|
||||
{snapshot_bookie, undefined},
|
||||
|
@ -145,6 +148,8 @@
|
|||
head_only = false :: boolean(),
|
||||
head_lookup = true :: boolean(),
|
||||
|
||||
ink_checking = ?MAX_KEYCHECK_FREQUENCY :: integer(),
|
||||
|
||||
put_countdown = 0 :: integer(),
|
||||
get_countdown = 0 :: integer(),
|
||||
fold_countdown = 0 :: integer(),
|
||||
|
@ -1170,39 +1175,63 @@ handle_call({head, Bucket, Key, Tag}, _From, State)
|
|||
when State#state.head_lookup == true ->
|
||||
SWp = os:timestamp(),
|
||||
LK = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
|
||||
case fetch_head(LK,
|
||||
State#state.penciller,
|
||||
State#state.ledger_cache,
|
||||
State#state.head_only) of
|
||||
not_present ->
|
||||
{reply, not_found, State};
|
||||
Head ->
|
||||
case leveled_codec:striphead_to_details(Head) of
|
||||
{_SeqN, tomb, _MH, _MD} ->
|
||||
{reply, not_found, State};
|
||||
{_SeqN, {active, TS}, _MH, MD} ->
|
||||
case TS >= leveled_util:integer_now() of
|
||||
true ->
|
||||
{SWr, UpdTimingsP} =
|
||||
update_timings(SWp,
|
||||
{head, pcl},
|
||||
State#state.head_timings),
|
||||
OMD = leveled_codec:build_metadata_object(LK, MD),
|
||||
{_SW, UpdTimingsR} =
|
||||
update_timings(SWr, {head, rsp}, UpdTimingsP),
|
||||
{UpdTimings, CountDown} =
|
||||
update_statetimings(head,
|
||||
UpdTimingsR,
|
||||
State#state.head_countdown),
|
||||
{reply,
|
||||
{ok, OMD},
|
||||
State#state{head_timings = UpdTimings,
|
||||
head_countdown = CountDown}};
|
||||
false ->
|
||||
{reply, not_found, State}
|
||||
end
|
||||
end
|
||||
end;
|
||||
Head = fetch_head(LK,
|
||||
State#state.penciller,
|
||||
State#state.ledger_cache,
|
||||
State#state.head_only),
|
||||
{SWr, UpdTimingsP} =
|
||||
update_timings(SWp, {head, pcl}, State#state.head_timings),
|
||||
{LedgerMD, JournalCheckFrequency} =
|
||||
case Head of
|
||||
not_present ->
|
||||
{not_found, State#state.ink_checking};
|
||||
Head ->
|
||||
case leveled_codec:striphead_to_details(Head) of
|
||||
{_SeqN, tomb, _MH, _MD} ->
|
||||
{not_found, State#state.ink_checking};
|
||||
{SeqN, {active, TS}, _MH, MD} ->
|
||||
case TS >= leveled_util:integer_now() of
|
||||
true ->
|
||||
CheckFrequency =
|
||||
case State#state.head_only of
|
||||
true ->
|
||||
0;
|
||||
false ->
|
||||
State#state.ink_checking
|
||||
end,
|
||||
case journal_notfound(CheckFrequency,
|
||||
State#state.inker,
|
||||
LK,
|
||||
SeqN) of
|
||||
{true, UppedFrequency} ->
|
||||
{not_found, UppedFrequency};
|
||||
{false, ReducedFrequency} ->
|
||||
{MD, ReducedFrequency}
|
||||
end;
|
||||
false ->
|
||||
{not_found, State#state.ink_checking}
|
||||
end
|
||||
end
|
||||
end,
|
||||
Reply =
|
||||
case LedgerMD of
|
||||
not_found ->
|
||||
not_found;
|
||||
_ ->
|
||||
{ok, leveled_codec:build_metadata_object(LK, LedgerMD)}
|
||||
end,
|
||||
{_SW, UpdTimingsR} =
|
||||
update_timings(SWr, {head, rsp}, UpdTimingsP),
|
||||
{UpdTimings, CountDown} =
|
||||
update_statetimings(head,
|
||||
UpdTimingsR,
|
||||
State#state.head_countdown),
|
||||
|
||||
{reply,
|
||||
Reply,
|
||||
State#state{head_timings = UpdTimings,
|
||||
head_countdown = CountDown,
|
||||
ink_checking = JournalCheckFrequency}};
|
||||
handle_call({snapshot, SnapType, Query, LongRunning}, _From, State) ->
|
||||
% Snapshot the store, specifying if the snapshot should be long running
|
||||
% (i.e. will the snapshot be queued or be required for an extended period
|
||||
|
@ -1841,6 +1870,36 @@ fetch_head(Key, Penciller, LedgerCache, HeadOnly) ->
|
|||
end.
|
||||
|
||||
|
||||
-spec journal_notfound(integer(), pid(), leveled_codec:ledger_key(), integer())
|
||||
-> {boolean(), integer()}.
|
||||
%% @doc Check to see if the item is not_found in the journal. If it is found
|
||||
%% return false, and drop the counter that represents the frequency this check
|
||||
%% should be made. If it is not_found, this is not expected so up the check
|
||||
%% frequency to the maximum value
|
||||
journal_notfound(CheckFrequency, Inker, LK, SQN) ->
|
||||
check_notfound(CheckFrequency,
|
||||
fun() ->
|
||||
leveled_inker:ink_keycheck(Inker, LK, SQN)
|
||||
end).
|
||||
|
||||
|
||||
-spec check_notfound(integer(), fun(() -> probably|missing)) ->
|
||||
{boolean(), integer()}.
|
||||
%% @doc Use a function to check if an item is found
|
||||
check_notfound(CheckFrequency, CheckFun) ->
|
||||
case leveled_rand:uniform(?MAX_KEYCHECK_FREQUENCY) of
|
||||
X when X =< CheckFrequency ->
|
||||
case CheckFun() of
|
||||
probably ->
|
||||
{false, max(?MIN_KEYCHECK_FREQUENCY, CheckFrequency - 1)};
|
||||
missing ->
|
||||
{true, ?MAX_KEYCHECK_FREQUENCY}
|
||||
end;
|
||||
_X ->
|
||||
{false, CheckFrequency}
|
||||
end.
|
||||
|
||||
|
||||
-spec preparefor_ledgercache(leveled_codec:journal_key_tag()|null,
|
||||
leveled_codec:ledger_key()|?DUMMY,
|
||||
integer(), any(), integer(),
|
||||
|
@ -2742,5 +2801,62 @@ coverage_cheat_test() ->
|
|||
{ok, _State1} = code_change(null, #state{}, null),
|
||||
{noreply, _State2} = handle_cast(null, #state{}).
|
||||
|
||||
erase_journal_test() ->
|
||||
RootPath = reset_filestructure(),
|
||||
{ok, Bookie1} = book_start([{root_path, RootPath},
|
||||
{max_journalsize, 50000},
|
||||
{cache_size, 100}]),
|
||||
ObjL1 = generate_multiple_objects(500, 1),
|
||||
% Put in all the objects with a TTL in the future
|
||||
lists:foreach(fun({K, V, S}) -> ok = book_put(Bookie1,
|
||||
"Bucket", K, V, S,
|
||||
?STD_TAG) end,
|
||||
ObjL1),
|
||||
lists:foreach(fun({K, V, _S}) ->
|
||||
{ok, V} = book_get(Bookie1, "Bucket", K, ?STD_TAG)
|
||||
end,
|
||||
ObjL1),
|
||||
|
||||
CheckHeadFun =
|
||||
fun(Book) ->
|
||||
fun({K, _V, _S}, Acc) ->
|
||||
case book_head(Book, "Bucket", K, ?STD_TAG) of
|
||||
{ok, _Head} -> Acc;
|
||||
not_found -> Acc + 1
|
||||
end
|
||||
end
|
||||
end,
|
||||
HeadsNotFound1 = lists:foldl(CheckHeadFun(Bookie1), 0, ObjL1),
|
||||
?assertMatch(0, HeadsNotFound1),
|
||||
|
||||
ok = book_close(Bookie1),
|
||||
io:format("Bookie closed - clearing Journal~n"),
|
||||
leveled_inker:clean_testdir(RootPath ++ "/" ++ ?JOURNAL_FP),
|
||||
{ok, Bookie2} = book_start([{root_path, RootPath},
|
||||
{max_journalsize, 5000},
|
||||
{cache_size, 100}]),
|
||||
HeadsNotFound2 = lists:foldl(CheckHeadFun(Bookie2), 0, ObjL1),
|
||||
?assertMatch(500, HeadsNotFound2),
|
||||
ok = book_destroy(Bookie2).
|
||||
|
||||
check_notfound_test() ->
|
||||
ProbablyFun = fun() -> probably end,
|
||||
MissingFun = fun() -> missing end,
|
||||
MinFreq = lists:foldl(fun(_I, Freq) ->
|
||||
{false, Freq0} =
|
||||
check_notfound(Freq, ProbablyFun),
|
||||
Freq0
|
||||
end,
|
||||
100,
|
||||
lists:seq(1, 5000)),
|
||||
% 5000 as needs to be a lot as doesn't decrement
|
||||
% when random interval is not hit
|
||||
?assertMatch(?MIN_KEYCHECK_FREQUENCY, MinFreq),
|
||||
|
||||
?assertMatch({true, ?MAX_KEYCHECK_FREQUENCY},
|
||||
check_notfound(?MAX_KEYCHECK_FREQUENCY, MissingFun)),
|
||||
|
||||
?assertMatch({false, 0}, check_notfound(0, MissingFun)).
|
||||
|
||||
|
||||
-endif.
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue