From c2f19d8825bf4fc1eb3b43144ff8b28f94847712 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 28 Nov 2017 01:19:30 +0000 Subject: [PATCH] Switch to using bloom at penciller Previouslythe tinybloom was used within the SST file as an extra check to remove false fetches. However the SST already has a low FPR check in the slot_index. If the newebloom was used (which is no longer per slot, but per sst), this can be shared with the penciller and then the penciller could use it and avoid the message pass. the message pass may be blocked by a 2i query or a slot fetch request for a merge. So this should make performance within the Penciller snappier. This is as a result of taking sst_timings within a volume test - where there was an average of + 100microsecs for each level that was dropped down. Given the bloom/slot checks were < 20 microsecs - there seems to be some further delay. The bloom is a binary of > 64 bytes - so passing it around should not require a copy. --- include/leveled.hrl | 3 +- src/leveled_log.erl | 4 +- src/leveled_pclerk.erl | 70 +++++----- src/leveled_penciller.erl | 57 ++++---- src/leveled_pmanifest.erl | 282 +++++++++++++++++++++++++------------- src/leveled_sst.erl | 243 +++++++++++++++----------------- src/leveled_tinybloom.erl | 278 ------------------------------------- src/leveled_tree.erl | 2 +- 8 files changed, 367 insertions(+), 572 deletions(-) delete mode 100644 src/leveled_tinybloom.erl diff --git a/include/leveled.hrl b/include/leveled.hrl index 5763040..6a8e014 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -33,7 +33,8 @@ {start_key :: tuple() | undefined, end_key :: tuple() | undefined, owner :: pid()|list(), - filename :: string() | undefined}). + filename :: string() | undefined, + bloom :: binary() | none}). -record(cdb_options, {max_size :: integer() | undefined, diff --git a/src/leveled_log.erl b/src/leveled_log.erl index 63eb237..b4aa144 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -226,9 +226,9 @@ {"SST12", {info, "SST Timings for sample_count=~w" ++ " at timing points index_query_time=~w" - ++ " tiny_bloom_time=~w slot_index_time=~w slot_fetch_time=~w" + ++ " lookup_cache_time=~w slot_index_time=~w slot_fetch_time=~w" ++ " noncached_block_fetch_time=~w" - ++ " exiting at points tiny_bloom=~w slot_index=~w" + ++ " exiting at points slot_index=~w" ++ " slot_fetch=~w noncached_block_fetch=~w"}}, diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index 41fb474..81cc7f2 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -223,12 +223,13 @@ do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, CM, Additions) -> RP, NewSQN, MaxSQN, CM, Additions); - {ok, Pid, Reply} -> + {ok, Pid, Reply, Bloom} -> {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply, Entry = #manifest_entry{start_key=SmallestKey, end_key=HighestKey, owner=Pid, - filename=FileName}, + filename=FileName, + bloom=Bloom}, leveled_log:log_timer("PC015", [], TS1), do_merge(KL1Rem, KL2Rem, SinkLevel, SinkB, @@ -275,40 +276,45 @@ generate_randomkeys(Count, Acc, BucketLow, BRange) -> merge_file_test() -> KL1_L1 = lists:sort(generate_randomkeys(8000, 0, 1000)), - {ok, PidL1_1, _} = leveled_sst:sst_new("../test/", - "KL1_L1.sst", - 1, - KL1_L1, - 999999, - native), + {ok, PidL1_1, _, _} = + leveled_sst:sst_new("../test/", + "KL1_L1.sst", + 1, + KL1_L1, + 999999, + native), KL1_L2 = lists:sort(generate_randomkeys(8000, 0, 250)), - {ok, PidL2_1, _} = leveled_sst:sst_new("../test/", - "KL1_L2.sst", - 2, - KL1_L2, - 999999, - native), + {ok, PidL2_1, _, _} = + leveled_sst:sst_new("../test/", + "KL1_L2.sst", + 2, + KL1_L2, + 999999, + native), KL2_L2 = lists:sort(generate_randomkeys(8000, 250, 250)), - {ok, PidL2_2, _} = leveled_sst:sst_new("../test/", - "KL2_L2.sst", - 2, - KL2_L2, - 999999, - lz4), + {ok, PidL2_2, _, _} = + leveled_sst:sst_new("../test/", + "KL2_L2.sst", + 2, + KL2_L2, + 999999, + lz4), KL3_L2 = lists:sort(generate_randomkeys(8000, 500, 250)), - {ok, PidL2_3, _} = leveled_sst:sst_new("../test/", - "KL3_L2.sst", - 2, - KL3_L2, - 999999, - lz4), + {ok, PidL2_3, _, _} = + leveled_sst:sst_new("../test/", + "KL3_L2.sst", + 2, + KL3_L2, + 999999, + lz4), KL4_L2 = lists:sort(generate_randomkeys(8000, 750, 250)), - {ok, PidL2_4, _} = leveled_sst:sst_new("../test/", - "KL4_L2.sst", - 2, - KL4_L2, - 999999, - lz4), + {ok, PidL2_4, _, _} = + leveled_sst:sst_new("../test/", + "KL4_L2.sst", + 2, + KL4_L2, + 999999, + lz4), E1 = #manifest_entry{owner = PidL1_1, filename = "./KL1_L1.sst", diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 7266e30..7dc00f0 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -181,7 +181,7 @@ pcl_checksequencenumber/3, pcl_workforclerk/1, pcl_manifestchange/2, - pcl_confirml0complete/4, + pcl_confirml0complete/5, pcl_confirmdelete/3, pcl_close/1, pcl_doom/1, @@ -439,14 +439,14 @@ pcl_workforclerk(Pid) -> pcl_manifestchange(Pid, Manifest) -> gen_server:cast(Pid, {manifest_change, Manifest}). --spec pcl_confirml0complete(pid(), string(), tuple(), tuple()) -> ok. +-spec pcl_confirml0complete(pid(), string(), tuple(), tuple(), binary()) -> ok. %% @doc %% Allows a SST writer that has written a L0 file to confirm that the file %% is now complete, so the filename and key ranges can be added to the %% manifest and the file can be used in place of the in-memory levelzero %% cache. -pcl_confirml0complete(Pid, FN, StartKey, EndKey) -> - gen_server:cast(Pid, {levelzero_complete, FN, StartKey, EndKey}). +pcl_confirml0complete(Pid, FN, StartKey, EndKey, Bloom) -> + gen_server:cast(Pid, {levelzero_complete, FN, StartKey, EndKey, Bloom}). -spec pcl_confirmdelete(pid(), string(), pid()) -> ok. %% @doc @@ -759,17 +759,18 @@ handle_cast({confirm_delete, Filename, FilePid}, State=#state{is_snapshot=Snap}) % from the Clerk {noreply, State} end; -handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) -> +handle_cast({levelzero_complete, FN, StartKey, EndKey, Bloom}, State) -> leveled_log:log("P0029", []), ManEntry = #manifest_entry{start_key=StartKey, end_key=EndKey, owner=State#state.levelzero_constructor, - filename=FN}, + filename=FN, + bloom=Bloom}, ManifestSQN = leveled_pmanifest:get_manifest_sqn(State#state.manifest) + 1, UpdMan = leveled_pmanifest:insert_manifest_entry(State#state.manifest, - ManifestSQN, - 0, - ManEntry), + ManifestSQN, + 0, + ManEntry), % Prompt clerk to ask about work - do this for every L0 roll UpdIndex = leveled_pmem:clear_index(State#state.levelzero_index), ok = leveled_pclerk:clerk_prompt(State#state.clerk), @@ -837,7 +838,7 @@ terminate(Reason, State) -> L0_Left = State#state.levelzero_size > 0, case {State#state.levelzero_pending, L0_Present, L0_Left} of {false, false, true} -> - L0Pid = roll_memory(State, true), + {L0Pid, _L0Bloom} = roll_memory(State, true), ok = leveled_sst:sst_close(L0Pid); StatusTuple -> leveled_log:log("P0010", [StatusTuple]) @@ -911,11 +912,9 @@ start_from_file(PCLopts) -> Manifest0 = leveled_pmanifest:open_manifest(RootPath), OpenFun = fun(FN) -> - {ok, - Pid, - {_FK, _LK}} = - leveled_sst:sst_open(sst_rootpath(RootPath), FN), - Pid + {ok, Pid, {_FK, _LK}, Bloom} = + leveled_sst:sst_open(sst_rootpath(RootPath), FN), + {Pid, Bloom} end, SQNFun = fun leveled_sst:sst_getmaxsequencenumber/1, {MaxSQN, Manifest1, FileList} = @@ -930,12 +929,13 @@ start_from_file(PCLopts) -> true -> leveled_log:log("P0015", [L0FN]), L0Open = leveled_sst:sst_open(sst_rootpath(RootPath), L0FN), - {ok, L0Pid, {L0StartKey, L0EndKey}} = L0Open, + {ok, L0Pid, {L0StartKey, L0EndKey}, Bloom} = L0Open, L0SQN = leveled_sst:sst_getmaxsequencenumber(L0Pid), L0Entry = #manifest_entry{start_key = L0StartKey, end_key = L0EndKey, filename = L0FN, - owner = L0Pid}, + owner = L0Pid, + bloom = Bloom}, Manifest2 = leveled_pmanifest:insert_manifest_entry(Manifest1, ManSQN + 1, 0, @@ -1025,7 +1025,7 @@ update_levelzero(L0Size, {PushedTree, PushedIdx, MinSQN, MaxSQN}, JitterCheck = RandomFactor or CacheMuchTooBig, case {CacheTooBig, L0Free, JitterCheck, NoPendingManifestChange} of {true, true, true, true} -> - L0Constructor = roll_memory(UpdState, false), + {L0Constructor, none} = roll_memory(UpdState, false), leveled_log:log_timer("P0031", [true, true], SW), UpdState#state{levelzero_pending=true, levelzero_constructor=L0Constructor}; @@ -1063,7 +1063,7 @@ roll_memory(State, false) -> State#state.ledger_sqn, State#state.compression_method), {ok, Constructor, _} = R, - Constructor; + {Constructor, none}; roll_memory(State, true) -> ManSQN = leveled_pmanifest:get_manifest_sqn(State#state.manifest) + 1, RootPath = sst_rootpath(State#state.root_path), @@ -1077,8 +1077,8 @@ roll_memory(State, true) -> KVList, State#state.ledger_sqn, State#state.compression_method), - {ok, Constructor, _} = R, - Constructor. + {ok, Constructor, _, Bloom} = R, + {Constructor, Bloom}. timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, Timings) -> SW = os:timestamp(), @@ -1107,11 +1107,16 @@ fetch(Key, Hash, Manifest, Level, FetchFun) -> false -> fetch(Key, Hash, Manifest, Level + 1, FetchFun); FP -> - case FetchFun(FP, Key, Hash, Level) of - not_present -> - fetch(Key, Hash, Manifest, Level + 1, FetchFun); - ObjectFound -> - {ObjectFound, Level} + case leveled_pmanifest:check_bloom(Manifest, FP, Hash) of + true -> + case FetchFun(FP, Key, Hash, Level) of + not_present -> + fetch(Key, Hash, Manifest, Level + 1, FetchFun); + ObjectFound -> + {ObjectFound, Level} + end; + false -> + fetch(Key, Hash, Manifest, Level + 1, FetchFun) end end. diff --git a/src/leveled_pmanifest.erl b/src/leveled_pmanifest.erl index ba6e9b7..68d1674 100644 --- a/src/leveled_pmanifest.erl +++ b/src/leveled_pmanifest.erl @@ -41,7 +41,8 @@ ready_to_delete/2, check_for_work/2, is_basement/2, - levelzero_present/1 + levelzero_present/1, + check_bloom/3 ]). -export([ @@ -69,8 +70,10 @@ pending_deletes, % OTP16 does not like defining type % a dictionary mapping keys (filenames) to SQN when the % deletion was made, and the original Manifest Entry - basement :: integer() + basement :: integer(), % Currently the lowest level (the largest number) + blooms :: dict:dict() + % A dictionary mapping PIDs to bloom filters }). -type manifest() :: #manifest{}. @@ -100,7 +103,8 @@ new_manifest() -> manifest_sqn = 0, snapshots = [], pending_deletes = dict:new(), - basement = 0 + basement = 0, + blooms = dict:new() }. -spec open_manifest(string()) -> manifest(). @@ -143,18 +147,24 @@ copy_manifest(Manifest) -> %% manifest. The PidFun should be able to return the Pid of a file process %% (having started one). The SQNFun will return the max sequence number %% of that file, if passed the Pid that owns it. -load_manifest(Manifest, PidFun, SQNFun) -> +load_manifest(Manifest, LoadFun, SQNFun) -> UpdateLevelFun = fun(LevelIdx, {AccMaxSQN, AccMan, AccFL}) -> L0 = array:get(LevelIdx, AccMan#manifest.levels), - {L1, SQN1, FileList} = load_level(LevelIdx, L0, PidFun, SQNFun), + {L1, SQN1, FileList, LvlBloom} = + load_level(LevelIdx, L0, LoadFun, SQNFun), UpdLevels = array:set(LevelIdx, L1, AccMan#manifest.levels), + UpdBlooms = + dict:merge(fun(_K, V, V) -> V end, + AccMan#manifest.blooms, + LvlBloom), {max(AccMaxSQN, SQN1), - AccMan#manifest{levels = UpdLevels}, + AccMan#manifest{levels = UpdLevels, blooms = UpdBlooms}, AccFL ++ FileList} end, - lists:foldl(UpdateLevelFun, {0, Manifest, []}, - lists:seq(0, Manifest#manifest.basement)). + lists:foldl(UpdateLevelFun, + {0, Manifest, []}, + lists:seq(0, Manifest#manifest.basement)). -spec close_manifest(manifest(), fun()) -> ok. %% @doc @@ -182,7 +192,8 @@ save_manifest(Manifest, RootPath) -> FP = filepath(RootPath, Manifest#manifest.manifest_sqn, current_manifest), ManBin = term_to_binary(Manifest#manifest{snapshots = [], pending_deletes = dict:new(), - min_snapshot_sqn = 0}), + min_snapshot_sqn = 0, + blooms = dict:new()}), CRC = erlang:crc32(ManBin), ok = file:write_file(FP, <>). @@ -198,24 +209,29 @@ save_manifest(Manifest, RootPath) -> replace_manifest_entry(Manifest, ManSQN, LevelIdx, Removals, Additions) -> Levels = Manifest#manifest.levels, Level = array:get(LevelIdx, Levels), - UpdLevel = replace_entry(LevelIdx, Level, Removals, Additions), + {UpdBlooms, StrippedAdditions} = + update_blooms(Removals, Additions, Manifest#manifest.blooms), + UpdLevel = replace_entry(LevelIdx, Level, Removals, StrippedAdditions), leveled_log:log("PC019", ["insert", LevelIdx, UpdLevel]), - PendingDeletes = update_pendingdeletes(ManSQN, - Removals, - Manifest#manifest.pending_deletes), + PendingDeletes = + update_pendingdeletes(ManSQN, + Removals, + Manifest#manifest.pending_deletes), UpdLevels = array:set(LevelIdx, UpdLevel, Levels), case is_empty(LevelIdx, UpdLevel) of true -> Manifest#manifest{levels = UpdLevels, basement = get_basement(UpdLevels), manifest_sqn = ManSQN, - pending_deletes = PendingDeletes}; + pending_deletes = PendingDeletes, + blooms = UpdBlooms}; false -> Basement = max(LevelIdx, Manifest#manifest.basement), Manifest#manifest{levels = UpdLevels, basement = Basement, manifest_sqn = ManSQN, - pending_deletes = PendingDeletes} + pending_deletes = PendingDeletes, + blooms = UpdBlooms} end. -spec insert_manifest_entry(manifest(), integer(), integer(), @@ -226,12 +242,15 @@ replace_manifest_entry(Manifest, ManSQN, LevelIdx, Removals, Additions) -> insert_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) -> Levels = Manifest#manifest.levels, Level = array:get(LevelIdx, Levels), - UpdLevel = add_entry(LevelIdx, Level, Entry), + {UpdBlooms, UpdEntry} = + update_blooms([], Entry, Manifest#manifest.blooms), + UpdLevel = add_entry(LevelIdx, Level, UpdEntry), leveled_log:log("PC019", ["insert", LevelIdx, UpdLevel]), Basement = max(LevelIdx, Manifest#manifest.basement), Manifest#manifest{levels = array:set(LevelIdx, UpdLevel, Levels), basement = Basement, - manifest_sqn = ManSQN}. + manifest_sqn = ManSQN, + blooms = UpdBlooms}. -spec remove_manifest_entry(manifest(), integer(), integer(), list()|manifest_entry()) -> manifest(). @@ -240,6 +259,8 @@ insert_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) -> remove_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) -> Levels = Manifest#manifest.levels, Level = array:get(LevelIdx, Levels), + {UpdBlooms, []} = + update_blooms(Entry, [], Manifest#manifest.blooms), UpdLevel = remove_entry(LevelIdx, Level, Entry), leveled_log:log("PC019", ["remove", LevelIdx, UpdLevel]), PendingDeletes = update_pendingdeletes(ManSQN, @@ -251,11 +272,13 @@ remove_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) -> Manifest#manifest{levels = UpdLevels, basement = get_basement(UpdLevels), manifest_sqn = ManSQN, - pending_deletes = PendingDeletes}; + pending_deletes = PendingDeletes, + blooms = UpdBlooms}; false -> Manifest#manifest{levels = UpdLevels, manifest_sqn = ManSQN, - pending_deletes = PendingDeletes} + pending_deletes = PendingDeletes, + blooms = UpdBlooms} end. -spec switch_manifest_entry(manifest(), integer(), integer(), @@ -479,6 +502,20 @@ is_basement(Manifest, Level) -> levelzero_present(Manifest) -> not is_empty(0, array:get(0, Manifest#manifest.levels)). + +-spec check_bloom(manifest(), string(), {integer(), integer()}) -> boolean(). +%% @doc +%% Check to see if a hahs is present in a manifest entry by using the exported +%% bloom filter +check_bloom(Manifest, FP, Hash) -> + case dict:find(FP, Manifest#manifest.blooms) of + {ok, Bloom} when is_binary(Bloom) -> + leveled_ebloom:check_hash(Hash, Bloom); + _ -> + true + end. + + %%%============================================================================ %%% Internal Functions %%%============================================================================ @@ -489,35 +526,39 @@ levelzero_present(Manifest) -> %% future branches may make lower levels trees or skiplists to improve fetch %% efficiency -load_level(LevelIdx, Level, PidFun, SQNFun) -> +load_level(LevelIdx, Level, LoadFun, SQNFun) -> HigherLevelLoadFun = - fun(ME, {L_Out, L_MaxSQN, FileList}) -> + fun(ME, {L_Out, L_MaxSQN, FileList, BloomD}) -> FN = ME#manifest_entry.filename, - P = PidFun(FN), + {P, Bloom} = LoadFun(FN), SQN = SQNFun(P), {[ME#manifest_entry{owner=P}|L_Out], max(SQN, L_MaxSQN), - [FN|FileList]} + [FN|FileList], + dict:store(FN, Bloom, BloomD)} end, LowerLevelLoadFun = - fun({EK, ME}, {L_Out, L_MaxSQN, FileList}) -> + fun({EK, ME}, {L_Out, L_MaxSQN, FileList, BloomD}) -> FN = ME#manifest_entry.filename, - P = PidFun(FN), + {P, Bloom} = LoadFun(FN), SQN = SQNFun(P), {[{EK, ME#manifest_entry{owner=P}}|L_Out], max(SQN, L_MaxSQN), - [FN|FileList]} + [FN|FileList], + dict:store(FN, Bloom, BloomD)} end, case LevelIdx =< 1 of true -> - lists:foldr(HigherLevelLoadFun, {[], 0, []}, Level); + lists:foldr(HigherLevelLoadFun, {[], 0, [], dict:new()}, Level); false -> - {L0, MaxSQN, Flist} = lists:foldr(LowerLevelLoadFun, - {[], 0, []}, - leveled_tree:to_list(Level)), + {L0, MaxSQN, Flist, UpdBloomD} = + lists:foldr(LowerLevelLoadFun, + {[], 0, [], dict:new()}, + leveled_tree:to_list(Level)), {leveled_tree:from_orderedlist(L0, ?TREE_TYPE, ?TREE_WIDTH), MaxSQN, - Flist} + Flist, + UpdBloomD} end. close_level(LevelIdx, Level, CloseEntryFun) when LevelIdx =< 1 -> @@ -567,9 +608,7 @@ add_entry(LevelIdx, Level, Entries) when is_list(Entries) -> leveled_tree:from_orderedlist(lists:append([LHS, Entries0, RHS]), ?TREE_TYPE, ?TREE_WIDTH) - end; -add_entry(LevelIdx, Level, Entry) -> - add_entry(LevelIdx, Level, [Entry]). + end. remove_entry(LevelIdx, Level, Entries) -> % We're assuming we're removing a sorted sublist @@ -608,12 +647,7 @@ replace_entry(LevelIdx, Level, Removals, Additions) when LevelIdx =< 1 -> FirstEntry#manifest_entry.end_key), {LHS, RHS} = lists:splitwith(PredFun, Level), Post = lists:nthtail(SectionLength, RHS), - case is_list(Additions) of - true -> - lists:append([LHS, Additions, Post]); - false -> - lists:append([LHS, [Additions], Post]) - end; + lists:append([LHS, Additions, Post]); replace_entry(LevelIdx, Level, Removals, Additions) -> {SectionLength, FirstEntry} = measure_removals(Removals), PredFun = pred_fun(LevelIdx, @@ -627,21 +661,11 @@ replace_entry(LevelIdx, Level, Removals, Additions) -> _ -> lists:nthtail(SectionLength, RHS) end, - UpdList = - case is_list(Additions) of - true -> - MapFun = - fun(ME) -> - {ME#manifest_entry.end_key, ME} - end, - Additions0 = lists:map(MapFun, Additions), - lists:append([LHS, Additions0, Post]); - false -> - lists:append([LHS, - [{Additions#manifest_entry.end_key, - Additions}], - Post]) + MapFun = + fun(ME) -> + {ME#manifest_entry.end_key, ME} end, + UpdList = lists:append([LHS, lists:map(MapFun, Additions), Post]), leveled_tree:from_orderedlist(UpdList, ?TREE_TYPE, ?TREE_WIDTH). @@ -661,6 +685,46 @@ update_pendingdeletes(ManSQN, Removals, PendingDeletes) -> end, lists:foldl(DelFun, PendingDeletes, Entries). +-spec update_blooms(list()|manifest_entry(), + list()|manifest_entry(), + dict:dict()) + -> {dict:dict(), list()}. +%% @doc +%% +%% The manifest is a Pid-> Bloom mappping for every Pid, and this needs to +%% be updated to represent the changes. However, the bloom would bloat out +%% the stored manifest, so the bloom must be stripped from the manifest entry +%% as part of this process +update_blooms(Removals, Additions, Blooms) -> + Additions0 = + case is_list(Additions) of + true -> Additions; + false -> [Additions] + end, + Removals0 = + case is_list(Removals) of + true -> Removals; + false -> [Removals] + end, + + RemFun = + fun(R, BloomD) -> + dict:erase(R#manifest_entry.owner, BloomD) + end, + AddFun = + fun(A, BloomD) -> + dict:store(A#manifest_entry.owner, A#manifest_entry.bloom, BloomD) + end, + StripFun = + fun(A) -> + A#manifest_entry{bloom = none} + end, + + Blooms0 = lists:foldl(RemFun, Blooms, Removals0), + Blooms1 = lists:foldl(AddFun, Blooms0, Additions0), + {Blooms1, lists:map(StripFun, Additions0)}. + + key_lookup_level(LevelIdx, [], _Key) when LevelIdx =< 1 -> false; key_lookup_level(LevelIdx, [Entry|Rest], Key) when LevelIdx =< 1 -> @@ -782,27 +846,33 @@ initial_setup() -> E1 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"}, end_key={i, "Bucket1", {"Idx1", "Fld9"}, "K93"}, filename="Z1", - owner="pid_z1"}, + owner="pid_z1", + bloom=none}, E2 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld9"}, "K97"}, - end_key={o, "Bucket1", "K71", null}, - filename="Z2", - owner="pid_z2"}, + end_key={o, "Bucket1", "K71", null}, + filename="Z2", + owner="pid_z2", + bloom=none}, E3 = #manifest_entry{start_key={o, "Bucket1", "K75", null}, end_key={o, "Bucket1", "K993", null}, filename="Z3", - owner="pid_z3"}, + owner="pid_z3", + bloom=none}, E4 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"}, end_key={i, "Bucket1", {"Idx1", "Fld7"}, "K93"}, filename="Z4", - owner="pid_z4"}, + owner="pid_z4", + bloom=none}, E5 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld7"}, "K97"}, end_key={o, "Bucket1", "K78", null}, filename="Z5", - owner="pid_z5"}, + owner="pid_z5", + bloom=none}, E6 = #manifest_entry{start_key={o, "Bucket1", "K81", null}, end_key={o, "Bucket1", "K996", null}, filename="Z6", - owner="pid_z6"}, + owner="pid_z6", + bloom=none}, Man0 = new_manifest(), @@ -819,32 +889,39 @@ changeup_setup(Man6) -> E1 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"}, end_key={i, "Bucket1", {"Idx1", "Fld9"}, "K93"}, filename="Z1", - owner="pid_z1"}, + owner="pid_z1", + bloom=none}, E2 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld9"}, "K97"}, - end_key={o, "Bucket1", "K71", null}, - filename="Z2", - owner="pid_z2"}, + end_key={o, "Bucket1", "K71", null}, + filename="Z2", + owner="pid_z2", + bloom=none}, E3 = #manifest_entry{start_key={o, "Bucket1", "K75", null}, end_key={o, "Bucket1", "K993", null}, filename="Z3", - owner="pid_z3"}, + owner="pid_z3", + bloom=none}, E1_2 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld4"}, "K8"}, end_key={i, "Bucket1", {"Idx1", "Fld9"}, "K62"}, owner="pid_y1", - filename="Y1"}, + filename="Y1", + bloom=none}, E2_2 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld9"}, "K67"}, end_key={o, "Bucket1", "K45", null}, owner="pid_y2", - filename="Y2"}, + filename="Y2", + bloom=none}, E3_2 = #manifest_entry{start_key={o, "Bucket1", "K47", null}, end_key={o, "Bucket1", "K812", null}, owner="pid_y3", - filename="Y3"}, + filename="Y3", + bloom=none}, E4_2 = #manifest_entry{start_key={o, "Bucket1", "K815", null}, end_key={o, "Bucket1", "K998", null}, owner="pid_y4", - filename="Y4"}, + filename="Y4", + bloom=none}, Man7 = remove_manifest_entry(Man6, 2, 1, E1), Man8 = remove_manifest_entry(Man7, 2, 1, E2), @@ -949,32 +1026,39 @@ ext_keylookup_manifest_test() -> E1 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"}, end_key={i, "Bucket1", {"Idx1", "Fld9"}, "K93"}, filename="Z1", - owner="pid_z1"}, + owner="pid_z1", + bloom=none}, E2 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld9"}, "K97"}, - end_key={o, "Bucket1", "K71", null}, - filename="Z2", - owner="pid_z2"}, + end_key={o, "Bucket1", "K71", null}, + filename="Z2", + owner="pid_z2", + bloom=none}, E3 = #manifest_entry{start_key={o, "Bucket1", "K75", null}, end_key={o, "Bucket1", "K993", null}, filename="Z3", - owner="pid_z3"}, + owner="pid_z3", + bloom=none}, E1_2 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld4"}, "K8"}, end_key={i, "Bucket1", {"Idx1", "Fld9"}, "K62"}, owner="pid_y1", - filename="Y1"}, + filename="Y1", + bloom=none}, E2_2 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld9"}, "K67"}, end_key={o, "Bucket1", "K45", null}, owner="pid_y2", - filename="Y2"}, + filename="Y2", + bloom=none}, E3_2 = #manifest_entry{start_key={o, "Bucket1", "K47", null}, end_key={o, "Bucket1", "K812", null}, owner="pid_y3", - filename="Y3"}, + filename="Y3", + bloom=none}, E4_2 = #manifest_entry{start_key={o, "Bucket1", "K815", null}, end_key={o, "Bucket1", "K998", null}, owner="pid_y4", - filename="Y4"}, + filename="Y4", + bloom=none}, Man8 = replace_manifest_entry(ManOpen2, 2, 1, E1, E1_2), Man9 = remove_manifest_entry(Man8, 2, 1, [E2, E3]), @@ -988,21 +1072,18 @@ ext_keylookup_manifest_test() -> E5 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld7"}, "K97"}, end_key={o, "Bucket1", "K78", null}, filename="Z5", - owner="pid_z5"}, + owner="pid_z5", + bloom=none}, E6 = #manifest_entry{start_key={o, "Bucket1", "K81", null}, end_key={o, "Bucket1", "K996", null}, filename="Z6", - owner="pid_z6"}, + owner="pid_z6", + bloom=none}, Man11 = remove_manifest_entry(Man10, 3, 2, [E5, E6]), ?assertMatch(3, get_manifest_sqn(Man11)), ?assertMatch(false, key_lookup(Man11, 2, LK1_4)), - E2_2 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld9"}, "K67"}, - end_key={o, "Bucket1", "K45", null}, - owner="pid_y2", - filename="Y2"}, - Man12 = replace_manifest_entry(Man11, 4, 2, E2_2, E5), ?assertMatch(4, get_manifest_sqn(Man12)), ?assertMatch("pid_z5", key_lookup(Man12, 2, LK1_4)). @@ -1057,7 +1138,8 @@ levelzero_present_test() -> E0 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"}, end_key={o, "Bucket1", "Key996", null}, filename="Z0", - owner="pid_z0"}, + owner="pid_z0", + bloom=none}, Man0 = new_manifest(), ?assertMatch(false, levelzero_present(Man0)), @@ -1070,15 +1152,18 @@ snapshot_release_test() -> E1 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"}, end_key={i, "Bucket1", {"Idx1", "Fld9"}, "K93"}, filename="Z1", - owner="pid_z1"}, + owner="pid_z1", + bloom=none}, E2 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld9"}, "K97"}, - end_key={o, "Bucket1", "K71", null}, - filename="Z2", - owner="pid_z2"}, + end_key={o, "Bucket1", "K71", null}, + filename="Z2", + owner="pid_z2", + bloom=none}, E3 = #manifest_entry{start_key={o, "Bucket1", "K75", null}, end_key={o, "Bucket1", "K993", null}, filename="Z3", - owner="pid_z3"}, + owner="pid_z3", + bloom=none}, Man7 = add_snapshot(Man6, pid_a1, 3600), Man8 = remove_manifest_entry(Man7, 2, 1, E1), @@ -1134,18 +1219,18 @@ potential_issue_test() -> {[], [{manifest_entry,{o_rkv,"Bucket","Key10",null}, {o_rkv,"Bucket","Key12949",null}, - "<0.313.0>","./16_1_0.sst"}, + "<0.313.0>","./16_1_0.sst", none}, {manifest_entry,{o_rkv,"Bucket","Key129490",null}, {o_rkv,"Bucket","Key158981",null}, - "<0.315.0>","./16_1_1.sst"}, + "<0.315.0>","./16_1_1.sst", none}, {manifest_entry,{o_rkv,"Bucket","Key158982",null}, {o_rkv,"Bucket","Key188472",null}, - "<0.316.0>","./16_1_2.sst"}], + "<0.316.0>","./16_1_2.sst", none}], {idxt,1, {{[{{o_rkv,"Bucket1","Key1",null}, {manifest_entry,{o_rkv,"Bucket","Key9083",null}, {o_rkv,"Bucket1","Key1",null}, - "<0.320.0>","./16_1_6.sst"}}]}, + "<0.320.0>","./16_1_6.sst", none}}]}, {1,{{o_rkv,"Bucket1","Key1",null},1,nil,nil}}}}, {idxt,0,{{},{0,nil}}}, {idxt,0,{{},{0,nil}}}, @@ -1158,7 +1243,8 @@ potential_issue_test() -> {dict,0,16,16,8,80,48, {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}, {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}}, - 2}, + 2, + dict:new()}, Range1 = range_lookup(Manifest, 1, {o_rkv, "Bucket", null, null}, diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 164e251..2ca83e6 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -115,8 +115,7 @@ -record(slot_index_value, {slot_id :: integer(), start_position :: integer(), - length :: integer(), - bloom :: binary()}). + length :: integer()}). -record(summary, {first_key :: tuple(), last_key :: tuple(), @@ -148,11 +147,11 @@ -record(sst_timings, {sample_count = 0 :: integer(), index_query_time = 0 :: integer(), - tiny_bloom_time = 0 :: integer(), + lookup_cache_time = 0 :: integer(), slot_index_time = 0 :: integer(), slot_fetch_time = 0 :: integer(), noncached_block_time = 0 :: integer(), - tiny_bloom_count = 0 :: integer(), + lookup_cache_count = 0 :: integer(), slot_index_count = 0 :: integer(), slot_fetch_count = 0 :: integer(), noncached_block_count = 0 :: integer()}). @@ -164,7 +163,8 @@ %%% API %%%============================================================================ --spec sst_open(string(), string()) -> {ok, pid(), {tuple(), tuple()}}. +-spec sst_open(string(), string()) -> + {ok, pid(), {tuple(), tuple()}, binary()}. %% @doc %% Open an SST file at a given path and filename. The first and last keys %% are returned in response to the request - so that those keys can be used @@ -178,13 +178,13 @@ sst_open(RootPath, Filename) -> case gen_fsm:sync_send_event(Pid, {sst_open, RootPath, Filename}, infinity) of - {ok, {SK, EK}} -> - {ok, Pid, {SK, EK}} + {ok, {SK, EK}, Bloom} -> + {ok, Pid, {SK, EK}, Bloom} end. -spec sst_new(string(), string(), integer(), list(), integer(), press_methods()) -> - {ok, pid(), {tuple(), tuple()}}. + {ok, pid(), {tuple(), tuple()}, binary()}. %% @doc %% Start a new SST file at the assigned level passing in a list of Key, Value %% pairs. This should not be used for basement levels or unexpanded Key/Value @@ -201,13 +201,13 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod) -> MaxSQN, PressMethod}, infinity) of - {ok, {SK, EK}} -> - {ok, Pid, {SK, EK}} + {ok, {SK, EK}, Bloom} -> + {ok, Pid, {SK, EK}, Bloom} end. -spec sst_new(string(), string(), list(), list(), boolean(), integer(), integer(), press_methods()) -> - empty|{ok, pid(), {{list(), list()}, tuple(), tuple()}}. + empty|{ok, pid(), {{list(), list()}, tuple(), tuple()}, binary()}. %% @doc %% Start a new SST file at the assigned level passing in a two lists of %% {Key, Value} pairs to be merged. The merge_lists function will use the @@ -238,15 +238,15 @@ sst_new(RootPath, Filename, MaxSQN, PressMethod}, infinity) of - {ok, {SK, EK}} -> - {ok, Pid, {{Rem1, Rem2}, SK, EK}} + {ok, {SK, EK}, Bloom} -> + {ok, Pid, {{Rem1, Rem2}, SK, EK}, Bloom} end end. -spec sst_newlevelzero(string(), string(), integer(), fun(), pid()|undefined, integer(), press_methods()) -> - {ok, pid(), noreply}. + {ok, pid(), noreply}. %% @doc %% Start a new file at level zero. At this level the file size is not fixed - %% it will be as big as the input. Also the KVList is not passed in, it is @@ -399,10 +399,11 @@ init([]) -> {ok, starting, #state{}}. starting({sst_open, RootPath, Filename}, _From, State) -> - UpdState = read_file(Filename, State#state{root_path=RootPath}), + {UpdState, Bloom} = + read_file(Filename, State#state{root_path=RootPath}), Summary = UpdState#state.summary, {reply, - {ok, {Summary#summary.first_key, Summary#summary.last_key}}, + {ok, {Summary#summary.first_key, Summary#summary.last_key}, Bloom}, reader, UpdState}; starting({sst_new, @@ -413,24 +414,22 @@ starting({sst_new, {Length, SlotIndex, BlockIndex, - SlotsBin} = build_all_slots(SlotList, PressMethod), - SummaryBin = build_table_summary(SlotIndex, - Level, - FirstKey, - Length, - MaxSQN), + SlotsBin, + Bloom} = build_all_slots(SlotList, PressMethod), + SummaryBin = + build_table_summary(SlotIndex, Level, FirstKey, Length, MaxSQN, Bloom), ActualFilename = write_file(RootPath, Filename, SummaryBin, SlotsBin, PressMethod), YBQ = Level =< 2, - UpdState = read_file(ActualFilename, - State#state{root_path=RootPath, - yield_blockquery=YBQ}), + {UpdState, Bloom} = + read_file(ActualFilename, + State#state{root_path=RootPath, yield_blockquery=YBQ}), Summary = UpdState#state.summary, leveled_log:log_timer("SST08", [ActualFilename, Level, Summary#summary.max_sqn], SW), {reply, - {ok, {Summary#summary.first_key, Summary#summary.last_key}}, + {ok, {Summary#summary.first_key, Summary#summary.last_key}, Bloom}, reader, UpdState#state{blockindex_cache = BlockIndex}}. @@ -449,23 +448,21 @@ starting({sst_newlevelzero, RootPath, Filename, {SlotCount, SlotIndex, BlockIndex, - SlotsBin} = build_all_slots(SlotList, PressMethod), + SlotsBin, + Bloom} = build_all_slots(SlotList, PressMethod), Time2 = timer:now_diff(os:timestamp(), SW2), SW3 = os:timestamp(), - SummaryBin = build_table_summary(SlotIndex, - 0, - FirstKey, - SlotCount, - MaxSQN), + SummaryBin = + build_table_summary(SlotIndex, 0, FirstKey, SlotCount, MaxSQN, Bloom), Time3 = timer:now_diff(os:timestamp(), SW3), SW4 = os:timestamp(), ActualFilename = write_file(RootPath, Filename, SummaryBin, SlotsBin, PressMethod), - UpdState = read_file(ActualFilename, - State#state{root_path = RootPath, - yield_blockquery = true}), + {UpdState, Bloom} = + read_file(ActualFilename, + State#state{root_path=RootPath, yield_blockquery=true}), Summary = UpdState#state.summary, Time4 = timer:now_diff(os:timestamp(), SW4), @@ -483,7 +480,8 @@ starting({sst_newlevelzero, RootPath, Filename, leveled_penciller:pcl_confirml0complete(Penciller, UpdState#state.filename, Summary#summary.first_key, - Summary#summary.last_key), + Summary#summary.last_key, + Bloom), {next_state, reader, UpdState#state{blockindex_cache = BlockIndex}} @@ -646,68 +644,47 @@ fetch(LedgerKey, Hash, State, Timings0) -> {SW1, Timings1} = update_timings(SW0, Timings0, index_query, true), SlotID = Slot#slot_index_value.slot_id, - Bloom = Slot#slot_index_value.bloom, - case leveled_tinybloom:check_hash(Hash, Bloom) of - false -> - {_SW2, Timings2} = - update_timings(SW1, Timings1, tiny_bloom, false), - {not_present, State, Timings2}; - true -> - {SW2, Timings2} = - update_timings(SW1, Timings1, tiny_bloom, true), - - CachedBlockIdx = array:get(SlotID - 1, - State#state.blockindex_cache), - case CachedBlockIdx of - none -> - SlotBin = read_slot(State#state.handle, Slot), - {Result, BlockLengths, BlockIdx} = - binaryslot_get(SlotBin, LedgerKey, Hash, PressMethod), - BlockIndexCache = array:set(SlotID - 1, - <>, - State#state.blockindex_cache), - {_SW3, Timings3} = - update_timings(SW2, Timings2, noncached_block, false), - {Result, - State#state{blockindex_cache = BlockIndexCache}, - Timings3}; - <> -> - PosList = find_pos(BlockIdx, - extra_hash(Hash), - [], - 0), - case PosList of - [] -> - {_SW3, Timings3} = - update_timings(SW2, - Timings2, - slot_index, - false), - {not_present, State, Timings3}; - _ -> - {SW3, Timings3} = - update_timings(SW2, - Timings2, - slot_index, - true), - StartPos = Slot#slot_index_value.start_position, - Result = - check_blocks(PosList, - State#state.handle, - StartPos, - BlockLengths, - LedgerKey, - PressMethod, - not_present), - {_SW4, Timings4} = - update_timings(SW3, - Timings3, - slot_fetch, - false), - {Result, State, Timings4} - end - end + CachedBlockIdx = + array:get(SlotID - 1, State#state.blockindex_cache), + {SW2, Timings2} = update_timings(SW1, Timings1, lookup_cache, true), + + case CachedBlockIdx of + none -> + SlotBin = read_slot(State#state.handle, Slot), + {Result, BlockLengths, BlockIdx} = + binaryslot_get(SlotBin, LedgerKey, Hash, PressMethod), + BlockIndexCache = + array:set(SlotID - 1, + <>, + State#state.blockindex_cache), + {_SW3, Timings3} = + update_timings(SW2, Timings2, noncached_block, false), + {Result, + State#state{blockindex_cache = BlockIndexCache}, + Timings3}; + <> -> + PosList = find_pos(BlockIdx, extra_hash(Hash), [], 0), + case PosList of + [] -> + {_SW3, Timings3} = + update_timings(SW2, Timings2, slot_index, false), + {not_present, State, Timings3}; + _ -> + {SW3, Timings3} = + update_timings(SW2, Timings2, slot_index, true), + StartPos = Slot#slot_index_value.start_position, + Result = + check_blocks(PosList, + State#state.handle, + StartPos, + BlockLengths, + LedgerKey, + PressMethod, + not_present), + {_SW4, Timings4} = + update_timings(SW3, Timings3, slot_fetch, false), + {Result, State, Timings4} + end end. @@ -808,7 +785,7 @@ read_file(Filename, State) -> {Handle, FileVersion, SummaryBin} = open_reader(filename:join(State#state.root_path, Filename)), UpdState0 = imp_fileversion(FileVersion, State), - {Summary, SlotList} = read_table_summary(SummaryBin), + {Summary, Bloom, SlotList} = read_table_summary(SummaryBin), BlockIndexCache = array:new([{size, Summary#summary.size}, {default, none}]), UpdState1 = UpdState0#state{blockindex_cache = BlockIndexCache}, @@ -817,9 +794,10 @@ read_file(Filename, State) -> leveled_log:log("SST03", [Filename, Summary#summary.size, Summary#summary.max_sqn]), - UpdState1#state{summary = UpdSummary, + {UpdState1#state{summary = UpdSummary, handle = Handle, - filename = Filename}. + filename = Filename}, + Bloom}. gen_fileversion(PressMethod) -> Bit1 = @@ -848,14 +826,15 @@ open_reader(Filename) -> {ok, SummaryBin} = file:pread(Handle, SlotsLength + 9, SummaryLength), {Handle, FileVersion, SummaryBin}. -build_table_summary(SlotIndex, _Level, FirstKey, SlotCount, MaxSQN) -> +build_table_summary(SlotIndex, _Level, FirstKey, SlotCount, MaxSQN, Bloom) -> [{LastKey, _LastV}|_Rest] = SlotIndex, Summary = #summary{first_key = FirstKey, last_key = LastKey, size = SlotCount, max_sqn = MaxSQN}, - SummBin = term_to_binary({Summary, lists:reverse(SlotIndex)}, - ?BINARY_SETTINGS), + SummBin = + term_to_binary({Summary, Bloom, lists:reverse(SlotIndex)}, + ?BINARY_SETTINGS), SummCRC = erlang:crc32(SummBin), <>. @@ -878,30 +857,31 @@ build_all_slots(SlotList, PressMethod) -> array:new([{size, SlotCount}, {default, none}]), <<>>, + [], PressMethod), - {SlotIndex, BlockIndex, SlotsBin} = BuildResponse, - {SlotCount, SlotIndex, BlockIndex, SlotsBin}. + {SlotIndex, BlockIndex, SlotsBin, HashLists} = BuildResponse, + Bloom = leveled_ebloom:create_bloom(HashLists), + {SlotCount, SlotIndex, BlockIndex, SlotsBin, Bloom}. build_all_slots([], _Pos, _SlotID, - SlotIdxAcc, BlockIdxAcc, SlotBinAcc, + SlotIdxAcc, BlockIdxAcc, SlotBinAcc, HashLists, _PressMethod) -> - {SlotIdxAcc, BlockIdxAcc, SlotBinAcc}; + {SlotIdxAcc, BlockIdxAcc, SlotBinAcc, HashLists}; build_all_slots([SlotD|Rest], Pos, SlotID, - SlotIdxAcc, BlockIdxAcc, SlotBinAcc, + SlotIdxAcc, BlockIdxAcc, SlotBinAcc, HashLists, PressMethod) -> {BlockIdx, SlotBin, HashList, LastKey} = SlotD, Length = byte_size(SlotBin), - Bloom = leveled_tinybloom:create_bloom(HashList), SlotIndexV = #slot_index_value{slot_id = SlotID, start_position = Pos, - length = Length, - bloom = Bloom}, + length = Length}, build_all_slots(Rest, Pos + Length, SlotID + 1, [{LastKey, SlotIndexV}|SlotIdxAcc], array:set(SlotID - 1, BlockIdx, BlockIdxAcc), <>, + lists:append(HashLists, HashList), PressMethod). @@ -1828,11 +1808,10 @@ log_timings(no_timing) -> log_timings(Timings) -> leveled_log:log("SST12", [Timings#sst_timings.sample_count, Timings#sst_timings.index_query_time, - Timings#sst_timings.tiny_bloom_time, + Timings#sst_timings.lookup_cache_time, Timings#sst_timings.slot_index_time, Timings#sst_timings.slot_fetch_time, Timings#sst_timings.noncached_block_time, - Timings#sst_timings.tiny_bloom_count, Timings#sst_timings.slot_index_count, Timings#sst_timings.slot_fetch_count, Timings#sst_timings.noncached_block_count]). @@ -1847,9 +1826,9 @@ update_timings(SW, Timings, Stage, Continue) -> index_query -> IQT = Timings#sst_timings.index_query_time, Timings#sst_timings{index_query_time = IQT + Timer}; - tiny_bloom -> - TBT = Timings#sst_timings.tiny_bloom_time, - Timings#sst_timings{tiny_bloom_time = TBT + Timer}; + lookup_cache -> + TBT = Timings#sst_timings.lookup_cache_time, + Timings#sst_timings{lookup_cache_time = TBT + Timer}; slot_index -> SIT = Timings#sst_timings.slot_index_time, Timings#sst_timings{slot_index_time = SIT + Timer}; @@ -1866,9 +1845,6 @@ update_timings(SW, Timings, Stage, Continue) -> false -> Timings1 = case Stage of - tiny_bloom -> - TBC = Timings#sst_timings.tiny_bloom_count, - Timings0#sst_timings{tiny_bloom_count = TBC + 1}; slot_index -> SIC = Timings#sst_timings.slot_index_count, Timings0#sst_timings{slot_index_count = SIC + 1}; @@ -2149,9 +2125,9 @@ merge_test() -> KVL2 = lists:ukeysort(1, generate_randomkeys(1, N, 1, 20)), KVL3 = lists:ukeymerge(1, KVL1, KVL2), SW0 = os:timestamp(), - {ok, P1, {FK1, LK1}} = + {ok, P1, {FK1, LK1}, _Bloom1} = sst_new("../test/", "level1_src", 1, KVL1, 6000, native), - {ok, P2, {FK2, LK2}} = + {ok, P2, {FK2, LK2}, _Bloom2} = sst_new("../test/", "level2_src", 2, KVL2, 3000, native), ExpFK1 = element(1, lists:nth(1, KVL1)), ExpLK1 = element(1, lists:last(KVL1)), @@ -2165,7 +2141,7 @@ merge_test() -> ML2 = [{next, #manifest_entry{owner = P2}, FK2}], NewR = sst_new("../test/", "level2_merge", ML1, ML2, false, 2, N * 2, native), - {ok, P3, {{Rem1, Rem2}, FK3, LK3}} = NewR, + {ok, P3, {{Rem1, Rem2}, FK3, LK3}, _Bloom3} = NewR, ?assertMatch([], Rem1), ?assertMatch([], Rem2), ?assertMatch(true, FK3 == min(FK1, FK2)), @@ -2198,7 +2174,7 @@ simple_persisted_range_test() -> KVList1 = lists:ukeysort(1, KVList0), [{FirstKey, _FV}|_Rest] = KVList1, {LastKey, _LV} = lists:last(KVList1), - {ok, Pid, {FirstKey, LastKey}} = + {ok, Pid, {FirstKey, LastKey}, _Bloom} = sst_new(RP, Filename, 1, KVList1, length(KVList1), native), {o, B, K, null} = LastKey, @@ -2248,7 +2224,7 @@ additional_range_test() -> [], lists:seq(?NOLOOK_SLOTSIZE + Gap + 1, 2 * ?NOLOOK_SLOTSIZE + Gap)), - {ok, P1, {{Rem1, Rem2}, SK, EK}} = + {ok, P1, {{Rem1, Rem2}, SK, EK}, _Bloom1} = sst_new("../test/", "range1_src", IK1, IK2, false, 1, 9999, native), ?assertMatch([], Rem1), ?assertMatch([], Rem2), @@ -2306,7 +2282,7 @@ simple_persisted_slotsize_test() -> ?LOOK_SLOTSIZE), [{FirstKey, _FV}|_Rest] = KVList1, {LastKey, _LV} = lists:last(KVList1), - {ok, Pid, {FirstKey, LastKey}} = + {ok, Pid, {FirstKey, LastKey}, _Bloom} = sst_new(RP, Filename, 1, KVList1, length(KVList1), native), lists:foreach(fun({K, V}) -> ?assertMatch({K, V}, sst_get(Pid, K)) @@ -2321,7 +2297,7 @@ simple_persisted_test() -> KVList1 = lists:ukeysort(1, KVList0), [{FirstKey, _FV}|_Rest] = KVList1, {LastKey, _LV} = lists:last(KVList1), - {ok, Pid, {FirstKey, LastKey}} = + {ok, Pid, {FirstKey, LastKey}, _Bloom} = sst_new(RP, Filename, 1, KVList1, length(KVList1), native), SW0 = os:timestamp(), lists:foreach(fun({K, V}) -> @@ -2534,16 +2510,15 @@ check_segment_match(PosBinIndex1, KVL, TreeSize) -> timings_test() -> SW = os:timestamp(), timer:sleep(1), - {no_timing, T0} = update_timings(SW, #sst_timings{}, tiny_bloom, false), - {no_timing, T1} = update_timings(SW, T0, slot_index, false), + {no_timing, T1} = update_timings(SW, #sst_timings{}, slot_index, false), {no_timing, T2} = update_timings(SW, T1, slot_fetch, false), {no_timing, T3} = update_timings(SW, T2, noncached_block, false), timer:sleep(1), - {_, T4} = update_timings(SW, T3, tiny_bloom, true), - ?assertMatch(4, T4#sst_timings.sample_count), - ?assertMatch(1, T4#sst_timings.tiny_bloom_count), - ?assertMatch(true, T4#sst_timings.tiny_bloom_time > - T3#sst_timings.tiny_bloom_time). + {_, T4} = update_timings(SW, T3, slot_fetch, true), + ?assertMatch(3, T4#sst_timings.sample_count), + ?assertMatch(1, T4#sst_timings.slot_fetch_count), + ?assertMatch(true, T4#sst_timings.slot_fetch_time > + T3#sst_timings.slot_fetch_time). -endif. diff --git a/src/leveled_tinybloom.erl b/src/leveled_tinybloom.erl deleted file mode 100644 index 88d1e12..0000000 --- a/src/leveled_tinybloom.erl +++ /dev/null @@ -1,278 +0,0 @@ -%% -------- TinyBloom --------- -%% -%% A fixed size bloom that supports 128 keys only, made to try and minimise -%% the cost of producing the bloom -%% - - --module(leveled_tinybloom). - --include("include/leveled.hrl"). - --include_lib("eunit/include/eunit.hrl"). - --export([ - create_bloom/1, - check_hash/2 - ]). - --define(BLOOM_SIZE_BYTES, 16). --define(INTEGER_SIZE, 128). --define(BAND_MASK, ?INTEGER_SIZE - 1). - - -%%%============================================================================ -%%% API -%%%============================================================================ - --spec create_bloom(list(integer())) -> binary(). -%% @doc -%% Create a binary bloom filter from alist of hashes -create_bloom(HashList) -> - case length(HashList) of - 0 -> - <<>>; - L when L > 32 -> - add_hashlist(HashList, - 7, - 0, 0, 0, 0, 0, 0, 0, 0); - L when L > 16 -> - add_hashlist(HashList, 3, 0, 0, 0, 0); - _ -> - add_hashlist(HashList, 1, 0, 0) - end. - --spec check_hash(integer(), binary()) -> boolean(). -%% @doc -%% Check for the presence of a given hash within a bloom -check_hash(_Hash, <<>>) -> - false; -check_hash({_SegHash, Hash}, BloomBin) -> - SlotSplit = (byte_size(BloomBin) div ?BLOOM_SIZE_BYTES) - 1, - {Slot, Hashes} = split_hash(Hash, SlotSplit), - Mask = get_mask(Hashes), - Pos = Slot * ?BLOOM_SIZE_BYTES, - IntSize = ?INTEGER_SIZE, - <<_H:Pos/binary, CheckInt:IntSize/integer, _T/binary>> = BloomBin, - case CheckInt band Mask of - Mask -> - true; - _ -> - false - end. - -%%%============================================================================ -%%% Internal Functions -%%%============================================================================ - -split_hash(Hash, SlotSplit) -> - Slot = Hash band SlotSplit, - H0 = (Hash bsr 4) band (?BAND_MASK), - H1 = (Hash bsr 11) band (?BAND_MASK), - H2 = (Hash bsr 18) band (?BAND_MASK), - H3 = (Hash bsr 25) band (?BAND_MASK), - {Slot, [H0, H1, H2, H3]}. - -get_mask([H0, H1, H2, H3]) -> - (1 bsl H0) bor (1 bsl H1) bor (1 bsl H2) bor (1 bsl H3). - - -%% This looks ugly and clunky, but in tests it was quicker than modifying an -%% Erlang term like an array as it is passed around the loop - -add_hashlist([], _S, S0, S1) -> - IntSize = ?INTEGER_SIZE, - <>; -add_hashlist([{_SegHash, TopHash}|T], SlotSplit, S0, S1) -> - {Slot, Hashes} = split_hash(TopHash, SlotSplit), - Mask = get_mask(Hashes), - case Slot of - 0 -> - add_hashlist(T, SlotSplit, S0 bor Mask, S1); - 1 -> - add_hashlist(T, SlotSplit, S0, S1 bor Mask) - end. - -add_hashlist([], _S, S0, S1, S2, S3) -> - IntSize = ?INTEGER_SIZE, - <>; -add_hashlist([{_SegHash, TopHash}|T], SlotSplit, S0, S1, S2, S3) -> - {Slot, Hashes} = split_hash(TopHash, SlotSplit), - Mask = get_mask(Hashes), - case Slot of - 0 -> - add_hashlist(T, SlotSplit, S0 bor Mask, S1, S2, S3); - 1 -> - add_hashlist(T, SlotSplit, S0, S1 bor Mask, S2, S3); - 2 -> - add_hashlist(T, SlotSplit, S0, S1, S2 bor Mask, S3); - 3 -> - add_hashlist(T, SlotSplit, S0, S1, S2, S3 bor Mask) - end. - -add_hashlist([], _S, S0, S1, S2, S3, S4, S5, S6, S7) -> - IntSize = ?INTEGER_SIZE, - <>; -add_hashlist([{_SegHash, TopHash}|T], - SlotSplit, - S0, S1, S2, S3, S4, S5, S6, S7) -> - {Slot, Hashes} = split_hash(TopHash, SlotSplit), - Mask = get_mask(Hashes), - case Slot of - 0 -> - add_hashlist(T, - SlotSplit, - S0 bor Mask, S1, S2, S3, S4, S5, S6, S7); - 1 -> - add_hashlist(T, - SlotSplit, - S0, S1 bor Mask, S2, S3, S4, S5, S6, S7); - 2 -> - add_hashlist(T, - SlotSplit, - S0, S1, S2 bor Mask, S3, S4, S5, S6, S7); - 3 -> - add_hashlist(T, - SlotSplit, - S0, S1, S2, S3 bor Mask, S4, S5, S6, S7); - 4 -> - add_hashlist(T, - SlotSplit, - S0, S1, S2, S3, S4 bor Mask, S5, S6, S7); - 5 -> - add_hashlist(T, - SlotSplit, - S0, S1, S2, S3, S4, S5 bor Mask, S6, S7); - 6 -> - add_hashlist(T, - SlotSplit, - S0, S1, S2, S3, S4, S5, S6 bor Mask, S7); - 7 -> - add_hashlist(T, - SlotSplit, - S0, S1, S2, S3, S4, S5, S6, S7 bor Mask) - end. - - -%%%============================================================================ -%%% Test -%%%============================================================================ - --ifdef(TEST). - -generate_randomkeys(Seqn, Count, BucketRangeLow, BucketRangeHigh) -> - generate_randomkeys(Seqn, - Count, - [], - BucketRangeLow, - BucketRangeHigh). - -generate_randomkeys(_Seqn, 0, Acc, _BucketLow, _BucketHigh) -> - Acc; -generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) -> - BRand = leveled_rand:uniform(BRange), - BNumber = string:right(integer_to_list(BucketLow + BRand), 4, $0), - KNumber = string:right(integer_to_list(leveled_rand:uniform(10000)), 6, $0), - LK = leveled_codec:to_ledgerkey("Bucket" ++ BNumber, "Key" ++ KNumber, o), - Chunk = leveled_rand:rand_bytes(16), - {_B, _K, MV, _H, _LMs} = - leveled_codec:generate_ledgerkv(LK, Seqn, Chunk, 64, infinity), - generate_randomkeys(Seqn + 1, - Count - 1, - [{LK, MV}|Acc], - BucketLow, - BRange). - - -get_hashlist(N) -> - KVL0 = lists:ukeysort(1, generate_randomkeys(1, N * 2, 1, 20)), - KVL = lists:sublist(KVL0, N), - HashFun = - fun({K, _V}) -> - leveled_codec:segment_hash(K) - end, - lists:map(HashFun, KVL). - -check_all_hashes(BloomBin, HashList) -> - CheckFun = - fun(Hash) -> - ?assertMatch(true, check_hash(Hash, BloomBin)) - end, - lists:foreach(CheckFun, HashList). - -check_neg_hashes(BloomBin, HashList, Counters) -> - CheckFun = - fun(Hash, {AccT, AccF}) -> - case check_hash(Hash, BloomBin) of - true -> - {AccT + 1, AccF}; - false -> - {AccT, AccF + 1} - end - end, - lists:foldl(CheckFun, Counters, HashList). - - -empty_bloom_test() -> - BloomBin0 = create_bloom([]), - ?assertMatch({0, 4}, - check_neg_hashes(BloomBin0, [0, 10, 100, 100000], {0, 0})). - -bloom_test_() -> - {timeout, 20, fun bloom_test_ranges/0}. - -bloom_test_ranges() -> - test_bloom(128, 256), - test_bloom(64, 100), - test_bloom(32, 100), - test_bloom(16, 100), - test_bloom(8, 100). - -test_bloom(N, Runs) -> - ListOfHashLists = - lists:map(fun(_X) -> get_hashlist(N) end, lists:seq(1, Runs)), - - SWa = os:timestamp(), - ListOfBlooms = - lists:map(fun(HL) -> create_bloom(HL) end, ListOfHashLists), - TSa = timer:now_diff(os:timestamp(), SWa), - - SWb = os:timestamp(), - lists:foreach(fun(Nth) -> - HL = lists:nth(Nth, ListOfHashLists), - BB = lists:nth(Nth, ListOfBlooms), - check_all_hashes(BB, HL) - end, - lists:seq(1, Runs)), - TSb = timer:now_diff(os:timestamp(), SWb), - - HashPool = get_hashlist(N * 2), - ListOfMisses = - lists:map(fun(HL) -> - lists:sublist(lists:subtract(HashPool, HL), N) - end, - ListOfHashLists), - - SWc = os:timestamp(), - {Pos, Neg} = - lists:foldl(fun(Nth, Acc) -> - HL = lists:nth(Nth, ListOfMisses), - BB = lists:nth(Nth, ListOfBlooms), - check_neg_hashes(BB, HL, Acc) - end, - {0, 0}, - lists:seq(1, Runs)), - FPR = Pos / (Pos + Neg), - TSc = timer:now_diff(os:timestamp(), SWc), - - io:format(user, - "Test with size ~w has microsecond timings: -" - ++ " build ~w check ~w neg_check ~w and fpr ~w~n", - [N, TSa, TSb, TSc, FPR]). - - --endif. diff --git a/src/leveled_tree.erl b/src/leveled_tree.erl index 22d4d22..38df85d 100644 --- a/src/leveled_tree.erl +++ b/src/leveled_tree.erl @@ -879,7 +879,7 @@ search_range_idx_test() -> {{[{{o_rkv,"Bucket1","Key1",null}, {manifest_entry,{o_rkv,"Bucket","Key9083",null}, {o_rkv,"Bucket1","Key1",null}, - "<0.320.0>","./16_1_6.sst"}}]}, + "<0.320.0>","./16_1_6.sst", none}}]}, {1,{{o_rkv,"Bucket1","Key1",null},1,nil,nil}}}}, StartKeyFun = fun(ME) ->