Iterator support added

Initial support for iterators
This commit is contained in:
martinsumner 2016-07-12 19:42:50 +01:00
parent 9dae893958
commit 45c10613e7

View file

@ -165,6 +165,7 @@
-define(DIVISOR, 8092).
-define(COMPRESSION_LEVEL, 1).
-define(HEADER_LEN, 56).
-define(ITERATOR_SCANWIDTH, 1).
-record(state, {version = ?CURRENT_VERSION :: tuple(),
@ -254,38 +255,132 @@ complete_file(Handle, FileMD, KL1, KL2, Level) ->
fetch_keyvalue(Handle, FileMD, Key) ->
{_NearestKey, {FilterLen, PointerF},
{LengthList, PointerB}} = get_nearestkey(FileMD#state.slot_index, Key),
{ok, SegFilter} = file:pread(Handle,
PointerF + FileMD#state.filter_pointer,
FilterLen),
{ok, SegFilter} = file:pread(Handle,
PointerF + FileMD#state.filter_pointer,
FilterLen),
SegID = hash_for_segmentid({keyonly, Key}),
case check_for_segments(SegFilter, [SegID], true) of
{maybe_present, BlockList} ->
io:format("Filter has returned maybe~n"),
fetch_keyvalue_fromblock(BlockList,
Key,
LengthList,
Handle,
PointerB + FileMD#state.slots_pointer);
not_present ->
io:format("Filter has returned no~n"),
not_present;
error_so_maybe_present ->
io:format("Filter has returned error~n"),
fetch_keyvalue_fromblock(lists:seq(0,3),
fetch_keyvalue_fromblock(lists:seq(0,length(LengthList)),
Key,
LengthList,
Handle,
PointerB + FileMD#state.slots_pointer)
end.
%% Fetches a range of keys returning a list of {Key, SeqN} tuples
fetch_range_keysonly(Handle, FileMD, StartKey, EndKey) ->
fetch_range(Handle, FileMD, StartKey, EndKey, [], fun acc_list_keysonly/2).
fetch_range_keysonly(Handle, FileMD, StartKey, EndKey, ScanWidth) ->
fetch_range(Handle, FileMD, StartKey, EndKey, [], fun acc_list_keysonly/2, ScanWidth).
acc_list_keysonly(null, empty) ->
[];
acc_list_keysonly(null, RList) ->
RList;
acc_list_keysonly(R, RList) ->
lists:append(RList, [strip_to_key_seqn_only(R)]).
%% Iterate keys, returning a batch of keys & values in a range
%% - the iterator can have a ScanWidth which is how many slots should be
%% scanned by the iterator before returning a result
%% - batches can be ended with a pointer to indicate there are potentially
%% further values in the range
%% - a list of functions can be provided, which should either return true
%% or false, and these can be used to filter the results from the query,
%% for example to ignore keys above a certain sequence number, to ignore
%% keys not matching a certain regular expression, or to ignore keys not
%% a member of a particular partition
%% - An Accumulator and an Accumulator function can be passed. The function
%% needs to handle being passed (KV, Acc) to add the current result to the
%% Accumulator. The functional should handle KV=null, Acc=empty to initiate
%% the accumulator, and KV=null to leave the Accumulator unchanged.
%% Flexibility with accumulators is such that keys-only can be returned rather
%% than keys and values, or other entirely different accumulators can be
%% used - e.g. counters, hash-lists to build bloom filters etc
fetch_range(Handle, FileMD, StartKey, EndKey, FunList, AccFun) ->
fetch_range(Handle, FileMD, StartKey, EndKey, FunList, AccFun, ?ITERATOR_SCANWIDTH).
fetch_range(Handle, FileMD, StartKey, EndKey, FunList, AccFun, ScanWidth) ->
fetch_range(Handle, FileMD, StartKey, EndKey, FunList, AccFun, ScanWidth, empty).
fetch_range(_Handle, _FileMD, StartKey, _EndKey, _FunList, _AccFun, 0, Acc) ->
{partial, Acc, StartKey};
fetch_range(Handle, FileMD, StartKey, EndKey, FunList, AccFun, ScanWidth, Acc) ->
%% get_nearestkey gets the last key in the index <= StartKey, or the next
%% key along if {next, StartKey} is passed
case get_nearestkey(FileMD#state.slot_index, StartKey) of
{NearestKey, _Filter, {LengthList, PointerB}} ->
fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, FunList, AccFun, ScanWidth,
LengthList, 0, PointerB + FileMD#state.slots_pointer,
AccFun(null, Acc));
not_found ->
{complete, Acc}
end.
fetch_range(Handle, FileMD, _StartKey, NearestKey, EndKey, FunList, AccFun, ScanWidth,
LengthList, BlockNumber, _Pointer, Acc)
when length(LengthList) == BlockNumber ->
%% Reached the end of the slot. Move the start key on one to scan a new slot
fetch_range(Handle, FileMD, {next, NearestKey}, EndKey, FunList, AccFun, ScanWidth - 1, Acc);
fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, FunList, AccFun, ScanWidth,
LengthList, BlockNumber, Pointer, Acc) ->
Block = fetch_block(Handle, LengthList, BlockNumber, Pointer),
Results = scan_block(Block, StartKey, EndKey, FunList, AccFun, Acc),
case Results of
{partial, Acc1, StartKey} ->
%% Move on to the next block
fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, FunList, AccFun, ScanWidth,
LengthList, BlockNumber + 1, Pointer, Acc1);
{complete, Acc1} ->
{complete, Acc1}
end.
scan_block([], StartKey, _EndKey, _FunList, _AccFun, Acc) ->
{partial, Acc, StartKey};
scan_block([HeadKV|T], StartKey, EndKey, FunList, AccFun, Acc) ->
K = strip_to_keyonly(HeadKV),
case K of
K when K < StartKey ->
scan_block(T, StartKey, EndKey, FunList, AccFun, Acc);
K when K > EndKey ->
{complete, Acc};
_ ->
case applyfuns(FunList, HeadKV) of
include ->
%% Add result to the accumulator
scan_block(T, StartKey, EndKey, FunList,
AccFun, AccFun(HeadKV, Acc));
skip ->
scan_block(T, StartKey, EndKey, FunList,
AccFun, Acc)
end
end.
applyfuns([], _KV) ->
include;
applyfuns([HeadFun|OtherFuns], KV) ->
case HeadFun(KV) of
true ->
applyfuns(OtherFuns, KV);
false ->
skip
end.
fetch_keyvalue_fromblock([], _Key, _LengthList, _Handle, _StartOfSlot) ->
not_present;
fetch_keyvalue_fromblock([BlockNumber|T], Key, LengthList, Handle, StartOfSlot) ->
Start = lists:sum(lists:sublist(LengthList, BlockNumber)),
Length = lists:nth(BlockNumber + 1, LengthList),
{ok, BlockToCheckBin} = file:pread(Handle, Start + StartOfSlot, Length),
BlockToCheck = binary_to_term(BlockToCheckBin),
BlockToCheck = fetch_block(Handle, LengthList, BlockNumber, StartOfSlot),
Result = lists:keyfind(Key, 1, BlockToCheck),
case Result of
false ->
@ -294,17 +389,48 @@ fetch_keyvalue_fromblock([BlockNumber|T], Key, LengthList, Handle, StartOfSlot)
KV
end.
fetch_block(Handle, LengthList, BlockNumber, StartOfSlot) ->
Start = lists:sum(lists:sublist(LengthList, BlockNumber)),
Length = lists:nth(BlockNumber + 1, LengthList),
{ok, BlockToCheckBin} = file:pread(Handle, Start + StartOfSlot, Length),
binary_to_term(BlockToCheckBin).
%% Need to deal with either Key or {next, Key}
get_nearestkey(KVList, Key) ->
get_nearestkey(KVList, Key, not_found).
get_nearestkey([], _KeyToFind, PrevV) ->
case Key of
{first, K} ->
get_firstkeytomatch(KVList, K, not_found);
{next, K} ->
get_nextkeyaftermatch(KVList, K, not_found);
_ ->
get_firstkeytomatch(KVList, Key, not_found)
end.
get_firstkeytomatch([], _KeyToFind, PrevV) ->
PrevV;
get_nearestkey([{K, _FilterInfo, _SlotInfo}|_T], KeyToFind, PrevV)
get_firstkeytomatch([{K, FilterInfo, SlotInfo}|_T], KeyToFind, PrevV)
when K > KeyToFind ->
PrevV;
get_nearestkey([Result|T], KeyToFind, _) ->
get_nearestkey(T, KeyToFind, Result).
case PrevV of
not_found ->
{K, FilterInfo, SlotInfo};
_ ->
PrevV
end;
get_firstkeytomatch([{K, FilterInfo, SlotInfo}|T], KeyToFind, _PrevV) ->
get_firstkeytomatch(T, KeyToFind, {K, FilterInfo, SlotInfo}).
get_nextkeyaftermatch([], _KeyToFind, _PrevV) ->
not_found;
get_nextkeyaftermatch([{K, FilterInfo, SlotInfo}|T], KeyToFind, PrevV)
when K >= KeyToFind ->
case PrevV of
not_found ->
get_nextkeyaftermatch(T, KeyToFind, next);
next ->
{K, FilterInfo, SlotInfo}
end;
get_nextkeyaftermatch([_KTuple|T], KeyToFind, PrevV) ->
get_nextkeyaftermatch(T, KeyToFind, PrevV).
%% Take a file handle at the sart position (after creating the header) and then
@ -570,6 +696,8 @@ last(E, [], _) -> E.
strip_to_keyonly({keyonly, K}) -> K;
strip_to_keyonly({K, _, _, _}) -> K.
strip_to_key_seqn_only({K, SeqN, _, _}) -> {K, SeqN}.
serialise_block(BlockKeyList) ->
term_to_binary(BlockKeyList, [{compressed, ?COMPRESSION_LEVEL}]).
@ -933,7 +1061,7 @@ generate_randomkeys(Count, Acc) ->
RandKey = {{o,
lists:concat(["Bucket", random:uniform(1024)]),
lists:concat(["Key", random:uniform(1024)])},
random:uniform(1024*1024),
Count + 1,
{active, infinity}, null},
generate_randomkeys(Count - 1, [RandKey|Acc]).
@ -1195,7 +1323,7 @@ initial_create_file_test() ->
big_create_file_test() ->
Filename = "../test/bigtest1.sft",
{KL1, KL2} = {lists:sort(generate_randomkeys(50000)),
{KL1, KL2} = {lists:sort(generate_randomkeys(2000)),
lists:sort(generate_randomkeys(50000))},
{InitHandle, InitFileMD} = create_file(Filename),
{Handle, FileMD, {_KL1Rem, _KL2Rem}} = complete_file(InitHandle,
@ -1207,18 +1335,68 @@ big_create_file_test() ->
Result2 = fetch_keyvalue(Handle, FileMD, K2),
?assertMatch(Result1, {K1, Sq1, St1, V1}),
?assertMatch(Result2, {K2, Sq2, St2, V2}),
SubList = lists:sublist(KL2, 1000),
FailedFinds = lists:foldl(fun(K, Acc) ->
{Kn, _, _, _} = K,
Rn = fetch_keyvalue(Handle, FileMD, Kn),
case Rn of
{Kn, _, _, _} -> Acc;
_ -> Acc + 1
{Kn, _, _, _} ->
Acc;
_ ->
Acc + 1
end
end,
0,
lists:sublist(KL2, 1000)),
SubList),
io:format("FailedFinds of ~w~n", [FailedFinds]),
?assertMatch(FailedFinds, 0),
Result3 = fetch_keyvalue(Handle, FileMD, {o, "Bucket1024", "Key1024Alt"}),
?assertMatch(Result3, not_present),
ok = file:close(Handle),
ok = file:delete(Filename).
initial_iterator_test() ->
Filename = "../test/test2.sft",
{KL1, KL2} = sample_keylist(),
{Handle, FileMD} = create_file(Filename),
{UpdHandle, UpdFileMD, {[], []}} = complete_file(Handle, FileMD, KL1, KL2, 1),
Result1 = fetch_range_keysonly(UpdHandle, UpdFileMD,
{o, "Bucket1", "Key8"},
{o, "Bucket1", "Key9d"}),
io:format("Result returned of ~w~n", [Result1]),
?assertMatch(Result1, {complete, [{{o, "Bucket1", "Key8"}, 1},
{{o, "Bucket1", "Key9"}, 1},
{{o, "Bucket1", "Key9a"}, 1},
{{o, "Bucket1", "Key9b"}, 1},
{{o, "Bucket1", "Key9c"}, 1},
{{o, "Bucket1", "Key9d"}, 1}]}),
Result2 = fetch_range_keysonly(UpdHandle, UpdFileMD,
{o, "Bucket1", "Key8"},
{o, "Bucket1", "Key9b"}),
?assertMatch(Result2, {complete, [{{o, "Bucket1", "Key8"}, 1},
{{o, "Bucket1", "Key9"}, 1},
{{o, "Bucket1", "Key9a"}, 1},
{{o, "Bucket1", "Key9b"}, 1}]}),
ok = file:close(UpdHandle),
ok = file:delete(Filename).
big_iterator_test() ->
Filename = "../test/bigtest1.sft",
{KL1, KL2} = {lists:sort(generate_randomkeys(10000)), []},
{InitHandle, InitFileMD} = create_file(Filename),
{Handle, FileMD, {KL1Rem, KL2Rem}} = complete_file(InitHandle,
InitFileMD,
KL1, KL2, 1),
io:format("Remainder lengths are ~w and ~w ~n", [length(KL1Rem), length(KL2Rem)]),
{complete, Result1} = fetch_range_keysonly(Handle, FileMD, {o, "Bucket0000", "Key0000"},
{o, "Bucket9999", "Key9999"},
256),
NumFoundKeys1 = length(Result1),
NumAddedKeys = 10000 - length(KL1Rem),
?assertMatch(NumFoundKeys1, NumAddedKeys),
{partial, Result2, _} = fetch_range_keysonly(Handle, FileMD, {o, "Bucket0000", "Key0000"},
{o, "Bucket9999", "Key9999"},
32),
NumFoundKeys2 = length(Result2),
?assertMatch(NumFoundKeys2, 32 * 128).