Improve testing of bloom feature

In particular will blooms re-appear following startup
This commit is contained in:
Martin Sumner 2017-11-28 11:43:46 +00:00
parent c2f19d8825
commit 5342e3a94f
2 changed files with 86 additions and 21 deletions

View file

@ -187,7 +187,9 @@
pcl_doom/1,
pcl_releasesnapshot/2,
pcl_registersnapshot/5,
pcl_getstartupsequencenumber/1]).
pcl_getstartupsequencenumber/1,
pcl_checkbloomtest/2,
pcl_checkforwork/1]).
-export([
sst_rootpath/1,
@ -501,6 +503,25 @@ pcl_close(Pid) ->
pcl_doom(Pid) ->
gen_server:call(Pid, doom, 60000).
-spec pcl_checkbloomtest(pid(), tuple()) -> boolean().
%% @doc
%% Function specifically added to help testing. In particular to make sure
%% that blooms are still available after pencllers have been re-loaded from
%% disk.
pcl_checkbloomtest(Pid, Key) ->
Hash = leveled_codec:segment_hash(Key),
if
Hash /= no_lookup ->
gen_server:call(Pid, {checkbloom_fortest, Key, Hash}, 2000)
end.
-spec pcl_checkforwork(pid()) -> boolean().
%% @doc
%% Used in test only to confim compaction work complete before closing
pcl_checkforwork(Pid) ->
gen_server:call(Pid, check_for_work, 2000).
%%%============================================================================
%%% gen_server callbacks
%%%============================================================================
@ -724,7 +745,28 @@ handle_call(doom, _From, State) ->
leveled_log:log("P0030", []),
ManifestFP = State#state.root_path ++ "/" ++ ?MANIFEST_FP ++ "/",
FilesFP = State#state.root_path ++ "/" ++ ?FILES_FP ++ "/",
{stop, normal, {ok, [ManifestFP, FilesFP]}, State}.
{stop, normal, {ok, [ManifestFP, FilesFP]}, State};
handle_call({checkbloom_fortest, Key, Hash}, _From, State) ->
Manifest = State#state.manifest,
FoldFun =
fun(Level, Acc) ->
case Acc of
true ->
true;
false ->
case leveled_pmanifest:key_lookup(Manifest, Level, Key) of
false ->
false;
FP ->
leveled_pmanifest:check_bloom(Manifest, FP, Hash)
end
end
end,
{reply, lists:foldl(FoldFun, false, lists:seq(0, ?MAX_LEVELS)), State};
handle_call(check_for_work, _From, State) ->
{_WL, WC} = leveled_pmanifest:check_for_work(State#state.manifest,
?LEVEL_SCALEFACTOR),
{reply, WC > 0, State}.
handle_cast({manifest_change, NewManifest}, State) ->
NewManSQN = leveled_pmanifest:get_manifest_sqn(NewManifest),
@ -1605,6 +1647,21 @@ archive_files_test() ->
?assertMatch(true, lists:member("test2.bak", AllFiles)),
ok = clean_subdir(SSTPath).
shutdown_when_compact(Pid) ->
FoldFun =
fun(_I, Ready) ->
case Ready of
true ->
true;
false ->
timer:sleep(200),
not pcl_checkforwork(Pid)
end
end,
true = lists:foldl(FoldFun, false, lists:seq(1, 100)),
io:format("No outstanding compaction work for ~w~n", [Pid]),
pcl_close(Pid).
simple_server_test() ->
RootPath = "../test/ledger",
clean_testdir(RootPath),
@ -1643,17 +1700,23 @@ simple_server_test() ->
?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001", null})),
?assertMatch(Key2, pcl_fetch(PCL, {o,"Bucket0002", "Key0002", null})),
?assertMatch(Key3, pcl_fetch(PCL, {o,"Bucket0003", "Key0003", null})),
timer:sleep(200),
% This sleep should make sure that the merge to L1 has occurred
% This will free up the L0 slot for the remainder to be written in
% shutdown
ok = pcl_close(PCL),
true = pcl_checkbloomtest(PCL, {o,"Bucket0001", "Key0001", null}),
true = pcl_checkbloomtest(PCL, {o,"Bucket0002", "Key0002", null}),
true = pcl_checkbloomtest(PCL, {o,"Bucket0003", "Key0003", null}),
false = pcl_checkbloomtest(PCL, {o,"Bucket9999", "Key9999", null}),
ok = shutdown_when_compact(PCL),
{ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath,
max_inmemory_tablesize=1000,
compression_method=native}),
?assertMatch(2003, pcl_getstartupsequencenumber(PCLr)),
% ok = maybe_pause_push(PCLr, [Key2] ++ KL2 ++ [Key3]),
true = pcl_checkbloomtest(PCLr, {o,"Bucket0001", "Key0001", null}),
true = pcl_checkbloomtest(PCLr, {o,"Bucket0002", "Key0002", null}),
true = pcl_checkbloomtest(PCLr, {o,"Bucket0003", "Key0003", null}),
false = pcl_checkbloomtest(PCLr, {o,"Bucket9999", "Key9999", null}),
?assertMatch(Key1, pcl_fetch(PCLr, {o,"Bucket0001", "Key0001", null})),
?assertMatch(Key2, pcl_fetch(PCLr, {o,"Bucket0002", "Key0002", null})),

View file

@ -154,10 +154,12 @@ load_manifest(Manifest, LoadFun, SQNFun) ->
{L1, SQN1, FileList, LvlBloom} =
load_level(LevelIdx, L0, LoadFun, SQNFun),
UpdLevels = array:set(LevelIdx, L1, AccMan#manifest.levels),
FoldBloomFun =
fun({P, B}, BAcc) ->
dict:store(P, B, BAcc)
end,
UpdBlooms =
dict:merge(fun(_K, V, V) -> V end,
AccMan#manifest.blooms,
LvlBloom),
lists:foldl(FoldBloomFun, AccMan#manifest.blooms, LvlBloom),
{max(AccMaxSQN, SQN1),
AccMan#manifest{levels = UpdLevels, blooms = UpdBlooms},
AccFL ++ FileList}
@ -503,9 +505,9 @@ levelzero_present(Manifest) ->
not is_empty(0, array:get(0, Manifest#manifest.levels)).
-spec check_bloom(manifest(), string(), {integer(), integer()}) -> boolean().
-spec check_bloom(manifest(), pid(), {integer(), integer()}) -> boolean().
%% @doc
%% Check to see if a hahs is present in a manifest entry by using the exported
%% Check to see if a hash is present in a manifest entry by using the exported
%% bloom filter
check_bloom(Manifest, FP, Hash) ->
case dict:find(FP, Manifest#manifest.blooms) of
@ -528,37 +530,37 @@ check_bloom(Manifest, FP, Hash) ->
load_level(LevelIdx, Level, LoadFun, SQNFun) ->
HigherLevelLoadFun =
fun(ME, {L_Out, L_MaxSQN, FileList, BloomD}) ->
fun(ME, {L_Out, L_MaxSQN, FileList, BloomL}) ->
FN = ME#manifest_entry.filename,
{P, Bloom} = LoadFun(FN),
SQN = SQNFun(P),
{[ME#manifest_entry{owner=P}|L_Out],
max(SQN, L_MaxSQN),
[FN|FileList],
dict:store(FN, Bloom, BloomD)}
[{P, Bloom}|BloomL]}
end,
LowerLevelLoadFun =
fun({EK, ME}, {L_Out, L_MaxSQN, FileList, BloomD}) ->
fun({EK, ME}, {L_Out, L_MaxSQN, FileList, BloomL}) ->
FN = ME#manifest_entry.filename,
{P, Bloom} = LoadFun(FN),
SQN = SQNFun(P),
{[{EK, ME#manifest_entry{owner=P}}|L_Out],
max(SQN, L_MaxSQN),
[FN|FileList],
dict:store(FN, Bloom, BloomD)}
[{P, Bloom}|BloomL]}
end,
case LevelIdx =< 1 of
true ->
lists:foldr(HigherLevelLoadFun, {[], 0, [], dict:new()}, Level);
lists:foldr(HigherLevelLoadFun, {[], 0, [], []}, Level);
false ->
{L0, MaxSQN, Flist, UpdBloomD} =
lists:foldr(LowerLevelLoadFun,
{[], 0, [], dict:new()},
{L0, MaxSQN, Flist, UpdBloomL} =
lists:foldr(LowerLevelLoadFun,
{[], 0, [], []},
leveled_tree:to_list(Level)),
{leveled_tree:from_orderedlist(L0, ?TREE_TYPE, ?TREE_WIDTH),
MaxSQN,
Flist,
UpdBloomD}
UpdBloomL}
end.
close_level(LevelIdx, Level, CloseEntryFun) when LevelIdx =< 1 ->