Add extra bloom check
Add extra bloom check - but get the SFT process to perform not the chekc not the Penciller. This avoids complexity of negotiating the transfer of the bloom to the Penciller - but doesn't avoid the potentially unecessary message pass between processes.
This commit is contained in:
parent
1f56501499
commit
cf6a1eb513
2 changed files with 52 additions and 21 deletions
|
@ -736,7 +736,7 @@ fetch_mem(Key, Hash, Manifest, L0Cache, none) ->
|
|||
L0Check = leveled_pmem:check_levelzero(Key, Hash, L0Cache),
|
||||
case L0Check of
|
||||
{false, not_found} ->
|
||||
fetch(Key, Manifest, 0, fun leveled_sft:sft_get/2);
|
||||
fetch(Key, Hash, Manifest, 0, fun leveled_sft:sft_get/3);
|
||||
{true, KV} ->
|
||||
KV
|
||||
end;
|
||||
|
@ -745,12 +745,12 @@ fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
|
|||
true ->
|
||||
fetch_mem(Key, Hash, Manifest, L0Cache, none);
|
||||
false ->
|
||||
fetch(Key, Manifest, 0, fun leveled_sft:sft_get/2)
|
||||
fetch(Key, Hash, Manifest, 0, fun leveled_sft:sft_get/3)
|
||||
end.
|
||||
|
||||
fetch(_Key, _Manifest, ?MAX_LEVELS + 1, _FetchFun) ->
|
||||
fetch(_Key, _Hash, _Manifest, ?MAX_LEVELS + 1, _FetchFun) ->
|
||||
not_present;
|
||||
fetch(Key, Manifest, Level, FetchFun) ->
|
||||
fetch(Key, Hash, Manifest, Level, FetchFun) ->
|
||||
LevelManifest = get_item(Level, Manifest, []),
|
||||
case lists:foldl(fun(File, Acc) ->
|
||||
case Acc of
|
||||
|
@ -764,11 +764,11 @@ fetch(Key, Manifest, Level, FetchFun) ->
|
|||
not_present,
|
||||
LevelManifest) of
|
||||
not_present ->
|
||||
fetch(Key, Manifest, Level + 1, FetchFun);
|
||||
fetch(Key, Hash, Manifest, Level + 1, FetchFun);
|
||||
FileToCheck ->
|
||||
case FetchFun(FileToCheck, Key) of
|
||||
case FetchFun(FileToCheck, Key, Hash) of
|
||||
not_present ->
|
||||
fetch(Key, Manifest, Level + 1, FetchFun);
|
||||
fetch(Key, Hash, Manifest, Level + 1, FetchFun);
|
||||
ObjectFound ->
|
||||
ObjectFound
|
||||
end
|
||||
|
|
|
@ -161,6 +161,7 @@
|
|||
sft_newfroml0cache/4,
|
||||
sft_open/1,
|
||||
sft_get/2,
|
||||
sft_get/3,
|
||||
sft_getkvrange/4,
|
||||
sft_close/1,
|
||||
sft_clear/1,
|
||||
|
@ -212,7 +213,8 @@
|
|||
handle :: file:fd(),
|
||||
background_complete = false :: boolean(),
|
||||
oversized_file = false :: boolean(),
|
||||
penciller :: pid()}).
|
||||
penciller :: pid(),
|
||||
bloom}).
|
||||
|
||||
%% Helper object when writing a file to keep track of various accumulators
|
||||
-record(writer, {slot_index = [] :: list(),
|
||||
|
@ -276,8 +278,11 @@ sft_open(Filename) ->
|
|||
sft_setfordelete(Pid, Penciller) ->
|
||||
gen_fsm:sync_send_event(Pid, {set_for_delete, Penciller}, infinity).
|
||||
|
||||
sft_get(Pid, Key, Hash) ->
|
||||
gen_fsm:sync_send_event(Pid, {get_kv, Key, Hash}, infinity).
|
||||
|
||||
sft_get(Pid, Key) ->
|
||||
gen_fsm:sync_send_event(Pid, {get_kv, Key}, infinity).
|
||||
sft_get(Pid, Key, leveled_codec:magic_hash(Key)).
|
||||
|
||||
sft_getkvrange(Pid, StartKey, EndKey, ScanWidth) ->
|
||||
gen_fsm:sync_send_event(Pid,
|
||||
|
@ -355,8 +360,14 @@ starting({sft_newfroml0cache, Filename, Slots, FetchFun, PCL}, _State) ->
|
|||
end.
|
||||
|
||||
|
||||
reader({get_kv, Key}, _From, State) ->
|
||||
Reply = fetch_keyvalue(State#state.handle, State, Key),
|
||||
reader({get_kv, Key, Hash}, _From, State) ->
|
||||
Reply =
|
||||
case leveled_tinybloom:check({hash, Hash}, State#state.bloom) of
|
||||
false ->
|
||||
not_present;
|
||||
true ->
|
||||
fetch_keyvalue(State#state.handle, State, Key)
|
||||
end,
|
||||
{reply, Reply, reader, State};
|
||||
reader({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) ->
|
||||
Reply = pointer_append_queryresults(fetch_range_kv(State#state.handle,
|
||||
|
@ -516,9 +527,13 @@ open_file(FileMD) ->
|
|||
Ilen:32/integer,
|
||||
Flen:32/integer,
|
||||
Slen:32/integer>> = HeaderLengths,
|
||||
{ok, SummaryBin} = file:pread(Handle,
|
||||
?HEADER_LEN + Blen + Ilen + Flen, Slen),
|
||||
{{LowSQN, HighSQN}, {LowKey, HighKey}, _Bloom} = binary_to_term(SummaryBin),
|
||||
{ok, <<SummaryCRC:32/integer, SummaryBin/binary>>} =
|
||||
file:pread(Handle, ?HEADER_LEN + Blen + Ilen + Flen, Slen),
|
||||
{{LowSQN, HighSQN}, {LowKey, HighKey}, Bloom} =
|
||||
case erlang:crc32(SummaryBin) of
|
||||
SummaryCRC ->
|
||||
binary_to_term(SummaryBin)
|
||||
end,
|
||||
{ok, SlotIndexBin} = file:pread(Handle, ?HEADER_LEN + Blen, Ilen),
|
||||
SlotIndex = binary_to_term(SlotIndexBin),
|
||||
{Handle, FileMD#state{slot_index=SlotIndex,
|
||||
|
@ -531,7 +546,8 @@ open_file(FileMD) ->
|
|||
filter_pointer=?HEADER_LEN + Blen + Ilen,
|
||||
summ_pointer=?HEADER_LEN + Blen + Ilen + Flen,
|
||||
summ_length=Slen,
|
||||
handle=Handle}}.
|
||||
handle=Handle,
|
||||
bloom=Bloom}}.
|
||||
|
||||
%% Take a file handle with a previously created header and complete it based on
|
||||
%% the two key lists KL1 and KL2
|
||||
|
@ -866,10 +882,12 @@ sftwrite_function(finalise,
|
|||
IndexLength = byte_size(Index),
|
||||
FilterLength = byte_size(SlotFilters),
|
||||
Summary = term_to_binary({SNExtremes, KeyExtremes, Bloom}),
|
||||
SummaryLength = byte_size(Summary),
|
||||
SummaryCRC = erlang:crc32(Summary),
|
||||
SummaryLength = byte_size(Summary) + 4,
|
||||
%% Write Index, Filter and Summary
|
||||
ok = file:write(Handle, <<Index/binary,
|
||||
SlotFilters/binary,
|
||||
SummaryCRC:32/integer,
|
||||
Summary/binary>>),
|
||||
%% Write Lengths into header
|
||||
ok = file:pwrite(Handle, 12, <<BlocksLength:32/integer,
|
||||
|
@ -1901,21 +1919,34 @@ hashclash_test() ->
|
|||
"Bucket",
|
||||
"Key8400" ++ integer_to_list(X),
|
||||
null},
|
||||
Value = {X, {active, infinity}, 0, null},
|
||||
Value = {X,
|
||||
{active, infinity},
|
||||
leveled_codec:magic_hash(Key),
|
||||
null},
|
||||
Acc ++ [{Key, Value}] end,
|
||||
[],
|
||||
lists:seq(10,98)),
|
||||
KeyListToUse = [{Key1, {1, {active, infinity}, 0, null}}|KeyList]
|
||||
++ [{Key99, {99, {active, infinity}, 0, null}}],
|
||||
KeyListToUse = [{Key1,
|
||||
{1,
|
||||
{active, infinity},
|
||||
leveled_codec:magic_hash(Key1),
|
||||
null}}|KeyList]
|
||||
++ [{Key99,
|
||||
{99,
|
||||
{active, infinity},
|
||||
leveled_codec:magic_hash(Key99),
|
||||
null}}],
|
||||
{InitHandle, InitFileMD} = create_file(Filename),
|
||||
{Handle, _FileMD, _Rem} = complete_file(InitHandle, InitFileMD,
|
||||
KeyListToUse, [],
|
||||
#level{level=1}),
|
||||
ok = file:close(Handle),
|
||||
{ok, SFTr, _KeyExtremes} = sft_open(Filename),
|
||||
?assertMatch({Key1, {1, {active, infinity}, 0, null}},
|
||||
?assertMatch({Key1,
|
||||
{1, {active, infinity}, _, null}},
|
||||
sft_get(SFTr, Key1)),
|
||||
?assertMatch({Key99, {99, {active, infinity}, 0, null}},
|
||||
?assertMatch({Key99,
|
||||
{99, {active, infinity}, _, null}},
|
||||
sft_get(SFTr, Key99)),
|
||||
?assertMatch(not_present,
|
||||
sft_get(SFTr, KeyNF)),
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue