Merge pull request #149 from martinsumner/mas-kvi-missinghead

Mas kvi missinghead
This commit is contained in:
Martin Sumner 2018-06-23 17:21:43 +01:00 committed by GitHub
commit 92bd2513c1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 57 additions and 26 deletions

View file

@ -766,7 +766,10 @@ handle_call({head, Bucket, Key, Tag}, _From, State)
when State#state.head_lookup == true ->
SWp = os:timestamp(),
LK = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
case fetch_head(LK, State#state.penciller, State#state.ledger_cache) of
case fetch_head(LK,
State#state.penciller,
State#state.ledger_cache,
State#state.head_only) of
not_present ->
{reply, not_found, State};
Head ->
@ -1369,6 +1372,13 @@ scan_table(Table, StartKey, EndKey, Acc, MinSQN, MaxSQN) ->
%% ledger cache if it has just been updated). not_present is returned if the
%% Key is not found
fetch_head(Key, Penciller, LedgerCache) ->
fetch_head(Key, Penciller, LedgerCache, false).
-spec fetch_head(leveled_codec:ledger_key(), pid(), ledger_cache(), boolean())
-> not_present|leveled_codec:ledger_value().
%% doc
%% The L0Index needs to be bypassed when running head_only
fetch_head(Key, Penciller, LedgerCache, HeadOnly) ->
SW = os:timestamp(),
CacheResult =
case LedgerCache#ledger_cache.mem of
@ -1382,7 +1392,10 @@ fetch_head(Key, Penciller, LedgerCache) ->
Head;
[] ->
Hash = leveled_codec:segment_hash(Key),
case leveled_penciller:pcl_fetch(Penciller, Key, Hash) of
UseL0Idx = not HeadOnly,
% don't use the L0Index in head only mode. Object specs don't
% get an addition on the L0 index
case leveled_penciller:pcl_fetch(Penciller, Key, Hash, UseL0Idx) of
{Key, Head} ->
maybe_longrunning(SW, pcl_head),
Head;

View file

@ -173,8 +173,7 @@
pcl_start/1,
pcl_pushmem/2,
pcl_fetchlevelzero/2,
pcl_fetch/2,
pcl_fetch/3,
pcl_fetch/4,
pcl_fetchkeys/5,
pcl_fetchkeysbysegment/6,
pcl_fetchnextkey/5,
@ -362,20 +361,20 @@ pcl_fetch(Pid, Key) ->
Hash = leveled_codec:segment_hash(Key),
if
Hash /= no_lookup ->
gen_server:call(Pid, {fetch, Key, Hash}, infinity)
gen_server:call(Pid, {fetch, Key, Hash, true}, infinity)
end.
-spec pcl_fetch(pid(),
leveled_codec:ledger_key(),
leveled_codec:segment_hash())
-> leveled_codec:ledger_kv()|not_present.
leveled_codec:segment_hash(),
boolean()) -> leveled_codec:ledger_kv()|not_present.
%% @doc
%% Fetch a key, return the first (highest SQN) occurrence of that Key along
%% with the value.
%%
%% Hash should be result of leveled_codec:segment_hash(Key)
pcl_fetch(Pid, Key, Hash) ->
gen_server:call(Pid, {fetch, Key, Hash}, infinity).
pcl_fetch(Pid, Key, Hash, UseL0Index) ->
gen_server:call(Pid, {fetch, Key, Hash, UseL0Index}, infinity).
-spec pcl_fetchkeys(pid(),
leveled_codec:ledger_key(),
@ -636,12 +635,19 @@ handle_call({push_mem, {LedgerTable, PushedIdx, MinSQN, MaxSQN}},
State#state.levelzero_cache,
State)}
end;
handle_call({fetch, Key, Hash}, _From, State) ->
handle_call({fetch, Key, Hash, UseL0Index}, _From, State) ->
L0Idx =
case UseL0Index of
true ->
State#state.levelzero_index;
false ->
none
end,
{R, UpdTimings} = timed_fetch_mem(Key,
Hash,
State#state.manifest,
State#state.levelzero_cache,
State#state.levelzero_index,
L0Idx,
State#state.timings),
{UpdTimings0, CountDown} =
update_statetimings(UpdTimings, State#state.timings_countdown),
@ -1233,7 +1239,13 @@ plain_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
element(1, R).
fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
PosList = leveled_pmem:check_index(Hash, L0Index),
PosList =
case L0Index of
none ->
lists:seq(1, length(L0Cache));
_ ->
leveled_pmem:check_index(Hash, L0Index)
end,
L0Check = leveled_pmem:check_levelzero(Key, Hash, PosList, L0Cache),
case L0Check of
{false, not_found} ->

View file

@ -44,7 +44,7 @@
-include_lib("eunit/include/eunit.hrl").
% -type index_array() :: array:array().
-type index_array() :: any(). % To live with OTP16
-type index_array() :: any()|none. % To live with OTP16
-export_type([index_array/0]).
@ -214,20 +214,20 @@ split_hash({SegmentID, ExtraHash}) ->
check_slotlist(Key, _Hash, CheckList, TreeList) ->
SlotCheckFun =
fun(SlotToCheck, {Found, KV}) ->
case Found of
true ->
fun(SlotToCheck, {Found, KV}) ->
case Found of
true ->
{Found, KV};
false ->
CheckTree = lists:nth(SlotToCheck, TreeList),
case leveled_tree:match(Key, CheckTree) of
none ->
{Found, KV};
false ->
CheckTree = lists:nth(SlotToCheck, TreeList),
case leveled_tree:match(Key, CheckTree) of
none ->
{Found, KV};
{value, Value} ->
{true, {Key, Value}}
end
{value, Value} ->
{true, {Key, Value}}
end
end,
end
end,
lists:foldl(SlotCheckFun,
{false, not_found},
lists:reverse(CheckList)).

View file

@ -1108,7 +1108,13 @@ basic_headonly_test(ObjectCount, RemoveCount, HeadOnly) ->
leveled_bookie:book_head(Bookie1,
SegmentID0,
{Bucket0, Key0},
h);
h),
CheckHeadFun =
fun({add, SegID, B, K, H}) ->
{ok, H} =
leveled_bookie:book_head(Bookie1, SegID, {B, K}, h)
end,
lists:foreach(CheckHeadFun, ObjectSpecL);
no_lookup ->
{unsupported_message, head} =
leveled_bookie:book_head(Bookie1,