diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 1a64c94..fe7fe68 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -124,6 +124,8 @@ -export([tune_seglist/1, extract_hash/1, member_check/2]). +-export([in_range/3]). + -record(slot_index_value, {slot_id :: integer(), start_position :: integer(), length :: integer()}). @@ -1674,7 +1676,7 @@ read_slots(Handle, SlotList, {SegList, LowLastMod, BlockIndexCache}, % the hash value this will fial unexpectedly. BinMapFun = fun(Pointer, Acc) -> - {SP, _L, ID, _SK, _EK} = pointer_mapfun(Pointer), + {SP, _L, ID, SK, EK} = pointer_mapfun(Pointer), CachedHeader = array:get(ID - 1, BlockIndexCache), case extract_header(CachedHeader, IdxModDate) of none -> @@ -1708,7 +1710,7 @@ read_slots(Handle, SlotList, {SegList, LowLastMod, BlockIndexCache}, % Need to find just the right keys PositionList = find_pos(BlockIdx, SegList, [], 0), - Acc ++ + KVL = check_blocks(PositionList, {Handle, SP}, BlockLengths, @@ -1716,7 +1718,10 @@ read_slots(Handle, SlotList, {SegList, LowLastMod, BlockIndexCache}, false, PressMethod, IdxModDate, - []) + []), + FilterFun = + fun(KV) -> in_range(KV, SK, EK) end, + Acc ++ lists:filter(FilterFun, KVL) % Note check_blocks shouldreturn [] if % PositionList is empty end @@ -1726,6 +1731,16 @@ read_slots(Handle, SlotList, {SegList, LowLastMod, BlockIndexCache}, lists:foldl(BinMapFun, [], SlotList). +-spec in_range(leveled_codec:ledger_kv(), + range_endpoint(), range_endpoint()) -> boolean(). +in_range({_LK, _LV}, all, all) -> + true; +in_range({LK, _LV}, all, EK) -> + not leveled_codec:endkey_passed(EK, LK); +in_range({LK, LV}, SK, EK) -> + (LK >= SK) and in_range({LK, LV}, all, EK). + + read_slotlist(SlotList, Handle) -> LengthList = lists:map(fun pointer_mapfun/1, SlotList), {MultiSlotBin, StartPos} = read_length_list(Handle, LengthList), @@ -2862,6 +2877,85 @@ simple_persisted_range_tester(SSTNewFun) -> TL5 = lists:map(fun(EK) -> {SK5, EK} end, [EK2, EK3, EK4, EK5]), lists:foreach(TestFun, TL2 ++ TL3 ++ TL4 ++ TL5). + +simple_persisted_rangesegfilter_test() -> + simple_persisted_rangesegfilter_tester(fun testsst_new/6), + simple_persisted_rangesegfilter_tester(fun sst_new/6). + +simple_persisted_rangesegfilter_tester(SSTNewFun) -> + {RP, Filename} = {"../test/", "range_segfilter_test"}, + KVList0 = generate_randomkeys(1, ?LOOK_SLOTSIZE * 16, 1, 20), + KVList1 = lists:ukeysort(1, KVList0), + [{FirstKey, _FV}|_Rest] = KVList1, + {LastKey, _LV} = lists:last(KVList1), + {ok, Pid, {FirstKey, LastKey}, _Bloom} = + SSTNewFun(RP, Filename, 1, KVList1, length(KVList1), native), + + SK1 = element(1, lists:nth(124, KVList1)), + SK2 = element(1, lists:nth(126, KVList1)), + SK3 = element(1, lists:nth(128, KVList1)), + SK4 = element(1, lists:nth(130, KVList1)), + SK5 = element(1, lists:nth(132, KVList1)), + + EK1 = element(1, lists:nth(252, KVList1)), + EK2 = element(1, lists:nth(254, KVList1)), + EK3 = element(1, lists:nth(256, KVList1)), + EK4 = element(1, lists:nth(258, KVList1)), + EK5 = element(1, lists:nth(260, KVList1)), + + GetSegFun = + fun(LK) -> + extract_hash( + leveled_codec:strip_to_segmentonly( + lists:keyfind(LK, 1, KVList1))) + end, + SegList = + lists:map(GetSegFun, + [SK1, SK2, SK3, SK4, SK5, EK1, EK2, EK3, EK4, EK5]), + + TestFun = + fun(StartKey, EndKey, OutList) -> + RangeKVL = + sst_getfilteredrange(Pid, StartKey, EndKey, 4, SegList, 0), + RangeKL = lists:map(fun({LK0, _LV0}) -> LK0 end, RangeKVL), + ?assertMatch(true, lists:member(StartKey, RangeKL)), + ?assertMatch(true, lists:member(EndKey, RangeKL)), + CheckOutFun = + fun(OutKey) -> + ?assertMatch(false, lists:member(OutKey, RangeKL)) + end, + lists:foreach(CheckOutFun, OutList) + end, + + lists:foldl(fun(SK0, Acc) -> + TestFun(SK0, EK1, [EK2, EK3, EK4, EK5] ++ Acc), + [SK0|Acc] + end, + [], + [SK1, SK2, SK3, SK4, SK5]), + lists:foldl(fun(SK0, Acc) -> + TestFun(SK0, EK2, [EK3, EK4, EK5] ++ Acc), + [SK0|Acc] + end, + [], + [SK1, SK2, SK3, SK4, SK5]), + lists:foldl(fun(SK0, Acc) -> + TestFun(SK0, EK3, [EK4, EK5] ++ Acc), + [SK0|Acc] + end, + [], + [SK1, SK2, SK3, SK4, SK5]), + lists:foldl(fun(SK0, Acc) -> + TestFun(SK0, EK4, [EK5] ++ Acc), + [SK0|Acc] + end, + [], + [SK1, SK2, SK3, SK4, SK5]), + + ok = sst_clear(Pid). + + + additional_range_test() -> % Test fetching ranges that fall into odd situations with regards to the % summayr index