Removed SFT
Now moved over to SST on this branch
This commit is contained in:
parent
c664483f03
commit
dc28388c76
7 changed files with 481 additions and 2246 deletions
|
@ -22,17 +22,17 @@
|
|||
%%
|
||||
%% The Ledger is divided into many levels
|
||||
%% - L0: New keys are received from the Bookie and merged into a single
|
||||
%% gb_tree, until that tree is the size of a SFT file, and it is then persisted
|
||||
%% as a SFT file at this level. L0 SFT files can be larger than the normal
|
||||
%% gb_tree, until that tree is the size of a SST file, and it is then persisted
|
||||
%% as a SST file at this level. L0 SST files can be larger than the normal
|
||||
%% maximum size - so we don't have to consider problems of either having more
|
||||
%% than one L0 file (and handling what happens on a crash between writing the
|
||||
%% files when the second may have overlapping sequence numbers), or having a
|
||||
%% remainder with overlapping in sequence numbers in memory after the file is
|
||||
%% written. Once the persistence is completed, the L0 tree can be erased.
|
||||
%% There can be only one SFT file at Level 0, so the work to merge that file
|
||||
%% There can be only one SST file at Level 0, so the work to merge that file
|
||||
%% to the lower level must be the highest priority, as otherwise writes to the
|
||||
%% ledger will stall, when there is next a need to persist.
|
||||
%% - L1 TO L7: May contain multiple processes managing non-overlapping sft
|
||||
%% - L1 TO L7: May contain multiple processes managing non-overlapping SST
|
||||
%% files. Compaction work should be sheduled if the number of files exceeds
|
||||
%% the target size of the level, where the target size is 8 ^ n.
|
||||
%%
|
||||
|
@ -67,14 +67,14 @@
|
|||
%% completed to merge the tree into the L0 tree.
|
||||
%%
|
||||
%% The Penciller MUST NOT accept a new PUSH if the Clerk has commenced the
|
||||
%% conversion of the current L0 tree into a SFT file, but not completed this
|
||||
%% conversion of the current L0 tree into a SST file, but not completed this
|
||||
%% change. The Penciller in this case returns the push, and the Bookie should
|
||||
%% continue to grow the cache before trying again.
|
||||
%%
|
||||
%% ---------- FETCH ----------
|
||||
%%
|
||||
%% On request to fetch a key the Penciller should look first in the in-memory
|
||||
%% L0 tree, then look in the SFT files Level by Level (including level 0),
|
||||
%% L0 tree, then look in the SST files Level by Level (including level 0),
|
||||
%% consulting the Manifest to determine which file should be checked at each
|
||||
%% level.
|
||||
%%
|
||||
|
@ -82,16 +82,16 @@
|
|||
%%
|
||||
%% Iterators may request a snapshot of the database. A snapshot is a cloned
|
||||
%% Penciller seeded not from disk, but by the in-memory L0 gb_tree and the
|
||||
%% in-memory manifest, allowing for direct reference for the SFT file processes.
|
||||
%% in-memory manifest, allowing for direct reference for the SST file processes.
|
||||
%%
|
||||
%% Clones formed to support snapshots are registered by the Penciller, so that
|
||||
%% SFT files valid at the point of the snapshot until either the iterator is
|
||||
%% SST files valid at the point of the snapshot until either the iterator is
|
||||
%% completed or has timed out.
|
||||
%%
|
||||
%% ---------- ON STARTUP ----------
|
||||
%%
|
||||
%% On Startup the Bookie with ask the Penciller to initiate the Ledger first.
|
||||
%% To initiate the Ledger the must consult the manifest, and then start a SFT
|
||||
%% To initiate the Ledger the must consult the manifest, and then start a SST
|
||||
%% management process for each file in the manifest.
|
||||
%%
|
||||
%% The penciller should then try and read any Level 0 file which has the
|
||||
|
@ -103,14 +103,14 @@
|
|||
%% ---------- ON SHUTDOWN ----------
|
||||
%%
|
||||
%% On a controlled shutdown the Penciller should attempt to write any in-memory
|
||||
%% ETS table to a L0 SFT file, assuming one is nto already pending. If one is
|
||||
%% ETS table to a L0 SST file, assuming one is nto already pending. If one is
|
||||
%% already pending then the Penciller will not persist this part of the Ledger.
|
||||
%%
|
||||
%% ---------- FOLDER STRUCTURE ----------
|
||||
%%
|
||||
%% The following folders are used by the Penciller
|
||||
%% $ROOT/ledger/ledger_manifest/ - used for keeping manifest files
|
||||
%% $ROOT/ledger/ledger_files/ - containing individual SFT files
|
||||
%% $ROOT/ledger/ledger_files/ - containing individual SST files
|
||||
%%
|
||||
%% In larger stores there could be a large number of files in the ledger_file
|
||||
%% folder - perhaps o(1000). It is assumed that modern file systems should
|
||||
|
@ -120,7 +120,7 @@
|
|||
%%
|
||||
%% The Penciller can have one and only one Clerk for performing compaction
|
||||
%% work. When the Clerk has requested and taken work, it should perform the
|
||||
%5 compaction work starting the new SFT process to manage the new Ledger state
|
||||
%5 compaction work starting the new SST process to manage the new Ledger state
|
||||
%% and then write a new manifest file that represents that state with using
|
||||
%% the next Manifest sequence number as the filename:
|
||||
%% - nonzero_<ManifestSQN#>.pnd
|
||||
|
@ -130,14 +130,14 @@
|
|||
%%
|
||||
%% On startup, the Penciller should look for the nonzero_*.crr file with the
|
||||
%% highest such manifest sequence number. This will be started as the
|
||||
%% manifest, together with any _0_0.sft file found at that Manifest SQN.
|
||||
%% manifest, together with any _0_0.sst file found at that Manifest SQN.
|
||||
%% Level zero files are not kept in the persisted manifest, and adding a L0
|
||||
%% file does not advanced the Manifest SQN.
|
||||
%%
|
||||
%% The pace at which the store can accept updates will be dependent on the
|
||||
%% speed at which the Penciller's Clerk can merge files at lower levels plus
|
||||
%% the time it takes to merge from Level 0. As if a clerk has commenced
|
||||
%% compaction work at a lower level and then immediately a L0 SFT file is
|
||||
%% compaction work at a lower level and then immediately a L0 SST file is
|
||||
%% written the Penciller will need to wait for this compaction work to
|
||||
%% complete and the L0 file to be compacted before the ETS table can be
|
||||
%% allowed to again reach capacity
|
||||
|
@ -145,7 +145,7 @@
|
|||
%% The writing of L0 files do not require the involvement of the clerk.
|
||||
%% The L0 files are prompted directly by the penciller when the in-memory tree
|
||||
%% has reached capacity. This places the penciller in a levelzero_pending
|
||||
%% state, and in this state it must return new pushes. Once the SFT file has
|
||||
%% state, and in this state it must return new pushes. Once the SST file has
|
||||
%% been completed it will confirm completion to the penciller which can then
|
||||
%% revert the levelzero_pending state, add the file to the manifest and clear
|
||||
%% the current level zero in-memory view.
|
||||
|
@ -399,10 +399,11 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys},
|
|||
List ->
|
||||
List
|
||||
end,
|
||||
SFTiter = initiate_rangequery_frommanifest(StartKey,
|
||||
SSTiter = initiate_rangequery_frommanifest(StartKey,
|
||||
EndKey,
|
||||
State#state.manifest),
|
||||
Acc = keyfolder({L0AsList, SFTiter},
|
||||
io:format("SSTiter on query ~w~n", [SSTiter]),
|
||||
Acc = keyfolder({L0AsList, SSTiter},
|
||||
{StartKey, EndKey},
|
||||
{AccFun, InitAcc},
|
||||
MaxKeys),
|
||||
|
@ -456,7 +457,7 @@ handle_cast({confirm_delete, FileName}, State=#state{is_snapshot=Snap})
|
|||
{true, Pid} ->
|
||||
UF1 = lists:keydelete(FileName, 1, State#state.unreferenced_files),
|
||||
leveled_log:log("P0005", [FileName]),
|
||||
ok = leveled_sft:sft_deleteconfirmed(Pid),
|
||||
ok = leveled_sst:sst_deleteconfirmed(Pid),
|
||||
{noreply, State#state{unreferenced_files=UF1}};
|
||||
_ ->
|
||||
{noreply, State}
|
||||
|
@ -525,7 +526,7 @@ terminate(Reason, State) ->
|
|||
leveled_log:log("P0009", []);
|
||||
{false, [], _N} ->
|
||||
L0Pid = roll_memory(UpdState, true),
|
||||
ok = leveled_sft:sft_close(L0Pid);
|
||||
ok = leveled_sst:sst_close(L0Pid);
|
||||
StatusTuple ->
|
||||
leveled_log:log("P0010", [StatusTuple])
|
||||
end,
|
||||
|
@ -533,7 +534,7 @@ terminate(Reason, State) ->
|
|||
% Tidy shutdown of individual files
|
||||
ok = close_files(0, UpdState#state.manifest),
|
||||
lists:foreach(fun({_FN, Pid, _SN}) ->
|
||||
ok = leveled_sft:sft_close(Pid) end,
|
||||
ok = leveled_sst:sst_close(Pid) end,
|
||||
UpdState#state.unreferenced_files),
|
||||
leveled_log:log("P0011", []),
|
||||
ok.
|
||||
|
@ -608,14 +609,14 @@ start_from_file(PCLopts) ->
|
|||
leveled_log:log("P0014", [MaxSQN]),
|
||||
|
||||
%% Find any L0 files
|
||||
L0FN = filepath(RootPath, TopManSQN, new_merge_files) ++ "_0_0.sft",
|
||||
L0FN = filepath(RootPath, TopManSQN, new_merge_files) ++ "_0_0.sst",
|
||||
case filelib:is_file(L0FN) of
|
||||
true ->
|
||||
leveled_log:log("P0015", [L0FN]),
|
||||
{ok,
|
||||
L0Pid,
|
||||
{L0StartKey, L0EndKey}} = leveled_sft:sft_open(L0FN),
|
||||
L0SQN = leveled_sft:sft_getmaxsequencenumber(L0Pid),
|
||||
{L0StartKey, L0EndKey}} = leveled_sst:sst_open(L0FN),
|
||||
L0SQN = leveled_sst:sst_getmaxsequencenumber(L0Pid),
|
||||
ManifestEntry = #manifest_entry{start_key=L0StartKey,
|
||||
end_key=L0EndKey,
|
||||
owner=L0Pid,
|
||||
|
@ -696,7 +697,7 @@ update_levelzero(L0Size, {PushedTree, MinSQN, MaxSQN},
|
|||
%% to an immediate return as expected. With 32K keys in the TreeList it could
|
||||
%% take around 35-40ms.
|
||||
%%
|
||||
%% To avoid blocking this gen_server, the SFT file can request each item of the
|
||||
%% To avoid blocking this gen_server, the SST file can request each item of the
|
||||
%% cache one at a time.
|
||||
%%
|
||||
%% The Wait is set to false to use a cast when calling this in normal operation
|
||||
|
@ -704,25 +705,22 @@ update_levelzero(L0Size, {PushedTree, MinSQN, MaxSQN},
|
|||
|
||||
roll_memory(State, false) ->
|
||||
FileName = levelzero_filename(State),
|
||||
leveled_log:log("P0019", [FileName]),
|
||||
Opts = #sft_options{wait=false, penciller=self()},
|
||||
leveled_log:log("P0019", [FileName, State#state.ledger_sqn]),
|
||||
PCL = self(),
|
||||
FetchFun = fun(Slot) -> pcl_fetchlevelzero(PCL, Slot) end,
|
||||
% FetchFun = fun(Slot) -> lists:nth(Slot, State#state.levelzero_cache) end,
|
||||
R = leveled_sft:sft_newfroml0cache(FileName,
|
||||
R = leveled_sst:sst_newlevelzero(FileName,
|
||||
length(State#state.levelzero_cache),
|
||||
FetchFun,
|
||||
Opts),
|
||||
PCL,
|
||||
State#state.ledger_sqn),
|
||||
{ok, Constructor, _} = R,
|
||||
Constructor;
|
||||
roll_memory(State, true) ->
|
||||
FileName = levelzero_filename(State),
|
||||
Opts = #sft_options{wait=true},
|
||||
FetchFun = fun(Slot) -> lists:nth(Slot, State#state.levelzero_cache) end,
|
||||
R = leveled_sft:sft_newfroml0cache(FileName,
|
||||
length(State#state.levelzero_cache),
|
||||
FetchFun,
|
||||
Opts),
|
||||
KVList = leveled_pmem:to_list(length(State#state.levelzero_cache),
|
||||
FetchFun),
|
||||
R = leveled_sst:sst_new(FileName, 0, KVList, State#state.ledger_sqn),
|
||||
{ok, Constructor, _} = R,
|
||||
Constructor.
|
||||
|
||||
|
@ -753,7 +751,7 @@ fetch_mem(Key, Hash, Manifest, L0Cache, none) ->
|
|||
L0Check = leveled_pmem:check_levelzero(Key, Hash, L0Cache),
|
||||
case L0Check of
|
||||
{false, not_found} ->
|
||||
fetch(Key, Hash, Manifest, 0, fun timed_sft_get/3);
|
||||
fetch(Key, Hash, Manifest, 0, fun timed_sst_get/3);
|
||||
{true, KV} ->
|
||||
{KV, 0}
|
||||
end;
|
||||
|
@ -762,7 +760,7 @@ fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
|
|||
true ->
|
||||
fetch_mem(Key, Hash, Manifest, L0Cache, none);
|
||||
false ->
|
||||
fetch(Key, Hash, Manifest, 0, fun timed_sft_get/3)
|
||||
fetch(Key, Hash, Manifest, 0, fun timed_sst_get/3)
|
||||
end.
|
||||
|
||||
fetch(_Key, _Hash, _Manifest, ?MAX_LEVELS + 1, _FetchFun) ->
|
||||
|
@ -791,9 +789,9 @@ fetch(Key, Hash, Manifest, Level, FetchFun) ->
|
|||
end
|
||||
end.
|
||||
|
||||
timed_sft_get(PID, Key, Hash) ->
|
||||
timed_sst_get(PID, Key, Hash) ->
|
||||
SW = os:timestamp(),
|
||||
R = leveled_sft:sft_get(PID, Key, Hash),
|
||||
R = leveled_sst:sst_get(PID, Key, Hash),
|
||||
T0 = timer:now_diff(os:timestamp(), SW),
|
||||
case {T0, R} of
|
||||
{T, R} when T < ?SLOW_FETCH ->
|
||||
|
@ -880,7 +878,7 @@ close_files(?MAX_LEVELS - 1, _Manifest) ->
|
|||
close_files(Level, Manifest) ->
|
||||
LevelList = get_item(Level, Manifest, []),
|
||||
lists:foreach(fun(F) ->
|
||||
ok = leveled_sft:sft_close(F#manifest_entry.owner) end,
|
||||
ok = leveled_sst:sst_close(F#manifest_entry.owner) end,
|
||||
LevelList),
|
||||
close_files(Level + 1, Manifest).
|
||||
|
||||
|
@ -897,8 +895,8 @@ open_all_filesinmanifest({Manifest, TopSQN}, Level) ->
|
|||
%5 replace them
|
||||
LvlR = lists:foldl(fun(F, {FL, FL_SQN}) ->
|
||||
FN = F#manifest_entry.filename,
|
||||
{ok, P, _Keys} = leveled_sft:sft_open(FN),
|
||||
F_SQN = leveled_sft:sft_getmaxsequencenumber(P),
|
||||
{ok, P, _Keys} = leveled_sst:sst_open(FN),
|
||||
F_SQN = leveled_sst:sst_getmaxsequencenumber(P),
|
||||
{lists:append(FL,
|
||||
[F#manifest_entry{owner = P}]),
|
||||
max(FL_SQN, F_SQN)}
|
||||
|
@ -932,24 +930,24 @@ initiate_rangequery_frommanifest(StartKey, EndKey, Manifest) ->
|
|||
C2 = leveled_codec:endkey_passed(EndKey,
|
||||
M#manifest_entry.start_key),
|
||||
not (C1 or C2) end,
|
||||
lists:foldl(fun(L, AccL) ->
|
||||
Level = get_item(L, Manifest, []),
|
||||
FL = lists:foldl(fun(M, Acc) ->
|
||||
case CompareFun(M) of
|
||||
true ->
|
||||
Acc ++ [{next_file, M}];
|
||||
false ->
|
||||
Acc
|
||||
end end,
|
||||
[],
|
||||
Level),
|
||||
case FL of
|
||||
[] -> AccL;
|
||||
FL -> AccL ++ [{L, FL}]
|
||||
end
|
||||
end,
|
||||
[],
|
||||
lists:seq(0, ?MAX_LEVELS - 1)).
|
||||
FoldFun =
|
||||
fun(L, AccL) ->
|
||||
Level = get_item(L, Manifest, []),
|
||||
FL = lists:foldl(fun(M, Acc) ->
|
||||
case CompareFun(M) of
|
||||
true ->
|
||||
Acc ++ [{next, M, StartKey}];
|
||||
false ->
|
||||
Acc
|
||||
end end,
|
||||
[],
|
||||
Level),
|
||||
case FL of
|
||||
[] -> AccL;
|
||||
FL -> AccL ++ [{L, FL}]
|
||||
end
|
||||
end,
|
||||
lists:foldl(FoldFun, [], lists:seq(0, ?MAX_LEVELS - 1)).
|
||||
|
||||
%% Looks to find the best choice for the next key across the levels (other
|
||||
%% than in-memory table)
|
||||
|
@ -960,22 +958,25 @@ find_nextkey(QueryArray, StartKey, EndKey) ->
|
|||
find_nextkey(QueryArray,
|
||||
0,
|
||||
{null, null},
|
||||
{fun leveled_sft:sft_getkvrange/4, StartKey, EndKey, 1}).
|
||||
StartKey,
|
||||
EndKey,
|
||||
1).
|
||||
|
||||
find_nextkey(_QueryArray, LCnt, {null, null}, _QueryFunT)
|
||||
find_nextkey(_QueryArray, LCnt, {null, null}, _StartKey, _EndKey, _Width)
|
||||
when LCnt > ?MAX_LEVELS ->
|
||||
% The array has been scanned wihtout finding a best key - must be
|
||||
% exhausted - respond to indicate no more keys to be found by the
|
||||
% iterator
|
||||
no_more_keys;
|
||||
find_nextkey(QueryArray, LCnt, {BKL, BestKV}, _QueryFunT)
|
||||
find_nextkey(QueryArray, LCnt, {BKL, BestKV}, _StartKey, _EndKey, _Width)
|
||||
when LCnt > ?MAX_LEVELS ->
|
||||
% All levels have been scanned, so need to remove the best result from
|
||||
% the array, and return that array along with the best key/sqn/status
|
||||
% combination
|
||||
{BKL, [BestKV|Tail]} = lists:keyfind(BKL, 1, QueryArray),
|
||||
{lists:keyreplace(BKL, 1, QueryArray, {BKL, Tail}), BestKV};
|
||||
find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, QueryFunT) ->
|
||||
find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV},
|
||||
StartKey, EndKey, Width) ->
|
||||
% Get the next key at this level
|
||||
{NextKey, RestOfKeys} = case lists:keyfind(LCnt, 1, QueryArray) of
|
||||
false ->
|
||||
|
@ -989,39 +990,46 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, QueryFunT) ->
|
|||
case {NextKey, BestKeyLevel, BestKV} of
|
||||
{null, BKL, BKV} ->
|
||||
% There is no key at this level - go to the next level
|
||||
find_nextkey(QueryArray, LCnt + 1, {BKL, BKV}, QueryFunT);
|
||||
{{next_file, ManifestEntry}, BKL, BKV} ->
|
||||
find_nextkey(QueryArray,
|
||||
LCnt + 1,
|
||||
{BKL, BKV},
|
||||
StartKey, EndKey, Width);
|
||||
{{next, ManifestEntry, _SK}, BKL, BKV} ->
|
||||
% The first key at this level is pointer to a file - need to query
|
||||
% the file to expand this level out before proceeding
|
||||
Owner = ManifestEntry#manifest_entry.owner,
|
||||
{QueryFun, StartKey, EndKey, ScanSize} = QueryFunT,
|
||||
QueryResult = QueryFun(Owner, StartKey, EndKey, ScanSize),
|
||||
NewEntry = {LCnt, QueryResult ++ RestOfKeys},
|
||||
Pointer = {next, Owner, StartKey, EndKey},
|
||||
UpdList = leveled_sst:expand_list_by_pointer(Pointer,
|
||||
RestOfKeys,
|
||||
Width),
|
||||
NewEntry = {LCnt, UpdList},
|
||||
% Need to loop around at this level (LCnt) as we have not yet
|
||||
% examined a real key at this level
|
||||
find_nextkey(lists:keyreplace(LCnt, 1, QueryArray, NewEntry),
|
||||
LCnt,
|
||||
{BKL, BKV},
|
||||
QueryFunT);
|
||||
{{next, SFTpid, NewStartKey}, BKL, BKV} ->
|
||||
StartKey, EndKey, Width);
|
||||
{{pointer, SSTPid, Slot, PSK, PEK}, BKL, BKV} ->
|
||||
% The first key at this level is pointer within a file - need to
|
||||
% query the file to expand this level out before proceeding
|
||||
{QueryFun, _StartKey, EndKey, ScanSize} = QueryFunT,
|
||||
QueryResult = QueryFun(SFTpid, NewStartKey, EndKey, ScanSize),
|
||||
NewEntry = {LCnt, QueryResult ++ RestOfKeys},
|
||||
Pointer = {pointer, SSTPid, Slot, PSK, PEK},
|
||||
UpdList = leveled_sst:expand_list_by_pointer(Pointer,
|
||||
RestOfKeys,
|
||||
Width),
|
||||
NewEntry = {LCnt, UpdList},
|
||||
% Need to loop around at this level (LCnt) as we have not yet
|
||||
% examined a real key at this level
|
||||
find_nextkey(lists:keyreplace(LCnt, 1, QueryArray, NewEntry),
|
||||
LCnt,
|
||||
{BKL, BKV},
|
||||
QueryFunT);
|
||||
StartKey, EndKey, Width);
|
||||
{{Key, Val}, null, null} ->
|
||||
% No best key set - so can assume that this key is the best key,
|
||||
% and check the lower levels
|
||||
find_nextkey(QueryArray,
|
||||
LCnt + 1,
|
||||
{LCnt, {Key, Val}},
|
||||
QueryFunT);
|
||||
StartKey, EndKey, Width);
|
||||
{{Key, Val}, _BKL, {BestKey, _BestVal}} when Key < BestKey ->
|
||||
% There is a real key and a best key to compare, and the real key
|
||||
% at this level is before the best key, and so is now the new best
|
||||
|
@ -1030,7 +1038,7 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, QueryFunT) ->
|
|||
find_nextkey(QueryArray,
|
||||
LCnt + 1,
|
||||
{LCnt, {Key, Val}},
|
||||
QueryFunT);
|
||||
StartKey, EndKey, Width);
|
||||
{{Key, Val}, BKL, {BestKey, BestVal}} when Key == BestKey ->
|
||||
SQN = leveled_codec:strip_to_seqonly({Key, Val}),
|
||||
BestSQN = leveled_codec:strip_to_seqonly({BestKey, BestVal}),
|
||||
|
@ -1041,7 +1049,7 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, QueryFunT) ->
|
|||
find_nextkey(lists:keyreplace(LCnt, 1, QueryArray, NewEntry),
|
||||
LCnt + 1,
|
||||
{BKL, {BestKey, BestVal}},
|
||||
QueryFunT);
|
||||
StartKey, EndKey, Width);
|
||||
SQN > BestSQN ->
|
||||
% There is a real key at the front of this level and it has
|
||||
% a higher SQN than the best key, so we should use this as
|
||||
|
@ -1056,29 +1064,32 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, QueryFunT) ->
|
|||
{BKL, BestTail}),
|
||||
LCnt + 1,
|
||||
{LCnt, {Key, Val}},
|
||||
QueryFunT)
|
||||
StartKey, EndKey, Width)
|
||||
end;
|
||||
{_, BKL, BKV} ->
|
||||
% This is not the best key
|
||||
find_nextkey(QueryArray, LCnt + 1, {BKL, BKV}, QueryFunT)
|
||||
find_nextkey(QueryArray,
|
||||
LCnt + 1,
|
||||
{BKL, BKV},
|
||||
StartKey, EndKey, Width)
|
||||
end.
|
||||
|
||||
|
||||
keyfolder(IMMiter, SFTiter, StartKey, EndKey, {AccFun, Acc}) ->
|
||||
keyfolder({IMMiter, SFTiter}, {StartKey, EndKey}, {AccFun, Acc}, -1).
|
||||
keyfolder(IMMiter, SSTiter, StartKey, EndKey, {AccFun, Acc}) ->
|
||||
keyfolder({IMMiter, SSTiter}, {StartKey, EndKey}, {AccFun, Acc}, -1).
|
||||
|
||||
keyfolder(_Iterators, _KeyRange, {_AccFun, Acc}, MaxKeys) when MaxKeys == 0 ->
|
||||
Acc;
|
||||
keyfolder({[], SFTiter}, KeyRange, {AccFun, Acc}, MaxKeys) ->
|
||||
keyfolder({[], SSTiter}, KeyRange, {AccFun, Acc}, MaxKeys) ->
|
||||
{StartKey, EndKey} = KeyRange,
|
||||
case find_nextkey(SFTiter, StartKey, EndKey) of
|
||||
case find_nextkey(SSTiter, StartKey, EndKey) of
|
||||
no_more_keys ->
|
||||
Acc;
|
||||
{NxSFTiter, {SFTKey, SFTVal}} ->
|
||||
Acc1 = AccFun(SFTKey, SFTVal, Acc),
|
||||
keyfolder({[], NxSFTiter}, KeyRange, {AccFun, Acc1}, MaxKeys - 1)
|
||||
{NxSSTiter, {SSTKey, SSTVal}} ->
|
||||
Acc1 = AccFun(SSTKey, SSTVal, Acc),
|
||||
keyfolder({[], NxSSTiter}, KeyRange, {AccFun, Acc1}, MaxKeys - 1)
|
||||
end;
|
||||
keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SFTiterator}, KeyRange,
|
||||
keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, KeyRange,
|
||||
{AccFun, Acc}, MaxKeys) ->
|
||||
{StartKey, EndKey} = KeyRange,
|
||||
case {IMMKey < StartKey, leveled_codec:endkey_passed(EndKey, IMMKey)} of
|
||||
|
@ -1087,7 +1098,7 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SFTiterator}, KeyRange,
|
|||
% Normally everything is pre-filterd, but the IMM iterator can
|
||||
% be re-used and so may be behind the StartKey if the StartKey has
|
||||
% advanced from the previous use
|
||||
keyfolder({NxIMMiterator, SFTiterator},
|
||||
keyfolder({NxIMMiterator, SSTiterator},
|
||||
KeyRange,
|
||||
{AccFun, Acc},
|
||||
MaxKeys);
|
||||
|
@ -1095,44 +1106,44 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SFTiterator}, KeyRange,
|
|||
% There are no more keys in-range in the in-memory
|
||||
% iterator, so take action as if this iterator is empty
|
||||
% (see above)
|
||||
keyfolder({[], SFTiterator},
|
||||
keyfolder({[], SSTiterator},
|
||||
KeyRange,
|
||||
{AccFun, Acc},
|
||||
MaxKeys);
|
||||
{false, false} ->
|
||||
case find_nextkey(SFTiterator, StartKey, EndKey) of
|
||||
case find_nextkey(SSTiterator, StartKey, EndKey) of
|
||||
no_more_keys ->
|
||||
% No more keys in range in the persisted store, so use the
|
||||
% in-memory KV as the next
|
||||
Acc1 = AccFun(IMMKey, IMMVal, Acc),
|
||||
keyfolder({NxIMMiterator, SFTiterator},
|
||||
keyfolder({NxIMMiterator, SSTiterator},
|
||||
KeyRange,
|
||||
{AccFun, Acc1},
|
||||
MaxKeys - 1);
|
||||
{NxSFTiterator, {SFTKey, SFTVal}} ->
|
||||
{NxSSTiterator, {SSTKey, SSTVal}} ->
|
||||
% There is a next key, so need to know which is the
|
||||
% next key between the two (and handle two keys
|
||||
% with different sequence numbers).
|
||||
case leveled_codec:key_dominates({IMMKey,
|
||||
IMMVal},
|
||||
{SFTKey,
|
||||
SFTVal}) of
|
||||
{SSTKey,
|
||||
SSTVal}) of
|
||||
left_hand_first ->
|
||||
Acc1 = AccFun(IMMKey, IMMVal, Acc),
|
||||
keyfolder({NxIMMiterator, SFTiterator},
|
||||
keyfolder({NxIMMiterator, SSTiterator},
|
||||
KeyRange,
|
||||
{AccFun, Acc1},
|
||||
MaxKeys - 1);
|
||||
right_hand_first ->
|
||||
Acc1 = AccFun(SFTKey, SFTVal, Acc),
|
||||
Acc1 = AccFun(SSTKey, SSTVal, Acc),
|
||||
keyfolder({[{IMMKey, IMMVal}|NxIMMiterator],
|
||||
NxSFTiterator},
|
||||
NxSSTiterator},
|
||||
KeyRange,
|
||||
{AccFun, Acc1},
|
||||
MaxKeys - 1);
|
||||
left_hand_dominant ->
|
||||
Acc1 = AccFun(IMMKey, IMMVal, Acc),
|
||||
keyfolder({NxIMMiterator, NxSFTiterator},
|
||||
keyfolder({NxIMMiterator, NxSSTiterator},
|
||||
KeyRange,
|
||||
{AccFun, Acc1},
|
||||
MaxKeys - 1)
|
||||
|
@ -1286,6 +1297,27 @@ confirm_delete(Filename, UnreferencedFiles, RegisteredSnapshots) ->
|
|||
|
||||
-ifdef(TEST).
|
||||
|
||||
|
||||
generate_randomkeys({Count, StartSQN}) ->
|
||||
generate_randomkeys(Count, StartSQN, []);
|
||||
generate_randomkeys(Count) ->
|
||||
generate_randomkeys(Count, 0, []).
|
||||
|
||||
generate_randomkeys(0, _SQN, Acc) ->
|
||||
lists:reverse(Acc);
|
||||
generate_randomkeys(Count, SQN, Acc) ->
|
||||
K = {o,
|
||||
lists:concat(["Bucket", random:uniform(1024)]),
|
||||
lists:concat(["Key", random:uniform(1024)]),
|
||||
null},
|
||||
RandKey = {K,
|
||||
{SQN,
|
||||
{active, infinity},
|
||||
leveled_codec:magic_hash(K),
|
||||
null}},
|
||||
generate_randomkeys(Count - 1, SQN + 1, [RandKey|Acc]).
|
||||
|
||||
|
||||
clean_testdir(RootPath) ->
|
||||
clean_subdir(filepath(RootPath, manifest)),
|
||||
clean_subdir(filepath(RootPath, files)).
|
||||
|
@ -1332,8 +1364,8 @@ compaction_work_assessment_test() ->
|
|||
?assertMatch([{1, Manifest3, 1}], WorkQ3).
|
||||
|
||||
confirm_delete_test() ->
|
||||
Filename = 'test.sft',
|
||||
UnreferencedFiles = [{'other.sft', dummy_owner, 15},
|
||||
Filename = 'test.sst',
|
||||
UnreferencedFiles = [{'other.sst', dummy_owner, 15},
|
||||
{Filename, dummy_owner, 10}],
|
||||
RegisteredIterators1 = [{dummy_pid, 16}, {dummy_pid, 12}],
|
||||
R1 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators1),
|
||||
|
@ -1376,20 +1408,20 @@ simple_server_test() ->
|
|||
Key1_Pre = {{o,"Bucket0001", "Key0001", null},
|
||||
{1, {active, infinity}, null}},
|
||||
Key1 = add_missing_hash(Key1_Pre),
|
||||
KL1 = leveled_sft:generate_randomkeys({1000, 2}),
|
||||
KL1 = generate_randomkeys({1000, 2}),
|
||||
Key2_Pre = {{o,"Bucket0002", "Key0002", null},
|
||||
{1002, {active, infinity}, null}},
|
||||
Key2 = add_missing_hash(Key2_Pre),
|
||||
KL2 = leveled_sft:generate_randomkeys({900, 1003}),
|
||||
KL2 = generate_randomkeys({900, 1003}),
|
||||
% Keep below the max table size by having 900 not 1000
|
||||
Key3_Pre = {{o,"Bucket0003", "Key0003", null},
|
||||
{2003, {active, infinity}, null}},
|
||||
Key3 = add_missing_hash(Key3_Pre),
|
||||
KL3 = leveled_sft:generate_randomkeys({1000, 2004}),
|
||||
KL3 = generate_randomkeys({1000, 2004}),
|
||||
Key4_Pre = {{o,"Bucket0004", "Key0004", null},
|
||||
{3004, {active, infinity}, null}},
|
||||
Key4 = add_missing_hash(Key4_Pre),
|
||||
KL4 = leveled_sft:generate_randomkeys({1000, 3005}),
|
||||
KL4 = generate_randomkeys({1000, 3005}),
|
||||
ok = maybe_pause_push(PCL, [Key1]),
|
||||
?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001", null})),
|
||||
ok = maybe_pause_push(PCL, KL1),
|
||||
|
@ -1464,7 +1496,7 @@ simple_server_test() ->
|
|||
Key1A_Pre = {{o,"Bucket0001", "Key0001", null},
|
||||
{4005, {active, infinity}, null}},
|
||||
Key1A = add_missing_hash(Key1A_Pre),
|
||||
KL1A = leveled_sft:generate_randomkeys({2000, 4006}),
|
||||
KL1A = generate_randomkeys({2000, 4006}),
|
||||
ok = maybe_pause_push(PCLr, [Key1A]),
|
||||
ok = maybe_pause_push(PCLr, KL1A),
|
||||
?assertMatch(true, pcl_checksequencenumber(PclSnap,
|
||||
|
@ -1528,17 +1560,16 @@ rangequery_manifest_test() ->
|
|||
end_key={o, "Bucket1", "K996", null},
|
||||
filename="Z6"}},
|
||||
Man = [{1, [E1, E2, E3]}, {2, [E4, E5, E6]}],
|
||||
R1 = initiate_rangequery_frommanifest({o, "Bucket1", "K711", null},
|
||||
{o, "Bucket1", "K999", null},
|
||||
Man),
|
||||
?assertMatch([{1, [{next_file, E3}]},
|
||||
{2, [{next_file, E5}, {next_file, E6}]}],
|
||||
SK1 = {o, "Bucket1", "K711", null},
|
||||
EK1 = {o, "Bucket1", "K999", null},
|
||||
R1 = initiate_rangequery_frommanifest(SK1, EK1, Man),
|
||||
?assertMatch([{1, [{next, E3, SK1}]},
|
||||
{2, [{next, E5, SK1}, {next, E6, SK1}]}],
|
||||
R1),
|
||||
R2 = initiate_rangequery_frommanifest({i, "Bucket1", {"Idx1", "Fld8"}, null},
|
||||
{i, "Bucket1", {"Idx1", "Fld8"}, null},
|
||||
Man),
|
||||
?assertMatch([{1, [{next_file, E1}]}, {2, [{next_file, E5}]}],
|
||||
R2),
|
||||
SK2 = {i, "Bucket1", {"Idx1", "Fld8"}, null},
|
||||
EK2 = {i, "Bucket1", {"Idx1", "Fld8"}, null},
|
||||
R2 = initiate_rangequery_frommanifest(SK2, EK2, Man),
|
||||
?assertMatch([{1, [{next, E1, SK2}]}, {2, [{next, E5, SK2}]}], R2),
|
||||
R3 = initiate_rangequery_frommanifest({i, "Bucket1", {"Idx0", "Fld8"}, null},
|
||||
{i, "Bucket1", {"Idx0", "Fld9"}, null},
|
||||
Man),
|
||||
|
@ -1693,17 +1724,18 @@ foldwithimm_simple_test() ->
|
|||
{{o, "Bucket1", "Key6"}, 7}], AccB).
|
||||
|
||||
create_file_test() ->
|
||||
Filename = "../test/new_file.sft",
|
||||
Filename = "../test/new_file.sst",
|
||||
ok = file:write_file(Filename, term_to_binary("hello")),
|
||||
KVL = lists:usort(leveled_sft:generate_randomkeys(10000)),
|
||||
KVL = lists:usort(generate_randomkeys(10000)),
|
||||
Tree = leveled_skiplist:from_list(KVL),
|
||||
FetchFun = fun(Slot) -> lists:nth(Slot, [Tree]) end,
|
||||
{ok,
|
||||
SP,
|
||||
noreply} = leveled_sft:sft_newfroml0cache(Filename,
|
||||
noreply} = leveled_sst:sst_newlevelzero(Filename,
|
||||
1,
|
||||
FetchFun,
|
||||
#sft_options{wait=false}),
|
||||
undefined,
|
||||
10000),
|
||||
lists:foreach(fun(X) ->
|
||||
case checkready(SP) of
|
||||
timeout ->
|
||||
|
@ -1716,9 +1748,9 @@ create_file_test() ->
|
|||
io:format("StartKey ~w EndKey ~w~n", [StartKey, EndKey]),
|
||||
?assertMatch({o, _, _, _}, StartKey),
|
||||
?assertMatch({o, _, _, _}, EndKey),
|
||||
?assertMatch("../test/new_file.sft", SrcFN),
|
||||
ok = leveled_sft:sft_clear(SP),
|
||||
{ok, Bin} = file:read_file("../test/new_file.sft.discarded"),
|
||||
?assertMatch("../test/new_file.sst", SrcFN),
|
||||
ok = leveled_sst:sst_clear(SP),
|
||||
{ok, Bin} = file:read_file("../test/new_file.sst.discarded"),
|
||||
?assertMatch("hello", binary_to_term(Bin)).
|
||||
|
||||
commit_manifest_test() ->
|
||||
|
@ -1735,14 +1767,14 @@ commit_manifest_test() ->
|
|||
ok = file:write_file(ManifestFP ++ "nonzero_1.pnd",
|
||||
term_to_binary("dummy data")),
|
||||
|
||||
L1_0 = [{1, [#manifest_entry{filename="1.sft"}]}],
|
||||
L1_0 = [{1, [#manifest_entry{filename="1.sst"}]}],
|
||||
Resp_WI0 = Resp_WI#penciller_work{new_manifest=L1_0,
|
||||
unreferenced_files=[]},
|
||||
{ok, State0} = commit_manifest_change(Resp_WI0, State),
|
||||
?assertMatch(1, State0#state.manifest_sqn),
|
||||
?assertMatch([], get_item(0, State0#state.manifest, [])),
|
||||
|
||||
L0Entry = [#manifest_entry{filename="0.sft"}],
|
||||
L0Entry = [#manifest_entry{filename="0.sst"}],
|
||||
ManifestPlus = [{0, L0Entry}|State0#state.manifest],
|
||||
|
||||
NxtSent_WI = #penciller_work{next_sqn=2,
|
||||
|
@ -1756,7 +1788,7 @@ commit_manifest_test() ->
|
|||
ok = file:write_file(ManifestFP ++ "nonzero_2.pnd",
|
||||
term_to_binary("dummy data")),
|
||||
|
||||
L2_0 = [#manifest_entry{filename="2.sft"}],
|
||||
L2_0 = [#manifest_entry{filename="2.sst"}],
|
||||
NxtResp_WI0 = NxtResp_WI#penciller_work{new_manifest=[{2, L2_0}],
|
||||
unreferenced_files=[]},
|
||||
{ok, State2} = commit_manifest_change(NxtResp_WI0, State1),
|
||||
|
@ -1777,7 +1809,7 @@ badmanifest_test() ->
|
|||
Key1_pre = {{o,"Bucket0001", "Key0001", null},
|
||||
{1001, {active, infinity}, null}},
|
||||
Key1 = add_missing_hash(Key1_pre),
|
||||
KL1 = leveled_sft:generate_randomkeys({1000, 1}),
|
||||
KL1 = generate_randomkeys({1000, 1}),
|
||||
|
||||
ok = maybe_pause_push(PCL, KL1 ++ [Key1]),
|
||||
%% Added together, as split apart there will be a race between the close
|
||||
|
@ -1798,7 +1830,7 @@ badmanifest_test() ->
|
|||
|
||||
checkready(Pid) ->
|
||||
try
|
||||
leveled_sft:sft_checkready(Pid)
|
||||
leveled_sst:sst_checkready(Pid)
|
||||
catch
|
||||
exit:{timeout, _} ->
|
||||
timeout
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue