Add merging of files

Add first function of worker - the ability to merge multiple files
together from different levels
This commit is contained in:
martinsumner 2016-07-22 16:57:28 +01:00
parent d96ac87fb5
commit a07ea27dd8
2 changed files with 422 additions and 27 deletions

View file

@ -142,13 +142,24 @@
-module(leveled_sft).
-export([create_file/1,
generate_segment_filter/1,
serialise_segment_filter/1,
check_for_segments/3,
-behaviour(gen_server).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3,
speedtest_check_forsegment/4,
generate_randomsegfilter/1,
create_slot/3]).
generate_randomkeys/1,
strip_to_keyonly/1,
sft_new/4,
sft_open/1,
sft_get/2,
sft_getkeyrange/4,
sft_close/1,
sft_clear/1]).
-include_lib("eunit/include/eunit.hrl").
@ -166,6 +177,7 @@
-define(COMPRESSION_LEVEL, 1).
-define(HEADER_LEN, 56).
-define(ITERATOR_SCANWIDTH, 1).
-define(MERGE_SCANWIDTH, 8).
-record(state, {version = ?CURRENT_VERSION :: tuple(),
@ -180,19 +192,168 @@
filter_pointer :: integer(),
summ_pointer :: integer(),
summ_length :: integer(),
filename :: string()}).
filename :: string(),
handle :: file:fd()}).
%%%============================================================================
%%% API
%%%============================================================================
sft_new(Filename, KL1, KL2, Level) ->
{ok, Pid} = gen_server:start(?MODULE, [], []),
Reply = gen_server:call(Pid, {sft_new, Filename, KL1, KL2, Level}, infinity),
{ok, Pid, Reply}.
sft_open(Filename) ->
{ok, Pid} = gen_server:start(?MODULE, [], []),
case gen_server:call(Pid, {sft_open, Filename}, infinity) of
ok ->
{ok, Pid};
Error ->
Error
end.
sft_get(Pid, Key) ->
file_request(Pid, {get_kv, Key}).
sft_getkeyrange(Pid, StartKey, EndKey, ScanWidth) ->
file_request(Pid, {get_keyrange, StartKey, EndKey, ScanWidth}).
sft_getkvrange(Pid, StartKey, EndKey, ScanWidth) ->
file_request(Pid, {get_kvrange, StartKey, EndKey, ScanWidth}).
sft_close(Pid) ->
file_request(Pid, close).
sft_clear(Pid) ->
file_request(Pid, clear).
%%%============================================================================
%%% API helper functions
%%%============================================================================
%% This saftey measure of checking the Pid is alive before perfoming any ops
%% is copied from the bitcask source code.
%%
%% It is not clear at present if this is necessary.
file_request(Pid, Request) ->
case check_pid(Pid) of
ok ->
try
gen_server:call(Pid, Request, infinity)
catch
exit:{normal,_} when Request == file_close ->
%% Honest race condition in bitcask_eqc PULSE test.
ok;
exit:{noproc,_} when Request == file_close ->
%% Honest race condition in bitcask_eqc PULSE test.
ok;
X1:X2 ->
exit({file_request_error, self(), Request, X1, X2})
end;
Error ->
Error
end.
check_pid(Pid) ->
IsPid = is_pid(Pid),
IsAlive = IsPid andalso is_process_alive(Pid),
case {IsAlive, IsPid} of
{true, _} ->
ok;
{false, true} ->
%% Same result as `file' module when accessing closed FD
{error, einval};
_ ->
%% Same result as `file' module when providing wrong arg
{error, badarg}
end.
%%%============================================================================
%%% gen_server callbacks
%%%============================================================================
init([]) ->
{ok, #state{}}.
handle_call({sft_new, Filename, KL1, KL2, Level}, _From, State) ->
case create_file(Filename) of
{error, Reason} ->
{reply, {error, Reason}, State};
{Handle, FileMD} ->
io:format("Creating file with inputs of size ~w ~w~n",
[length(KL1), length(KL2)]),
{ReadHandle, UpdFileMD, KeyRemainders} = complete_file(Handle,
FileMD,
KL1, KL2,
Level),
{KL1Rem, KL2Rem} = KeyRemainders,
io:format("File created with remainders of size ~w ~w~n",
[length(KL1Rem), length(KL2Rem)]),
{reply, {KeyRemainders,
UpdFileMD#state.smallest_key,
UpdFileMD#state.highest_key},
UpdFileMD#state{handle=ReadHandle}}
end;
handle_call({sft_open, Filename}, _From, _State) ->
{_Handle, FileMD} = open_file(Filename),
{reply, {FileMD#state.smallest_key, FileMD#state.highest_key}, FileMD};
handle_call({get_kv, Key}, _From, State) ->
Reply = fetch_keyvalue(State#state.handle, State, Key),
{reply, Reply, State};
handle_call({get_keyrange, StartKey, EndKey, ScanWidth}, _From, State) ->
Reply = fetch_range_keysonly(State#state.handle, State,
StartKey, EndKey,
ScanWidth),
{reply, Reply, State};
handle_call({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) ->
Reply = fetch_range_kv(State#state.handle, State,
StartKey, EndKey,
ScanWidth),
{reply, Reply, State};
handle_call(close, _From, State) ->
{reply, true, State};
handle_call(clear, _From, State) ->
ok = file:close(State#state.handle),
ok = file:delete(State#state.filename),
{reply, true, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%============================================================================
%%% Internal functions
%%%============================================================================
%% Start a bare file with an initial header and no further details
%% Return the {Handle, metadata record}
create_file(FileName) when is_list(FileName) ->
{ok, Handle} = file:open(FileName, [binary, raw, read, write]),
Header = create_header(initial),
{ok, _} = file:position(Handle, bof),
ok = file:write(Handle, Header),
{ok, StartPos} = file:position(Handle, cur),
FileMD = #state{next_position=StartPos, filename=FileName},
{Handle, FileMD}.
io:format("Opening file with filename ~s~n", [FileName]),
case file:open(FileName, [binary, raw, read, write]) of
{ok, Handle} ->
Header = create_header(initial),
{ok, _} = file:position(Handle, bof),
ok = file:write(Handle, Header),
{ok, StartPos} = file:position(Handle, cur),
FileMD = #state{next_position=StartPos, filename=FileName},
{Handle, FileMD};
{error, Reason} ->
io:format("Error opening filename ~s with reason ~s", [FileName, Reason]),
{error, Reason}
end.
create_header(initial) ->
@ -234,14 +395,17 @@ open_file(FileMD) ->
index_pointer=?HEADER_LEN + Blen,
filter_pointer=?HEADER_LEN + Blen + Ilen,
summ_pointer=?HEADER_LEN + Blen + Ilen + Flen,
summ_length=Slen}}.
summ_length=Slen,
handle=Handle}}.
%% Take a file handle with a previously created header and complete it based on
%% the two key lists KL1 and KL2
complete_file(Handle, FileMD, KL1, KL2, Level) ->
{ok, KeyRemainders} = write_keys(Handle,
KL1, KL2, [], <<>>,
maybe_expand_pointer(KL1),
maybe_expand_pointer(KL2),
[], <<>>,
Level,
fun sftwrite_function/2),
{ReadHandle, UpdFileMD} = open_file(FileMD),
@ -283,6 +447,10 @@ fetch_range_keysonly(Handle, FileMD, StartKey, EndKey) ->
fetch_range_keysonly(Handle, FileMD, StartKey, EndKey, ScanWidth) ->
fetch_range(Handle, FileMD, StartKey, EndKey, [], fun acc_list_keysonly/2, ScanWidth).
%% Fetches a range of keys returning the full tuple, including value
fetch_range_kv(Handle, FileMD, StartKey, EndKey, ScanWidth) ->
fetch_range(Handle, FileMD, StartKey, EndKey, [], fun acc_list_kv/2, ScanWidth).
acc_list_keysonly(null, empty) ->
[];
acc_list_keysonly(null, RList) ->
@ -290,6 +458,13 @@ acc_list_keysonly(null, RList) ->
acc_list_keysonly(R, RList) ->
lists:append(RList, [strip_to_key_seqn_only(R)]).
acc_list_kv(null, empty) ->
[];
acc_list_kv(null, RList) ->
RList;
acc_list_kv(R, RList) ->
lists:append(RList, [R]).
%% Iterate keys, returning a batch of keys & values in a range
%% - the iterator can have a ScanWidth which is how many slots should be
%% scanned by the iterator before returning a result
@ -351,9 +526,9 @@ scan_block([], StartKey, _EndKey, _FunList, _AccFun, Acc) ->
scan_block([HeadKV|T], StartKey, EndKey, FunList, AccFun, Acc) ->
K = strip_to_keyonly(HeadKV),
case K of
K when K < StartKey ->
K when K < StartKey, StartKey /= all ->
scan_block(T, StartKey, EndKey, FunList, AccFun, Acc);
K when K > EndKey ->
K when K > EndKey, EndKey /= all ->
{complete, Acc};
_ ->
case applyfuns(FunList, HeadKV) of
@ -367,6 +542,7 @@ scan_block([HeadKV|T], StartKey, EndKey, FunList, AccFun, Acc) ->
end
end.
applyfuns([], _KV) ->
include;
applyfuns([HeadFun|OtherFuns], KV) ->
@ -396,6 +572,16 @@ fetch_block(Handle, LengthList, BlockNumber, StartOfSlot) ->
binary_to_term(BlockToCheckBin).
%% Need to deal with either Key or {next, Key}
get_nearestkey(KVList, all) ->
case KVList of
[] ->
not_found;
[H|_Tail] ->
H;
_ ->
io:format("KVList issue ~w~n", [KVList]),
error
end;
get_nearestkey(KVList, Key) ->
case Key of
{first, K} ->
@ -486,7 +672,14 @@ write_keys(Handle, KL1, KL2, {SlotCount, SlotTotal},
full ->
write_keys(Handle, KL1rem, KL2rem, {SlotCount + 1, SlotTotal + 1},
UpdSlotIndex, UpdSlots,
SNExtremes, FirstKey, FinalKey, Level, WriteFun)
SNExtremes, FirstKey, FinalKey, Level, WriteFun);
complete ->
UpdHandle = WriteFun(slots , {Handle, UpdSlots}),
{complete_keywrite(UpdHandle,
UpdSlotIndex,
SNExtremes, {FirstKey, FinalKey},
WriteFun),
{KL1rem, KL2rem}}
end.
@ -712,6 +905,7 @@ serialise_block(BlockKeyList) ->
%% there are matching keys then the highest sequence number must be chosen and
%% any lower sequence numbers should be compacted out of existence
key_dominates([H1|T1], [], Level) ->
{_, _, St1, _} = H1,
case maybe_reap_expiredkey(St1, Level) of
@ -763,10 +957,35 @@ maybe_reap_expiredkey({_, TS}, {basement, CurrTS}) when CurrTS > TS ->
maybe_reap_expiredkey(_, _) ->
false.
%% Not worked out pointers yet
maybe_expand_pointer(Tail) ->
Tail.
%% When a list is provided it may include a pointer to gain another batch of
%% entries from the same file, or a new batch of entries from another file
%%
%% This resultant list should include the Tail of any pointers added at the
%% end of the list
maybe_expand_pointer([]) ->
[];
maybe_expand_pointer([H|Tail]) ->
case H of
{next, SFTPid, StartKey} ->
io:format("Scanning further on PID ~w ~w~n", [SFTPid, StartKey]),
QResult = sft_getkvrange(SFTPid, StartKey, all, ?MERGE_SCANWIDTH),
Acc = pointer_append_queryresults(QResult, SFTPid),
lists:append(Acc, Tail);
_ ->
[H|Tail]
end.
pointer_append_queryresults(Results, QueryPid) ->
case Results of
{complete, Acc} ->
Acc;
{partial, Acc, StartKey} ->
lists:append(Acc, [{next, QueryPid, StartKey}])
end.
%% Update the sequence numbers
update_sequencenumbers({_, SN, _, _}, 0, 0) ->
{SN, SN};
@ -1020,11 +1239,9 @@ findremainder(BitStr, Factor) ->
%%%%%%%%%%%%%%%%
% T E S T
%%%%%%%%%%%%%%%
%%%============================================================================
%%% Test
%%%============================================================================
speedtest_check_forsegment(_, 0, _, _) ->
true;
@ -1288,7 +1505,8 @@ testwrite_function(finalise, {Handle, C_SlotIndex, SNExtremes, KeyExtremes}) ->
writekeys_stage1_test() ->
{KL1, KL2} = sample_keylist(),
{FunOut, {_KL1Rem, _KL2Rem}} = write_keys([], KL1, KL2, [], <<>>, 1, fun testwrite_function/2),
{FunOut, {_KL1Rem, _KL2Rem}} = write_keys([], KL1, KL2, [], <<>>, 1,
fun testwrite_function/2),
{Handle, {_, PointerIndex}, SNExtremes, KeyExtremes} = FunOut,
?assertMatch(SNExtremes, {1,3}),
?assertMatch(KeyExtremes, {{o, "Bucket1", "Key1"},
@ -1377,6 +1595,17 @@ initial_iterator_test() ->
{{o, "Bucket1", "Key9"}, 1},
{{o, "Bucket1", "Key9a"}, 1},
{{o, "Bucket1", "Key9b"}, 1}]}),
Result3 = fetch_range_keysonly(UpdHandle, UpdFileMD,
{o, "Bucket3", "Key4"},
all),
{partial, RL3, _} = Result3,
?assertMatch(RL3, [{{o, "Bucket3", "Key4"}, 3},
{{o, "Bucket3", "Key5"}, 1},
{{o, "Bucket3", "Key6"}, 2},
{{o, "Bucket3", "Key7"}, 1},
{{o, "Bucket3", "Key8"}, 1},
{{o, "Bucket3", "Key9"}, 1},
{{o, "Bucket4", "Key1"}, 1}]),
ok = file:close(UpdHandle),
ok = file:delete(Filename).
@ -1399,6 +1628,11 @@ big_iterator_test() ->
32),
NumFoundKeys2 = length(Result2),
?assertMatch(NumFoundKeys2, 32 * 128),
{partial, Result3, _} = fetch_range_keysonly(Handle, FileMD, {o, "Bucket0000", "Key0000"},
{o, "Bucket9999", "Key9999"},
4),
NumFoundKeys3 = length(Result3),
?assertMatch(NumFoundKeys3, 4 * 128),
ok = file:close(Handle),
ok = file:delete(Filename).

161
src/leveled_worker.erl Normal file
View file

@ -0,0 +1,161 @@
%% Controlling asynchronour work in leveleddb to manage compaction within a
%% level and cleaning out of old files across a level
-module(leveled_worker).
-export([merge_file/3, perform_merge/3]).
-include_lib("eunit/include/eunit.hrl").
merge_file(_FileToMerge, _ManifestMgr, _Level) ->
%% CandidateList = leveled_manifest:get_manifest_atlevel(ManifestMgr, Level),
%% [Adds, Removes] = perform_merge(FileToMerge, CandidateList, Level),
%%leveled_manifest:update_manifest_atlevel(ManifestMgr, Level, Adds, Removes),
ok.
%% Assumption is that there is a single SFT from a higher level that needs
%% to be merged into multiple SFTs at a lower level. This should create an
%% entirely new set of SFTs, and the calling process can then update the
%% manifest.
%%
%% Once the FileToMerge has been emptied, the remainder of the candidate list
%% needs to be placed in a remainder SFT that may be of a sub-optimal (small)
%% size. This stops the need to perpetually roll over the whole level if the
%% level consists of already full files. Some smartness may be required when
%% selecting the candidate list so that small files just outside the candidate
%% list be included to avoid a proliferation of small files.
%%
%% FileToMerge should be a tuple of {FileName, Pid} where the Pid is the Pid of
%% the gen_server leveled_sft process representing the file.
%%
%% CandidateList should be a list of {StartKey, EndKey, Pid} tuples
%% representing different gen_server leveled_sft processes, sorted by StartKey.
%%
%% The level is the level which the new files should be created at.
perform_merge(FileToMerge, CandidateList, Level) ->
{Filename, UpperSFTPid} = FileToMerge,
MergeID = generate_merge_id(Filename, Level),
io:format("Merge to be commenced for FileToMerge=~s with MergeID=~s~n",
[Filename, MergeID]),
PointerList = lists:map(fun(P) -> {next, P, all} end, CandidateList),
do_merge([{next, UpperSFTPid, all}],
PointerList, Level, MergeID, 0, []).
do_merge([], [], Level, MergeID, FileCounter, OutList) ->
io:format("Merge completed with MergeID=~s Level=~w and FileCounter=~w~n",
[MergeID, Level, FileCounter]),
OutList;
do_merge(KL1, KL2, Level, MergeID, FileCounter, OutList) ->
FileName = lists:flatten(io_lib:format("../test/~s_~w.sft", [MergeID, FileCounter])),
io:format("File to be created as part of MergeID=~s Filename=~s~n", [MergeID, FileName]),
case leveled_sft:sft_new(FileName, KL1, KL2, Level) of
{ok, _Pid, {error, Reason}} ->
io:format("Exiting due to error~w~n", [Reason]);
{ok, Pid, Reply} ->
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply,
do_merge(KL1Rem, KL2Rem, Level, MergeID, FileCounter + 1,
lists:append(OutList, [{SmallestKey, HighestKey, Pid}]))
end.
generate_merge_id(Filename, Level) ->
<<A:32, C:16, D:16, E:48>> = crypto:rand_bytes(14),
FileID = erlang:phash2(Filename, 256),
B = FileID * 256 + Level,
Str = io_lib:format("~8.16.0b-~4.16.0b-4~3.16.0b-~4.16.0b-~12.16.0b",
[A, B, C band 16#0fff, D band 16#3fff bor 16#8000, E]),
list_to_binary(Str).
%%%============================================================================
%%% Test
%%%============================================================================
generate_randomkeys(Count, BucketRangeLow, BucketRangeHigh) ->
generate_randomkeys(Count, [], BucketRangeLow, BucketRangeHigh).
generate_randomkeys(0, Acc, _BucketLow, _BucketHigh) ->
Acc;
generate_randomkeys(Count, Acc, BucketLow, BRange) ->
BNumber = string:right(integer_to_list(BucketLow + random:uniform(BRange)),
4, $0),
KNumber = string:right(integer_to_list(random:uniform(1000)), 4, $0),
RandKey = {{o,
"Bucket" ++ BNumber,
"Key" ++ KNumber},
Count + 1,
{active, infinity}, null},
generate_randomkeys(Count - 1, [RandKey|Acc], BucketLow, BRange).
choose_pid_toquery([{StartKey, EndKey, Pid}|_T], Key) when Key >= StartKey,
EndKey >= Key ->
Pid;
choose_pid_toquery([_H|T], Key) ->
choose_pid_toquery(T, Key).
find_randomkeys(_FList, 0, _Source) ->
ok;
find_randomkeys(FList, Count, Source) ->
K1 = leveled_sft:strip_to_keyonly(lists:nth(random:uniform(length(Source)),
Source)),
P1 = choose_pid_toquery(FList, K1),
FoundKV = leveled_sft:sft_get(P1, K1),
case FoundKV of
not_present ->
io:format("Failed to find ~w in ~w~n", [K1, P1]),
?assertMatch(true, false);
_ ->
Found = leveled_sft:strip_to_keyonly(FoundKV),
io:format("success finding ~w in ~w~n", [K1, P1]),
?assertMatch(K1, Found)
end,
find_randomkeys(FList, Count - 1, Source).
merge_file_test() ->
KL1_L1 = lists:sort(generate_randomkeys(16000, 0, 1000)),
{ok, PidL1_1, _} = leveled_sft:sft_new("../test/KL1_L1.sft",
KL1_L1, [], 1),
KL1_L2 = lists:sort(generate_randomkeys(16000, 0, 250)),
{ok, PidL2_1, _} = leveled_sft:sft_new("../test/KL1_L2.sft",
KL1_L2, [], 2),
KL2_L2 = lists:sort(generate_randomkeys(16000, 250, 250)),
{ok, PidL2_2, _} = leveled_sft:sft_new("../test/KL2_L2.sft",
KL2_L2, [], 2),
KL3_L2 = lists:sort(generate_randomkeys(16000, 500, 250)),
{ok, PidL2_3, _} = leveled_sft:sft_new("../test/KL3_L2.sft",
KL3_L2, [], 2),
KL4_L2 = lists:sort(generate_randomkeys(16000, 750, 250)),
{ok, PidL2_4, _} = leveled_sft:sft_new("../test/KL4_L2.sft",
KL4_L2, [], 2),
Result = perform_merge({"../test/KL1_L1.sft", PidL1_1},
[PidL2_1, PidL2_2, PidL2_3, PidL2_4],
2),
lists:foreach(fun({{o, B1, K1}, {o, B2, K2}, R}) ->
io:format("Result of ~s ~s and ~s ~s with Pid ~w~n",
[B1, K1, B2, K2, R]) end,
Result),
io:format("Finding keys in KL1_L1~n"),
ok = find_randomkeys(Result, 50, KL1_L1),
io:format("Finding keys in KL1_L2~n"),
ok = find_randomkeys(Result, 50, KL1_L2),
io:format("Finding keys in KL2_L2~n"),
ok = find_randomkeys(Result, 50, KL2_L2),
io:format("Finding keys in KL3_L2~n"),
ok = find_randomkeys(Result, 50, KL3_L2),
io:format("Finding keys in KL4_L2~n"),
ok = find_randomkeys(Result, 50, KL4_L2),
leveled_sft:sft_clear(PidL1_1),
leveled_sft:sft_clear(PidL2_1),
leveled_sft:sft_clear(PidL2_2),
leveled_sft:sft_clear(PidL2_3),
leveled_sft:sft_clear(PidL2_4),
lists:foreach(fun({_StK, _EndK, Pid}) -> leveled_sft:sft_clear(Pid) end, Result).