diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 39183d8..ef0377e 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -165,6 +165,7 @@ -define(DIVISOR, 8092). -define(COMPRESSION_LEVEL, 1). -define(HEADER_LEN, 56). +-define(ITERATOR_SCANWIDTH, 1). -record(state, {version = ?CURRENT_VERSION :: tuple(), @@ -254,38 +255,132 @@ complete_file(Handle, FileMD, KL1, KL2, Level) -> fetch_keyvalue(Handle, FileMD, Key) -> {_NearestKey, {FilterLen, PointerF}, {LengthList, PointerB}} = get_nearestkey(FileMD#state.slot_index, Key), - {ok, SegFilter} = file:pread(Handle, - PointerF + FileMD#state.filter_pointer, - FilterLen), + {ok, SegFilter} = file:pread(Handle, + PointerF + FileMD#state.filter_pointer, + FilterLen), SegID = hash_for_segmentid({keyonly, Key}), case check_for_segments(SegFilter, [SegID], true) of {maybe_present, BlockList} -> - io:format("Filter has returned maybe~n"), fetch_keyvalue_fromblock(BlockList, Key, LengthList, Handle, PointerB + FileMD#state.slots_pointer); not_present -> - io:format("Filter has returned no~n"), not_present; error_so_maybe_present -> - io:format("Filter has returned error~n"), - fetch_keyvalue_fromblock(lists:seq(0,3), + fetch_keyvalue_fromblock(lists:seq(0,length(LengthList)), Key, LengthList, Handle, PointerB + FileMD#state.slots_pointer) end. +%% Fetches a range of keys returning a list of {Key, SeqN} tuples +fetch_range_keysonly(Handle, FileMD, StartKey, EndKey) -> + fetch_range(Handle, FileMD, StartKey, EndKey, [], fun acc_list_keysonly/2). + +fetch_range_keysonly(Handle, FileMD, StartKey, EndKey, ScanWidth) -> + fetch_range(Handle, FileMD, StartKey, EndKey, [], fun acc_list_keysonly/2, ScanWidth). + +acc_list_keysonly(null, empty) -> + []; +acc_list_keysonly(null, RList) -> + RList; +acc_list_keysonly(R, RList) -> + lists:append(RList, [strip_to_key_seqn_only(R)]). + +%% Iterate keys, returning a batch of keys & values in a range +%% - the iterator can have a ScanWidth which is how many slots should be +%% scanned by the iterator before returning a result +%% - batches can be ended with a pointer to indicate there are potentially +%% further values in the range +%% - a list of functions can be provided, which should either return true +%% or false, and these can be used to filter the results from the query, +%% for example to ignore keys above a certain sequence number, to ignore +%% keys not matching a certain regular expression, or to ignore keys not +%% a member of a particular partition +%% - An Accumulator and an Accumulator function can be passed. The function +%% needs to handle being passed (KV, Acc) to add the current result to the +%% Accumulator. The functional should handle KV=null, Acc=empty to initiate +%% the accumulator, and KV=null to leave the Accumulator unchanged. +%% Flexibility with accumulators is such that keys-only can be returned rather +%% than keys and values, or other entirely different accumulators can be +%% used - e.g. counters, hash-lists to build bloom filters etc + +fetch_range(Handle, FileMD, StartKey, EndKey, FunList, AccFun) -> + fetch_range(Handle, FileMD, StartKey, EndKey, FunList, AccFun, ?ITERATOR_SCANWIDTH). + +fetch_range(Handle, FileMD, StartKey, EndKey, FunList, AccFun, ScanWidth) -> + fetch_range(Handle, FileMD, StartKey, EndKey, FunList, AccFun, ScanWidth, empty). + +fetch_range(_Handle, _FileMD, StartKey, _EndKey, _FunList, _AccFun, 0, Acc) -> + {partial, Acc, StartKey}; +fetch_range(Handle, FileMD, StartKey, EndKey, FunList, AccFun, ScanWidth, Acc) -> + %% get_nearestkey gets the last key in the index <= StartKey, or the next + %% key along if {next, StartKey} is passed + case get_nearestkey(FileMD#state.slot_index, StartKey) of + {NearestKey, _Filter, {LengthList, PointerB}} -> + fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, FunList, AccFun, ScanWidth, + LengthList, 0, PointerB + FileMD#state.slots_pointer, + AccFun(null, Acc)); + not_found -> + {complete, Acc} + end. + +fetch_range(Handle, FileMD, _StartKey, NearestKey, EndKey, FunList, AccFun, ScanWidth, + LengthList, BlockNumber, _Pointer, Acc) + when length(LengthList) == BlockNumber -> + %% Reached the end of the slot. Move the start key on one to scan a new slot + fetch_range(Handle, FileMD, {next, NearestKey}, EndKey, FunList, AccFun, ScanWidth - 1, Acc); +fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, FunList, AccFun, ScanWidth, + LengthList, BlockNumber, Pointer, Acc) -> + Block = fetch_block(Handle, LengthList, BlockNumber, Pointer), + Results = scan_block(Block, StartKey, EndKey, FunList, AccFun, Acc), + case Results of + {partial, Acc1, StartKey} -> + %% Move on to the next block + fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, FunList, AccFun, ScanWidth, + LengthList, BlockNumber + 1, Pointer, Acc1); + {complete, Acc1} -> + {complete, Acc1} + end. + +scan_block([], StartKey, _EndKey, _FunList, _AccFun, Acc) -> + {partial, Acc, StartKey}; +scan_block([HeadKV|T], StartKey, EndKey, FunList, AccFun, Acc) -> + K = strip_to_keyonly(HeadKV), + case K of + K when K < StartKey -> + scan_block(T, StartKey, EndKey, FunList, AccFun, Acc); + K when K > EndKey -> + {complete, Acc}; + _ -> + case applyfuns(FunList, HeadKV) of + include -> + %% Add result to the accumulator + scan_block(T, StartKey, EndKey, FunList, + AccFun, AccFun(HeadKV, Acc)); + skip -> + scan_block(T, StartKey, EndKey, FunList, + AccFun, Acc) + end + end. + +applyfuns([], _KV) -> + include; +applyfuns([HeadFun|OtherFuns], KV) -> + case HeadFun(KV) of + true -> + applyfuns(OtherFuns, KV); + false -> + skip + end. fetch_keyvalue_fromblock([], _Key, _LengthList, _Handle, _StartOfSlot) -> not_present; fetch_keyvalue_fromblock([BlockNumber|T], Key, LengthList, Handle, StartOfSlot) -> - Start = lists:sum(lists:sublist(LengthList, BlockNumber)), - Length = lists:nth(BlockNumber + 1, LengthList), - {ok, BlockToCheckBin} = file:pread(Handle, Start + StartOfSlot, Length), - BlockToCheck = binary_to_term(BlockToCheckBin), + BlockToCheck = fetch_block(Handle, LengthList, BlockNumber, StartOfSlot), Result = lists:keyfind(Key, 1, BlockToCheck), case Result of false -> @@ -294,17 +389,48 @@ fetch_keyvalue_fromblock([BlockNumber|T], Key, LengthList, Handle, StartOfSlot) KV end. - +fetch_block(Handle, LengthList, BlockNumber, StartOfSlot) -> + Start = lists:sum(lists:sublist(LengthList, BlockNumber)), + Length = lists:nth(BlockNumber + 1, LengthList), + {ok, BlockToCheckBin} = file:pread(Handle, Start + StartOfSlot, Length), + binary_to_term(BlockToCheckBin). + +%% Need to deal with either Key or {next, Key} get_nearestkey(KVList, Key) -> - get_nearestkey(KVList, Key, not_found). - -get_nearestkey([], _KeyToFind, PrevV) -> + case Key of + {first, K} -> + get_firstkeytomatch(KVList, K, not_found); + {next, K} -> + get_nextkeyaftermatch(KVList, K, not_found); + _ -> + get_firstkeytomatch(KVList, Key, not_found) + end. + +get_firstkeytomatch([], _KeyToFind, PrevV) -> PrevV; -get_nearestkey([{K, _FilterInfo, _SlotInfo}|_T], KeyToFind, PrevV) +get_firstkeytomatch([{K, FilterInfo, SlotInfo}|_T], KeyToFind, PrevV) when K > KeyToFind -> - PrevV; -get_nearestkey([Result|T], KeyToFind, _) -> - get_nearestkey(T, KeyToFind, Result). + case PrevV of + not_found -> + {K, FilterInfo, SlotInfo}; + _ -> + PrevV + end; +get_firstkeytomatch([{K, FilterInfo, SlotInfo}|T], KeyToFind, _PrevV) -> + get_firstkeytomatch(T, KeyToFind, {K, FilterInfo, SlotInfo}). + +get_nextkeyaftermatch([], _KeyToFind, _PrevV) -> + not_found; +get_nextkeyaftermatch([{K, FilterInfo, SlotInfo}|T], KeyToFind, PrevV) + when K >= KeyToFind -> + case PrevV of + not_found -> + get_nextkeyaftermatch(T, KeyToFind, next); + next -> + {K, FilterInfo, SlotInfo} + end; +get_nextkeyaftermatch([_KTuple|T], KeyToFind, PrevV) -> + get_nextkeyaftermatch(T, KeyToFind, PrevV). %% Take a file handle at the sart position (after creating the header) and then @@ -570,6 +696,8 @@ last(E, [], _) -> E. strip_to_keyonly({keyonly, K}) -> K; strip_to_keyonly({K, _, _, _}) -> K. +strip_to_key_seqn_only({K, SeqN, _, _}) -> {K, SeqN}. + serialise_block(BlockKeyList) -> term_to_binary(BlockKeyList, [{compressed, ?COMPRESSION_LEVEL}]). @@ -933,7 +1061,7 @@ generate_randomkeys(Count, Acc) -> RandKey = {{o, lists:concat(["Bucket", random:uniform(1024)]), lists:concat(["Key", random:uniform(1024)])}, - random:uniform(1024*1024), + Count + 1, {active, infinity}, null}, generate_randomkeys(Count - 1, [RandKey|Acc]). @@ -1195,7 +1323,7 @@ initial_create_file_test() -> big_create_file_test() -> Filename = "../test/bigtest1.sft", - {KL1, KL2} = {lists:sort(generate_randomkeys(50000)), + {KL1, KL2} = {lists:sort(generate_randomkeys(2000)), lists:sort(generate_randomkeys(50000))}, {InitHandle, InitFileMD} = create_file(Filename), {Handle, FileMD, {_KL1Rem, _KL2Rem}} = complete_file(InitHandle, @@ -1207,18 +1335,68 @@ big_create_file_test() -> Result2 = fetch_keyvalue(Handle, FileMD, K2), ?assertMatch(Result1, {K1, Sq1, St1, V1}), ?assertMatch(Result2, {K2, Sq2, St2, V2}), + SubList = lists:sublist(KL2, 1000), FailedFinds = lists:foldl(fun(K, Acc) -> {Kn, _, _, _} = K, Rn = fetch_keyvalue(Handle, FileMD, Kn), case Rn of - {Kn, _, _, _} -> Acc; - _ -> Acc + 1 + {Kn, _, _, _} -> + Acc; + _ -> + Acc + 1 end end, 0, - lists:sublist(KL2, 1000)), + SubList), + io:format("FailedFinds of ~w~n", [FailedFinds]), ?assertMatch(FailedFinds, 0), Result3 = fetch_keyvalue(Handle, FileMD, {o, "Bucket1024", "Key1024Alt"}), ?assertMatch(Result3, not_present), ok = file:close(Handle), ok = file:delete(Filename). + +initial_iterator_test() -> + Filename = "../test/test2.sft", + {KL1, KL2} = sample_keylist(), + {Handle, FileMD} = create_file(Filename), + {UpdHandle, UpdFileMD, {[], []}} = complete_file(Handle, FileMD, KL1, KL2, 1), + Result1 = fetch_range_keysonly(UpdHandle, UpdFileMD, + {o, "Bucket1", "Key8"}, + {o, "Bucket1", "Key9d"}), + io:format("Result returned of ~w~n", [Result1]), + ?assertMatch(Result1, {complete, [{{o, "Bucket1", "Key8"}, 1}, + {{o, "Bucket1", "Key9"}, 1}, + {{o, "Bucket1", "Key9a"}, 1}, + {{o, "Bucket1", "Key9b"}, 1}, + {{o, "Bucket1", "Key9c"}, 1}, + {{o, "Bucket1", "Key9d"}, 1}]}), + Result2 = fetch_range_keysonly(UpdHandle, UpdFileMD, + {o, "Bucket1", "Key8"}, + {o, "Bucket1", "Key9b"}), + ?assertMatch(Result2, {complete, [{{o, "Bucket1", "Key8"}, 1}, + {{o, "Bucket1", "Key9"}, 1}, + {{o, "Bucket1", "Key9a"}, 1}, + {{o, "Bucket1", "Key9b"}, 1}]}), + ok = file:close(UpdHandle), + ok = file:delete(Filename). + +big_iterator_test() -> + Filename = "../test/bigtest1.sft", + {KL1, KL2} = {lists:sort(generate_randomkeys(10000)), []}, + {InitHandle, InitFileMD} = create_file(Filename), + {Handle, FileMD, {KL1Rem, KL2Rem}} = complete_file(InitHandle, + InitFileMD, + KL1, KL2, 1), + io:format("Remainder lengths are ~w and ~w ~n", [length(KL1Rem), length(KL2Rem)]), + {complete, Result1} = fetch_range_keysonly(Handle, FileMD, {o, "Bucket0000", "Key0000"}, + {o, "Bucket9999", "Key9999"}, + 256), + NumFoundKeys1 = length(Result1), + NumAddedKeys = 10000 - length(KL1Rem), + ?assertMatch(NumFoundKeys1, NumAddedKeys), + {partial, Result2, _} = fetch_range_keysonly(Handle, FileMD, {o, "Bucket0000", "Key0000"}, + {o, "Bucket9999", "Key9999"}, + 32), + NumFoundKeys2 = length(Result2), + ?assertMatch(NumFoundKeys2, 32 * 128). + \ No newline at end of file