diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 7dc00f0..0f214d1 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -187,7 +187,9 @@ pcl_doom/1, pcl_releasesnapshot/2, pcl_registersnapshot/5, - pcl_getstartupsequencenumber/1]). + pcl_getstartupsequencenumber/1, + pcl_checkbloomtest/2, + pcl_checkforwork/1]). -export([ sst_rootpath/1, @@ -501,6 +503,25 @@ pcl_close(Pid) -> pcl_doom(Pid) -> gen_server:call(Pid, doom, 60000). + +-spec pcl_checkbloomtest(pid(), tuple()) -> boolean(). +%% @doc +%% Function specifically added to help testing. In particular to make sure +%% that blooms are still available after pencllers have been re-loaded from +%% disk. +pcl_checkbloomtest(Pid, Key) -> + Hash = leveled_codec:segment_hash(Key), + if + Hash /= no_lookup -> + gen_server:call(Pid, {checkbloom_fortest, Key, Hash}, 2000) + end. + +-spec pcl_checkforwork(pid()) -> boolean(). +%% @doc +%% Used in test only to confim compaction work complete before closing +pcl_checkforwork(Pid) -> + gen_server:call(Pid, check_for_work, 2000). + %%%============================================================================ %%% gen_server callbacks %%%============================================================================ @@ -724,7 +745,28 @@ handle_call(doom, _From, State) -> leveled_log:log("P0030", []), ManifestFP = State#state.root_path ++ "/" ++ ?MANIFEST_FP ++ "/", FilesFP = State#state.root_path ++ "/" ++ ?FILES_FP ++ "/", - {stop, normal, {ok, [ManifestFP, FilesFP]}, State}. + {stop, normal, {ok, [ManifestFP, FilesFP]}, State}; +handle_call({checkbloom_fortest, Key, Hash}, _From, State) -> + Manifest = State#state.manifest, + FoldFun = + fun(Level, Acc) -> + case Acc of + true -> + true; + false -> + case leveled_pmanifest:key_lookup(Manifest, Level, Key) of + false -> + false; + FP -> + leveled_pmanifest:check_bloom(Manifest, FP, Hash) + end + end + end, + {reply, lists:foldl(FoldFun, false, lists:seq(0, ?MAX_LEVELS)), State}; +handle_call(check_for_work, _From, State) -> + {_WL, WC} = leveled_pmanifest:check_for_work(State#state.manifest, + ?LEVEL_SCALEFACTOR), + {reply, WC > 0, State}. handle_cast({manifest_change, NewManifest}, State) -> NewManSQN = leveled_pmanifest:get_manifest_sqn(NewManifest), @@ -1605,6 +1647,21 @@ archive_files_test() -> ?assertMatch(true, lists:member("test2.bak", AllFiles)), ok = clean_subdir(SSTPath). +shutdown_when_compact(Pid) -> + FoldFun = + fun(_I, Ready) -> + case Ready of + true -> + true; + false -> + timer:sleep(200), + not pcl_checkforwork(Pid) + end + end, + true = lists:foldl(FoldFun, false, lists:seq(1, 100)), + io:format("No outstanding compaction work for ~w~n", [Pid]), + pcl_close(Pid). + simple_server_test() -> RootPath = "../test/ledger", clean_testdir(RootPath), @@ -1643,17 +1700,23 @@ simple_server_test() -> ?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001", null})), ?assertMatch(Key2, pcl_fetch(PCL, {o,"Bucket0002", "Key0002", null})), ?assertMatch(Key3, pcl_fetch(PCL, {o,"Bucket0003", "Key0003", null})), - timer:sleep(200), - % This sleep should make sure that the merge to L1 has occurred - % This will free up the L0 slot for the remainder to be written in - % shutdown - ok = pcl_close(PCL), + + true = pcl_checkbloomtest(PCL, {o,"Bucket0001", "Key0001", null}), + true = pcl_checkbloomtest(PCL, {o,"Bucket0002", "Key0002", null}), + true = pcl_checkbloomtest(PCL, {o,"Bucket0003", "Key0003", null}), + false = pcl_checkbloomtest(PCL, {o,"Bucket9999", "Key9999", null}), + + ok = shutdown_when_compact(PCL), {ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath, max_inmemory_tablesize=1000, compression_method=native}), ?assertMatch(2003, pcl_getstartupsequencenumber(PCLr)), % ok = maybe_pause_push(PCLr, [Key2] ++ KL2 ++ [Key3]), + true = pcl_checkbloomtest(PCLr, {o,"Bucket0001", "Key0001", null}), + true = pcl_checkbloomtest(PCLr, {o,"Bucket0002", "Key0002", null}), + true = pcl_checkbloomtest(PCLr, {o,"Bucket0003", "Key0003", null}), + false = pcl_checkbloomtest(PCLr, {o,"Bucket9999", "Key9999", null}), ?assertMatch(Key1, pcl_fetch(PCLr, {o,"Bucket0001", "Key0001", null})), ?assertMatch(Key2, pcl_fetch(PCLr, {o,"Bucket0002", "Key0002", null})), diff --git a/src/leveled_pmanifest.erl b/src/leveled_pmanifest.erl index 68d1674..1319fed 100644 --- a/src/leveled_pmanifest.erl +++ b/src/leveled_pmanifest.erl @@ -154,10 +154,12 @@ load_manifest(Manifest, LoadFun, SQNFun) -> {L1, SQN1, FileList, LvlBloom} = load_level(LevelIdx, L0, LoadFun, SQNFun), UpdLevels = array:set(LevelIdx, L1, AccMan#manifest.levels), + FoldBloomFun = + fun({P, B}, BAcc) -> + dict:store(P, B, BAcc) + end, UpdBlooms = - dict:merge(fun(_K, V, V) -> V end, - AccMan#manifest.blooms, - LvlBloom), + lists:foldl(FoldBloomFun, AccMan#manifest.blooms, LvlBloom), {max(AccMaxSQN, SQN1), AccMan#manifest{levels = UpdLevels, blooms = UpdBlooms}, AccFL ++ FileList} @@ -503,9 +505,9 @@ levelzero_present(Manifest) -> not is_empty(0, array:get(0, Manifest#manifest.levels)). --spec check_bloom(manifest(), string(), {integer(), integer()}) -> boolean(). +-spec check_bloom(manifest(), pid(), {integer(), integer()}) -> boolean(). %% @doc -%% Check to see if a hahs is present in a manifest entry by using the exported +%% Check to see if a hash is present in a manifest entry by using the exported %% bloom filter check_bloom(Manifest, FP, Hash) -> case dict:find(FP, Manifest#manifest.blooms) of @@ -528,37 +530,37 @@ check_bloom(Manifest, FP, Hash) -> load_level(LevelIdx, Level, LoadFun, SQNFun) -> HigherLevelLoadFun = - fun(ME, {L_Out, L_MaxSQN, FileList, BloomD}) -> + fun(ME, {L_Out, L_MaxSQN, FileList, BloomL}) -> FN = ME#manifest_entry.filename, {P, Bloom} = LoadFun(FN), SQN = SQNFun(P), {[ME#manifest_entry{owner=P}|L_Out], max(SQN, L_MaxSQN), [FN|FileList], - dict:store(FN, Bloom, BloomD)} + [{P, Bloom}|BloomL]} end, LowerLevelLoadFun = - fun({EK, ME}, {L_Out, L_MaxSQN, FileList, BloomD}) -> + fun({EK, ME}, {L_Out, L_MaxSQN, FileList, BloomL}) -> FN = ME#manifest_entry.filename, {P, Bloom} = LoadFun(FN), SQN = SQNFun(P), {[{EK, ME#manifest_entry{owner=P}}|L_Out], max(SQN, L_MaxSQN), [FN|FileList], - dict:store(FN, Bloom, BloomD)} + [{P, Bloom}|BloomL]} end, case LevelIdx =< 1 of true -> - lists:foldr(HigherLevelLoadFun, {[], 0, [], dict:new()}, Level); + lists:foldr(HigherLevelLoadFun, {[], 0, [], []}, Level); false -> - {L0, MaxSQN, Flist, UpdBloomD} = - lists:foldr(LowerLevelLoadFun, - {[], 0, [], dict:new()}, + {L0, MaxSQN, Flist, UpdBloomL} = + lists:foldr(LowerLevelLoadFun, + {[], 0, [], []}, leveled_tree:to_list(Level)), {leveled_tree:from_orderedlist(L0, ?TREE_TYPE, ?TREE_WIDTH), MaxSQN, Flist, - UpdBloomD} + UpdBloomL} end. close_level(LevelIdx, Level, CloseEntryFun) when LevelIdx =< 1 ->