diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index d5b70d1..82dc432 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -736,7 +736,7 @@ fetch_mem(Key, Hash, Manifest, L0Cache, none) -> L0Check = leveled_pmem:check_levelzero(Key, Hash, L0Cache), case L0Check of {false, not_found} -> - fetch(Key, Manifest, 0, fun leveled_sft:sft_get/2); + fetch(Key, Hash, Manifest, 0, fun leveled_sft:sft_get/3); {true, KV} -> KV end; @@ -745,12 +745,12 @@ fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> true -> fetch_mem(Key, Hash, Manifest, L0Cache, none); false -> - fetch(Key, Manifest, 0, fun leveled_sft:sft_get/2) + fetch(Key, Hash, Manifest, 0, fun leveled_sft:sft_get/3) end. -fetch(_Key, _Manifest, ?MAX_LEVELS + 1, _FetchFun) -> +fetch(_Key, _Hash, _Manifest, ?MAX_LEVELS + 1, _FetchFun) -> not_present; -fetch(Key, Manifest, Level, FetchFun) -> +fetch(Key, Hash, Manifest, Level, FetchFun) -> LevelManifest = get_item(Level, Manifest, []), case lists:foldl(fun(File, Acc) -> case Acc of @@ -764,11 +764,11 @@ fetch(Key, Manifest, Level, FetchFun) -> not_present, LevelManifest) of not_present -> - fetch(Key, Manifest, Level + 1, FetchFun); + fetch(Key, Hash, Manifest, Level + 1, FetchFun); FileToCheck -> - case FetchFun(FileToCheck, Key) of + case FetchFun(FileToCheck, Key, Hash) of not_present -> - fetch(Key, Manifest, Level + 1, FetchFun); + fetch(Key, Hash, Manifest, Level + 1, FetchFun); ObjectFound -> ObjectFound end diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 6c71cb6..57e7a63 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -161,6 +161,7 @@ sft_newfroml0cache/4, sft_open/1, sft_get/2, + sft_get/3, sft_getkvrange/4, sft_close/1, sft_clear/1, @@ -212,7 +213,8 @@ handle :: file:fd(), background_complete = false :: boolean(), oversized_file = false :: boolean(), - penciller :: pid()}). + penciller :: pid(), + bloom}). %% Helper object when writing a file to keep track of various accumulators -record(writer, {slot_index = [] :: list(), @@ -276,8 +278,11 @@ sft_open(Filename) -> sft_setfordelete(Pid, Penciller) -> gen_fsm:sync_send_event(Pid, {set_for_delete, Penciller}, infinity). +sft_get(Pid, Key, Hash) -> + gen_fsm:sync_send_event(Pid, {get_kv, Key, Hash}, infinity). + sft_get(Pid, Key) -> - gen_fsm:sync_send_event(Pid, {get_kv, Key}, infinity). + sft_get(Pid, Key, leveled_codec:magic_hash(Key)). sft_getkvrange(Pid, StartKey, EndKey, ScanWidth) -> gen_fsm:sync_send_event(Pid, @@ -355,8 +360,14 @@ starting({sft_newfroml0cache, Filename, Slots, FetchFun, PCL}, _State) -> end. -reader({get_kv, Key}, _From, State) -> - Reply = fetch_keyvalue(State#state.handle, State, Key), +reader({get_kv, Key, Hash}, _From, State) -> + Reply = + case leveled_tinybloom:check({hash, Hash}, State#state.bloom) of + false -> + not_present; + true -> + fetch_keyvalue(State#state.handle, State, Key) + end, {reply, Reply, reader, State}; reader({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) -> Reply = pointer_append_queryresults(fetch_range_kv(State#state.handle, @@ -516,9 +527,13 @@ open_file(FileMD) -> Ilen:32/integer, Flen:32/integer, Slen:32/integer>> = HeaderLengths, - {ok, SummaryBin} = file:pread(Handle, - ?HEADER_LEN + Blen + Ilen + Flen, Slen), - {{LowSQN, HighSQN}, {LowKey, HighKey}, _Bloom} = binary_to_term(SummaryBin), + {ok, <>} = + file:pread(Handle, ?HEADER_LEN + Blen + Ilen + Flen, Slen), + {{LowSQN, HighSQN}, {LowKey, HighKey}, Bloom} = + case erlang:crc32(SummaryBin) of + SummaryCRC -> + binary_to_term(SummaryBin) + end, {ok, SlotIndexBin} = file:pread(Handle, ?HEADER_LEN + Blen, Ilen), SlotIndex = binary_to_term(SlotIndexBin), {Handle, FileMD#state{slot_index=SlotIndex, @@ -531,7 +546,8 @@ open_file(FileMD) -> filter_pointer=?HEADER_LEN + Blen + Ilen, summ_pointer=?HEADER_LEN + Blen + Ilen + Flen, summ_length=Slen, - handle=Handle}}. + handle=Handle, + bloom=Bloom}}. %% Take a file handle with a previously created header and complete it based on %% the two key lists KL1 and KL2 @@ -866,10 +882,12 @@ sftwrite_function(finalise, IndexLength = byte_size(Index), FilterLength = byte_size(SlotFilters), Summary = term_to_binary({SNExtremes, KeyExtremes, Bloom}), - SummaryLength = byte_size(Summary), + SummaryCRC = erlang:crc32(Summary), + SummaryLength = byte_size(Summary) + 4, %% Write Index, Filter and Summary ok = file:write(Handle, <>), %% Write Lengths into header ok = file:pwrite(Handle, 12, < "Bucket", "Key8400" ++ integer_to_list(X), null}, - Value = {X, {active, infinity}, 0, null}, + Value = {X, + {active, infinity}, + leveled_codec:magic_hash(Key), + null}, Acc ++ [{Key, Value}] end, [], lists:seq(10,98)), - KeyListToUse = [{Key1, {1, {active, infinity}, 0, null}}|KeyList] - ++ [{Key99, {99, {active, infinity}, 0, null}}], + KeyListToUse = [{Key1, + {1, + {active, infinity}, + leveled_codec:magic_hash(Key1), + null}}|KeyList] + ++ [{Key99, + {99, + {active, infinity}, + leveled_codec:magic_hash(Key99), + null}}], {InitHandle, InitFileMD} = create_file(Filename), {Handle, _FileMD, _Rem} = complete_file(InitHandle, InitFileMD, KeyListToUse, [], #level{level=1}), ok = file:close(Handle), {ok, SFTr, _KeyExtremes} = sft_open(Filename), - ?assertMatch({Key1, {1, {active, infinity}, 0, null}}, + ?assertMatch({Key1, + {1, {active, infinity}, _, null}}, sft_get(SFTr, Key1)), - ?assertMatch({Key99, {99, {active, infinity}, 0, null}}, + ?assertMatch({Key99, + {99, {active, infinity}, _, null}}, sft_get(SFTr, Key99)), ?assertMatch(not_present, sft_get(SFTr, KeyNF)),