diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 5d441da..b450bcc 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -1181,7 +1181,7 @@ maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize), if TimeToPush -> - CacheToLoad = {leveled_tree:from_orderedset(Tab, ?CACHE_TYPE), + CacheToLoad = {Tab, Cache#ledger_cache.index, Cache#ledger_cache.min_sqn, Cache#ledger_cache.max_sqn}, diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index ad25c58..0e5d17f 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -61,7 +61,8 @@ generate_uuid/0, integer_now/0, riak_extract_metadata/2, - magic_hash/1]). + magic_hash/1, + to_lookup/1]). -define(V1_VERS, 1). -define(MAGIC, 53). % riak_kv -> riak_object @@ -73,6 +74,14 @@ %% what they are - %% http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function +to_lookup(Key) -> + case element(1, Key) of + ?IDX_TAG -> + no_lookup; + _ -> + lookup + end. + magic_hash({?RIAK_TAG, Bucket, Key, _SubKey}) -> magic_hash({Bucket, Key}); magic_hash({?STD_TAG, Bucket, Key, _SubKey}) -> @@ -198,32 +207,41 @@ compact_inkerkvc({_InkerKey, crc_wonky, false}, _Strategy) -> compact_inkerkvc({{_SQN, ?INKT_TOMB, _LK}, _V, _CrcCheck}, _Strategy) -> skip; compact_inkerkvc({{SQN, ?INKT_KEYD, LK}, V, CrcCheck}, Strategy) -> - {Tag, _, _, _} = LK, - {Tag, TagStrat} = lists:keyfind(Tag, 1, Strategy), - case TagStrat of + case get_tagstrategy(LK, Strategy) of + skip -> + skip; retain -> {retain, {{SQN, ?INKT_KEYD, LK}, V, CrcCheck}}; TagStrat -> {TagStrat, null} end; compact_inkerkvc({{SQN, ?INKT_STND, LK}, V, CrcCheck}, Strategy) -> - {Tag, _, _, _} = LK, - case lists:keyfind(Tag, 1, Strategy) of - {Tag, TagStrat} -> - case TagStrat of - retain -> - {_V, KeyDeltas} = split_inkvalue(V), - {TagStrat, {{SQN, ?INKT_KEYD, LK}, {null, KeyDeltas}, CrcCheck}}; - TagStrat -> - {TagStrat, null} - end; - false -> - leveled_log:log("IC012", [Tag, Strategy]), - skip + case get_tagstrategy(LK, Strategy) of + skip -> + skip; + retain -> + {_V, KeyDeltas} = split_inkvalue(V), + {retain, {{SQN, ?INKT_KEYD, LK}, {null, KeyDeltas}, CrcCheck}}; + TagStrat -> + {TagStrat, null} end; compact_inkerkvc(_KVC, _Strategy) -> skip. +get_tagstrategy(LK, Strategy) -> + case LK of + {Tag, _, _, _} -> + case lists:keyfind(Tag, 1, Strategy) of + {Tag, TagStrat} -> + TagStrat; + false -> + leveled_log:log("IC012", [Tag, Strategy]), + skip + end; + _ -> + skip + end. + split_inkvalue(VBin) -> case is_binary(VBin) of true -> @@ -429,6 +447,73 @@ endkey_passed_test() -> ?assertMatch(true, endkey_passed(TestKey, K2)). +corrupted_ledgerkey_test() -> + % When testing for compacted journal which has been corrupted, there may + % be a corruptes ledger key. Always skip these keys + % Key has become a 3-tuple not a 4-tuple + TagStrat1 = compact_inkerkvc({{1, + ?INKT_STND, + {?STD_TAG, "B1", "K1andSK"}}, + {}, + true}, + [{?STD_TAG, retain}]), + ?assertMatch(skip, TagStrat1), + TagStrat2 = compact_inkerkvc({{1, + ?INKT_KEYD, + {?STD_TAG, "B1", "K1andSK"}}, + {}, + true}, + [{?STD_TAG, retain}]), + ?assertMatch(skip, TagStrat2). + +general_skip_strategy_test() -> + % Confirm that we will skip if the strategy says so + TagStrat1 = compact_inkerkvc({{1, + ?INKT_STND, + {?STD_TAG, "B1", "K1andSK"}}, + {}, + true}, + [{?STD_TAG, skip}]), + ?assertMatch(skip, TagStrat1), + TagStrat2 = compact_inkerkvc({{1, + ?INKT_KEYD, + {?STD_TAG, "B1", "K1andSK"}}, + {}, + true}, + [{?STD_TAG, skip}]), + ?assertMatch(skip, TagStrat2), + TagStrat3 = compact_inkerkvc({{1, + ?INKT_KEYD, + {?IDX_TAG, "B1", "K1", "SK"}}, + {}, + true}, + [{?STD_TAG, skip}]), + ?assertMatch(skip, TagStrat3), + TagStrat4 = compact_inkerkvc({{1, + ?INKT_KEYD, + {?IDX_TAG, "B1", "K1", "SK"}}, + {}, + true}, + [{?STD_TAG, skip}, {?IDX_TAG, recalc}]), + ?assertMatch({recalc, null}, TagStrat4), + TagStrat5 = compact_inkerkvc({{1, + ?INKT_TOMB, + {?IDX_TAG, "B1", "K1", "SK"}}, + {}, + true}, + [{?STD_TAG, skip}, {?IDX_TAG, recalc}]), + ?assertMatch(skip, TagStrat5). + +corrupted_inker_tag_test() -> + % Confirm that we will skip on unknown inker tag + TagStrat1 = compact_inkerkvc({{1, + foo, + {?STD_TAG, "B1", "K1andSK"}}, + {}, + true}, + [{?STD_TAG, retain}]), + ?assertMatch(skip, TagStrat1). + %% Test below proved that the overhead of performing hashes was trivial %% Maybe 5 microseconds per hash diff --git a/src/leveled_log.erl b/src/leveled_log.erl index c89fbea..0d2de8c 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -166,7 +166,8 @@ {"PC015", {info, "File created"}}, {"PC016", - {info, "Slow fetch from SFT ~w of ~w microseconds with result ~w"}}, + {info, "Slow fetch from SFT ~w of ~w microseconds at level ~w " + ++ "with result ~w"}}, {"PC017", {info, "Notified clerk of manifest change"}}, {"PC018", @@ -259,14 +260,14 @@ {error, "False result returned from SST with filename ~s as " ++ "slot ~w has failed crc check"}}, {"SST03", - {info, "Opening SST file with filename ~s keys ~w slots ~w and" + {info, "Opening SST file with filename ~s slots ~w and" ++ " max sqn ~w"}}, {"SST04", {info, "Exit called for reason ~w on filename ~s"}}, {"SST05", {warn, "Rename rogue filename ~s to ~s"}}, {"SST06", - {info, "File ~s has been set for delete"}}, + {debug, "File ~s has been set for delete"}}, {"SST07", {info, "Exit called and now clearing ~s"}}, {"SST08", diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 2f5c1a5..fd242f1 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -341,7 +341,7 @@ init([PCLopts]) -> end. -handle_call({push_mem, {PushedTree, PushedIdx, MinSQN, MaxSQN}}, +handle_call({push_mem, {LedgerTable, PushedIdx, MinSQN, MaxSQN}}, From, State=#state{is_snapshot=Snap}) when Snap == false -> % The push_mem process is as follows: @@ -370,7 +370,16 @@ handle_call({push_mem, {PushedTree, PushedIdx, MinSQN, MaxSQN}}, {reply, returned, State}; false -> leveled_log:log("P0018", [ok, false, false]), - gen_server:reply(From, ok), + PushedTree = + case is_tuple(LedgerTable) of + true -> + LedgerTable; + false -> + leveled_tree:from_orderedset(LedgerTable, + ?CACHE_TYPE) + end, + % Reply must happen after the table has been converted + gen_server:reply(From, ok), {noreply, update_levelzero(State#state.levelzero_size, {PushedTree, PushedIdx, MinSQN, MaxSQN}, @@ -853,7 +862,7 @@ fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> L0Check = leveled_pmem:check_levelzero(Key, Hash, PosList, L0Cache), case L0Check of {false, not_found} -> - fetch(Key, Hash, Manifest, 0, fun timed_sst_get/3); + fetch(Key, Hash, Manifest, 0, fun timed_sst_get/4); {true, KV} -> {KV, 0} end. @@ -865,7 +874,7 @@ fetch(Key, Hash, Manifest, Level, FetchFun) -> false -> fetch(Key, Hash, Manifest, Level + 1, FetchFun); FP -> - case FetchFun(FP, Key, Hash) of + case FetchFun(FP, Key, Hash, Level) of not_present -> fetch(Key, Hash, Manifest, Level + 1, FetchFun); ObjectFound -> @@ -873,21 +882,21 @@ fetch(Key, Hash, Manifest, Level, FetchFun) -> end end. -timed_sst_get(PID, Key, Hash) -> +timed_sst_get(PID, Key, Hash, Level) -> SW = os:timestamp(), R = leveled_sst:sst_get(PID, Key, Hash), T0 = timer:now_diff(os:timestamp(), SW), - log_slowfetch(T0, R, PID, ?SLOW_FETCH). + log_slowfetch(T0, R, PID, Level, ?SLOW_FETCH). -log_slowfetch(T0, R, PID, FetchTolerance) -> +log_slowfetch(T0, R, PID, Level, FetchTolerance) -> case {T0, R} of {T, R} when T < FetchTolerance -> R; {T, not_present} -> - leveled_log:log("PC016", [PID, T, not_present]), + leveled_log:log("PC016", [PID, T, Level, not_present]), not_present; {T, R} -> - leveled_log:log("PC016", [PID, T, found]), + leveled_log:log("PC016", [PID, T, Level, found]), R end. @@ -1498,7 +1507,7 @@ create_file_test() -> ?assertMatch("hello", binary_to_term(Bin)). slow_fetch_test() -> - ?assertMatch(not_present, log_slowfetch(2, not_present, "fake", 1)). + ?assertMatch(not_present, log_slowfetch(2, not_present, "fake", 0, 1)). checkready(Pid) -> try diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index c89f149..489715e 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -66,11 +66,12 @@ -define(MAX_SLOTS, 256). -define(SLOT_SIZE, 128). % This is not configurable +-define(NOLOOK_MULT, 2). % How much bigger is a slot/block with no lookups +-define(NOLOOK_SLOTSIZE, ?SLOT_SIZE * ?NOLOOK_MULT). -define(COMPRESSION_LEVEL, 1). -define(BINARY_SETTINGS, [{compressed, ?COMPRESSION_LEVEL}]). % -define(LEVEL_BLOOM_BITS, [{0, 8}, {1, 10}, {2, 8}, {default, 6}]). --define(MERGE_SCANWIDTH, 16). --define(INDEX_MARKER_WIDTH, 16). +-define(MERGE_SCANWIDTH, 8). -define(DISCARD_EXT, ".discarded"). -define(DELETE_TIMEOUT, 10000). -define(TREE_TYPE, idxt). @@ -143,21 +144,22 @@ sst_open(RootPath, Filename) -> sst_new(RootPath, Filename, Level, KVList, MaxSQN) -> {ok, Pid} = gen_fsm:start(?MODULE, [], []), + {[], [], SlotList, FK} = merge_lists(KVList), case gen_fsm:sync_send_event(Pid, {sst_new, RootPath, Filename, Level, - KVList, + {SlotList, FK}, MaxSQN}, infinity) of {ok, {SK, EK}} -> {ok, Pid, {SK, EK}} end. -sst_new(RootPath, Filename, KL1, KL2, IsBasement, Level, MaxSQN) -> - {{Rem1, Rem2}, MergedList} = merge_lists(KL1, KL2, {IsBasement, Level}), - case MergedList of +sst_new(RootPath, Filename, KVL1, KVL2, IsBasement, Level, MaxSQN) -> + {Rem1, Rem2, SlotList, FK} = merge_lists(KVL1, KVL2, {IsBasement, Level}), + case SlotList of [] -> empty; _ -> @@ -167,7 +169,7 @@ sst_new(RootPath, Filename, KL1, KL2, IsBasement, Level, MaxSQN) -> RootPath, Filename, Level, - MergedList, + {SlotList, FK}, MaxSQN}, infinity) of {ok, {SK, EK}} -> @@ -241,13 +243,13 @@ starting({sst_open, RootPath, Filename}, _From, State) -> {ok, {Summary#summary.first_key, Summary#summary.last_key}}, reader, UpdState}; -starting({sst_new, RootPath, Filename, Level, KVList, MaxSQN}, _From, State) -> +starting({sst_new, RootPath, Filename, Level, {SlotList, FirstKey}, MaxSQN}, + _From, State) -> SW = os:timestamp(), - {FirstKey, - Length, + {Length, SlotIndex, BlockIndex, - SlotsBin} = build_all_slots(KVList), + SlotsBin} = build_all_slots(SlotList), SummaryBin = build_table_summary(SlotIndex, Level, FirstKey, @@ -268,15 +270,15 @@ starting({sst_newlevelzero, RootPath, Filename, Slots, FetchFun, Penciller, MaxSQN}, State) -> SW = os:timestamp(), KVList = leveled_pmem:to_list(Slots, FetchFun), - {FirstKey, - Length, + {[], [], SlotList, FirstKey} = merge_lists(KVList), + {SlotCount, SlotIndex, BlockIndex, - SlotsBin} = build_all_slots(KVList), + SlotsBin} = build_all_slots(SlotList), SummaryBin = build_table_summary(SlotIndex, 0, FirstKey, - Length, + SlotCount, MaxSQN), ActualFilename = write_file(RootPath, Filename, SummaryBin, SlotsBin), UpdState = read_file(ActualFilename, State#state{root_path=RootPath}), @@ -317,7 +319,7 @@ reader(get_maxsequencenumber, _From, State) -> Summary = State#state.summary, {reply, Summary#summary.max_sqn, reader, State}; reader(print_timings, _From, State) -> - io:format(user, "Timings of ~w~n", [State#state.sst_timings]), + io:format(user, "~nTimings of ~w~n", [State#state.sst_timings]), {reply, ok, reader, State#state{sst_timings = undefined}}; reader({set_for_delete, Penciller}, _From, State) -> leveled_log:log("SST06", [State#state.filename]), @@ -415,19 +417,19 @@ fetch(LedgerKey, Hash, State) -> case CachedBlockIdx of none -> SlotBin = read_slot(State#state.handle, Slot), - {Result, BlockIdx} = binaryslot_get(SlotBin, - LedgerKey, - Hash, - none), + {Result, + BlockLengths, + BlockIdx} = binaryslot_get(SlotBin, LedgerKey, Hash), BlockIndexCache = array:set(SlotID - 1, - BlockIdx, + <>, State#state.blockindex_cache), {Result, slot_fetch, Slot#slot_index_value.slot_id, State#state{blockindex_cache = BlockIndexCache}}; - _ -> - PosList = find_pos(CachedBlockIdx, + <> -> + PosList = find_pos(BlockIdx, double_hash(Hash, LedgerKey), [], 0), @@ -435,12 +437,12 @@ fetch(LedgerKey, Hash, State) -> [] -> {not_present, slot_bloom, SlotID, State}; _ -> - SlotBin = read_slot(State#state.handle, Slot), - Result = binaryslot_get(SlotBin, - LedgerKey, - Hash, - {true, PosList}), - {element(1, Result), slot_fetch, SlotID, State} + Result = check_blocks(PosList, + State#state.handle, + Slot, + BlockLengths, + LedgerKey), + {Result, slot_fetch, SlotID, State} end end end. @@ -452,10 +454,9 @@ fetch_range(StartKey, EndKey, ScanWidth, State) -> {Slots, RTrim} = lookup_slots(StartKey, EndKey, Summary#summary.index), Self = self(), SL = length(Slots), + ExpandedSlots = case SL of - 0 -> - []; 1 -> [Slot] = Slots, case RTrim of @@ -534,14 +535,13 @@ read_file(Filename, State) -> {Handle, SummaryBin} = open_reader(filename:join(State#state.root_path, Filename)), {Summary, SlotList} = read_table_summary(SummaryBin), - SlotCount = length(SlotList), - BlockIndexCache = array:new([{size, SlotCount}, {default, none}]), + BlockIndexCache = array:new([{size, Summary#summary.size}, + {default, none}]), UpdState = State#state{blockindex_cache = BlockIndexCache}, SlotIndex = from_list(SlotList), UpdSummary = Summary#summary{index = SlotIndex}, leveled_log:log("SST03", [Filename, Summary#summary.size, - SlotCount, Summary#summary.max_sqn]), UpdState#state{summary = UpdSummary, handle = Handle, @@ -554,13 +554,13 @@ open_reader(Filename) -> {ok, SummaryBin} = file:pread(Handle, SlotsLength + 8, SummaryLength), {Handle, SummaryBin}. -build_table_summary(SlotList, _Level, FirstKey, L, MaxSQN) -> - [{LastKey, _LastV}|_Rest] = SlotList, +build_table_summary(SlotIndex, _Level, FirstKey, SlotCount, MaxSQN) -> + [{LastKey, _LastV}|_Rest] = SlotIndex, Summary = #summary{first_key = FirstKey, last_key = LastKey, - size = L, + size = SlotCount, max_sqn = MaxSQN}, - SummBin = term_to_binary({Summary, lists:reverse(SlotList)}, + SummBin = term_to_binary({Summary, lists:reverse(SlotIndex)}, ?BINARY_SETTINGS), SummCRC = erlang:crc32(SummBin), <>. @@ -574,15 +574,10 @@ read_table_summary(BinWithCheck) -> binary_to_term(SummBin) end. -build_all_slots(KVList) -> - L = length(KVList), - % The length is not a constant time command and the list may be large, - % but otherwise length must be called each iteration to avoid exception - % on split or sublist - [{FirstKey, _FirstV}|_Rest] = KVList, - SlotCount = L div ?SLOT_SIZE + 1, - BuildResponse = build_all_slots(KVList, - SlotCount, + +build_all_slots(SlotList) -> + SlotCount = length(SlotList), + BuildResponse = build_all_slots(SlotList, 8, 1, [], @@ -590,73 +585,27 @@ build_all_slots(KVList) -> {default, none}]), <<>>), {SlotIndex, BlockIndex, SlotsBin} = BuildResponse, - {FirstKey, L, SlotIndex, BlockIndex, SlotsBin}. + {SlotCount, SlotIndex, BlockIndex, SlotsBin}. -build_all_slots([], _SC, _Pos, _SlotID, SlotIdx, BlockIdxA, SlotsBin) -> - {SlotIdx, BlockIdxA, SlotsBin}; -build_all_slots(KVL, SC, Pos, SlotID, SlotIdx, BlockIdxA, SlotsBin) -> - {SlotList, KVRem} = - case SC of - 1 -> - {lists:sublist(KVL, ?SLOT_SIZE), []}; - _N -> - lists:split(?SLOT_SIZE, KVL) - end, - {LastKey, _V} = lists:last(SlotList), - {BlockIndex, SlotBin, HashList} = generate_binary_slot(SlotList), +build_all_slots([], _Pos, _SlotID, + SlotIdxAcc, BlockIdxAcc, SlotBinAcc) -> + {SlotIdxAcc, BlockIdxAcc, SlotBinAcc}; +build_all_slots([SlotD|Rest], Pos, SlotID, + SlotIdxAcc, BlockIdxAcc, SlotBinAcc) -> + {BlockIdx, SlotBin, HashList, LastKey} = SlotD, Length = byte_size(SlotBin), Bloom = leveled_tinybloom:create_bloom(HashList), SlotIndexV = #slot_index_value{slot_id = SlotID, start_position = Pos, length = Length, bloom = Bloom}, - build_all_slots(KVRem, - SC - 1, + build_all_slots(Rest, Pos + Length, SlotID + 1, - [{LastKey, SlotIndexV}|SlotIdx], - array:set(SlotID - 1, BlockIndex, BlockIdxA), - <>). + [{LastKey, SlotIndexV}|SlotIdxAcc], + array:set(SlotID - 1, BlockIdx, BlockIdxAcc), + <>). -read_slot(Handle, Slot) -> - {ok, SlotBin} = file:pread(Handle, - Slot#slot_index_value.start_position, - Slot#slot_index_value.length), - SlotBin. - -read_slots(Handle, SlotList) -> - PointerMapFun = - fun(Pointer) -> - {Slot, SK, EK} = - case Pointer of - {pointer, _Pid, Slot0, SK0, EK0} -> - {Slot0, SK0, EK0}; - {pointer, Slot0, SK0, EK0} -> - {Slot0, SK0, EK0} - end, - - {Slot#slot_index_value.start_position, - Slot#slot_index_value.length, - SK, - EK} - end, - - LengthList = lists:map(PointerMapFun, SlotList), - StartPos = element(1, lists:nth(1, LengthList)), - EndPos = element(1, lists:last(LengthList)) - + element(2, lists:last(LengthList)), - {ok, MultiSlotBin} = file:pread(Handle, StartPos, EndPos - StartPos), - - BinSplitMapFun = - fun({SP, L, SK, EK}) -> - Start = SP - StartPos, - <<_Pre:Start/binary, - SlotBin:L/binary, - _Post/binary>> = MultiSlotBin, - {SlotBin, SK, EK} - end, - - lists:map(BinSplitMapFun, LengthList). generate_filenames(RootFilename) -> Ext = filename:extension(RootFilename), @@ -706,12 +655,7 @@ lookup_slots(StartKey, EndKey, Tree) -> end, SlotList = leveled_tree:search_range(StartKey, EndKey, Tree, StartKeyFun), {EK, _EndSlot} = lists:last(SlotList), - case EK of - EndKey -> - {lists:map(MapFun, SlotList), false}; - _ -> - {lists:map(MapFun, SlotList), true} - end. + {lists:map(MapFun, SlotList), not leveled_codec:endkey_passed(EK, EndKey)}. %%%============================================================================ @@ -739,7 +683,7 @@ lookup_slots(StartKey, EndKey, Tree) -> %% based on a 17-bit hash (so 0.0039 fpr). -generate_binary_slot(KVL) -> +generate_binary_slot(Lookup, KVL) -> HashFoldFun = fun({K, V}, {PosBinAcc, NoHashCount, HashAcc}) -> @@ -772,85 +716,169 @@ generate_binary_slot(KVL) -> end end, - - {PosBinIndex0, NHC, HashL} = lists:foldr(HashFoldFun, {<<>>, 0, []}, KVL), - PosBinIndex1 = - case NHC of - 0 -> - PosBinIndex0; - _ -> - N = NHC - 1, - <<0:1/integer, N:7/integer, PosBinIndex0/binary>> + + {HashL, PosBinIndex} = + case Lookup of + lookup -> + {PosBinIndex0, + NHC, + HashL0} = lists:foldr(HashFoldFun, {<<>>, 0, []}, KVL), + PosBinIndex1 = + case NHC of + 0 -> + PosBinIndex0; + _ -> + N = NHC - 1, + <<0:1/integer, N:7/integer, PosBinIndex0/binary>> + end, + {HashL0, PosBinIndex1}; + no_lookup -> + {[], <<0:1/integer, 127:7/integer>>} end, + BlockSize = + case Lookup of + lookup -> + ?SLOT_SIZE div 4; + no_lookup -> + ?NOLOOK_SLOTSIZE div 4 + end, + {B1, B2, B3, B4} = case length(KVL) of - L when L =< 32 -> + L when L =< BlockSize -> {term_to_binary(KVL, ?BINARY_SETTINGS), <<0:0>>, <<0:0>>, <<0:0>>}; - L when L =< 64 -> - {KVLA_32, KVLB_32} = lists:split(32, KVL), - {term_to_binary(KVLA_32, ?BINARY_SETTINGS), - term_to_binary(KVLB_32, ?BINARY_SETTINGS), + L when L =< 2 * BlockSize -> + {KVLA, KVLB} = lists:split(BlockSize, KVL), + {term_to_binary(KVLA, ?BINARY_SETTINGS), + term_to_binary(KVLB, ?BINARY_SETTINGS), <<0:0>>, <<0:0>>}; - L when L =< 96 -> - {KVLA_32, KVLB_64} = lists:split(32, KVL), - {KVLB_32, KVLC_32} = lists:split(32, KVLB_64), - {term_to_binary(KVLA_32, ?BINARY_SETTINGS), - term_to_binary(KVLB_32, ?BINARY_SETTINGS), - term_to_binary(KVLC_32, ?BINARY_SETTINGS), + L when L =< 3 * BlockSize -> + {KVLA, KVLB_Rest} = lists:split(BlockSize, KVL), + {KVLB, KVLC} = lists:split(BlockSize, KVLB_Rest), + {term_to_binary(KVLA, ?BINARY_SETTINGS), + term_to_binary(KVLB, ?BINARY_SETTINGS), + term_to_binary(KVLC, ?BINARY_SETTINGS), <<0:0>>}; - L when L =< 128 -> - {KVLA_32, KVLB_96} = lists:split(32, KVL), - {KVLB_32, KVLC_64} = lists:split(32, KVLB_96), - {KVLC_32, KVLD_32} = lists:split(32, KVLC_64), - {term_to_binary(KVLA_32, ?BINARY_SETTINGS), - term_to_binary(KVLB_32, ?BINARY_SETTINGS), - term_to_binary(KVLC_32, ?BINARY_SETTINGS), - term_to_binary(KVLD_32, ?BINARY_SETTINGS)} + L when L =< 4 * BlockSize -> + {KVLA, KVLB_Rest} = lists:split(BlockSize, KVL), + {KVLB, KVLC_Rest} = lists:split(BlockSize, KVLB_Rest), + {KVLC, KVLD} = lists:split(BlockSize, KVLC_Rest), + {term_to_binary(KVLA, ?BINARY_SETTINGS), + term_to_binary(KVLB, ?BINARY_SETTINGS), + term_to_binary(KVLC, ?BINARY_SETTINGS), + term_to_binary(KVLD, ?BINARY_SETTINGS)} end, - B1P = byte_size(PosBinIndex1), + B1P = byte_size(PosBinIndex), B1L = byte_size(B1), B2L = byte_size(B2), B3L = byte_size(B3), B4L = byte_size(B4), - Lengths = <>, SlotBin = <>, CRC32 = erlang:crc32(SlotBin), FullBin = <>, + + {LastKey, _LV} = lists:last(KVL), - {PosBinIndex1, FullBin, HashL}. + {<>, FullBin, HashL, LastKey}. -binaryslot_get(FullBin, Key, Hash, CachedPosLookup) -> +check_blocks([], _Handle, _Slot, _BlockLengths, _LedgerKey) -> + not_present; +check_blocks([Pos|Rest], Handle, Slot, BlockLengths, LedgerKey) -> + {BlockNumber, BlockPos} = revert_position(Pos), + BlockBin = read_block(Handle, Slot, BlockLengths, BlockNumber), + BlockL = binary_to_term(BlockBin), + {K, V} = lists:nth(BlockPos, BlockL), + case K of + LedgerKey -> + {K, V}; + _ -> + check_blocks(Rest, Handle, Slot, BlockLengths, LedgerKey) + end. + + +read_block(Handle, Slot, BlockLengths, BlockID) -> + {BlockPos, Offset, Length} = block_offsetandlength(BlockLengths, BlockID), + {ok, BlockBin} = file:pread(Handle, + Slot#slot_index_value.start_position + + BlockPos + + Offset + + 24, + % 4-byte CRC, 4 byte pos, 4x4 byte lengths + Length), + BlockBin. + +read_slot(Handle, Slot) -> + {ok, SlotBin} = file:pread(Handle, + Slot#slot_index_value.start_position, + Slot#slot_index_value.length), + SlotBin. + +read_slots(Handle, SlotList) -> + PointerMapFun = + fun(Pointer) -> + {Slot, SK, EK} = + case Pointer of + {pointer, _Pid, Slot0, SK0, EK0} -> + {Slot0, SK0, EK0}; + {pointer, Slot0, SK0, EK0} -> + {Slot0, SK0, EK0} + end, + + {Slot#slot_index_value.start_position, + Slot#slot_index_value.length, + SK, + EK} + end, + + LengthList = lists:map(PointerMapFun, SlotList), + StartPos = element(1, lists:nth(1, LengthList)), + EndPos = element(1, lists:last(LengthList)) + + element(2, lists:last(LengthList)), + {ok, MultiSlotBin} = file:pread(Handle, StartPos, EndPos - StartPos), + + BinSplitMapFun = + fun({SP, L, SK, EK}) -> + Start = SP - StartPos, + <<_Pre:Start/binary, + SlotBin:L/binary, + _Post/binary>> = MultiSlotBin, + {SlotBin, SK, EK} + end, + + lists:map(BinSplitMapFun, LengthList). + + +binaryslot_get(FullBin, Key, Hash) -> case crc_check_slot(FullBin) of - {Lengths, Rest} -> - B1P = element(1, Lengths), - case CachedPosLookup of - {true, PosList} -> - <<_PosBinIndex:B1P/binary, Blocks/binary>> = Rest, - {fetch_value(PosList, Lengths, Blocks, Key), none}; - none -> - <> = Rest, - PosList = find_pos(PosBinIndex, - double_hash(Hash, Key), - [], - 0), - {fetch_value(PosList, Lengths, Blocks, Key), PosBinIndex} - end; + {BlockLengths, Rest} -> + <> = BlockLengths, + <> = Rest, + PosList = find_pos(PosBinIndex, + double_hash(Hash, Key), + [], + 0), + {fetch_value(PosList, BlockLengths, Blocks, Key), + BlockLengths, + PosBinIndex}; crc_wonky -> - {not_present, none} + {not_present, + none, + none} end. binaryslot_tolist(FullBin) -> @@ -867,8 +895,12 @@ binaryslot_tolist(FullBin) -> {Out, _Rem} = case crc_check_slot(FullBin) of - {Lengths, RestBin} -> - {B1P, B1L, B2L, B3L, B4L} = Lengths, + {BlockLengths, RestBin} -> + <> = BlockLengths, <<_PosBinIndex:B1P/binary, Blocks/binary>> = RestBin, lists:foldl(BlockFetchFun, {[], Blocks}, [B1L, B2L, B3L, B4L]); crc_wonky -> @@ -919,8 +951,12 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey) -> {Out, _Rem} = case crc_check_slot(FullBin) of - {Lengths, RestBin} -> - {B1P, B1L, B2L, B3L, B4L} = Lengths, + {BlockLengths, RestBin} -> + <> = BlockLengths, <<_PosBinIndex:B1P/binary, Blocks/binary>> = RestBin, lists:foldl(BlockFetchFun, {[], Blocks}, [B1L, B2L, B3L, B4L]); crc_wonky -> @@ -968,65 +1004,67 @@ trim_booleans(FirstKey, LastKey, StartKey, EndKey) -> end. - - crc_check_slot(FullBin) -> <> = FullBin, case erlang:crc32(SlotBin) of CRC32 -> - <> = SlotBin, - Lengths = {B1P, B1L, B2L, B3L, B4L}, - {Lengths, Rest}; + <> = SlotBin, + {BlockLengths, Rest}; _ -> leveled_log:log("SST09", []), crc_wonky end. +block_offsetandlength(BlockLengths, BlockID) -> + <> = BlockLengths, + case BlockID of + 1 -> + <> = BlockLengths0, + {BlocksPos, 0, B1L}; + 2 -> + <> = BlockLengths0, + {BlocksPos, B1L, B2L}; + 3 -> + <> = BlockLengths0, + {BlocksPos, B1L + B2L, B3L}; + 4 -> + <> = BlockLengths0, + {BlocksPos, B1L + B2L + B3L, B4L} + end. + double_hash(Hash, Key) -> H2 = erlang:phash2(Key), (Hash bxor H2) band 32767. -fetch_value([], _Lengths, _Blocks, _Key) -> +fetch_value([], _BlockLengths, _Blocks, _Key) -> not_present; -fetch_value([Pos|Rest], Lengths, Blocks, Key) -> - BlockNumber = (Pos div 32) + 1, - BlockPos = (Pos rem 32) + 1, - BlockL = - case BlockNumber of - 1 -> - B1L = element(2, Lengths), - <> = Blocks, - binary_to_term(Block); - 2 -> - B1L = element(2, Lengths), - B2L = element(3, Lengths), - <<_Pass:B1L/binary, Block:B2L/binary, _Rest/binary>> = Blocks, - binary_to_term(Block); - 3 -> - PreL = element(2, Lengths) + element(3, Lengths), - B3L = element(4, Lengths), - <<_Pass:PreL/binary, Block:B3L/binary, _Rest/binary>> = Blocks, - binary_to_term(Block); - 4 -> - {_B1P, B1L, B2L, B3L, B4L} = Lengths, - PreL = B1L + B2L + B3L, - <<_Pass:PreL/binary, Block:B4L/binary>> = Blocks, - binary_to_term(Block) - end, - +fetch_value([Pos|Rest], BlockLengths, Blocks, Key) -> + {BlockNumber, BlockPos} = revert_position(Pos), + {_BlockPos, + Offset, + Length} = block_offsetandlength(BlockLengths, BlockNumber), + <<_Pre:Offset/binary, Block:Length/binary, _Rest/binary>> = Blocks, + BlockL = binary_to_term(Block), {K, V} = lists:nth(BlockPos, BlockL), case K of Key -> {K, V}; _ -> - fetch_value(Rest, Lengths, Blocks, Key) + fetch_value(Rest, BlockLengths, Blocks, Key) end. + +revert_position(Pos) -> + BlockNumber = (Pos div 32) + 1, + BlockPos = (Pos rem 32) + 1, + {BlockNumber, BlockPos}. + find_pos(<<>>, _Hash, PosList, _Count) -> PosList; find_pos(<<1:1/integer, Hash:15/integer, T/binary>>, Hash, PosList, Count) -> @@ -1042,8 +1080,25 @@ find_pos(<<0:1/integer, NHC:7/integer, T/binary>>, Hash, PosList, Count) -> %%% Merge Functions %%%============================================================================ -%% functions for merging two KV lists with pointers - +%% The source lists are merged into lists of slots before the file is created +%% At Level zero, there will be a single source list - and this will always be +%% split into standard size slots +%% +%% At lower levels there will be two source lists and they will need to be +%% merged to ensure that the best conflicting answer survives and compactable +%% KV pairs are discarded. +%% +%% At lower levels slots can be larger if there are no lookup keys present in +%% the slot. This is to slow the growth of the manifest/number-of-files when +%% large numbers of index keys are present - as well as improving compression +%% ratios in the Ledger. +%% +%% The outcome of merge_lists/1 and merge_lists/3 should be an list of slots. +%% Each slot should be ordered by Key and be of the form {Flag, KVList}, where +%% Flag can either be lookup or no-lookup. The list of slots should also be +%% ordered by Key (i.e. the first key in the slot) +%% +%% For merging ... %% Compare the keys at the head of the list, and either skip that "best" key or %% identify as the next key. %% @@ -1054,23 +1109,117 @@ find_pos(<<0:1/integer, NHC:7/integer, T/binary>>, Hash, PosList, Count) -> %% there are matching keys then the highest sequence number must be chosen and %% any lower sequence numbers should be compacted out of existence -merge_lists(KeyList1, KeyList2, LevelInfo) -> - merge_lists(KeyList1, KeyList2, LevelInfo, [], ?MAX_SLOTS * ?SLOT_SIZE). +merge_lists(KVList1) -> + SlotCount = length(KVList1) div ?SLOT_SIZE, + {[], + [], + split_lists(KVList1, [], SlotCount), + element(1, lists:nth(1, KVList1))}. -merge_lists([], [], _LevelR, MergedList, _MaxSize) -> - {{[], []}, lists:reverse(MergedList)}; -merge_lists(Rem1, Rem2, _LevelR, MergedList, 0) -> - {{Rem1, Rem2}, lists:reverse(MergedList)}; -merge_lists(KeyList1, KeyList2, {IsBasement, TS}, MergedList, MaxSize) -> - case key_dominates(KeyList1, KeyList2, {IsBasement, TS}) of - {{next_key, TopKey}, Rem1, Rem2} -> - merge_lists(Rem1, +split_lists([], SlotLists, 0) -> + lists:reverse(SlotLists); +split_lists(LastPuff, SlotLists, 0) -> + SlotD = generate_binary_slot(lookup, LastPuff), + lists:reverse([SlotD|SlotLists]); +split_lists(KVList1, SlotLists, N) -> + {Slot, KVListRem} = lists:split(?SLOT_SIZE, KVList1), + SlotD = generate_binary_slot(lookup, Slot), + split_lists(KVListRem, [SlotD|SlotLists], N - 1). + +merge_lists(KVList1, KVList2, LevelInfo) -> + merge_lists(KVList1, KVList2, LevelInfo, [], null, 0). + +merge_lists(KVList1, KVList2, _LI, SlotList, FirstKey, ?MAX_SLOTS) -> + {KVList1, KVList2, lists:reverse(SlotList), FirstKey}; +merge_lists([], [], _LI, SlotList, FirstKey, _SlotCount) -> + {[], [], lists:reverse(SlotList), FirstKey}; +merge_lists(KVList1, KVList2, LI, SlotList, FirstKey, SlotCount) -> + {KVRem1, KVRem2, Slot, FK0} = + form_slot(KVList1, KVList2, LI, no_lookup, 0, [], FirstKey), + case Slot of + {_, []} -> + merge_lists(KVRem1, + KVRem2, + LI, + SlotList, + FK0, + SlotCount); + {Lookup, KVL} -> + SlotD = generate_binary_slot(Lookup, KVL), + merge_lists(KVRem1, + KVRem2, + LI, + [SlotD|SlotList], + FK0, + SlotCount + 1) + end. + +form_slot([], [], _LI, Type, _Size, Slot, FK) -> + {[], [], {Type, lists:reverse(Slot)}, FK}; +form_slot(KVList1, KVList2, _LI, lookup, ?SLOT_SIZE, Slot, FK) -> + {KVList1, KVList2, {lookup, lists:reverse(Slot)}, FK}; +form_slot(KVList1, KVList2, _LI, no_lookup, ?NOLOOK_SLOTSIZE, Slot, FK) -> + {KVList1, KVList2, {no_lookup, lists:reverse(Slot)}, FK}; +form_slot(KVList1, KVList2, {IsBasement, TS}, lookup, Size, Slot, FK) -> + case {key_dominates(KVList1, KVList2, {IsBasement, TS}), FK} of + {{{next_key, TopKV}, Rem1, Rem2}, null} -> + {TopK, _TopV} = TopKV, + form_slot(Rem1, Rem2, {IsBasement, TS}, - [TopKey|MergedList], - MaxSize - 1); + lookup, + Size + 1, + [TopKV|Slot], + TopK); + {{{next_key, TopKV}, Rem1, Rem2}, _} -> + form_slot(Rem1, + Rem2, + {IsBasement, TS}, + lookup, + Size + 1, + [TopKV|Slot], + FK); + {{skipped_key, Rem1, Rem2}, _} -> + form_slot(Rem1, Rem2, {IsBasement, TS}, lookup, Size, Slot, FK) + end; +form_slot(KVList1, KVList2, {IsBasement, TS}, no_lookup, Size, Slot, FK) -> + case key_dominates(KVList1, KVList2, {IsBasement, TS}) of + {{next_key, {TopK, TopV}}, Rem1, Rem2} -> + FK0 = + case FK of + null -> + TopK; + _ -> + FK + end, + case leveled_codec:to_lookup(TopK) of + no_lookup -> + form_slot(Rem1, + Rem2, + {IsBasement, TS}, + no_lookup, + Size + 1, + [{TopK, TopV}|Slot], + FK0); + lookup -> + case Size >= ?SLOT_SIZE of + true -> + {KVList1, + KVList2, + {no_lookup, lists:reverse(Slot)}, + FK}; + false -> + form_slot(Rem1, + Rem2, + {IsBasement, TS}, + lookup, + Size + 1, + [{TopK, TopV}|Slot], + FK0) + end + end; {skipped_key, Rem1, Rem2} -> - merge_lists(Rem1, Rem2, {IsBasement, TS}, MergedList, MaxSize) + form_slot(Rem1, Rem2, {IsBasement, TS}, no_lookup, Size, Slot, FK) end. key_dominates(KL1, KL2, Level) -> @@ -1202,14 +1351,41 @@ generate_indexkeys(Count) -> generate_indexkeys(0, IndexList) -> IndexList; generate_indexkeys(Count, IndexList) -> - IndexSpecs = [{add, "t1_int", random:uniform(80000)}], - Changes = leveled_codec:convert_indexspecs(IndexSpecs, - "Bucket", - "Key" ++ integer_to_list(Count), - Count, - infinity), + Changes = generate_indexkey(random:uniform(8000), Count), generate_indexkeys(Count - 1, IndexList ++ Changes). +generate_indexkey(Term, Count) -> + IndexSpecs = [{add, "t1_int", Term}], + leveled_codec:convert_indexspecs(IndexSpecs, + "Bucket", + "Key" ++ integer_to_list(Count), + Count, + infinity). + +form_slot_test() -> + % If a skip key happens, mustn't switch to loookup by accident as could be + % over the expected size + SkippingKV = {{o, "B1", "K9999", null}, {9999, tomb, 1234567, {}}}, + Slot = [{{o, "B1", "K5", null}, {5, active, 99234567, {}}}], + R1 = form_slot([SkippingKV], [], + {true, 99999999}, + no_lookup, + ?SLOT_SIZE + 1, + Slot, + {o, "B1", "K5", null}), + ?assertMatch({[], [], {no_lookup, Slot}, {o, "B1", "K5", null}}, R1). + +merge_tombstonelist_test() -> + % Merge lists wiht nothing but tombstones + SkippingKV1 = {{o, "B1", "K9995", null}, {9995, tomb, 1234567, {}}}, + SkippingKV2 = {{o, "B1", "K9996", null}, {9996, tomb, 1234567, {}}}, + SkippingKV3 = {{o, "B1", "K9997", null}, {9997, tomb, 1234567, {}}}, + SkippingKV4 = {{o, "B1", "K9998", null}, {9998, tomb, 1234567, {}}}, + SkippingKV5 = {{o, "B1", "K9999", null}, {9999, tomb, 1234567, {}}}, + R = merge_lists([SkippingKV1, SkippingKV3, SkippingKV5], + [SkippingKV2, SkippingKV4], + {true, 9999999}), + ?assertMatch({[], [], [], null}, R). indexed_list_test() -> io:format(user, "~nIndexed list timing test:~n", []), @@ -1219,7 +1395,7 @@ indexed_list_test() -> SW0 = os:timestamp(), - {_PosBinIndex1, FullBin, _HL} = generate_binary_slot(KVL1), + {_PosBinIndex1, FullBin, _HL, _LK} = generate_binary_slot(lookup, KVL1), io:format(user, "Indexed list created slot in ~w microseconds of size ~w~n", [timer:now_diff(os:timestamp(), SW0), byte_size(FullBin)]), @@ -1247,7 +1423,7 @@ indexed_list_mixedkeys_test() -> KVL1 = lists:sublist(KVL0, 33), Keys = lists:ukeysort(1, generate_indexkeys(60) ++ KVL1), - {_PosBinIndex1, FullBin, _HL} = generate_binary_slot(Keys), + {_PosBinIndex1, FullBin, _HL, _LK} = generate_binary_slot(lookup, Keys), {TestK1, TestV1} = lists:nth(4, KVL1), MH1 = leveled_codec:magic_hash(TestK1), @@ -1273,7 +1449,7 @@ indexed_list_mixedkeys2_test() -> IdxKeys2 = lists:ukeysort(1, generate_indexkeys(30)), % this isn't actually ordered correctly Keys = IdxKeys1 ++ KVL1 ++ IdxKeys2, - {_PosBinIndex1, FullBin, _HL} = generate_binary_slot(Keys), + {_PosBinIndex1, FullBin, _HL, _LK} = generate_binary_slot(lookup, Keys), lists:foreach(fun({K, V}) -> MH = leveled_codec:magic_hash(K), test_binary_slot(FullBin, K, MH, {K, V}) @@ -1282,8 +1458,8 @@ indexed_list_mixedkeys2_test() -> indexed_list_allindexkeys_test() -> Keys = lists:sublist(lists:ukeysort(1, generate_indexkeys(150)), 128), - {PosBinIndex1, FullBin, _HL} = generate_binary_slot(Keys), - ?assertMatch(<<127:8/integer>>, PosBinIndex1), + {PosBinIndex1, FullBin, _HL, _LK} = generate_binary_slot(lookup, Keys), + ?assertMatch(<<_BL:20/binary, 127:8/integer>>, PosBinIndex1), % SW = os:timestamp(), BinToList = binaryslot_tolist(FullBin), % io:format(user, @@ -1292,11 +1468,23 @@ indexed_list_allindexkeys_test() -> ?assertMatch(Keys, BinToList), ?assertMatch(Keys, binaryslot_trimmedlist(FullBin, all, all)). +indexed_list_allindexkeys_nolookup_test() -> + Keys = lists:sublist(lists:ukeysort(1, generate_indexkeys(1000)), + 128 * ?NOLOOK_MULT), + {PosBinIndex1, FullBin, _HL, _LK} = generate_binary_slot(no_lookup, Keys), + ?assertMatch(<<_BL:20/binary, 127:8/integer>>, PosBinIndex1), + % SW = os:timestamp(), + BinToList = binaryslot_tolist(FullBin), + % io:format(user, + % "Indexed list flattened in ~w microseconds ~n", + % [timer:now_diff(os:timestamp(), SW)]), + ?assertMatch(Keys, BinToList), + ?assertMatch(Keys, binaryslot_trimmedlist(FullBin, all, all)). indexed_list_allindexkeys_trimmed_test() -> Keys = lists:sublist(lists:ukeysort(1, generate_indexkeys(150)), 128), - {PosBinIndex1, FullBin, _HL} = generate_binary_slot(Keys), - ?assertMatch(<<127:8/integer>>, PosBinIndex1), + {PosBinIndex1, FullBin, _HL, _LK} = generate_binary_slot(lookup, Keys), + ?assertMatch(<<_BL:20/binary, 127:8/integer>>, PosBinIndex1), ?assertMatch(Keys, binaryslot_trimmedlist(FullBin, {i, "Bucket", @@ -1333,7 +1521,8 @@ indexed_list_mixedkeys_bitflip_test() -> KVL0 = lists:ukeysort(1, generate_randomkeys(1, 50, 1, 4)), KVL1 = lists:sublist(KVL0, 33), Keys = lists:ukeysort(1, generate_indexkeys(60) ++ KVL1), - {_PosBinIndex1, FullBin, _HL} = generate_binary_slot(Keys), + {_PosBinIndex1, FullBin, _HL, LK} = generate_binary_slot(lookup, Keys), + ?assertMatch(LK, element(1, lists:last(Keys))), L = byte_size(FullBin), Byte1 = random:uniform(L), <> = FullBin, @@ -1362,7 +1551,7 @@ indexed_list_mixedkeys_bitflip_test() -> test_binary_slot(FullBin, Key, Hash, ExpectedValue) -> % SW = os:timestamp(), - {ReturnedValue, _} = binaryslot_get(FullBin, Key, Hash, none), + {ReturnedValue, _BLs, _Idx} = binaryslot_get(FullBin, Key, Hash), ?assertMatch(ExpectedValue, ReturnedValue). % io:format(user, "Fetch success in ~w microseconds ~n", % [timer:now_diff(os:timestamp(), SW)]). @@ -1387,16 +1576,12 @@ merge_test() -> ?assertMatch(ExpLK2, LK2), ML1 = [{next, #manifest_entry{owner = P1}, FK1}], ML2 = [{next, #manifest_entry{owner = P2}, FK2}], - {ok, P3, {{Rem1, Rem2}, FK3, LK3}} = sst_new("../test/", - "level2_merge", - ML1, - ML2, - false, - 2, - N * 2), + NewR = sst_new("../test/", "level2_merge", ML1, ML2, false, 2, N * 2), + {ok, P3, {{Rem1, Rem2}, FK3, LK3}} = NewR, ?assertMatch([], Rem1), ?assertMatch([], Rem2), ?assertMatch(true, FK3 == min(FK1, FK2)), + io:format("LK1 ~w LK2 ~w LK3 ~w~n", [LK1, LK2, LK3]), ?assertMatch(true, LK3 == max(LK1, LK2)), io:format(user, "Created and merged two files of size ~w in ~w microseconds~n", @@ -1459,6 +1644,77 @@ simple_persisted_range_test() -> TL4 = lists:map(fun(EK) -> {SK4, EK} end, [EK2, EK3, EK4, EK5]), TL5 = lists:map(fun(EK) -> {SK5, EK} end, [EK2, EK3, EK4, EK5]), lists:foreach(TestFun, TL2 ++ TL3 ++ TL4 ++ TL5). + +additional_range_test() -> + % Test fetching ranges that fall into odd situations with regards to the + % summayr index + % - ranges which fall between entries in summary + % - ranges which go beyond the end of the range of the sst + % - ranges which match to an end key in the summary index + IK1 = lists:foldl(fun(X, Acc) -> + Acc ++ generate_indexkey(X, X) + end, + [], + lists:seq(1, ?NOLOOK_SLOTSIZE)), + Gap = 2, + IK2 = lists:foldl(fun(X, Acc) -> + Acc ++ generate_indexkey(X, X) + end, + [], + lists:seq(?NOLOOK_SLOTSIZE + Gap + 1, + 2 * ?NOLOOK_SLOTSIZE + Gap)), + {ok, + P1, + {{Rem1, Rem2}, + SK, + EK}} = sst_new("../test/", "range1_src", IK1, IK2, false, 1, 9999), + ?assertMatch([], Rem1), + ?assertMatch([], Rem2), + ?assertMatch(SK, element(1, lists:nth(1, IK1))), + ?assertMatch(EK, element(1, lists:last(IK2))), + + % Basic test - checking scanwidth + R1 = sst_getkvrange(P1, SK, EK, 1), + ?assertMatch(?NOLOOK_SLOTSIZE + 1, length(R1)), + QR1 = lists:sublist(R1, ?NOLOOK_SLOTSIZE), + ?assertMatch(IK1, QR1), + R2 = sst_getkvrange(P1, SK, EK, 2), + ?assertMatch(?NOLOOK_SLOTSIZE * 2, length(R2)), + QR2 = lists:sublist(R2, ?NOLOOK_SLOTSIZE), + QR3 = lists:sublist(R2, ?NOLOOK_SLOTSIZE + 1, 2 * ?NOLOOK_SLOTSIZE), + ?assertMatch(IK1, QR2), + ?assertMatch(IK2, QR3), + + % Testing the gap + [GapSKV] = generate_indexkey(?NOLOOK_SLOTSIZE + 1, ?NOLOOK_SLOTSIZE + 1), + [GapEKV] = generate_indexkey(?NOLOOK_SLOTSIZE + 2, ?NOLOOK_SLOTSIZE + 2), + R3 = sst_getkvrange(P1, element(1, GapSKV), element(1, GapEKV), 1), + ?assertMatch([], R3), + + % Testing beyond the range + [PastEKV] = generate_indexkey(2 * ?NOLOOK_SLOTSIZE + Gap + 1, + 2 * ?NOLOOK_SLOTSIZE + Gap + 1), + R4 = sst_getkvrange(P1, element(1, GapSKV), element(1, PastEKV), 2), + ?assertMatch(IK2, R4), + R5 = sst_getkvrange(P1, SK, element(1, PastEKV), 2), + IKAll = IK1 ++ IK2, + ?assertMatch(IKAll, R5), + [MidREKV] = generate_indexkey(?NOLOOK_SLOTSIZE + Gap + 2, + ?NOLOOK_SLOTSIZE + Gap + 2), + io:format(user, "Mid second range to past range test~n", []), + R6 = sst_getkvrange(P1, element(1, MidREKV), element(1, PastEKV), 2), + Exp6 = lists:sublist(IK2, 2, length(IK2)), + ?assertMatch(Exp6, R6), + + % Testing at a slot end + Slot1EK = element(1, lists:last(IK1)), + R7 = sst_getkvrange(P1, SK, Slot1EK, 2), + ?assertMatch(IK1, R7). + + % Testing beyond end (should never happen if manifest behaves) + % Test blows up anyway + % R8 = sst_getkvrange(P1, element(1, PastEKV), element(1, PastEKV), 2), + % ?assertMatch([], R8).