Merge pull request #14 from martinsumner/mas-altsst

Mas altsst
This commit is contained in:
martinsumner 2017-01-04 23:30:42 +00:00 committed by GitHub
commit c43014a0ee
9 changed files with 2110 additions and 2386 deletions

View file

@ -1186,7 +1186,10 @@ hashtree_query_test() ->
{hashtree_query, {hashtree_query,
?STD_TAG, ?STD_TAG,
false}), false}),
?assertMatch(KeyHashList, HTFolder2()), L0 = length(KeyHashList),
HTR2 = HTFolder2(),
?assertMatch(L0, length(HTR2)),
?assertMatch(KeyHashList, HTR2),
ok = book_close(Bookie2), ok = book_close(Bookie2),
reset_filestructure(). reset_filestructure().

View file

@ -37,7 +37,6 @@
strip_to_keyonly/1, strip_to_keyonly/1,
strip_to_seqonly/1, strip_to_seqonly/1,
strip_to_statusonly/1, strip_to_statusonly/1,
strip_to_keyseqstatusonly/1,
strip_to_keyseqonly/1, strip_to_keyseqonly/1,
strip_to_seqnhashonly/1, strip_to_seqnhashonly/1,
striphead_to_details/1, striphead_to_details/1,
@ -80,8 +79,6 @@ magic_hash({?RIAK_TAG, Bucket, Key, _SubKey}) ->
magic_hash({Bucket, Key}); magic_hash({Bucket, Key});
magic_hash({?STD_TAG, Bucket, Key, _SubKey}) -> magic_hash({?STD_TAG, Bucket, Key, _SubKey}) ->
magic_hash({Bucket, Key}); magic_hash({Bucket, Key});
magic_hash({?IDX_TAG, _B, _Idx, _Key}) ->
no_lookup;
magic_hash(AnyKey) -> magic_hash(AnyKey) ->
BK = term_to_binary(AnyKey), BK = term_to_binary(AnyKey),
H = 5381, H = 5381,
@ -111,11 +108,8 @@ inker_reload_strategy(AltList) ->
ReloadStrategy0, ReloadStrategy0,
AltList). AltList).
strip_to_keyonly({keyonly, K}) -> K;
strip_to_keyonly({K, _V}) -> K. strip_to_keyonly({K, _V}) -> K.
strip_to_keyseqstatusonly({K, {SeqN, St, _, _MD}}) -> {K, SeqN, St}.
strip_to_statusonly({_, {_, St, _, _}}) -> St. strip_to_statusonly({_, {_, St, _, _}}) -> St.
strip_to_seqonly({_, {SeqN, _, _, _}}) -> SeqN. strip_to_seqonly({_, {SeqN, _, _, _}}) -> SeqN.

View file

@ -11,13 +11,15 @@
log_timer/3, log_timer/3,
put_timing/4, put_timing/4,
head_timing/4, head_timing/4,
get_timing/3]). get_timing/3,
sst_timing/3]).
-define(PUT_TIMING_LOGPOINT, 20000). -define(PUT_LOGPOINT, 20000).
-define(HEAD_TIMING_LOGPOINT, 160000). -define(HEAD_LOGPOINT, 160000).
-define(GET_TIMING_LOGPOINT, 160000). -define(GET_LOGPOINT, 160000).
-define(SST_LOGPOINT, 20000).
-define(LOG_LEVEL, [info, warn, error, critical]). -define(LOG_LEVEL, [info, warn, error, critical]).
-define(SAMPLE_RATE, 16#F). -define(SAMPLE_RATE, 16).
-define(LOGBASE, dict:from_list([ -define(LOGBASE, dict:from_list([
@ -94,7 +96,7 @@
{info, "Response to push_mem of ~w with " {info, "Response to push_mem of ~w with "
++ "L0 pending ~w and merge backlog ~w"}}, ++ "L0 pending ~w and merge backlog ~w"}},
{"P0019", {"P0019",
{info, "Rolling level zero to filename ~s"}}, {info, "Rolling level zero to filename ~s at ledger sqn ~w"}},
{"P0020", {"P0020",
{info, "Work at Level ~w to be scheduled for ~w with ~w " {info, "Work at Level ~w to be scheduled for ~w with ~w "
++ "queue items outstanding at all levels"}}, ++ "queue items outstanding at all levels"}},
@ -150,8 +152,6 @@
{info, "File to be created as part of MSN=~w Filename=~s"}}, {info, "File to be created as part of MSN=~w Filename=~s"}},
{"PC013", {"PC013",
{warn, "Merge resulted in empty file ~s"}}, {warn, "Merge resulted in empty file ~s"}},
{"PC014",
{info, "Empty file ~s to be cleared"}},
{"PC015", {"PC015",
{info, "File created"}}, {info, "File created"}},
{"PC016", {"PC016",
@ -230,35 +230,26 @@
{"PM002", {"PM002",
{info, "Completed dump of L0 cache to list of size ~w"}}, {info, "Completed dump of L0 cache to list of size ~w"}},
{"SST01",
{"SFT01", {info, "SST timing for result ~w is sample ~w total ~w and max ~w"}},
{info, "Opened filename with name ~s"}}, {"SST02",
{"SFT02", {error, "False result returned from SST with filename ~s as "
{info, "File ~s has been set for delete"}}, ++ "slot ~w has failed crc check"}},
{"SFT03", {"SST03",
{info, "File creation of L0 file ~s"}}, {info, "Opening SST file with filename ~s keys ~w slots ~w and"
{"SFT04", ++ " max sqn ~w"}},
{debug, "File ~s prompting for delete status check"}}, {"SST04",
{"SFT05",
{info, "Exit called for reason ~w on filename ~s"}}, {info, "Exit called for reason ~w on filename ~s"}},
{"SFT06", {"SST05",
{info, "Exit called and now clearing ~s"}},
{"SFT07",
{info, "Creating file with input of size ~w"}},
{"SFT08",
{info, "Renaming file from ~s to ~s"}},
{"SFT09",
{warn, "Filename ~s already exists"}},
{"SFT10",
{warn, "Rename rogue filename ~s to ~s"}}, {warn, "Rename rogue filename ~s to ~s"}},
{"SFT11", {"SST06",
{error, "Segment filter failed due to ~s"}}, {info, "File ~s has been set for delete"}},
{"SFT12", {"SST07",
{error, "Segment filter failed due to CRC check ~w did not match ~w"}}, {info, "Exit called and now clearing ~s"}},
{"SFT13", {"SST08",
{error, "Segment filter failed due to ~s"}}, {info, "Completed creation of ~s at level ~w with max sqn ~w"}},
{"SFT14", {"SST09",
{debug, "Range fetch from SFT PID ~w"}}, {warn, "Read request exposes slot with bad CRC"}},
{"CDB01", {"CDB01",
{info, "Opening file for writing with filename ~s"}}, {info, "Opening file for writing with filename ~s"}},
@ -333,14 +324,13 @@ log_timer(LogReference, Subs, StartTime) ->
end. end.
%% Make a log of put timings split out by actor - one log for every %% Make a log of put timings split out by actor - one log for every
%% PUT_TIMING_LOGPOINT puts %% PUT_LOGPOINT puts
put_timing(_Actor, undefined, T0, T1) -> put_timing(_Actor, undefined, T0, T1) ->
{1, {T0, T1}, {T0, T1}}; {1, {T0, T1}, {T0, T1}};
put_timing(Actor, {?PUT_TIMING_LOGPOINT, {Total0, Total1}, {Max0, Max1}}, put_timing(Actor, {?PUT_LOGPOINT, {Total0, Total1}, {Max0, Max1}}, T0, T1) ->
T0, T1) -> RN = random:uniform(?HEAD_LOGPOINT),
RN = random:uniform(?HEAD_TIMING_LOGPOINT), case RN > ?HEAD_LOGPOINT div 2 of
case RN > ?HEAD_TIMING_LOGPOINT div 2 of
true -> true ->
% log at the timing point less than half the time % log at the timing point less than half the time
LogRef = LogRef =
@ -349,7 +339,7 @@ put_timing(Actor, {?PUT_TIMING_LOGPOINT, {Total0, Total1}, {Max0, Max1}},
inker -> "I0019"; inker -> "I0019";
journal -> "CDB17" journal -> "CDB17"
end, end,
log(LogRef, [?PUT_TIMING_LOGPOINT, Total0, Total1, Max0, Max1]), log(LogRef, [?PUT_LOGPOINT, Total0, Total1, Max0, Max1]),
put_timing(Actor, undefined, T0, T1); put_timing(Actor, undefined, T0, T1);
false -> false ->
% Log some other random time % Log some other random time
@ -359,13 +349,13 @@ put_timing(_Actor, {N, {Total0, Total1}, {Max0, Max1}}, T0, T1) ->
{N + 1, {Total0 + T0, Total1 + T1}, {max(Max0, T0), max(Max1, T1)}}. {N + 1, {Total0 + T0, Total1 + T1}, {max(Max0, T0), max(Max1, T1)}}.
%% Make a log of penciller head timings split out by level and result - one %% Make a log of penciller head timings split out by level and result - one
%% log for every HEAD_TIMING_LOGPOINT puts %% log for every HEAD_LOGPOINT puts
%% Returns a tuple of {Count, TimingDict} to be stored on the process state %% Returns a tuple of {Count, TimingDict} to be stored on the process state
head_timing(undefined, SW, Level, R) -> head_timing(undefined, SW, Level, R) ->
T0 = timer:now_diff(os:timestamp(), SW), T0 = timer:now_diff(os:timestamp(), SW),
head_timing_int(undefined, T0, Level, R); head_timing_int(undefined, T0, Level, R);
head_timing({N, HeadTimingD}, SW, Level, R) -> head_timing({N, HeadTimingD}, SW, Level, R) ->
case N band ?SAMPLE_RATE of case N band (?SAMPLE_RATE - 1) of
0 -> 0 ->
T0 = timer:now_diff(os:timestamp(), SW), T0 = timer:now_diff(os:timestamp(), SW),
head_timing_int({N, HeadTimingD}, T0, Level, R); head_timing_int({N, HeadTimingD}, T0, Level, R);
@ -384,9 +374,9 @@ head_timing_int(undefined, T0, Level, R) ->
dict:store(K, [0, 0, 0], Acc) dict:store(K, [0, 0, 0], Acc)
end end, end end,
{1, lists:foldl(NewDFun, dict:new(), head_keylist())}; {1, lists:foldl(NewDFun, dict:new(), head_keylist())};
head_timing_int({?HEAD_TIMING_LOGPOINT, HeadTimingD}, T0, Level, R) -> head_timing_int({?HEAD_LOGPOINT, HeadTimingD}, T0, Level, R) ->
RN = random:uniform(?HEAD_TIMING_LOGPOINT), RN = random:uniform(?HEAD_LOGPOINT),
case RN > ?HEAD_TIMING_LOGPOINT div 2 of case RN > ?HEAD_LOGPOINT div 2 of
true -> true ->
% log at the timing point less than half the time % log at the timing point less than half the time
LogFun = fun(K) -> log("P0032", [K|dict:fetch(K, HeadTimingD)]) end, LogFun = fun(K) -> log("P0032", [K|dict:fetch(K, HeadTimingD)]) end,
@ -419,21 +409,61 @@ head_keylist() ->
[not_present, found_lower, found_0, found_1, found_2]. [not_present, found_lower, found_0, found_1, found_2].
sst_timing(undefined, SW, TimerType) ->
T0 = timer:now_diff(os:timestamp(), SW),
gen_timing_int(undefined,
T0,
TimerType,
fun sst_keylist/0,
?SST_LOGPOINT,
"SST01");
sst_timing({N, SSTTimerD}, SW, TimerType) ->
case N band (?SAMPLE_RATE - 1) of
0 ->
T0 = timer:now_diff(os:timestamp(), SW),
gen_timing_int({N, SSTTimerD},
T0,
TimerType,
fun sst_keylist/0,
?SST_LOGPOINT,
"SST01");
_ ->
% Not to be sampled this time
{N + 1, SSTTimerD}
end.
sst_keylist() ->
[slot_bloom, slot_fetch].
get_timing(undefined, SW, TimerType) -> get_timing(undefined, SW, TimerType) ->
T0 = timer:now_diff(os:timestamp(), SW), T0 = timer:now_diff(os:timestamp(), SW),
get_timing_int(undefined, T0, TimerType); gen_timing_int(undefined,
T0,
TimerType,
fun get_keylist/0,
?GET_LOGPOINT,
"B0014");
get_timing({N, GetTimerD}, SW, TimerType) -> get_timing({N, GetTimerD}, SW, TimerType) ->
case N band ?SAMPLE_RATE of case N band (?SAMPLE_RATE - 1) of
0 -> 0 ->
T0 = timer:now_diff(os:timestamp(), SW), T0 = timer:now_diff(os:timestamp(), SW),
get_timing_int({N, GetTimerD}, T0, TimerType); gen_timing_int({N, GetTimerD},
T0,
TimerType,
fun get_keylist/0,
?GET_LOGPOINT,
"B0014");
_ -> _ ->
% Not to be sampled this time % Not to be sampled this time
{N + 1, GetTimerD} {N + 1, GetTimerD}
end. end.
get_timing_int(undefined, T0, TimerType) -> get_keylist() ->
[head_not_present, head_found, fetch].
gen_timing_int(undefined, T0, TimerType, KeyListFun, _LogPoint, _LogRef) ->
NewDFun = fun(K, Acc) -> NewDFun = fun(K, Acc) ->
case K of case K of
TimerType -> TimerType ->
@ -441,31 +471,32 @@ get_timing_int(undefined, T0, TimerType) ->
_ -> _ ->
dict:store(K, [0, 0, 0], Acc) dict:store(K, [0, 0, 0], Acc)
end end, end end,
{1, lists:foldl(NewDFun, dict:new(), get_keylist())}; {1, lists:foldl(NewDFun, dict:new(), KeyListFun())};
get_timing_int({?GET_TIMING_LOGPOINT, GetTimerD}, T0, TimerType) -> gen_timing_int({LogPoint, TimerD}, T0, TimerType, KeyListFun, LogPoint,
RN = random:uniform(?GET_TIMING_LOGPOINT), LogRef) ->
case RN > ?GET_TIMING_LOGPOINT div 2 of RN = random:uniform(LogPoint),
case RN > LogPoint div 2 of
true -> true ->
% log at the timing point less than half the time % log at the timing point less than half the time
LogFun = fun(K) -> log("B0014", [K|dict:fetch(K, GetTimerD)]) end, LogFun = fun(K) -> log(LogRef, [K|dict:fetch(K, TimerD)]) end,
lists:foreach(LogFun, get_keylist()), lists:foreach(LogFun, KeyListFun()),
get_timing_int(undefined, T0, TimerType); gen_timing_int(undefined, T0, TimerType,
KeyListFun, LogPoint, LogRef);
false -> false ->
% Log some other time - reset to RN not 0 to stagger logs out over % Log some other time - reset to RN not 0 to stagger logs out over
% time between the vnodes % time between the vnodes
get_timing_int({RN, GetTimerD}, T0, TimerType) gen_timing_int({RN, TimerD}, T0, TimerType,
KeyListFun, LogPoint, LogRef)
end; end;
get_timing_int({N, GetTimerD}, T0, TimerType) -> gen_timing_int({N, TimerD}, T0, TimerType, _KeyListFun, _LogPoint, _LogRef) ->
[Count0, Total0, Max0] = dict:fetch(TimerType, GetTimerD), [Count0, Total0, Max0] = dict:fetch(TimerType, TimerD),
{N + 1, {N + 1,
dict:store(TimerType, dict:store(TimerType,
[Count0 + 1, Total0 + T0, max(Max0, T0)], [Count0 + 1, Total0 + T0, max(Max0, T0)],
GetTimerD)}. TimerD)}.
get_keylist() ->
[head_not_present, head_found, fetch].
%%%============================================================================ %%%============================================================================
%%% Test %%% Test

View file

@ -9,7 +9,7 @@
%% %%
%% -------- COMMITTING MANIFEST CHANGES --------- %% -------- COMMITTING MANIFEST CHANGES ---------
%% %%
%% Once the Penciller has taken a manifest change, the SFT file owners which no %% Once the Penciller has taken a manifest change, the SST file owners which no
%% longer form part of the manifest will be marked for delete. By marking for %% longer form part of the manifest will be marked for delete. By marking for
%% deletion, the owners will poll to confirm when it is safe for them to be %% deletion, the owners will poll to confirm when it is safe for them to be
%% deleted. %% deleted.
@ -225,7 +225,7 @@ merge(WI) ->
mark_for_delete([], _Penciller) -> mark_for_delete([], _Penciller) ->
ok; ok;
mark_for_delete([Head|Tail], Penciller) -> mark_for_delete([Head|Tail], Penciller) ->
ok = leveled_sft:sft_setfordelete(Head#manifest_entry.owner, Penciller), ok = leveled_sst:sst_setfordelete(Head#manifest_entry.owner, Penciller),
mark_for_delete(Tail, Penciller). mark_for_delete(Tail, Penciller).
@ -268,13 +268,13 @@ select_filetomerge(SrcLevel, Manifest) ->
%% Assumption is that there is a single SFT from a higher level that needs %% Assumption is that there is a single SST from a higher level that needs
%% to be merged into multiple SFTs at a lower level. This should create an %% to be merged into multiple SSTs at a lower level. This should create an
%% entirely new set of SFTs, and the calling process can then update the %% entirely new set of SSTs, and the calling process can then update the
%% manifest. %% manifest.
%% %%
%% Once the FileToMerge has been emptied, the remainder of the candidate list %% Once the FileToMerge has been emptied, the remainder of the candidate list
%% needs to be placed in a remainder SFT that may be of a sub-optimal (small) %% needs to be placed in a remainder SST that may be of a sub-optimal (small)
%% size. This stops the need to perpetually roll over the whole level if the %% size. This stops the need to perpetually roll over the whole level if the
%% level consists of already full files. Some smartness may be required when %% level consists of already full files. Some smartness may be required when
%% selecting the candidate list so that small files just outside the candidate %% selecting the candidate list so that small files just outside the candidate
@ -293,40 +293,31 @@ perform_merge({SrcPid, SrcFN}, CandidateList, LevelInfo, {Filepath, MSN}) ->
PointerList = lists:map(fun(P) -> PointerList = lists:map(fun(P) ->
{next, P#manifest_entry.owner, all} end, {next, P#manifest_entry.owner, all} end,
CandidateList), CandidateList),
MaxSQN = leveled_sst:sst_getmaxsequencenumber(SrcPid),
do_merge([{next, SrcPid, all}], do_merge([{next, SrcPid, all}],
PointerList, PointerList,
LevelInfo, LevelInfo,
{Filepath, MSN}, {Filepath, MSN},
MaxSQN,
0, 0,
[]). []).
do_merge([], [], {SrcLevel, _IsB}, {_Filepath, MSN}, FileCounter, OutList) -> do_merge([], [], {SrcLevel, _IsB}, {_Filepath, MSN}, _MaxSQN,
FileCounter, OutList) ->
leveled_log:log("PC011", [MSN, SrcLevel, FileCounter]), leveled_log:log("PC011", [MSN, SrcLevel, FileCounter]),
OutList; OutList;
do_merge(KL1, KL2, {SrcLevel, IsB}, {Filepath, MSN}, FileCounter, OutList) -> do_merge(KL1, KL2, {SrcLevel, IsB}, {Filepath, MSN}, MaxSQN,
FileName = lists:flatten(io_lib:format(Filepath ++ "_~w_~w.sft", FileCounter, OutList) ->
FileName = lists:flatten(io_lib:format(Filepath ++ "_~w_~w.sst",
[SrcLevel + 1, FileCounter])), [SrcLevel + 1, FileCounter])),
leveled_log:log("PC012", [MSN, FileName]), leveled_log:log("PC012", [MSN, FileName]),
TS1 = os:timestamp(), TS1 = os:timestamp(),
LevelR = case IsB of case leveled_sst:sst_new(FileName, KL1, KL2, IsB, SrcLevel + 1, MaxSQN) of
true -> empty ->
#level{level = SrcLevel + 1,
is_basement = true,
timestamp = leveled_codec:integer_now()};
false ->
SrcLevel + 1
end,
{ok, Pid, Reply} = leveled_sft:sft_new(FileName,
KL1,
KL2,
LevelR),
case Reply of
{{[], []}, null, _} ->
leveled_log:log("PC013", [FileName]), leveled_log:log("PC013", [FileName]),
leveled_log:log("PC014", [FileName]),
ok = leveled_sft:sft_clear(Pid),
OutList; OutList;
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} -> {ok, Pid, Reply} ->
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply,
ExtMan = lists:append(OutList, ExtMan = lists:append(OutList,
[#manifest_entry{start_key=SmallestKey, [#manifest_entry{start_key=SmallestKey,
end_key=HighestKey, end_key=HighestKey,
@ -334,7 +325,7 @@ do_merge(KL1, KL2, {SrcLevel, IsB}, {Filepath, MSN}, FileCounter, OutList) ->
filename=FileName}]), filename=FileName}]),
leveled_log:log_timer("PC015", [], TS1), leveled_log:log_timer("PC015", [], TS1),
do_merge(KL1Rem, KL2Rem, do_merge(KL1Rem, KL2Rem,
{SrcLevel, IsB}, {Filepath, MSN}, {SrcLevel, IsB}, {Filepath, MSN}, MaxSQN,
FileCounter + 1, ExtMan) FileCounter + 1, ExtMan)
end. end.
@ -384,7 +375,7 @@ find_randomkeys(FList, Count, Source) ->
KV1 = lists:nth(random:uniform(length(Source)), Source), KV1 = lists:nth(random:uniform(length(Source)), Source),
K1 = leveled_codec:strip_to_keyonly(KV1), K1 = leveled_codec:strip_to_keyonly(KV1),
P1 = choose_pid_toquery(FList, K1), P1 = choose_pid_toquery(FList, K1),
FoundKV = leveled_sft:sft_get(P1, K1), FoundKV = leveled_sst:sst_get(P1, K1),
Found = leveled_codec:strip_to_keyonly(FoundKV), Found = leveled_codec:strip_to_keyonly(FoundKV),
io:format("success finding ~w in ~w~n", [K1, P1]), io:format("success finding ~w in ~w~n", [K1, P1]),
?assertMatch(K1, Found), ?assertMatch(K1, Found),
@ -393,21 +384,31 @@ find_randomkeys(FList, Count, Source) ->
merge_file_test() -> merge_file_test() ->
KL1_L1 = lists:sort(generate_randomkeys(8000, 0, 1000)), KL1_L1 = lists:sort(generate_randomkeys(8000, 0, 1000)),
{ok, PidL1_1, _} = leveled_sft:sft_new("../test/KL1_L1.sft", {ok, PidL1_1, _} = leveled_sst:sst_new("../test/KL1_L1.sst",
KL1_L1, [], 1), 1,
KL1_L1,
undefined),
KL1_L2 = lists:sort(generate_randomkeys(8000, 0, 250)), KL1_L2 = lists:sort(generate_randomkeys(8000, 0, 250)),
{ok, PidL2_1, _} = leveled_sft:sft_new("../test/KL1_L2.sft", {ok, PidL2_1, _} = leveled_sst:sst_new("../test/KL1_L2.sst",
KL1_L2, [], 2), 2,
KL1_L2,
undefined),
KL2_L2 = lists:sort(generate_randomkeys(8000, 250, 250)), KL2_L2 = lists:sort(generate_randomkeys(8000, 250, 250)),
{ok, PidL2_2, _} = leveled_sft:sft_new("../test/KL2_L2.sft", {ok, PidL2_2, _} = leveled_sst:sst_new("../test/KL2_L2.sst",
KL2_L2, [], 2), 2,
KL2_L2,
undefined),
KL3_L2 = lists:sort(generate_randomkeys(8000, 500, 250)), KL3_L2 = lists:sort(generate_randomkeys(8000, 500, 250)),
{ok, PidL2_3, _} = leveled_sft:sft_new("../test/KL3_L2.sft", {ok, PidL2_3, _} = leveled_sst:sst_new("../test/KL3_L2.sst",
KL3_L2, [], 2), 2,
KL3_L2,
undefined),
KL4_L2 = lists:sort(generate_randomkeys(8000, 750, 250)), KL4_L2 = lists:sort(generate_randomkeys(8000, 750, 250)),
{ok, PidL2_4, _} = leveled_sft:sft_new("../test/KL4_L2.sft", {ok, PidL2_4, _} = leveled_sst:sst_new("../test/KL4_L2.sst",
KL4_L2, [], 2), 2,
Result = perform_merge({PidL1_1, "../test/KL1_L1.sft"}, KL4_L2,
undefined),
Result = perform_merge({PidL1_1, "../test/KL1_L1.sst"},
[#manifest_entry{owner=PidL2_1}, [#manifest_entry{owner=PidL2_1},
#manifest_entry{owner=PidL2_2}, #manifest_entry{owner=PidL2_2},
#manifest_entry{owner=PidL2_3}, #manifest_entry{owner=PidL2_3},
@ -429,13 +430,13 @@ merge_file_test() ->
ok = find_randomkeys(Result, 50, KL3_L2), ok = find_randomkeys(Result, 50, KL3_L2),
io:format("Finding keys in KL4_L2~n"), io:format("Finding keys in KL4_L2~n"),
ok = find_randomkeys(Result, 50, KL4_L2), ok = find_randomkeys(Result, 50, KL4_L2),
leveled_sft:sft_clear(PidL1_1), leveled_sst:sst_clear(PidL1_1),
leveled_sft:sft_clear(PidL2_1), leveled_sst:sst_clear(PidL2_1),
leveled_sft:sft_clear(PidL2_2), leveled_sst:sst_clear(PidL2_2),
leveled_sft:sft_clear(PidL2_3), leveled_sst:sst_clear(PidL2_3),
leveled_sft:sft_clear(PidL2_4), leveled_sst:sst_clear(PidL2_4),
lists:foreach(fun(ManEntry) -> lists:foreach(fun(ManEntry) ->
leveled_sft:sft_clear(ManEntry#manifest_entry.owner) end, leveled_sst:sst_clear(ManEntry#manifest_entry.owner) end,
Result). Result).
select_merge_candidates_test() -> select_merge_candidates_test() ->

View file

@ -22,17 +22,17 @@
%% %%
%% The Ledger is divided into many levels %% The Ledger is divided into many levels
%% - L0: New keys are received from the Bookie and merged into a single %% - L0: New keys are received from the Bookie and merged into a single
%% gb_tree, until that tree is the size of a SFT file, and it is then persisted %% gb_tree, until that tree is the size of a SST file, and it is then persisted
%% as a SFT file at this level. L0 SFT files can be larger than the normal %% as a SST file at this level. L0 SST files can be larger than the normal
%% maximum size - so we don't have to consider problems of either having more %% maximum size - so we don't have to consider problems of either having more
%% than one L0 file (and handling what happens on a crash between writing the %% than one L0 file (and handling what happens on a crash between writing the
%% files when the second may have overlapping sequence numbers), or having a %% files when the second may have overlapping sequence numbers), or having a
%% remainder with overlapping in sequence numbers in memory after the file is %% remainder with overlapping in sequence numbers in memory after the file is
%% written. Once the persistence is completed, the L0 tree can be erased. %% written. Once the persistence is completed, the L0 tree can be erased.
%% There can be only one SFT file at Level 0, so the work to merge that file %% There can be only one SST file at Level 0, so the work to merge that file
%% to the lower level must be the highest priority, as otherwise writes to the %% to the lower level must be the highest priority, as otherwise writes to the
%% ledger will stall, when there is next a need to persist. %% ledger will stall, when there is next a need to persist.
%% - L1 TO L7: May contain multiple processes managing non-overlapping sft %% - L1 TO L7: May contain multiple processes managing non-overlapping SST
%% files. Compaction work should be sheduled if the number of files exceeds %% files. Compaction work should be sheduled if the number of files exceeds
%% the target size of the level, where the target size is 8 ^ n. %% the target size of the level, where the target size is 8 ^ n.
%% %%
@ -67,14 +67,14 @@
%% completed to merge the tree into the L0 tree. %% completed to merge the tree into the L0 tree.
%% %%
%% The Penciller MUST NOT accept a new PUSH if the Clerk has commenced the %% The Penciller MUST NOT accept a new PUSH if the Clerk has commenced the
%% conversion of the current L0 tree into a SFT file, but not completed this %% conversion of the current L0 tree into a SST file, but not completed this
%% change. The Penciller in this case returns the push, and the Bookie should %% change. The Penciller in this case returns the push, and the Bookie should
%% continue to grow the cache before trying again. %% continue to grow the cache before trying again.
%% %%
%% ---------- FETCH ---------- %% ---------- FETCH ----------
%% %%
%% On request to fetch a key the Penciller should look first in the in-memory %% On request to fetch a key the Penciller should look first in the in-memory
%% L0 tree, then look in the SFT files Level by Level (including level 0), %% L0 tree, then look in the SST files Level by Level (including level 0),
%% consulting the Manifest to determine which file should be checked at each %% consulting the Manifest to determine which file should be checked at each
%% level. %% level.
%% %%
@ -82,16 +82,16 @@
%% %%
%% Iterators may request a snapshot of the database. A snapshot is a cloned %% Iterators may request a snapshot of the database. A snapshot is a cloned
%% Penciller seeded not from disk, but by the in-memory L0 gb_tree and the %% Penciller seeded not from disk, but by the in-memory L0 gb_tree and the
%% in-memory manifest, allowing for direct reference for the SFT file processes. %% in-memory manifest, allowing for direct reference for the SST file processes.
%% %%
%% Clones formed to support snapshots are registered by the Penciller, so that %% Clones formed to support snapshots are registered by the Penciller, so that
%% SFT files valid at the point of the snapshot until either the iterator is %% SST files valid at the point of the snapshot until either the iterator is
%% completed or has timed out. %% completed or has timed out.
%% %%
%% ---------- ON STARTUP ---------- %% ---------- ON STARTUP ----------
%% %%
%% On Startup the Bookie with ask the Penciller to initiate the Ledger first. %% On Startup the Bookie with ask the Penciller to initiate the Ledger first.
%% To initiate the Ledger the must consult the manifest, and then start a SFT %% To initiate the Ledger the must consult the manifest, and then start a SST
%% management process for each file in the manifest. %% management process for each file in the manifest.
%% %%
%% The penciller should then try and read any Level 0 file which has the %% The penciller should then try and read any Level 0 file which has the
@ -103,14 +103,14 @@
%% ---------- ON SHUTDOWN ---------- %% ---------- ON SHUTDOWN ----------
%% %%
%% On a controlled shutdown the Penciller should attempt to write any in-memory %% On a controlled shutdown the Penciller should attempt to write any in-memory
%% ETS table to a L0 SFT file, assuming one is nto already pending. If one is %% ETS table to a L0 SST file, assuming one is nto already pending. If one is
%% already pending then the Penciller will not persist this part of the Ledger. %% already pending then the Penciller will not persist this part of the Ledger.
%% %%
%% ---------- FOLDER STRUCTURE ---------- %% ---------- FOLDER STRUCTURE ----------
%% %%
%% The following folders are used by the Penciller %% The following folders are used by the Penciller
%% $ROOT/ledger/ledger_manifest/ - used for keeping manifest files %% $ROOT/ledger/ledger_manifest/ - used for keeping manifest files
%% $ROOT/ledger/ledger_files/ - containing individual SFT files %% $ROOT/ledger/ledger_files/ - containing individual SST files
%% %%
%% In larger stores there could be a large number of files in the ledger_file %% In larger stores there could be a large number of files in the ledger_file
%% folder - perhaps o(1000). It is assumed that modern file systems should %% folder - perhaps o(1000). It is assumed that modern file systems should
@ -120,7 +120,7 @@
%% %%
%% The Penciller can have one and only one Clerk for performing compaction %% The Penciller can have one and only one Clerk for performing compaction
%% work. When the Clerk has requested and taken work, it should perform the %% work. When the Clerk has requested and taken work, it should perform the
%5 compaction work starting the new SFT process to manage the new Ledger state %5 compaction work starting the new SST process to manage the new Ledger state
%% and then write a new manifest file that represents that state with using %% and then write a new manifest file that represents that state with using
%% the next Manifest sequence number as the filename: %% the next Manifest sequence number as the filename:
%% - nonzero_<ManifestSQN#>.pnd %% - nonzero_<ManifestSQN#>.pnd
@ -130,14 +130,14 @@
%% %%
%% On startup, the Penciller should look for the nonzero_*.crr file with the %% On startup, the Penciller should look for the nonzero_*.crr file with the
%% highest such manifest sequence number. This will be started as the %% highest such manifest sequence number. This will be started as the
%% manifest, together with any _0_0.sft file found at that Manifest SQN. %% manifest, together with any _0_0.sst file found at that Manifest SQN.
%% Level zero files are not kept in the persisted manifest, and adding a L0 %% Level zero files are not kept in the persisted manifest, and adding a L0
%% file does not advanced the Manifest SQN. %% file does not advanced the Manifest SQN.
%% %%
%% The pace at which the store can accept updates will be dependent on the %% The pace at which the store can accept updates will be dependent on the
%% speed at which the Penciller's Clerk can merge files at lower levels plus %% speed at which the Penciller's Clerk can merge files at lower levels plus
%% the time it takes to merge from Level 0. As if a clerk has commenced %% the time it takes to merge from Level 0. As if a clerk has commenced
%% compaction work at a lower level and then immediately a L0 SFT file is %% compaction work at a lower level and then immediately a L0 SST file is
%% written the Penciller will need to wait for this compaction work to %% written the Penciller will need to wait for this compaction work to
%% complete and the L0 file to be compacted before the ETS table can be %% complete and the L0 file to be compacted before the ETS table can be
%% allowed to again reach capacity %% allowed to again reach capacity
@ -145,7 +145,7 @@
%% The writing of L0 files do not require the involvement of the clerk. %% The writing of L0 files do not require the involvement of the clerk.
%% The L0 files are prompted directly by the penciller when the in-memory tree %% The L0 files are prompted directly by the penciller when the in-memory tree
%% has reached capacity. This places the penciller in a levelzero_pending %% has reached capacity. This places the penciller in a levelzero_pending
%% state, and in this state it must return new pushes. Once the SFT file has %% state, and in this state it must return new pushes. Once the SST file has
%% been completed it will confirm completion to the penciller which can then %% been completed it will confirm completion to the penciller which can then
%% revert the levelzero_pending state, add the file to the manifest and clear %% revert the levelzero_pending state, add the file to the manifest and clear
%% the current level zero in-memory view. %% the current level zero in-memory view.
@ -172,7 +172,6 @@
pcl_fetchkeys/5, pcl_fetchkeys/5,
pcl_fetchnextkey/5, pcl_fetchnextkey/5,
pcl_checksequencenumber/3, pcl_checksequencenumber/3,
pcl_checksequencenumber/4,
pcl_workforclerk/1, pcl_workforclerk/1,
pcl_promptmanifestchange/2, pcl_promptmanifestchange/2,
pcl_confirml0complete/4, pcl_confirml0complete/4,
@ -203,6 +202,7 @@
-define(WORKQUEUE_BACKLOG_TOLERANCE, 4). -define(WORKQUEUE_BACKLOG_TOLERANCE, 4).
-define(COIN_SIDECOUNT, 5). -define(COIN_SIDECOUNT, 5).
-define(SLOW_FETCH, 20000). -define(SLOW_FETCH, 20000).
-define(ITERATOR_SCANWIDTH, 4).
-record(state, {manifest = [] :: list(), -record(state, {manifest = [] :: list(),
manifest_sqn = 0 :: integer(), manifest_sqn = 0 :: integer(),
@ -280,9 +280,6 @@ pcl_checksequencenumber(Pid, Key, SQN) ->
gen_server:call(Pid, {check_sqn, Key, Hash, SQN}, infinity) gen_server:call(Pid, {check_sqn, Key, Hash, SQN}, infinity)
end. end.
pcl_checksequencenumber(Pid, Key, Hash, SQN) ->
gen_server:call(Pid, {check_sqn, Key, Hash, SQN}, infinity).
pcl_workforclerk(Pid) -> pcl_workforclerk(Pid) ->
gen_server:call(Pid, work_for_clerk, infinity). gen_server:call(Pid, work_for_clerk, infinity).
@ -399,10 +396,10 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys},
List -> List ->
List List
end, end,
SFTiter = initiate_rangequery_frommanifest(StartKey, SSTiter = initiate_rangequery_frommanifest(StartKey,
EndKey, EndKey,
State#state.manifest), State#state.manifest),
Acc = keyfolder({L0AsList, SFTiter}, Acc = keyfolder({L0AsList, SSTiter},
{StartKey, EndKey}, {StartKey, EndKey},
{AccFun, InitAcc}, {AccFun, InitAcc},
MaxKeys), MaxKeys),
@ -456,7 +453,7 @@ handle_cast({confirm_delete, FileName}, State=#state{is_snapshot=Snap})
{true, Pid} -> {true, Pid} ->
UF1 = lists:keydelete(FileName, 1, State#state.unreferenced_files), UF1 = lists:keydelete(FileName, 1, State#state.unreferenced_files),
leveled_log:log("P0005", [FileName]), leveled_log:log("P0005", [FileName]),
ok = leveled_sft:sft_deleteconfirmed(Pid), ok = leveled_sst:sst_deleteconfirmed(Pid),
{noreply, State#state{unreferenced_files=UF1}}; {noreply, State#state{unreferenced_files=UF1}};
_ -> _ ->
{noreply, State} {noreply, State}
@ -525,7 +522,7 @@ terminate(Reason, State) ->
leveled_log:log("P0009", []); leveled_log:log("P0009", []);
{false, [], _N} -> {false, [], _N} ->
L0Pid = roll_memory(UpdState, true), L0Pid = roll_memory(UpdState, true),
ok = leveled_sft:sft_close(L0Pid); ok = leveled_sst:sst_close(L0Pid);
StatusTuple -> StatusTuple ->
leveled_log:log("P0010", [StatusTuple]) leveled_log:log("P0010", [StatusTuple])
end, end,
@ -533,7 +530,7 @@ terminate(Reason, State) ->
% Tidy shutdown of individual files % Tidy shutdown of individual files
ok = close_files(0, UpdState#state.manifest), ok = close_files(0, UpdState#state.manifest),
lists:foreach(fun({_FN, Pid, _SN}) -> lists:foreach(fun({_FN, Pid, _SN}) ->
ok = leveled_sft:sft_close(Pid) end, ok = leveled_sst:sst_close(Pid) end,
UpdState#state.unreferenced_files), UpdState#state.unreferenced_files),
leveled_log:log("P0011", []), leveled_log:log("P0011", []),
ok. ok.
@ -571,8 +568,11 @@ start_from_file(PCLopts) ->
levelzero_index=leveled_pmem:new_index()}, levelzero_index=leveled_pmem:new_index()},
%% Open manifest %% Open manifest
ManifestPath = InitState#state.root_path ++ "/" ++ ?MANIFEST_FP ++ "/", ManifestPath = filepath(InitState#state.root_path, manifest) ++ "/",
SSTPath = filepath(InitState#state.root_path, files) ++ "/",
ok = filelib:ensure_dir(ManifestPath), ok = filelib:ensure_dir(ManifestPath),
ok = filelib:ensure_dir(SSTPath),
{ok, Filenames} = file:list_dir(ManifestPath), {ok, Filenames} = file:list_dir(ManifestPath),
CurrRegex = "nonzero_(?<MSN>[0-9]+)\\." ++ ?CURRENT_FILEX, CurrRegex = "nonzero_(?<MSN>[0-9]+)\\." ++ ?CURRENT_FILEX,
ValidManSQNs = lists:foldl(fun(FN, Acc) -> ValidManSQNs = lists:foldl(fun(FN, Acc) ->
@ -608,14 +608,14 @@ start_from_file(PCLopts) ->
leveled_log:log("P0014", [MaxSQN]), leveled_log:log("P0014", [MaxSQN]),
%% Find any L0 files %% Find any L0 files
L0FN = filepath(RootPath, TopManSQN, new_merge_files) ++ "_0_0.sft", L0FN = filepath(RootPath, TopManSQN, new_merge_files) ++ "_0_0.sst",
case filelib:is_file(L0FN) of case filelib:is_file(L0FN) of
true -> true ->
leveled_log:log("P0015", [L0FN]), leveled_log:log("P0015", [L0FN]),
{ok, {ok,
L0Pid, L0Pid,
{L0StartKey, L0EndKey}} = leveled_sft:sft_open(L0FN), {L0StartKey, L0EndKey}} = leveled_sst:sst_open(L0FN),
L0SQN = leveled_sft:sft_getmaxsequencenumber(L0Pid), L0SQN = leveled_sst:sst_getmaxsequencenumber(L0Pid),
ManifestEntry = #manifest_entry{start_key=L0StartKey, ManifestEntry = #manifest_entry{start_key=L0StartKey,
end_key=L0EndKey, end_key=L0EndKey,
owner=L0Pid, owner=L0Pid,
@ -682,13 +682,7 @@ update_levelzero(L0Size, {PushedTree, MinSQN, MaxSQN},
_ -> _ ->
leveled_log:log_timer("P0031", [], SW), leveled_log:log_timer("P0031", [], SW),
UpdState UpdState
end; end
NewL0Size == L0Size ->
leveled_log:log_timer("P0031", [], SW),
State#state{levelzero_cache=L0Cache,
levelzero_size=L0Size,
ledger_sqn=LedgerSQN}
end. end.
@ -696,7 +690,7 @@ update_levelzero(L0Size, {PushedTree, MinSQN, MaxSQN},
%% to an immediate return as expected. With 32K keys in the TreeList it could %% to an immediate return as expected. With 32K keys in the TreeList it could
%% take around 35-40ms. %% take around 35-40ms.
%% %%
%% To avoid blocking this gen_server, the SFT file can request each item of the %% To avoid blocking this gen_server, the SST file can request each item of the
%% cache one at a time. %% cache one at a time.
%% %%
%% The Wait is set to false to use a cast when calling this in normal operation %% The Wait is set to false to use a cast when calling this in normal operation
@ -704,25 +698,22 @@ update_levelzero(L0Size, {PushedTree, MinSQN, MaxSQN},
roll_memory(State, false) -> roll_memory(State, false) ->
FileName = levelzero_filename(State), FileName = levelzero_filename(State),
leveled_log:log("P0019", [FileName]), leveled_log:log("P0019", [FileName, State#state.ledger_sqn]),
Opts = #sft_options{wait=false, penciller=self()},
PCL = self(), PCL = self(),
FetchFun = fun(Slot) -> pcl_fetchlevelzero(PCL, Slot) end, FetchFun = fun(Slot) -> pcl_fetchlevelzero(PCL, Slot) end,
% FetchFun = fun(Slot) -> lists:nth(Slot, State#state.levelzero_cache) end, R = leveled_sst:sst_newlevelzero(FileName,
R = leveled_sft:sft_newfroml0cache(FileName,
length(State#state.levelzero_cache), length(State#state.levelzero_cache),
FetchFun, FetchFun,
Opts), PCL,
State#state.ledger_sqn),
{ok, Constructor, _} = R, {ok, Constructor, _} = R,
Constructor; Constructor;
roll_memory(State, true) -> roll_memory(State, true) ->
FileName = levelzero_filename(State), FileName = levelzero_filename(State),
Opts = #sft_options{wait=true},
FetchFun = fun(Slot) -> lists:nth(Slot, State#state.levelzero_cache) end, FetchFun = fun(Slot) -> lists:nth(Slot, State#state.levelzero_cache) end,
R = leveled_sft:sft_newfroml0cache(FileName, KVList = leveled_pmem:to_list(length(State#state.levelzero_cache),
length(State#state.levelzero_cache), FetchFun),
FetchFun, R = leveled_sst:sst_new(FileName, 0, KVList, State#state.ledger_sqn),
Opts),
{ok, Constructor, _} = R, {ok, Constructor, _} = R,
Constructor. Constructor.
@ -753,7 +744,7 @@ fetch_mem(Key, Hash, Manifest, L0Cache, none) ->
L0Check = leveled_pmem:check_levelzero(Key, Hash, L0Cache), L0Check = leveled_pmem:check_levelzero(Key, Hash, L0Cache),
case L0Check of case L0Check of
{false, not_found} -> {false, not_found} ->
fetch(Key, Hash, Manifest, 0, fun timed_sft_get/3); fetch(Key, Hash, Manifest, 0, fun timed_sst_get/3);
{true, KV} -> {true, KV} ->
{KV, 0} {KV, 0}
end; end;
@ -762,7 +753,7 @@ fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
true -> true ->
fetch_mem(Key, Hash, Manifest, L0Cache, none); fetch_mem(Key, Hash, Manifest, L0Cache, none);
false -> false ->
fetch(Key, Hash, Manifest, 0, fun timed_sft_get/3) fetch(Key, Hash, Manifest, 0, fun timed_sst_get/3)
end. end.
fetch(_Key, _Hash, _Manifest, ?MAX_LEVELS + 1, _FetchFun) -> fetch(_Key, _Hash, _Manifest, ?MAX_LEVELS + 1, _FetchFun) ->
@ -791,9 +782,9 @@ fetch(Key, Hash, Manifest, Level, FetchFun) ->
end end
end. end.
timed_sft_get(PID, Key, Hash) -> timed_sst_get(PID, Key, Hash) ->
SW = os:timestamp(), SW = os:timestamp(),
R = leveled_sft:sft_get(PID, Key, Hash), R = leveled_sst:sst_get(PID, Key, Hash),
T0 = timer:now_diff(os:timestamp(), SW), T0 = timer:now_diff(os:timestamp(), SW),
case {T0, R} of case {T0, R} of
{T, R} when T < ?SLOW_FETCH -> {T, R} when T < ?SLOW_FETCH ->
@ -880,7 +871,7 @@ close_files(?MAX_LEVELS - 1, _Manifest) ->
close_files(Level, Manifest) -> close_files(Level, Manifest) ->
LevelList = get_item(Level, Manifest, []), LevelList = get_item(Level, Manifest, []),
lists:foreach(fun(F) -> lists:foreach(fun(F) ->
ok = leveled_sft:sft_close(F#manifest_entry.owner) end, ok = leveled_sst:sst_close(F#manifest_entry.owner) end,
LevelList), LevelList),
close_files(Level + 1, Manifest). close_files(Level + 1, Manifest).
@ -897,8 +888,8 @@ open_all_filesinmanifest({Manifest, TopSQN}, Level) ->
%5 replace them %5 replace them
LvlR = lists:foldl(fun(F, {FL, FL_SQN}) -> LvlR = lists:foldl(fun(F, {FL, FL_SQN}) ->
FN = F#manifest_entry.filename, FN = F#manifest_entry.filename,
{ok, P, _Keys} = leveled_sft:sft_open(FN), {ok, P, _Keys} = leveled_sst:sst_open(FN),
F_SQN = leveled_sft:sft_getmaxsequencenumber(P), F_SQN = leveled_sst:sst_getmaxsequencenumber(P),
{lists:append(FL, {lists:append(FL,
[F#manifest_entry{owner = P}]), [F#manifest_entry{owner = P}]),
max(FL_SQN, F_SQN)} max(FL_SQN, F_SQN)}
@ -932,12 +923,13 @@ initiate_rangequery_frommanifest(StartKey, EndKey, Manifest) ->
C2 = leveled_codec:endkey_passed(EndKey, C2 = leveled_codec:endkey_passed(EndKey,
M#manifest_entry.start_key), M#manifest_entry.start_key),
not (C1 or C2) end, not (C1 or C2) end,
lists:foldl(fun(L, AccL) -> FoldFun =
fun(L, AccL) ->
Level = get_item(L, Manifest, []), Level = get_item(L, Manifest, []),
FL = lists:foldl(fun(M, Acc) -> FL = lists:foldl(fun(M, Acc) ->
case CompareFun(M) of case CompareFun(M) of
true -> true ->
Acc ++ [{next_file, M}]; Acc ++ [{next, M, StartKey}];
false -> false ->
Acc Acc
end end, end end,
@ -948,8 +940,7 @@ initiate_rangequery_frommanifest(StartKey, EndKey, Manifest) ->
FL -> AccL ++ [{L, FL}] FL -> AccL ++ [{L, FL}]
end end
end, end,
[], lists:foldl(FoldFun, [], lists:seq(0, ?MAX_LEVELS - 1)).
lists:seq(0, ?MAX_LEVELS - 1)).
%% Looks to find the best choice for the next key across the levels (other %% Looks to find the best choice for the next key across the levels (other
%% than in-memory table) %% than in-memory table)
@ -960,22 +951,25 @@ find_nextkey(QueryArray, StartKey, EndKey) ->
find_nextkey(QueryArray, find_nextkey(QueryArray,
0, 0,
{null, null}, {null, null},
{fun leveled_sft:sft_getkvrange/4, StartKey, EndKey, 1}). StartKey,
EndKey,
?ITERATOR_SCANWIDTH).
find_nextkey(_QueryArray, LCnt, {null, null}, _QueryFunT) find_nextkey(_QueryArray, LCnt, {null, null}, _StartKey, _EndKey, _Width)
when LCnt > ?MAX_LEVELS -> when LCnt > ?MAX_LEVELS ->
% The array has been scanned wihtout finding a best key - must be % The array has been scanned wihtout finding a best key - must be
% exhausted - respond to indicate no more keys to be found by the % exhausted - respond to indicate no more keys to be found by the
% iterator % iterator
no_more_keys; no_more_keys;
find_nextkey(QueryArray, LCnt, {BKL, BestKV}, _QueryFunT) find_nextkey(QueryArray, LCnt, {BKL, BestKV}, _StartKey, _EndKey, _Width)
when LCnt > ?MAX_LEVELS -> when LCnt > ?MAX_LEVELS ->
% All levels have been scanned, so need to remove the best result from % All levels have been scanned, so need to remove the best result from
% the array, and return that array along with the best key/sqn/status % the array, and return that array along with the best key/sqn/status
% combination % combination
{BKL, [BestKV|Tail]} = lists:keyfind(BKL, 1, QueryArray), {BKL, [BestKV|Tail]} = lists:keyfind(BKL, 1, QueryArray),
{lists:keyreplace(BKL, 1, QueryArray, {BKL, Tail}), BestKV}; {lists:keyreplace(BKL, 1, QueryArray, {BKL, Tail}), BestKV};
find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, QueryFunT) -> find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV},
StartKey, EndKey, Width) ->
% Get the next key at this level % Get the next key at this level
{NextKey, RestOfKeys} = case lists:keyfind(LCnt, 1, QueryArray) of {NextKey, RestOfKeys} = case lists:keyfind(LCnt, 1, QueryArray) of
false -> false ->
@ -989,39 +983,46 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, QueryFunT) ->
case {NextKey, BestKeyLevel, BestKV} of case {NextKey, BestKeyLevel, BestKV} of
{null, BKL, BKV} -> {null, BKL, BKV} ->
% There is no key at this level - go to the next level % There is no key at this level - go to the next level
find_nextkey(QueryArray, LCnt + 1, {BKL, BKV}, QueryFunT); find_nextkey(QueryArray,
{{next_file, ManifestEntry}, BKL, BKV} -> LCnt + 1,
{BKL, BKV},
StartKey, EndKey, Width);
{{next, ManifestEntry, _SK}, BKL, BKV} ->
% The first key at this level is pointer to a file - need to query % The first key at this level is pointer to a file - need to query
% the file to expand this level out before proceeding % the file to expand this level out before proceeding
Owner = ManifestEntry#manifest_entry.owner, Owner = ManifestEntry#manifest_entry.owner,
{QueryFun, StartKey, EndKey, ScanSize} = QueryFunT, Pointer = {next, Owner, StartKey, EndKey},
QueryResult = QueryFun(Owner, StartKey, EndKey, ScanSize), UpdList = leveled_sst:expand_list_by_pointer(Pointer,
NewEntry = {LCnt, QueryResult ++ RestOfKeys}, RestOfKeys,
Width),
NewEntry = {LCnt, UpdList},
% Need to loop around at this level (LCnt) as we have not yet % Need to loop around at this level (LCnt) as we have not yet
% examined a real key at this level % examined a real key at this level
find_nextkey(lists:keyreplace(LCnt, 1, QueryArray, NewEntry), find_nextkey(lists:keyreplace(LCnt, 1, QueryArray, NewEntry),
LCnt, LCnt,
{BKL, BKV}, {BKL, BKV},
QueryFunT); StartKey, EndKey, Width);
{{next, SFTpid, NewStartKey}, BKL, BKV} -> {{pointer, SSTPid, Slot, PSK, PEK}, BKL, BKV} ->
% The first key at this level is pointer within a file - need to % The first key at this level is pointer within a file - need to
% query the file to expand this level out before proceeding % query the file to expand this level out before proceeding
{QueryFun, _StartKey, EndKey, ScanSize} = QueryFunT, Pointer = {pointer, SSTPid, Slot, PSK, PEK},
QueryResult = QueryFun(SFTpid, NewStartKey, EndKey, ScanSize), UpdList = leveled_sst:expand_list_by_pointer(Pointer,
NewEntry = {LCnt, QueryResult ++ RestOfKeys}, RestOfKeys,
Width),
NewEntry = {LCnt, UpdList},
% Need to loop around at this level (LCnt) as we have not yet % Need to loop around at this level (LCnt) as we have not yet
% examined a real key at this level % examined a real key at this level
find_nextkey(lists:keyreplace(LCnt, 1, QueryArray, NewEntry), find_nextkey(lists:keyreplace(LCnt, 1, QueryArray, NewEntry),
LCnt, LCnt,
{BKL, BKV}, {BKL, BKV},
QueryFunT); StartKey, EndKey, Width);
{{Key, Val}, null, null} -> {{Key, Val}, null, null} ->
% No best key set - so can assume that this key is the best key, % No best key set - so can assume that this key is the best key,
% and check the lower levels % and check the lower levels
find_nextkey(QueryArray, find_nextkey(QueryArray,
LCnt + 1, LCnt + 1,
{LCnt, {Key, Val}}, {LCnt, {Key, Val}},
QueryFunT); StartKey, EndKey, Width);
{{Key, Val}, _BKL, {BestKey, _BestVal}} when Key < BestKey -> {{Key, Val}, _BKL, {BestKey, _BestVal}} when Key < BestKey ->
% There is a real key and a best key to compare, and the real key % There is a real key and a best key to compare, and the real key
% at this level is before the best key, and so is now the new best % at this level is before the best key, and so is now the new best
@ -1030,7 +1031,7 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, QueryFunT) ->
find_nextkey(QueryArray, find_nextkey(QueryArray,
LCnt + 1, LCnt + 1,
{LCnt, {Key, Val}}, {LCnt, {Key, Val}},
QueryFunT); StartKey, EndKey, Width);
{{Key, Val}, BKL, {BestKey, BestVal}} when Key == BestKey -> {{Key, Val}, BKL, {BestKey, BestVal}} when Key == BestKey ->
SQN = leveled_codec:strip_to_seqonly({Key, Val}), SQN = leveled_codec:strip_to_seqonly({Key, Val}),
BestSQN = leveled_codec:strip_to_seqonly({BestKey, BestVal}), BestSQN = leveled_codec:strip_to_seqonly({BestKey, BestVal}),
@ -1041,7 +1042,7 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, QueryFunT) ->
find_nextkey(lists:keyreplace(LCnt, 1, QueryArray, NewEntry), find_nextkey(lists:keyreplace(LCnt, 1, QueryArray, NewEntry),
LCnt + 1, LCnt + 1,
{BKL, {BestKey, BestVal}}, {BKL, {BestKey, BestVal}},
QueryFunT); StartKey, EndKey, Width);
SQN > BestSQN -> SQN > BestSQN ->
% There is a real key at the front of this level and it has % There is a real key at the front of this level and it has
% a higher SQN than the best key, so we should use this as % a higher SQN than the best key, so we should use this as
@ -1056,29 +1057,32 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, QueryFunT) ->
{BKL, BestTail}), {BKL, BestTail}),
LCnt + 1, LCnt + 1,
{LCnt, {Key, Val}}, {LCnt, {Key, Val}},
QueryFunT) StartKey, EndKey, Width)
end; end;
{_, BKL, BKV} -> {_, BKL, BKV} ->
% This is not the best key % This is not the best key
find_nextkey(QueryArray, LCnt + 1, {BKL, BKV}, QueryFunT) find_nextkey(QueryArray,
LCnt + 1,
{BKL, BKV},
StartKey, EndKey, Width)
end. end.
keyfolder(IMMiter, SFTiter, StartKey, EndKey, {AccFun, Acc}) -> keyfolder(IMMiter, SSTiter, StartKey, EndKey, {AccFun, Acc}) ->
keyfolder({IMMiter, SFTiter}, {StartKey, EndKey}, {AccFun, Acc}, -1). keyfolder({IMMiter, SSTiter}, {StartKey, EndKey}, {AccFun, Acc}, -1).
keyfolder(_Iterators, _KeyRange, {_AccFun, Acc}, MaxKeys) when MaxKeys == 0 -> keyfolder(_Iterators, _KeyRange, {_AccFun, Acc}, MaxKeys) when MaxKeys == 0 ->
Acc; Acc;
keyfolder({[], SFTiter}, KeyRange, {AccFun, Acc}, MaxKeys) -> keyfolder({[], SSTiter}, KeyRange, {AccFun, Acc}, MaxKeys) ->
{StartKey, EndKey} = KeyRange, {StartKey, EndKey} = KeyRange,
case find_nextkey(SFTiter, StartKey, EndKey) of case find_nextkey(SSTiter, StartKey, EndKey) of
no_more_keys -> no_more_keys ->
Acc; Acc;
{NxSFTiter, {SFTKey, SFTVal}} -> {NxSSTiter, {SSTKey, SSTVal}} ->
Acc1 = AccFun(SFTKey, SFTVal, Acc), Acc1 = AccFun(SSTKey, SSTVal, Acc),
keyfolder({[], NxSFTiter}, KeyRange, {AccFun, Acc1}, MaxKeys - 1) keyfolder({[], NxSSTiter}, KeyRange, {AccFun, Acc1}, MaxKeys - 1)
end; end;
keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SFTiterator}, KeyRange, keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, KeyRange,
{AccFun, Acc}, MaxKeys) -> {AccFun, Acc}, MaxKeys) ->
{StartKey, EndKey} = KeyRange, {StartKey, EndKey} = KeyRange,
case {IMMKey < StartKey, leveled_codec:endkey_passed(EndKey, IMMKey)} of case {IMMKey < StartKey, leveled_codec:endkey_passed(EndKey, IMMKey)} of
@ -1087,7 +1091,7 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SFTiterator}, KeyRange,
% Normally everything is pre-filterd, but the IMM iterator can % Normally everything is pre-filterd, but the IMM iterator can
% be re-used and so may be behind the StartKey if the StartKey has % be re-used and so may be behind the StartKey if the StartKey has
% advanced from the previous use % advanced from the previous use
keyfolder({NxIMMiterator, SFTiterator}, keyfolder({NxIMMiterator, SSTiterator},
KeyRange, KeyRange,
{AccFun, Acc}, {AccFun, Acc},
MaxKeys); MaxKeys);
@ -1095,44 +1099,44 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SFTiterator}, KeyRange,
% There are no more keys in-range in the in-memory % There are no more keys in-range in the in-memory
% iterator, so take action as if this iterator is empty % iterator, so take action as if this iterator is empty
% (see above) % (see above)
keyfolder({[], SFTiterator}, keyfolder({[], SSTiterator},
KeyRange, KeyRange,
{AccFun, Acc}, {AccFun, Acc},
MaxKeys); MaxKeys);
{false, false} -> {false, false} ->
case find_nextkey(SFTiterator, StartKey, EndKey) of case find_nextkey(SSTiterator, StartKey, EndKey) of
no_more_keys -> no_more_keys ->
% No more keys in range in the persisted store, so use the % No more keys in range in the persisted store, so use the
% in-memory KV as the next % in-memory KV as the next
Acc1 = AccFun(IMMKey, IMMVal, Acc), Acc1 = AccFun(IMMKey, IMMVal, Acc),
keyfolder({NxIMMiterator, SFTiterator}, keyfolder({NxIMMiterator, SSTiterator},
KeyRange, KeyRange,
{AccFun, Acc1}, {AccFun, Acc1},
MaxKeys - 1); MaxKeys - 1);
{NxSFTiterator, {SFTKey, SFTVal}} -> {NxSSTiterator, {SSTKey, SSTVal}} ->
% There is a next key, so need to know which is the % There is a next key, so need to know which is the
% next key between the two (and handle two keys % next key between the two (and handle two keys
% with different sequence numbers). % with different sequence numbers).
case leveled_codec:key_dominates({IMMKey, case leveled_codec:key_dominates({IMMKey,
IMMVal}, IMMVal},
{SFTKey, {SSTKey,
SFTVal}) of SSTVal}) of
left_hand_first -> left_hand_first ->
Acc1 = AccFun(IMMKey, IMMVal, Acc), Acc1 = AccFun(IMMKey, IMMVal, Acc),
keyfolder({NxIMMiterator, SFTiterator}, keyfolder({NxIMMiterator, SSTiterator},
KeyRange, KeyRange,
{AccFun, Acc1}, {AccFun, Acc1},
MaxKeys - 1); MaxKeys - 1);
right_hand_first -> right_hand_first ->
Acc1 = AccFun(SFTKey, SFTVal, Acc), Acc1 = AccFun(SSTKey, SSTVal, Acc),
keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], keyfolder({[{IMMKey, IMMVal}|NxIMMiterator],
NxSFTiterator}, NxSSTiterator},
KeyRange, KeyRange,
{AccFun, Acc1}, {AccFun, Acc1},
MaxKeys - 1); MaxKeys - 1);
left_hand_dominant -> left_hand_dominant ->
Acc1 = AccFun(IMMKey, IMMVal, Acc), Acc1 = AccFun(IMMKey, IMMVal, Acc),
keyfolder({NxIMMiterator, NxSFTiterator}, keyfolder({NxIMMiterator, NxSSTiterator},
KeyRange, KeyRange,
{AccFun, Acc1}, {AccFun, Acc1},
MaxKeys - 1) MaxKeys - 1)
@ -1286,6 +1290,27 @@ confirm_delete(Filename, UnreferencedFiles, RegisteredSnapshots) ->
-ifdef(TEST). -ifdef(TEST).
generate_randomkeys({Count, StartSQN}) ->
generate_randomkeys(Count, StartSQN, []);
generate_randomkeys(Count) ->
generate_randomkeys(Count, 0, []).
generate_randomkeys(0, _SQN, Acc) ->
lists:reverse(Acc);
generate_randomkeys(Count, SQN, Acc) ->
K = {o,
lists:concat(["Bucket", random:uniform(1024)]),
lists:concat(["Key", random:uniform(1024)]),
null},
RandKey = {K,
{SQN,
{active, infinity},
leveled_codec:magic_hash(K),
null}},
generate_randomkeys(Count - 1, SQN + 1, [RandKey|Acc]).
clean_testdir(RootPath) -> clean_testdir(RootPath) ->
clean_subdir(filepath(RootPath, manifest)), clean_subdir(filepath(RootPath, manifest)),
clean_subdir(filepath(RootPath, files)). clean_subdir(filepath(RootPath, files)).
@ -1332,8 +1357,8 @@ compaction_work_assessment_test() ->
?assertMatch([{1, Manifest3, 1}], WorkQ3). ?assertMatch([{1, Manifest3, 1}], WorkQ3).
confirm_delete_test() -> confirm_delete_test() ->
Filename = 'test.sft', Filename = 'test.sst',
UnreferencedFiles = [{'other.sft', dummy_owner, 15}, UnreferencedFiles = [{'other.sst', dummy_owner, 15},
{Filename, dummy_owner, 10}], {Filename, dummy_owner, 10}],
RegisteredIterators1 = [{dummy_pid, 16}, {dummy_pid, 12}], RegisteredIterators1 = [{dummy_pid, 16}, {dummy_pid, 12}],
R1 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators1), R1 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators1),
@ -1376,20 +1401,20 @@ simple_server_test() ->
Key1_Pre = {{o,"Bucket0001", "Key0001", null}, Key1_Pre = {{o,"Bucket0001", "Key0001", null},
{1, {active, infinity}, null}}, {1, {active, infinity}, null}},
Key1 = add_missing_hash(Key1_Pre), Key1 = add_missing_hash(Key1_Pre),
KL1 = leveled_sft:generate_randomkeys({1000, 2}), KL1 = generate_randomkeys({1000, 2}),
Key2_Pre = {{o,"Bucket0002", "Key0002", null}, Key2_Pre = {{o,"Bucket0002", "Key0002", null},
{1002, {active, infinity}, null}}, {1002, {active, infinity}, null}},
Key2 = add_missing_hash(Key2_Pre), Key2 = add_missing_hash(Key2_Pre),
KL2 = leveled_sft:generate_randomkeys({900, 1003}), KL2 = generate_randomkeys({900, 1003}),
% Keep below the max table size by having 900 not 1000 % Keep below the max table size by having 900 not 1000
Key3_Pre = {{o,"Bucket0003", "Key0003", null}, Key3_Pre = {{o,"Bucket0003", "Key0003", null},
{2003, {active, infinity}, null}}, {2003, {active, infinity}, null}},
Key3 = add_missing_hash(Key3_Pre), Key3 = add_missing_hash(Key3_Pre),
KL3 = leveled_sft:generate_randomkeys({1000, 2004}), KL3 = generate_randomkeys({1000, 2004}),
Key4_Pre = {{o,"Bucket0004", "Key0004", null}, Key4_Pre = {{o,"Bucket0004", "Key0004", null},
{3004, {active, infinity}, null}}, {3004, {active, infinity}, null}},
Key4 = add_missing_hash(Key4_Pre), Key4 = add_missing_hash(Key4_Pre),
KL4 = leveled_sft:generate_randomkeys({1000, 3005}), KL4 = generate_randomkeys({1000, 3005}),
ok = maybe_pause_push(PCL, [Key1]), ok = maybe_pause_push(PCL, [Key1]),
?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001", null})), ?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001", null})),
ok = maybe_pause_push(PCL, KL1), ok = maybe_pause_push(PCL, KL1),
@ -1464,7 +1489,7 @@ simple_server_test() ->
Key1A_Pre = {{o,"Bucket0001", "Key0001", null}, Key1A_Pre = {{o,"Bucket0001", "Key0001", null},
{4005, {active, infinity}, null}}, {4005, {active, infinity}, null}},
Key1A = add_missing_hash(Key1A_Pre), Key1A = add_missing_hash(Key1A_Pre),
KL1A = leveled_sft:generate_randomkeys({2000, 4006}), KL1A = generate_randomkeys({2000, 4006}),
ok = maybe_pause_push(PCLr, [Key1A]), ok = maybe_pause_push(PCLr, [Key1A]),
ok = maybe_pause_push(PCLr, KL1A), ok = maybe_pause_push(PCLr, KL1A),
?assertMatch(true, pcl_checksequencenumber(PclSnap, ?assertMatch(true, pcl_checksequencenumber(PclSnap,
@ -1528,17 +1553,16 @@ rangequery_manifest_test() ->
end_key={o, "Bucket1", "K996", null}, end_key={o, "Bucket1", "K996", null},
filename="Z6"}}, filename="Z6"}},
Man = [{1, [E1, E2, E3]}, {2, [E4, E5, E6]}], Man = [{1, [E1, E2, E3]}, {2, [E4, E5, E6]}],
R1 = initiate_rangequery_frommanifest({o, "Bucket1", "K711", null}, SK1 = {o, "Bucket1", "K711", null},
{o, "Bucket1", "K999", null}, EK1 = {o, "Bucket1", "K999", null},
Man), R1 = initiate_rangequery_frommanifest(SK1, EK1, Man),
?assertMatch([{1, [{next_file, E3}]}, ?assertMatch([{1, [{next, E3, SK1}]},
{2, [{next_file, E5}, {next_file, E6}]}], {2, [{next, E5, SK1}, {next, E6, SK1}]}],
R1), R1),
R2 = initiate_rangequery_frommanifest({i, "Bucket1", {"Idx1", "Fld8"}, null}, SK2 = {i, "Bucket1", {"Idx1", "Fld8"}, null},
{i, "Bucket1", {"Idx1", "Fld8"}, null}, EK2 = {i, "Bucket1", {"Idx1", "Fld8"}, null},
Man), R2 = initiate_rangequery_frommanifest(SK2, EK2, Man),
?assertMatch([{1, [{next_file, E1}]}, {2, [{next_file, E5}]}], ?assertMatch([{1, [{next, E1, SK2}]}, {2, [{next, E5, SK2}]}], R2),
R2),
R3 = initiate_rangequery_frommanifest({i, "Bucket1", {"Idx0", "Fld8"}, null}, R3 = initiate_rangequery_frommanifest({i, "Bucket1", {"Idx0", "Fld8"}, null},
{i, "Bucket1", {"Idx0", "Fld9"}, null}, {i, "Bucket1", {"Idx0", "Fld9"}, null},
Man), Man),
@ -1693,17 +1717,18 @@ foldwithimm_simple_test() ->
{{o, "Bucket1", "Key6"}, 7}], AccB). {{o, "Bucket1", "Key6"}, 7}], AccB).
create_file_test() -> create_file_test() ->
Filename = "../test/new_file.sft", Filename = "../test/new_file.sst",
ok = file:write_file(Filename, term_to_binary("hello")), ok = file:write_file(Filename, term_to_binary("hello")),
KVL = lists:usort(leveled_sft:generate_randomkeys(10000)), KVL = lists:usort(generate_randomkeys(10000)),
Tree = leveled_skiplist:from_list(KVL), Tree = leveled_skiplist:from_list(KVL),
FetchFun = fun(Slot) -> lists:nth(Slot, [Tree]) end, FetchFun = fun(Slot) -> lists:nth(Slot, [Tree]) end,
{ok, {ok,
SP, SP,
noreply} = leveled_sft:sft_newfroml0cache(Filename, noreply} = leveled_sst:sst_newlevelzero(Filename,
1, 1,
FetchFun, FetchFun,
#sft_options{wait=false}), undefined,
10000),
lists:foreach(fun(X) -> lists:foreach(fun(X) ->
case checkready(SP) of case checkready(SP) of
timeout -> timeout ->
@ -1716,9 +1741,9 @@ create_file_test() ->
io:format("StartKey ~w EndKey ~w~n", [StartKey, EndKey]), io:format("StartKey ~w EndKey ~w~n", [StartKey, EndKey]),
?assertMatch({o, _, _, _}, StartKey), ?assertMatch({o, _, _, _}, StartKey),
?assertMatch({o, _, _, _}, EndKey), ?assertMatch({o, _, _, _}, EndKey),
?assertMatch("../test/new_file.sft", SrcFN), ?assertMatch("../test/new_file.sst", SrcFN),
ok = leveled_sft:sft_clear(SP), ok = leveled_sst:sst_clear(SP),
{ok, Bin} = file:read_file("../test/new_file.sft.discarded"), {ok, Bin} = file:read_file("../test/new_file.sst.discarded"),
?assertMatch("hello", binary_to_term(Bin)). ?assertMatch("hello", binary_to_term(Bin)).
commit_manifest_test() -> commit_manifest_test() ->
@ -1735,14 +1760,14 @@ commit_manifest_test() ->
ok = file:write_file(ManifestFP ++ "nonzero_1.pnd", ok = file:write_file(ManifestFP ++ "nonzero_1.pnd",
term_to_binary("dummy data")), term_to_binary("dummy data")),
L1_0 = [{1, [#manifest_entry{filename="1.sft"}]}], L1_0 = [{1, [#manifest_entry{filename="1.sst"}]}],
Resp_WI0 = Resp_WI#penciller_work{new_manifest=L1_0, Resp_WI0 = Resp_WI#penciller_work{new_manifest=L1_0,
unreferenced_files=[]}, unreferenced_files=[]},
{ok, State0} = commit_manifest_change(Resp_WI0, State), {ok, State0} = commit_manifest_change(Resp_WI0, State),
?assertMatch(1, State0#state.manifest_sqn), ?assertMatch(1, State0#state.manifest_sqn),
?assertMatch([], get_item(0, State0#state.manifest, [])), ?assertMatch([], get_item(0, State0#state.manifest, [])),
L0Entry = [#manifest_entry{filename="0.sft"}], L0Entry = [#manifest_entry{filename="0.sst"}],
ManifestPlus = [{0, L0Entry}|State0#state.manifest], ManifestPlus = [{0, L0Entry}|State0#state.manifest],
NxtSent_WI = #penciller_work{next_sqn=2, NxtSent_WI = #penciller_work{next_sqn=2,
@ -1756,7 +1781,7 @@ commit_manifest_test() ->
ok = file:write_file(ManifestFP ++ "nonzero_2.pnd", ok = file:write_file(ManifestFP ++ "nonzero_2.pnd",
term_to_binary("dummy data")), term_to_binary("dummy data")),
L2_0 = [#manifest_entry{filename="2.sft"}], L2_0 = [#manifest_entry{filename="2.sst"}],
NxtResp_WI0 = NxtResp_WI#penciller_work{new_manifest=[{2, L2_0}], NxtResp_WI0 = NxtResp_WI#penciller_work{new_manifest=[{2, L2_0}],
unreferenced_files=[]}, unreferenced_files=[]},
{ok, State2} = commit_manifest_change(NxtResp_WI0, State1), {ok, State2} = commit_manifest_change(NxtResp_WI0, State1),
@ -1777,7 +1802,7 @@ badmanifest_test() ->
Key1_pre = {{o,"Bucket0001", "Key0001", null}, Key1_pre = {{o,"Bucket0001", "Key0001", null},
{1001, {active, infinity}, null}}, {1001, {active, infinity}, null}},
Key1 = add_missing_hash(Key1_pre), Key1 = add_missing_hash(Key1_pre),
KL1 = leveled_sft:generate_randomkeys({1000, 1}), KL1 = generate_randomkeys({1000, 1}),
ok = maybe_pause_push(PCL, KL1 ++ [Key1]), ok = maybe_pause_push(PCL, KL1 ++ [Key1]),
%% Added together, as split apart there will be a race between the close %% Added together, as split apart there will be a race between the close
@ -1798,7 +1823,7 @@ badmanifest_test() ->
checkready(Pid) -> checkready(Pid) ->
try try
leveled_sft:sft_checkready(Pid) leveled_sst:sst_checkready(Pid)
catch catch
exit:{timeout, _} -> exit:{timeout, _} ->
timeout timeout

File diff suppressed because it is too large Load diff

View file

@ -262,79 +262,60 @@ to_list(SkipList, Level) ->
[], [],
SkipList). SkipList).
to_range(SkipList, Start, End, 1) ->
R = lists:foldl(fun({Mark, SL}, {PassedStart, PassedEnd, Acc, PrevList}) ->
case {PassedStart, PassedEnd} of to_range(SkipList, StartKey, EndKey, ListHeight) ->
{true, true} -> to_range(SkipList, StartKey, EndKey, ListHeight, [], true).
{true, true, Acc, null};
{false, false} -> to_range(SkipList, StartKey, EndKey, ListHeight, Acc, StartIncl) ->
case Start > Mark of SL = sublist_above(SkipList, StartKey, ListHeight, StartIncl),
true -> case SL of
{false, false, Acc, SL}; [] ->
Acc;
_ ->
{LK, _LV} = lists:last(SL),
case leveled_codec:endkey_passed(EndKey, LK) of
false -> false ->
RHS = splitlist_start(Start, PrevList ++ SL), to_range(SkipList,
case leveled_codec:endkey_passed(End, Mark) of LK,
EndKey,
ListHeight,
Acc ++ SL,
false);
true -> true ->
EL = splitlist_end(End, RHS), SplitFun =
{true, true, EL, null}; fun({K, _V}) ->
false -> not leveled_codec:endkey_passed(EndKey, K) end,
{true, false, RHS, null} LHS = lists:takewhile(SplitFun, SL),
Acc ++ LHS
end end
end; end.
{true, false} ->
case leveled_codec:endkey_passed(End, Mark) of sublist_above(SkipList, StartKey, 0, StartIncl) ->
TestFun =
fun({K, _V}) ->
case StartIncl of
true -> true ->
EL = splitlist_end(End, SL), K < StartKey;
{true, true, Acc ++ EL, null};
false -> false ->
{true, false, Acc ++ SL, null} K =< StartKey
end
end end, end end,
lists:dropwhile(TestFun, SkipList);
{false, false, [], []}, sublist_above(SkipList, StartKey, Level, StartIncl) ->
SkipList), TestFun =
{_Bool1, _Bool2, SubList, _PrevList} = R, fun({K, _SL}) ->
SubList; case StartIncl of
to_range(SkipList, Start, End, Level) ->
R = lists:foldl(fun({Mark, SL}, {PassedStart, PassedEnd, Acc, PrevList}) ->
case {PassedStart, PassedEnd} of
{true, true} ->
{true, true, Acc, null};
{false, false} ->
case Start > Mark of
true -> true ->
{false, false, Acc, SL}; K < StartKey;
false -> false ->
SkipLRange = to_range(PrevList, K =< StartKey
Start, End,
Level - 1) ++
to_range(SL,
Start, End,
Level - 1),
case leveled_codec:endkey_passed(End, Mark) of
true ->
{true, true, SkipLRange, null};
false ->
{true, false, SkipLRange, null}
end
end;
{true, false} ->
SkipLRange = to_range(SL, Start, End, Level - 1),
case leveled_codec:endkey_passed(End, Mark) of
true ->
{true, true, Acc ++ SkipLRange, null};
false ->
{true, false, Acc ++ SkipLRange, null}
end
end end, end end,
RHS = lists:dropwhile(TestFun, SkipList),
{false, false, [], []}, case RHS of
SkipList), [] ->
{_Bool1, _Bool2, SubList, _PrevList} = R, [];
SubList. [{_K, SL}|_Rest] ->
sublist_above(SL, StartKey, Level - 1, StartIncl)
end.
empty(SkipList, 1) -> empty(SkipList, 1) ->
[{?INFINITY_KEY, SkipList}]; [{?INFINITY_KEY, SkipList}];
@ -385,17 +366,6 @@ get_sublist(Key, SkipList) ->
null, null,
SkipList). SkipList).
splitlist_start(StartKey, SL) ->
{_LHS, RHS} = lists:splitwith(fun({K, _V}) -> K < StartKey end, SL),
RHS.
splitlist_end(EndKey, SL) ->
{LHS, _RHS} = lists:splitwith(fun({K, _V}) ->
not leveled_codec:endkey_passed(EndKey, K)
end,
SL),
LHS.
%%%============================================================================ %%%============================================================================
%%% Test %%% Test
%%%============================================================================ %%%============================================================================
@ -645,6 +615,33 @@ skiplist_nolookup_test() ->
KL), KL),
?assertMatch(KLSorted, to_list(SkipList)). ?assertMatch(KLSorted, to_list(SkipList)).
skiplist_range_test() ->
N = 150,
KL = generate_randomkeys(1, N, 1, N div 5),
KLSL1 = lists:sublist(lists:ukeysort(1, KL), 128),
SkipList1 = from_list(KLSL1),
{LastK1, V1} = lists:last(KLSL1),
R1 = to_range(SkipList1, LastK1, LastK1),
?assertMatch([{LastK1, V1}], R1),
KLSL2 = lists:sublist(lists:ukeysort(1, KL), 127),
SkipList2 = from_list(KLSL2),
{LastK2, V2} = lists:last(KLSL2),
R2 = to_range(SkipList2, LastK2, LastK2),
?assertMatch([{LastK2, V2}], R2),
KLSL3 = lists:sublist(lists:ukeysort(1, KL), 129),
SkipList3 = from_list(KLSL3),
{LastK3, V3} = lists:last(KLSL3),
R3 = to_range(SkipList3, LastK3, LastK3),
?assertMatch([{LastK3, V3}], R3),
{FirstK4, V4} = lists:nth(1, KLSL3),
R4 = to_range(SkipList3, FirstK4, FirstK4),
?assertMatch([{FirstK4, V4}], R4).
empty_skiplist_size_test() -> empty_skiplist_size_test() ->
?assertMatch(0, leveled_skiplist:size(empty(false))), ?assertMatch(0, leveled_skiplist:size(empty(false))),
?assertMatch(0, leveled_skiplist:size(empty(true))). ?assertMatch(0, leveled_skiplist:size(empty(true))).

1691
src/leveled_sst.erl Normal file

File diff suppressed because it is too large Load diff

View file

@ -2,7 +2,7 @@
%% %%
%% For sheltering relatively expensive lookups with a probabilistic check %% For sheltering relatively expensive lookups with a probabilistic check
%% %%
%% Uses multiple 256 byte blooms. Can sensibly hold up to 1000 keys per array. %% Uses multiple 512 byte blooms. Can sensibly hold up to 1000 keys per array.
%% Even at 1000 keys should still offer only a 20% false positive %% Even at 1000 keys should still offer only a 20% false positive
%% %%
%% Restricted to no more than 256 arrays - so can't handle more than 250K keys %% Restricted to no more than 256 arrays - so can't handle more than 250K keys
@ -22,13 +22,13 @@
empty/1 empty/1
]). ]).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
%%%============================================================================ %%%============================================================================
%%% Bloom API %%% Bloom API
%%%============================================================================ %%%============================================================================
empty(Width) when Width =< 256 -> empty(Width) when Width =< 256 ->
FoldFun = fun(X, Acc) -> dict:store(X, <<0:4096>>, Acc) end, FoldFun = fun(X, Acc) -> dict:store(X, <<0:4096>>, Acc) end,
lists:foldl(FoldFun, dict:new(), lists:seq(0, Width - 1)). lists:foldl(FoldFun, dict:new(), lists:seq(0, Width - 1)).
@ -36,26 +36,30 @@ empty(Width) when Width =< 256 ->
enter({hash, no_lookup}, Bloom) -> enter({hash, no_lookup}, Bloom) ->
Bloom; Bloom;
enter({hash, Hash}, Bloom) -> enter({hash, Hash}, Bloom) ->
{H0, Bit1, Bit2} = split_hash(Hash), {Slot0, Bit1, Bit2} = split_hash(Hash),
Slot = H0 rem dict:size(Bloom), Slot = Slot0 rem dict:size(Bloom),
BitArray0 = dict:fetch(Slot, Bloom), BitArray0 = dict:fetch(Slot, Bloom),
BitArray1 = lists:foldl(fun add_to_array/2, FoldFun =
fun(Bit, Arr) -> add_to_array(Bit, Arr, 4096) end,
BitArray1 = lists:foldl(FoldFun,
BitArray0, BitArray0,
lists:usort([Bit1, Bit2])), lists:usort([Bit1, Bit2])),
dict:store(Slot, BitArray1, Bloom); dict:store(Slot, <<BitArray1/binary>>, Bloom);
enter(Key, Bloom) -> enter(Key, Bloom) ->
Hash = leveled_codec:magic_hash(Key), Hash = leveled_codec:magic_hash(Key),
enter({hash, Hash}, Bloom). enter({hash, Hash}, Bloom).
check({hash, Hash}, Bloom) -> check({hash, Hash}, Bloom) ->
{H0, Bit1, Bit2} = split_hash(Hash), {Slot0, Bit1, Bit2} = split_hash(Hash),
Slot = H0 rem dict:size(Bloom), Slot = Slot0 rem dict:size(Bloom),
BitArray = dict:fetch(Slot, Bloom), BitArray = dict:fetch(Slot, Bloom),
case getbit(Bit1, BitArray) of
case getbit(Bit1, BitArray, 4096) of
<<0:1>> -> <<0:1>> ->
false; false;
<<1:1>> -> <<1:1>> ->
case getbit(Bit2, BitArray) of case getbit(Bit2, BitArray, 4096) of
<<0:1>> -> <<0:1>> ->
false; false;
<<1:1>> -> <<1:1>> ->
@ -66,6 +70,7 @@ check(Key, Bloom) ->
Hash = leveled_codec:magic_hash(Key), Hash = leveled_codec:magic_hash(Key),
check({hash, Hash}, Bloom). check({hash, Hash}, Bloom).
%%%============================================================================ %%%============================================================================
%%% Internal Functions %%% Internal Functions
%%%============================================================================ %%%============================================================================
@ -76,15 +81,15 @@ split_hash(Hash) ->
H2 = Hash bsr 20, H2 = Hash bsr 20,
{H0, H1, H2}. {H0, H1, H2}.
add_to_array(Bit, BitArray) -> add_to_array(Bit, BitArray, ArrayLength) ->
RestLen = 4096 - Bit - 1, RestLen = ArrayLength - Bit - 1,
<<Head:Bit/bitstring, <<Head:Bit/bitstring,
_B:1/bitstring, _B:1/integer,
Rest:RestLen/bitstring>> = BitArray, Rest:RestLen/bitstring>> = BitArray,
<<Head/bitstring, 1:1, Rest/bitstring>>. <<Head/bitstring, 1:1, Rest/bitstring>>.
getbit(Bit, BitArray) -> getbit(Bit, BitArray, ArrayLength) ->
RestLen = 4096 - Bit - 1, RestLen = ArrayLength - Bit - 1,
<<_Head:Bit/bitstring, <<_Head:Bit/bitstring,
B:1/bitstring, B:1/bitstring,
_Rest:RestLen/bitstring>> = BitArray, _Rest:RestLen/bitstring>> = BitArray,
@ -99,7 +104,7 @@ getbit(Bit, BitArray) ->
simple_test() -> simple_test() ->
N = 4000, N = 4000,
W = 4, W = 6,
KLin = lists:map(fun(X) -> "Key_" ++ KLin = lists:map(fun(X) -> "Key_" ++
integer_to_list(X) ++ integer_to_list(X) ++
integer_to_list(random:uniform(100)) ++ integer_to_list(random:uniform(100)) ++
@ -150,4 +155,5 @@ simple_test() ->
?assertMatch(true, FP < (N div 4)). ?assertMatch(true, FP < (N div 4)).
-endif. -endif.