Merge pull request #16 from martinsumner/mas-manifesttidy-simple

Penciller Manifest - Clean API
This commit is contained in:
martinsumner 2017-01-17 21:14:12 +00:00 committed by GitHub
commit 213a2e93fb
8 changed files with 1068 additions and 945 deletions

View file

@ -20,18 +20,6 @@
expire_tombstones = false :: boolean(),
penciller :: pid()}).
-record(penciller_work,
{next_sqn :: integer(),
clerk :: pid(),
src_level :: integer(),
manifest :: list(),
start_time :: tuple(),
ledger_filepath :: string(),
manifest_file :: string(),
new_manifest :: list(),
unreferenced_files :: list(),
target_is_basement = false ::boolean()}).
-record(level,
{level :: integer(),
is_basement = false :: boolean(),

View file

@ -1 +0,0 @@
[].

View file

@ -282,6 +282,8 @@ turn_to_string(Item) ->
% Compare a key against a query key, only comparing elements that are non-null
% in the Query key. This is used for comparing against end keys in queries.
endkey_passed(all, _) ->
false;
endkey_passed({EK1, null, null, null}, {CK1, _, _, _}) ->
EK1 < CK1;
endkey_passed({EK1, EK2, null, null}, {CK1, CK2, _, _}) ->

View file

@ -65,8 +65,7 @@
{"P0004",
{info, "Remaining ledger snapshots are ~w"}},
{"P0005",
{info, "Delete confirmed as file ~s is removed from " ++
"unreferenced files"}},
{info, "Delete confirmed as file ~s is removed from Manifest"}},
{"P0006",
{info, "Orphaned reply after timeout on L0 file write ~s"}},
{"P0007",
@ -74,8 +73,6 @@
++ "reason ~w"}},
{"P0008",
{info, "Penciller closing for reason ~w"}},
{"P0009",
{info, "Level 0 cache empty at close of Penciller"}},
{"P0010",
{info, "No level zero action on close of Penciller ~w"}},
{"P0011",
@ -97,9 +94,6 @@
++ "L0 pending ~w and merge backlog ~w"}},
{"P0019",
{info, "Rolling level zero to filename ~s at ledger sqn ~w"}},
{"P0020",
{info, "Work at Level ~w to be scheduled for ~w with ~w "
++ "queue items outstanding at all levels"}},
{"P0021",
{info, "Allocation of work blocked as L0 pending"}},
{"P0022",
@ -108,7 +102,8 @@
{info, "Manifest entry of startkey ~s ~s ~s endkey ~s ~s ~s "
++ "filename=~s~n"}},
{"P0024",
{info, "Outstanding compaction work items of ~w at level ~w"}},
{info, "Outstanding compaction work items of ~w with backlog status "
++ "of ~w"}},
{"P0025",
{info, "Merge to sqn ~w from Level ~w completed"}},
{"P0026",
@ -125,7 +120,17 @@
{info, "Completion of update to levelzero"}},
{"P0032",
{info, "Head timing for result ~w is sample ~w total ~w and max ~w"}},
{"P0033",
{error, "Corrupted manifest file at path ~s to be ignored "
++ "due to error ~w"}},
{"P0034",
{warn, "Snapshot with pid ~w timed out and so deletion will "
++ "continue regardless"}},
{"P0035",
{info, "Startup with Manifest SQN of ~w"}},
{"P0036",
{info, "Garbage collection on mnaifest removes key for filename ~s"}},
{"PC001",
{info, "Penciller's clerk ~w started with owner ~w"}},
{"PC002",
@ -147,15 +152,22 @@
{"PC010",
{info, "Merge to be commenced for FileToMerge=~s with MSN=~w"}},
{"PC011",
{info, "Merge completed with MSN=~w Level=~w and FileCounter=~w"}},
{info, "Merge completed with MSN=~w to Level=~w and FileCounter=~w"}},
{"PC012",
{info, "File to be created as part of MSN=~w Filename=~s"}},
{info, "File to be created as part of MSN=~w Filename=~s "
++ "IsBasement=~w"}},
{"PC013",
{warn, "Merge resulted in empty file ~s"}},
{"PC015",
{info, "File created"}},
{"PC016",
{info, "Slow fetch from SFT ~w of ~w microseconds with result ~w"}},
{"PC017",
{info, "Notified clerk of manifest change"}},
{"PC018",
{info, "Saved manifest file"}},
{"PC019",
{debug, "After ~s level ~w is ~w"}},
{"I0001",
{info, "Unexpected failure to fetch value for Key=~w SQN=~w "
@ -250,6 +262,8 @@
{info, "Completed creation of ~s at level ~w with max sqn ~w"}},
{"SST09",
{warn, "Read request exposes slot with bad CRC"}},
{"SST10",
{info, "Expansion sought to support pointer to pid ~w status ~w"}},
{"CDB01",
{info, "Opening file for writing with filename ~s"}},

View file

@ -2,10 +2,9 @@
%%
%% The Penciller's clerk is responsible for compaction work within the Ledger.
%%
%% The Clerk will periodically poll the Penciller to see if there is work for
%% it to complete, except if the Clerk has informed the Penciller that it has
%% readied a manifest change to be committed - in which case it will wait to
%% be called by the Penciller.
%% The Clerk will periodically poll the Penciller to check there is no work
%% at level zero pending completion, and if not the Clerk will examine the
%% manifest to see if work is necessary.
%%
%% -------- COMMITTING MANIFEST CHANGES ---------
%%
@ -18,35 +17,7 @@
%% certain that the manifest change has been committed. Some uncollected
%% garbage is considered acceptable.
%%
%% The process of committing a manifest change is as follows:
%%
%% A - The Clerk completes a merge, and casts a prompt to the Penciller with
%% a work item describing the change
%%
%% B - The Penciller commits the change to disk, and then calls the Clerk to
%% confirm the manifest change
%%
%% C - The Clerk replies immediately to acknowledge this call, then marks the
%% removed files for deletion
%%
%% Shutdown < A/B - If the Penciller starts the shutdown process before the
%% merge is complete, in the shutdown the Penciller will call a request for the
%% manifest change which will pick up the pending change. It will then confirm
%% the change, and now the Clerk will mark the files for delete before it
%% replies to the Penciller so it can complete the shutdown process (which will
%% prompt erasing of the removed files).
%%
%% The clerk will not request work on timeout if the committing of a manifest
%% change is pending confirmation.
%%
%% -------- TIMEOUTS ---------
%%
%% The Penciller may prompt the Clerk to callback soon (i.e. reduce the
%% Timeout) if it has urgent work ready (i.e. it has written a L0 file).
%%
%% There will also be a natural quick timeout once the committing of a manifest
%% change has occurred.
%%
-module(leveled_pclerk).
@ -54,42 +25,54 @@
-include("include/leveled.hrl").
-export([init/1,
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
clerk_new/1,
code_change/3
]).
-export([
clerk_new/2,
clerk_prompt/1,
clerk_manifestchange/3,
code_change/3]).
clerk_push/2,
clerk_close/1,
clerk_promptdeletions/2
]).
-include_lib("eunit/include/eunit.hrl").
-define(MAX_TIMEOUT, 2000).
-define(MIN_TIMEOUT, 50).
-define(MIN_TIMEOUT, 200).
-record(state, {owner :: pid(),
change_pending=false :: boolean(),
work_item :: #penciller_work{}|null}).
root_path :: string(),
pending_deletions = dict:new() % OTP 16 does not like type
}).
%%%============================================================================
%%% API
%%%============================================================================
clerk_new(Owner) ->
clerk_new(Owner, Manifest) ->
{ok, Pid} = gen_server:start(?MODULE, [], []),
ok = gen_server:call(Pid, {register, Owner}, infinity),
ok = gen_server:call(Pid, {load, Owner, Manifest}, infinity),
leveled_log:log("PC001", [Pid, Owner]),
{ok, Pid}.
clerk_manifestchange(Pid, Action, Closing) ->
gen_server:call(Pid, {manifest_change, Action, Closing}, infinity).
clerk_prompt(Pid) ->
gen_server:cast(Pid, prompt).
clerk_promptdeletions(Pid, ManifestSQN) ->
gen_server:cast(Pid, {prompt_deletions, ManifestSQN}).
clerk_push(Pid, Work) ->
gen_server:cast(Pid, {push_work, Work}).
clerk_close(Pid) ->
gen_server:call(Pid, close, 20000).
%%%============================================================================
%%% gen_server callbacks
@ -98,53 +81,26 @@ clerk_prompt(Pid) ->
init([]) ->
{ok, #state{}}.
handle_call({register, Owner}, _From, State) ->
{reply,
ok,
State#state{owner=Owner},
?MIN_TIMEOUT};
handle_call({manifest_change, return, true}, _From, State) ->
leveled_log:log("PC002", []),
case State#state.change_pending of
true ->
WI = State#state.work_item,
{reply, {ok, WI}, State};
false ->
{stop, normal, no_change, State}
end;
handle_call({manifest_change, confirm, Closing}, From, State) ->
case Closing of
true ->
leveled_log:log("PC003", []),
WI = State#state.work_item,
ok = mark_for_delete(WI#penciller_work.unreferenced_files,
State#state.owner),
{stop, normal, ok, State};
false ->
leveled_log:log("PC004", []),
gen_server:reply(From, ok),
WI = State#state.work_item,
ok = mark_for_delete(WI#penciller_work.unreferenced_files,
State#state.owner),
{noreply,
State#state{work_item=null, change_pending=false},
?MIN_TIMEOUT}
end.
handle_call({load, Owner, RootPath}, _From, State) ->
{reply, ok, State#state{owner=Owner, root_path=RootPath}, ?MIN_TIMEOUT};
handle_call(close, _From, State) ->
{stop, normal, ok, State}.
handle_cast(prompt, State) ->
{noreply, State, ?MIN_TIMEOUT}.
handle_info(timeout, State=#state{change_pending=Pnd}) when Pnd == false ->
case requestandhandle_work(State) of
{false, Timeout} ->
{noreply, State, Timeout};
{true, WI} ->
% No timeout now as will wait for call to return manifest
% change
{noreply,
State#state{change_pending=true, work_item=WI}}
end.
handle_info(timeout, State);
handle_cast({push_work, Work}, State) ->
{ManifestSQN, Deletions} = handle_work(Work, State),
PDs = dict:store(ManifestSQN, Deletions, State#state.pending_deletions),
{noreply, State#state{pending_deletions = PDs}, ?MAX_TIMEOUT};
handle_cast({prompt_deletions, ManifestSQN}, State) ->
Deletions = dict:fetch(ManifestSQN, State#state.pending_deletions),
ok = notify_deletions(Deletions, State#state.owner),
UpdDeletions = dict:erase(ManifestSQN, State#state.pending_deletions),
{noreply, State#state{pending_deletions = UpdDeletions}, ?MIN_TIMEOUT}.
handle_info(timeout, State) ->
request_work(State),
{noreply, State, ?MAX_TIMEOUT}.
terminate(Reason, _State) ->
leveled_log:log("PC005", [self(), Reason]).
@ -157,185 +113,117 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%============================================================================
requestandhandle_work(State) ->
case leveled_penciller:pcl_workforclerk(State#state.owner) of
none ->
leveled_log:log("PC006", []),
{false, ?MAX_TIMEOUT};
WI ->
{NewManifest, FilesToDelete} = merge(WI),
UpdWI = WI#penciller_work{new_manifest=NewManifest,
unreferenced_files=FilesToDelete},
leveled_log:log("PC007", []),
ok = leveled_penciller:pcl_promptmanifestchange(State#state.owner,
UpdWI),
{true, UpdWI}
end.
request_work(State) ->
ok = leveled_penciller:pcl_workforclerk(State#state.owner).
handle_work({SrcLevel, Manifest}, State) ->
{UpdManifest, EntriesToDelete} = merge(SrcLevel,
Manifest,
State#state.root_path),
leveled_log:log("PC007", []),
SWMC = os:timestamp(),
ok = leveled_penciller:pcl_manifestchange(State#state.owner,
UpdManifest),
leveled_log:log_timer("PC017", [], SWMC),
SWSM = os:timestamp(),
ok = leveled_pmanifest:save_manifest(UpdManifest,
State#state.root_path),
leveled_log:log_timer("PC018", [], SWSM),
{leveled_pmanifest:get_manifest_sqn(UpdManifest), EntriesToDelete}.
merge(WI) ->
SrcLevel = WI#penciller_work.src_level,
{SrcF, UpdMFest1} = select_filetomerge(SrcLevel,
WI#penciller_work.manifest),
SinkFiles = get_item(SrcLevel + 1, UpdMFest1, []),
{Candidates, Others} = check_for_merge_candidates(SrcF, SinkFiles),
%% TODO:
%% Need to work out if this is the top level
%% And then tell merge process to create files at the top level
%% Which will include the reaping of expired tombstones
leveled_log:log("PC008", [SrcLevel, length(Candidates)]),
MergedFiles = case length(Candidates) of
merge(SrcLevel, Manifest, RootPath) ->
Src = leveled_pmanifest:mergefile_selector(Manifest, SrcLevel),
NewSQN = leveled_pmanifest:get_manifest_sqn(Manifest) + 1,
SinkList = leveled_pmanifest:merge_lookup(Manifest,
SrcLevel + 1,
Src#manifest_entry.start_key,
Src#manifest_entry.end_key),
Candidates = length(SinkList),
leveled_log:log("PC008", [SrcLevel, Candidates]),
case Candidates of
0 ->
%% If no overlapping candiates, manifest change only required
%%
%% TODO: need to think still about simply renaming when at
%% lower level
leveled_log:log("PC009",
[SrcF#manifest_entry.filename, SrcLevel + 1]),
[SrcF];
[Src#manifest_entry.filename, SrcLevel + 1]),
Man0 = leveled_pmanifest:switch_manifest_entry(Manifest,
NewSQN,
SrcLevel,
Src),
{Man0, []};
_ ->
perform_merge({SrcF#manifest_entry.owner,
SrcF#manifest_entry.filename},
Candidates,
{SrcLevel, WI#penciller_work.target_is_basement},
{WI#penciller_work.ledger_filepath,
WI#penciller_work.next_sqn})
end,
NewLevel = lists:sort(lists:append(MergedFiles, Others)),
UpdMFest2 = lists:keystore(SrcLevel + 1,
1,
UpdMFest1,
{SrcLevel + 1, NewLevel}),
ok = filelib:ensure_dir(WI#penciller_work.manifest_file),
{ok, Handle} = file:open(WI#penciller_work.manifest_file,
[binary, raw, write]),
ok = file:write(Handle, term_to_binary(UpdMFest2)),
ok = file:close(Handle),
case lists:member(SrcF, MergedFiles) of
true ->
{UpdMFest2, Candidates};
false ->
%% Can rub out src file as it is not part of output
{UpdMFest2, Candidates ++ [SrcF]}
FilePath = leveled_penciller:filepath(RootPath,
NewSQN,
new_merge_files),
perform_merge(Manifest, Src, SinkList, SrcLevel, FilePath, NewSQN)
end.
mark_for_delete([], _Penciller) ->
notify_deletions([], _Penciller) ->
ok;
mark_for_delete([Head|Tail], Penciller) ->
notify_deletions([Head|Tail], Penciller) ->
ok = leveled_sst:sst_setfordelete(Head#manifest_entry.owner, Penciller),
mark_for_delete(Tail, Penciller).
check_for_merge_candidates(SrcF, SinkFiles) ->
lists:partition(fun(Ref) ->
case {Ref#manifest_entry.start_key,
Ref#manifest_entry.end_key} of
{_, EK} when SrcF#manifest_entry.start_key > EK ->
false;
{SK, _} when SrcF#manifest_entry.end_key < SK ->
false;
_ ->
true
end end,
SinkFiles).
%% An algorithm for discovering which files to merge ....
%% We can find the most optimal file:
%% - The one with the most overlapping data below?
%% - The one that overlaps with the fewest files below?
%% - The smallest file?
%% We could try and be fair in some way (merge oldest first)
%% Ultimately, there is a lack of certainty that being fair or optimal is
%% genuinely better - eventually every file has to be compacted.
%%
%% Hence, the initial implementation is to select files to merge at random
select_filetomerge(SrcLevel, Manifest) ->
{SrcLevel, LevelManifest} = lists:keyfind(SrcLevel, 1, Manifest),
Selected = lists:nth(random:uniform(length(LevelManifest)),
LevelManifest),
UpdManifest = lists:keyreplace(SrcLevel,
1,
Manifest,
{SrcLevel,
lists:delete(Selected,
LevelManifest)}),
{Selected, UpdManifest}.
notify_deletions(Tail, Penciller).
%% Assumption is that there is a single SST from a higher level that needs
%% to be merged into multiple SSTs at a lower level. This should create an
%% entirely new set of SSTs, and the calling process can then update the
%% manifest.
%% to be merged into multiple SSTs at a lower level.
%%
%% Once the FileToMerge has been emptied, the remainder of the candidate list
%% needs to be placed in a remainder SST that may be of a sub-optimal (small)
%% size. This stops the need to perpetually roll over the whole level if the
%% level consists of already full files. Some smartness may be required when
%% selecting the candidate list so that small files just outside the candidate
%% list be included to avoid a proliferation of small files.
%%
%% FileToMerge should be a tuple of {FileName, Pid} where the Pid is the Pid of
%% the gen_server leveled_sft process representing the file.
%%
%% CandidateList should be a list of {StartKey, EndKey, Pid} tuples
%% representing different gen_server leveled_sft processes, sorted by StartKey.
%%
%% The level is the level which the new files should be created at.
%% SrcLevel is the level of the src sst file, the sink should be srcLevel + 1
perform_merge({SrcPid, SrcFN}, CandidateList, LevelInfo, {Filepath, MSN}) ->
leveled_log:log("PC010", [SrcFN, MSN]),
PointerList = lists:map(fun(P) ->
{next, P#manifest_entry.owner, all} end,
CandidateList),
MaxSQN = leveled_sst:sst_getmaxsequencenumber(SrcPid),
do_merge([{next, SrcPid, all}],
PointerList,
LevelInfo,
{Filepath, MSN},
MaxSQN,
0,
[]).
perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN) ->
leveled_log:log("PC010", [Src#manifest_entry.filename, NewSQN]),
SrcList = [{next, Src, all}],
MaxSQN = leveled_sst:sst_getmaxsequencenumber(Src#manifest_entry.owner),
SinkLevel = SrcLevel + 1,
SinkBasement = leveled_pmanifest:is_basement(Manifest, SinkLevel),
Additions = do_merge(SrcList, SinkList,
SinkLevel, SinkBasement,
RootPath, NewSQN, MaxSQN,
[]),
RevertPointerFun =
fun({next, ME, _SK}) ->
ME
end,
SinkManifestList = lists:map(RevertPointerFun, SinkList),
Man0 = leveled_pmanifest:remove_manifest_entry(Manifest,
NewSQN,
SinkLevel,
SinkManifestList),
Man1 = leveled_pmanifest:insert_manifest_entry(Man0,
NewSQN,
SinkLevel,
Additions),
Man2 = leveled_pmanifest:remove_manifest_entry(Man1,
NewSQN,
SrcLevel,
Src),
{Man2, [Src|SinkManifestList]}.
do_merge([], [], {SrcLevel, _IsB}, {_Filepath, MSN}, _MaxSQN,
FileCounter, OutList) ->
leveled_log:log("PC011", [MSN, SrcLevel, FileCounter]),
OutList;
do_merge(KL1, KL2, {SrcLevel, IsB}, {Filepath, MSN}, MaxSQN,
FileCounter, OutList) ->
FileName = lists:flatten(io_lib:format(Filepath ++ "_~w_~w.sst",
[SrcLevel + 1, FileCounter])),
leveled_log:log("PC012", [MSN, FileName]),
do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, Additions) ->
leveled_log:log("PC011", [NewSQN, SinkLevel, length(Additions)]),
Additions;
do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, Additions) ->
FileName = lists:flatten(io_lib:format(RP ++ "_~w_~w.sst",
[SinkLevel, length(Additions)])),
leveled_log:log("PC012", [NewSQN, FileName, SinkB]),
TS1 = os:timestamp(),
case leveled_sst:sst_new(FileName, KL1, KL2, IsB, SrcLevel + 1, MaxSQN) of
case leveled_sst:sst_new(FileName, KL1, KL2, SinkB, SinkLevel, MaxSQN) of
empty ->
leveled_log:log("PC013", [FileName]),
OutList;
do_merge([], [],
SinkLevel, SinkB,
RP, NewSQN, MaxSQN,
Additions);
{ok, Pid, Reply} ->
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply,
ExtMan = lists:append(OutList,
[#manifest_entry{start_key=SmallestKey,
end_key=HighestKey,
owner=Pid,
filename=FileName}]),
Entry = #manifest_entry{start_key=SmallestKey,
end_key=HighestKey,
owner=Pid,
filename=FileName},
leveled_log:log_timer("PC015", [], TS1),
do_merge(KL1Rem, KL2Rem,
{SrcLevel, IsB}, {Filepath, MSN}, MaxSQN,
FileCounter + 1, ExtMan)
end.
get_item(Index, List, Default) ->
case lists:keysearch(Index, 1, List) of
{value, {Index, Value}} ->
Value;
false ->
Default
SinkLevel, SinkB,
RP, NewSQN, MaxSQN,
Additions ++ [Entry])
end.
@ -361,26 +249,6 @@ generate_randomkeys(Count, Acc, BucketLow, BRange) ->
null}},
generate_randomkeys(Count - 1, [RandKey|Acc], BucketLow, BRange).
choose_pid_toquery([ManEntry|_T], Key) when
Key >= ManEntry#manifest_entry.start_key,
ManEntry#manifest_entry.end_key >= Key ->
ManEntry#manifest_entry.owner;
choose_pid_toquery([_H|T], Key) ->
choose_pid_toquery(T, Key).
find_randomkeys(_FList, 0, _Source) ->
ok;
find_randomkeys(FList, Count, Source) ->
KV1 = lists:nth(random:uniform(length(Source)), Source),
K1 = leveled_codec:strip_to_keyonly(KV1),
P1 = choose_pid_toquery(FList, K1),
FoundKV = leveled_sst:sst_get(P1, K1),
Found = leveled_codec:strip_to_keyonly(FoundKV),
io:format("success finding ~w in ~w~n", [K1, P1]),
?assertMatch(K1, Found),
find_randomkeys(FList, Count - 1, Source).
merge_file_test() ->
KL1_L1 = lists:sort(generate_randomkeys(8000, 0, 1000)),
@ -408,57 +276,40 @@ merge_file_test() ->
2,
KL4_L2,
undefined),
Result = perform_merge({PidL1_1, "../test/KL1_L1.sst"},
[#manifest_entry{owner=PidL2_1},
#manifest_entry{owner=PidL2_2},
#manifest_entry{owner=PidL2_3},
#manifest_entry{owner=PidL2_4}],
{2, false}, {"../test/", 99}),
lists:foreach(fun(ManEntry) ->
{o, B1, K1} = ManEntry#manifest_entry.start_key,
{o, B2, K2} = ManEntry#manifest_entry.end_key,
io:format("Result of ~s ~s and ~s ~s with Pid ~w~n",
[B1, K1, B2, K2, ManEntry#manifest_entry.owner]) end,
Result),
io:format("Finding keys in KL1_L1~n"),
ok = find_randomkeys(Result, 50, KL1_L1),
io:format("Finding keys in KL1_L2~n"),
ok = find_randomkeys(Result, 50, KL1_L2),
io:format("Finding keys in KL2_L2~n"),
ok = find_randomkeys(Result, 50, KL2_L2),
io:format("Finding keys in KL3_L2~n"),
ok = find_randomkeys(Result, 50, KL3_L2),
io:format("Finding keys in KL4_L2~n"),
ok = find_randomkeys(Result, 50, KL4_L2),
leveled_sst:sst_clear(PidL1_1),
leveled_sst:sst_clear(PidL2_1),
leveled_sst:sst_clear(PidL2_2),
leveled_sst:sst_clear(PidL2_3),
leveled_sst:sst_clear(PidL2_4),
lists:foreach(fun(ManEntry) ->
leveled_sst:sst_clear(ManEntry#manifest_entry.owner) end,
Result).
select_merge_candidates_test() ->
Sink1 = #manifest_entry{start_key = {o, "Bucket", "Key1"},
end_key = {o, "Bucket", "Key20000"}},
Sink2 = #manifest_entry{start_key = {o, "Bucket", "Key20001"},
end_key = {o, "Bucket1", "Key1"}},
Src1 = #manifest_entry{start_key = {o, "Bucket", "Key40001"},
end_key = {o, "Bucket", "Key60000"}},
{Candidates, Others} = check_for_merge_candidates(Src1, [Sink1, Sink2]),
?assertMatch([Sink2], Candidates),
?assertMatch([Sink1], Others).
select_merge_file_test() ->
L0 = [{{o, "B1", "K1"}, {o, "B3", "K3"}, dummy_pid}],
L1 = [{{o, "B1", "K1"}, {o, "B2", "K2"}, dummy_pid},
{{o, "B2", "K3"}, {o, "B4", "K4"}, dummy_pid}],
Manifest = [{0, L0}, {1, L1}],
{FileRef, NewManifest} = select_filetomerge(0, Manifest),
?assertMatch(FileRef, {{o, "B1", "K1"}, {o, "B3", "K3"}, dummy_pid}),
?assertMatch(NewManifest, [{0, []}, {1, L1}]).
E1 = #manifest_entry{owner = PidL1_1,
filename = "../test/KL1_L1.sst",
end_key = lists:last(KL1_L1),
start_key = lists:nth(1, KL1_L1)},
E2 = #manifest_entry{owner = PidL2_1,
filename = "../test/KL1_L2.sst",
end_key = lists:last(KL1_L2),
start_key = lists:nth(1, KL1_L2)},
E3 = #manifest_entry{owner = PidL2_2,
filename = "../test/KL2_L2.sst",
end_key = lists:last(KL2_L2),
start_key = lists:nth(1, KL2_L2)},
E4 = #manifest_entry{owner = PidL2_3,
filename = "../test/KL3_L2.sst",
end_key = lists:last(KL3_L2),
start_key = lists:nth(1, KL3_L2)},
E5 = #manifest_entry{owner = PidL2_4,
filename = "../test/KL4_L2.sst",
end_key = lists:last(KL4_L2),
start_key = lists:nth(1, KL4_L2)},
Man0 = leveled_pmanifest:new_manifest(),
Man1 = leveled_pmanifest:insert_manifest_entry(Man0, 1, 2, E2),
Man2 = leveled_pmanifest:insert_manifest_entry(Man1, 1, 2, E3),
Man3 = leveled_pmanifest:insert_manifest_entry(Man2, 1, 2, E4),
Man4 = leveled_pmanifest:insert_manifest_entry(Man3, 1, 2, E5),
Man5 = leveled_pmanifest:insert_manifest_entry(Man4, 2, 1, E1),
PointerList = lists:map(fun(ME) -> {next, ME, all} end,
[E2, E3, E4, E5]),
{Man6, _Dels} = perform_merge(Man5, E1, PointerList, 1, "../test", 3),
?assertMatch(3, leveled_pmanifest:get_manifest_sqn(Man6)).
coverage_cheat_test() ->
{ok, _State1} = code_change(null, #state{}, null).

View file

@ -161,12 +161,15 @@
-include("include/leveled.hrl").
-export([init/1,
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3,
code_change/3]).
-export([
pcl_start/1,
pcl_pushmem/2,
pcl_fetchlevelzero/2,
@ -176,15 +179,18 @@
pcl_fetchnextkey/5,
pcl_checksequencenumber/3,
pcl_workforclerk/1,
pcl_promptmanifestchange/2,
pcl_manifestchange/2,
pcl_confirml0complete/4,
pcl_confirmdelete/2,
pcl_confirmdelete/3,
pcl_close/1,
pcl_doom/1,
pcl_registersnapshot/2,
pcl_releasesnapshot/2,
pcl_loadsnapshot/2,
pcl_getstartupsequencenumber/1,
pcl_getstartupsequencenumber/1]).
-export([
filepath/3,
clean_testdir/1]).
-include_lib("eunit/include/eunit.hrl").
@ -206,13 +212,12 @@
-define(COIN_SIDECOUNT, 5).
-define(SLOW_FETCH, 20000).
-define(ITERATOR_SCANWIDTH, 4).
-define(SNAPSHOT_TIMEOUT, 3600).
-record(state, {manifest = [] :: list(),
manifest_sqn = 0 :: integer(),
ledger_sqn = 0 :: integer(), % The highest SQN added to L0
-record(state, {manifest, % a manifest record from the leveled_manifest module
persisted_sqn = 0 :: integer(), % The highest SQN persisted
registered_snapshots = [] :: list(),
unreferenced_files = [] :: list(),
ledger_sqn = 0 :: integer(), % The highest SQN added to L0
root_path = "../test" :: string(),
clerk :: pid(),
@ -230,8 +235,8 @@
source_penciller :: pid(),
levelzero_astree :: list(),
ongoing_work = [] :: list(),
work_backlog = false :: boolean(),
work_ongoing = false :: boolean(), % i.e. compaction work
work_backlog = false :: boolean(), % i.e. compaction work
head_timing :: tuple()}).
@ -284,16 +289,16 @@ pcl_checksequencenumber(Pid, Key, SQN) ->
end.
pcl_workforclerk(Pid) ->
gen_server:call(Pid, work_for_clerk, infinity).
gen_server:cast(Pid, work_for_clerk).
pcl_promptmanifestchange(Pid, WI) ->
gen_server:cast(Pid, {manifest_change, WI}).
pcl_manifestchange(Pid, Manifest) ->
gen_server:cast(Pid, {manifest_change, Manifest}).
pcl_confirml0complete(Pid, FN, StartKey, EndKey) ->
gen_server:cast(Pid, {levelzero_complete, FN, StartKey, EndKey}).
pcl_confirmdelete(Pid, FileName) ->
gen_server:cast(Pid, {confirm_delete, FileName}).
pcl_confirmdelete(Pid, FileName, FilePid) ->
gen_server:cast(Pid, {confirm_delete, FileName, FilePid}).
pcl_getstartupsequencenumber(Pid) ->
gen_server:call(Pid, get_startup_sqn, infinity).
@ -324,9 +329,11 @@ init([PCLopts]) ->
{undefined, true} ->
SrcPenciller = PCLopts#penciller_options.source_penciller,
{ok, State} = pcl_registersnapshot(SrcPenciller, self()),
ManifestClone = leveled_pmanifest:copy_manifest(State#state.manifest),
leveled_log:log("P0001", [self()]),
io:format("Snapshot ledger sqn at ~w~n", [State#state.ledger_sqn]),
{ok, State#state{is_snapshot=true, source_penciller=SrcPenciller}};
{ok, State#state{is_snapshot=true,
source_penciller=SrcPenciller,
manifest=ManifestClone}};
%% Need to do something about timeout
{_RootPath, false} ->
start_from_file(PCLopts)
@ -401,23 +408,33 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys},
List ->
List
end,
SSTiter = initiate_rangequery_frommanifest(StartKey,
EndKey,
State#state.manifest),
SetupFoldFun =
fun(Level, Acc) ->
Pointers = leveled_pmanifest:range_lookup(State#state.manifest,
Level,
StartKey,
EndKey),
case Pointers of
[] -> Acc;
PL -> Acc ++ [{Level, PL}]
end
end,
SSTiter = lists:foldl(SetupFoldFun, [], lists:seq(0, ?MAX_LEVELS - 1)),
Acc = keyfolder({L0AsList, SSTiter},
{StartKey, EndKey},
{AccFun, InitAcc},
MaxKeys),
{reply, Acc, State#state{levelzero_astree = L0AsList}};
handle_call(work_for_clerk, From, State) ->
{UpdState, Work} = return_work(State, From),
{reply, Work, UpdState};
handle_call(get_startup_sqn, _From, State) ->
{reply, State#state.persisted_sqn, State};
handle_call({register_snapshot, Snapshot}, _From, State) ->
Rs = [{Snapshot,
State#state.manifest_sqn}|State#state.registered_snapshots],
{reply, {ok, State}, State#state{registered_snapshots = Rs}};
Manifest0 = leveled_pmanifest:add_snapshot(State#state.manifest,
Snapshot,
?SNAPSHOT_TIMEOUT),
{reply, {ok, State}, State#state{manifest = Manifest0}};
handle_call({load_snapshot, {BookieIncrTree, BookieIdx, MinSQN, MaxSQN}},
_From, State) ->
L0D = leveled_pmem:add_to_cache(State#state.levelzero_size,
@ -443,29 +460,33 @@ handle_call(doom, _From, State) ->
FilesFP = State#state.root_path ++ "/" ++ ?FILES_FP ++ "/",
{stop, normal, {ok, [ManifestFP, FilesFP]}, State}.
handle_cast({manifest_change, WI}, State) ->
{ok, UpdState} = commit_manifest_change(WI, State),
ok = leveled_pclerk:clerk_manifestchange(State#state.clerk,
confirm,
false),
{noreply, UpdState};
handle_cast({manifest_change, NewManifest}, State) ->
NewManSQN = leveled_pmanifest:get_manifest_sqn(NewManifest),
ok = leveled_pclerk:clerk_promptdeletions(State#state.clerk, NewManSQN),
{noreply, State#state{manifest = NewManifest, work_ongoing=false}};
handle_cast({release_snapshot, Snapshot}, State) ->
Rs = lists:keydelete(Snapshot, 1, State#state.registered_snapshots),
Manifest0 = leveled_pmanifest:release_snapshot(State#state.manifest,
Snapshot),
leveled_log:log("P0003", [Snapshot]),
leveled_log:log("P0004", [Rs]),
{noreply, State#state{registered_snapshots=Rs}};
handle_cast({confirm_delete, FileName}, State=#state{is_snapshot=Snap})
{noreply, State#state{manifest=Manifest0}};
handle_cast({confirm_delete, Filename, FilePid}, State=#state{is_snapshot=Snap})
when Snap == false ->
Reply = confirm_delete(FileName,
State#state.unreferenced_files,
State#state.registered_snapshots),
case Reply of
{true, Pid} ->
UF1 = lists:keydelete(FileName, 1, State#state.unreferenced_files),
leveled_log:log("P0005", [FileName]),
ok = leveled_sst:sst_deleteconfirmed(Pid),
{noreply, State#state{unreferenced_files=UF1}};
_ ->
case State#state.work_ongoing of
false ->
R2D = leveled_pmanifest:ready_to_delete(State#state.manifest,
Filename),
case R2D of
{true, M0} ->
leveled_log:log("P0005", [Filename]),
ok = leveled_sst:sst_deleteconfirmed(FilePid),
{noreply, State#state{manifest=M0}};
{false, _M0} ->
{noreply, State}
end;
true ->
% If there is ongoing work, then we can't safely update the pidmap
% as any change will be reverted when the manifest is passed back
% from the Clerk
{noreply, State}
end;
handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) ->
@ -474,7 +495,11 @@ handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) ->
end_key=EndKey,
owner=State#state.levelzero_constructor,
filename=FN},
UpdMan = lists:keystore(0, 1, State#state.manifest, {0, [ManEntry]}),
ManifestSQN = leveled_pmanifest:get_manifest_sqn(State#state.manifest) + 1,
UpdMan = leveled_pmanifest:insert_manifest_entry(State#state.manifest,
ManifestSQN,
0,
ManEntry),
% Prompt clerk to ask about work - do this for every L0 roll
UpdIndex = leveled_pmem:clear_index(State#state.levelzero_index),
ok = leveled_pclerk:clerk_prompt(State#state.clerk),
@ -484,7 +509,33 @@ handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) ->
levelzero_constructor=undefined,
levelzero_size=0,
manifest=UpdMan,
persisted_sqn=State#state.ledger_sqn}}.
persisted_sqn=State#state.ledger_sqn}};
handle_cast(work_for_clerk, State) ->
case State#state.levelzero_pending of
true ->
{noreply, State};
false ->
{WL, WC} = leveled_pmanifest:check_for_work(State#state.manifest,
?LEVEL_SCALEFACTOR),
case WC of
0 ->
{noreply, State#state{work_backlog=false}};
N when N > ?WORKQUEUE_BACKLOG_TOLERANCE ->
leveled_log:log("P0024", [N, true]),
[TL|_Tail] = WL,
ok = leveled_pclerk:clerk_push(State#state.clerk,
{TL, State#state.manifest}),
{noreply,
State#state{work_backlog=true, work_ongoing=true}};
N ->
leveled_log:log("P0024", [N, false]),
[TL|_Tail] = WL,
ok = leveled_pclerk:clerk_push(State#state.clerk,
{TL, State#state.manifest}),
{noreply,
State#state{work_backlog=false, work_ongoing=true}}
end
end.
handle_info(_Info, State) ->
@ -495,10 +546,6 @@ terminate(Reason, State=#state{is_snapshot=Snap}) when Snap == true ->
leveled_log:log("P0007", [Reason]),
ok;
terminate(Reason, State) ->
%% When a Penciller shuts down it isn't safe to try an manage the safe
%% finishing of any outstanding work. The last commmitted manifest will
%% be used.
%%
%% Level 0 files lie outside of the manifest, and so if there is no L0
%% file present it is safe to write the current contents of memory. If
%% there is a L0 file present - then the memory can be dropped (it is
@ -506,43 +553,27 @@ terminate(Reason, State) ->
%% as presumably the ETS file has been recently flushed, hence the presence
%% of a L0 file).
%%
%% The penciller should close each file in the unreferenced files, and
%% then each file in the manifest, and cast a close on the clerk.
%% The cast may not succeed as the clerk could be synchronously calling
%% the penciller looking for a manifest commit
%%
%% The penciller should close each file in the manifest, and cast a close
%% on the clerk.
ok = leveled_pclerk:clerk_close(State#state.clerk),
leveled_log:log("P0008", [Reason]),
MC = leveled_pclerk:clerk_manifestchange(State#state.clerk,
return,
true),
UpdState = case MC of
{ok, WI} ->
{ok, NewState} = commit_manifest_change(WI, State),
Clerk = State#state.clerk,
ok = leveled_pclerk:clerk_manifestchange(Clerk,
confirm,
true),
NewState;
no_change ->
State
end,
case {UpdState#state.levelzero_pending,
get_item(0, UpdState#state.manifest, []),
UpdState#state.levelzero_size} of
{false, [], 0} ->
leveled_log:log("P0009", []);
{false, [], _N} ->
L0Pid = roll_memory(UpdState, true),
L0_Present = leveled_pmanifest:key_lookup(State#state.manifest, 0, all),
L0_Left = State#state.levelzero_size > 0,
case {State#state.levelzero_pending, L0_Present, L0_Left} of
{false, false, true} ->
L0Pid = roll_memory(State, true),
ok = leveled_sst:sst_close(L0Pid);
StatusTuple ->
leveled_log:log("P0010", [StatusTuple])
end,
% Tidy shutdown of individual files
ok = close_files(0, UpdState#state.manifest),
lists:foreach(fun({_FN, Pid, _SN}) ->
ok = leveled_sst:sst_close(Pid) end,
UpdState#state.unreferenced_files),
EntryCloseFun =
fun(ME) ->
ok = leveled_sst:sst_close(ME#manifest_entry.owner)
end,
leveled_pmanifest:close_manifest(State#state.manifest, EntryCloseFun),
leveled_log:log("P0011", []),
ok.
@ -565,7 +596,7 @@ start_from_file(PCLopts) ->
M
end,
{ok, MergeClerk} = leveled_pclerk:clerk_new(self()),
{ok, MergeClerk} = leveled_pclerk:clerk_new(self(), RootPath),
CoinToss = PCLopts#penciller_options.levelzero_cointoss,
% Used to randomly defer the writing of L0 file. Intended to help with
@ -579,47 +610,21 @@ start_from_file(PCLopts) ->
levelzero_index=leveled_pmem:new_index()},
%% Open manifest
ManifestPath = filepath(InitState#state.root_path, manifest) ++ "/",
SSTPath = filepath(InitState#state.root_path, files) ++ "/",
ok = filelib:ensure_dir(ManifestPath),
ok = filelib:ensure_dir(SSTPath),
{ok, Filenames} = file:list_dir(ManifestPath),
CurrRegex = "nonzero_(?<MSN>[0-9]+)\\." ++ ?CURRENT_FILEX,
ValidManSQNs = lists:foldl(fun(FN, Acc) ->
case re:run(FN,
CurrRegex,
[{capture, ['MSN'], list}]) of
nomatch ->
Acc;
{match, [Int]} when is_list(Int) ->
Acc ++ [list_to_integer(Int)]
end
end,
[],
Filenames),
TopManSQN = lists:foldl(fun(X, MaxSQN) -> max(X, MaxSQN) end,
0,
ValidManSQNs),
leveled_log:log("P0012", [TopManSQN]),
ManUpdate = case TopManSQN of
0 ->
leveled_log:log("P0013", []),
{[], 0};
_ ->
CurrManFile = filepath(InitState#state.root_path,
TopManSQN,
current_manifest),
{ok, Bin} = file:read_file(CurrManFile),
Manifest = binary_to_term(Bin),
open_all_filesinmanifest(Manifest)
end,
{UpdManifest, MaxSQN} = ManUpdate,
Manifest0 = leveled_pmanifest:open_manifest(RootPath),
OpenFun =
fun(FN) ->
{ok, Pid, {_FK, _LK}} = leveled_sst:sst_open(FN),
Pid
end,
SQNFun = fun leveled_sst:sst_getmaxsequencenumber/1,
{MaxSQN, Manifest1} = leveled_pmanifest:load_manifest(Manifest0,
OpenFun,
SQNFun),
leveled_log:log("P0014", [MaxSQN]),
ManSQN = leveled_pmanifest:get_manifest_sqn(Manifest1),
leveled_log:log("P0035", [ManSQN]),
%% Find any L0 files
L0FN = filepath(RootPath, TopManSQN, new_merge_files) ++ "_0_0.sst",
L0FN = filepath(RootPath, ManSQN + 1, new_merge_files) ++ "_0_0.sst",
case filelib:is_file(L0FN) of
true ->
leveled_log:log("P0015", [L0FN]),
@ -627,32 +632,29 @@ start_from_file(PCLopts) ->
L0Pid,
{L0StartKey, L0EndKey}} = leveled_sst:sst_open(L0FN),
L0SQN = leveled_sst:sst_getmaxsequencenumber(L0Pid),
ManifestEntry = #manifest_entry{start_key=L0StartKey,
end_key=L0EndKey,
owner=L0Pid,
filename=L0FN},
UpdManifest2 = lists:keystore(0,
1,
UpdManifest,
{0, [ManifestEntry]}),
L0Entry = #manifest_entry{start_key = L0StartKey,
end_key = L0EndKey,
filename = L0FN,
owner = L0Pid},
Manifest2 = leveled_pmanifest:insert_manifest_entry(Manifest1,
ManSQN + 1,
0,
L0Entry),
leveled_log:log("P0016", [L0SQN]),
LedgerSQN = max(MaxSQN, L0SQN),
{ok,
InitState#state{manifest=UpdManifest2,
manifest_sqn=TopManSQN,
ledger_sqn=LedgerSQN,
persisted_sqn=LedgerSQN}};
InitState#state{manifest = Manifest2,
ledger_sqn = LedgerSQN,
persisted_sqn = LedgerSQN}};
false ->
leveled_log:log("P0017", []),
{ok,
InitState#state{manifest=UpdManifest,
manifest_sqn=TopManSQN,
ledger_sqn=MaxSQN,
persisted_sqn=MaxSQN}}
InitState#state{manifest = Manifest1,
ledger_sqn = MaxSQN,
persisted_sqn = MaxSQN}}
end.
update_levelzero(L0Size, {PushedTree, PushedIdx, MinSQN, MaxSQN},
LedgerSQN, L0Cache, State) ->
SW = os:timestamp(),
@ -673,7 +675,7 @@ update_levelzero(L0Size, {PushedTree, PushedIdx, MinSQN, MaxSQN},
ledger_sqn=UpdMaxSQN},
CacheTooBig = NewL0Size > State#state.levelzero_maxcachesize,
CacheMuchTooBig = NewL0Size > ?SUPER_MAX_TABLE_SIZE,
Level0Free = length(get_item(0, State#state.manifest, [])) == 0,
L0Free = not leveled_pmanifest:levelzero_present(State#state.manifest),
RandomFactor =
case State#state.levelzero_cointoss of
true ->
@ -686,9 +688,10 @@ update_levelzero(L0Size, {PushedTree, PushedIdx, MinSQN, MaxSQN},
false ->
true
end,
NoPendingManifestChange = not State#state.work_ongoing,
JitterCheck = RandomFactor or CacheMuchTooBig,
case {CacheTooBig, Level0Free, JitterCheck} of
{true, true, true} ->
case {CacheTooBig, L0Free, JitterCheck, NoPendingManifestChange} of
{true, true, true, true} ->
L0Constructor = roll_memory(UpdState, false),
leveled_log:log_timer("P0031", [], SW),
UpdState#state{levelzero_pending=true,
@ -732,10 +735,10 @@ roll_memory(State, true) ->
Constructor.
levelzero_filename(State) ->
MSN = State#state.manifest_sqn,
ManSQN = leveled_pmanifest:get_manifest_sqn(State#state.manifest) + 1,
FileName = State#state.root_path
++ "/" ++ ?FILES_FP ++ "/"
++ integer_to_list(MSN) ++ "_0_0",
++ integer_to_list(ManSQN) ++ "_0_0",
FileName.
timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, HeadTimer) ->
@ -767,22 +770,11 @@ fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
fetch(_Key, _Hash, _Manifest, ?MAX_LEVELS + 1, _FetchFun) ->
{not_present, basement};
fetch(Key, Hash, Manifest, Level, FetchFun) ->
LevelManifest = get_item(Level, Manifest, []),
case lists:foldl(fun(File, Acc) ->
case Acc of
not_present when
Key >= File#manifest_entry.start_key,
File#manifest_entry.end_key >= Key ->
File#manifest_entry.owner;
FoundDetails ->
FoundDetails
end end,
not_present,
LevelManifest) of
not_present ->
case leveled_pmanifest:key_lookup(Manifest, Level, Key) of
false ->
fetch(Key, Hash, Manifest, Level + 1, FetchFun);
FileToCheck ->
case FetchFun(FileToCheck, Key, Hash) of
FP ->
case FetchFun(FP, Key, Hash) of
not_present ->
fetch(Key, Hash, Manifest, Level + 1, FetchFun);
ObjectFound ->
@ -821,135 +813,6 @@ compare_to_sqn(Obj, SQN) ->
end.
%% Work out what the current work queue should be
%%
%% The work queue should have a lower level work at the front, and no work
%% should be added to the queue if a compaction worker has already been asked
%% to look at work at that level
%%
%% The full queue is calculated for logging purposes only
return_work(State, From) ->
{WorkQ, BasementL} = assess_workqueue([], 0, State#state.manifest, 0),
case length(WorkQ) of
L when L > 0 ->
Excess = lists:foldl(fun({_, _, OH}, Acc) -> Acc+OH end, 0, WorkQ),
[{SrcLevel, Manifest, _Overhead}|_OtherWork] = WorkQ,
leveled_log:log("P0020", [SrcLevel, From, Excess]),
IsBasement = if
SrcLevel + 1 == BasementL ->
true;
true ->
false
end,
Backlog = Excess >= ?WORKQUEUE_BACKLOG_TOLERANCE,
case State#state.levelzero_pending of
true ->
% Once the L0 file is completed there will be more work
% - so don't be busy doing other work now
leveled_log:log("P0021", []),
{State#state{work_backlog=Backlog}, none};
false ->
%% No work currently outstanding
%% Can allocate work
NextSQN = State#state.manifest_sqn + 1,
FP = filepath(State#state.root_path,
NextSQN,
new_merge_files),
ManFile = filepath(State#state.root_path,
NextSQN,
pending_manifest),
WI = #penciller_work{next_sqn=NextSQN,
clerk=From,
src_level=SrcLevel,
manifest=Manifest,
start_time = os:timestamp(),
ledger_filepath = FP,
manifest_file = ManFile,
target_is_basement = IsBasement},
{State#state{ongoing_work=[WI], work_backlog=Backlog}, WI}
end;
_ ->
{State#state{work_backlog=false}, none}
end.
close_files(?MAX_LEVELS - 1, _Manifest) ->
ok;
close_files(Level, Manifest) ->
LevelList = get_item(Level, Manifest, []),
lists:foreach(fun(F) ->
ok = leveled_sst:sst_close(F#manifest_entry.owner) end,
LevelList),
close_files(Level + 1, Manifest).
open_all_filesinmanifest(Manifest) ->
open_all_filesinmanifest({Manifest, 0}, 0).
open_all_filesinmanifest(Result, ?MAX_LEVELS - 1) ->
Result;
open_all_filesinmanifest({Manifest, TopSQN}, Level) ->
LevelList = get_item(Level, Manifest, []),
%% The Pids in the saved manifest related to now closed references
%% Need to roll over the manifest at this level starting new processes to
%5 replace them
LvlR = lists:foldl(fun(F, {FL, FL_SQN}) ->
FN = F#manifest_entry.filename,
{ok, P, _Keys} = leveled_sst:sst_open(FN),
F_SQN = leveled_sst:sst_getmaxsequencenumber(P),
{lists:append(FL,
[F#manifest_entry{owner = P}]),
max(FL_SQN, F_SQN)}
end,
{[], 0},
LevelList),
%% Result is tuple of revised file list for this level in manifest, and
%% the maximum sequence number seen at this level
{LvlFL, LvlSQN} = LvlR,
UpdManifest = lists:keystore(Level, 1, Manifest, {Level, LvlFL}),
open_all_filesinmanifest({UpdManifest, max(TopSQN, LvlSQN)}, Level + 1).
print_manifest(Manifest) ->
lists:foreach(fun(L) ->
leveled_log:log("P0022", [L]),
Level = get_item(L, Manifest, []),
lists:foreach(fun print_manifest_entry/1, Level)
end,
lists:seq(0, ?MAX_LEVELS - 1)),
ok.
print_manifest_entry(Entry) ->
{S1, S2, S3} = leveled_codec:print_key(Entry#manifest_entry.start_key),
{E1, E2, E3} = leveled_codec:print_key(Entry#manifest_entry.end_key),
leveled_log:log("P0023",
[S1, S2, S3, E1, E2, E3, Entry#manifest_entry.filename]).
initiate_rangequery_frommanifest(StartKey, EndKey, Manifest) ->
CompareFun = fun(M) ->
C1 = StartKey > M#manifest_entry.end_key,
C2 = leveled_codec:endkey_passed(EndKey,
M#manifest_entry.start_key),
not (C1 or C2) end,
FoldFun =
fun(L, AccL) ->
Level = get_item(L, Manifest, []),
FL = lists:foldl(fun(M, Acc) ->
case CompareFun(M) of
true ->
Acc ++ [{next, M, StartKey}];
false ->
Acc
end end,
[],
Level),
case FL of
[] -> AccL;
FL -> AccL ++ [{L, FL}]
end
end,
lists:foldl(FoldFun, [], lists:seq(0, ?MAX_LEVELS - 1)).
%% Looks to find the best choice for the next key across the levels (other
%% than in-memory table)
%% In finding the best choice, the next key in a given level may be a next
@ -995,10 +858,9 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV},
LCnt + 1,
{BKL, BKV},
StartKey, EndKey, Width);
{{next, ManifestEntry, _SK}, BKL, BKV} ->
{{next, Owner, _SK}, BKL, BKV} ->
% The first key at this level is pointer to a file - need to query
% the file to expand this level out before proceeding
Owner = ManifestEntry#manifest_entry.owner,
Pointer = {next, Owner, StartKey, EndKey},
UpdList = leveled_sst:expand_list_by_pointer(Pointer,
RestOfKeys,
@ -1153,143 +1015,14 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, KeyRange,
end.
assess_workqueue(WorkQ, ?MAX_LEVELS - 1, _Man, BasementLevel) ->
{WorkQ, BasementLevel};
assess_workqueue(WorkQ, LevelToAssess, Man, BasementLevel) ->
MaxFiles = get_item(LevelToAssess, ?LEVEL_SCALEFACTOR, 0),
case length(get_item(LevelToAssess, Man, [])) of
FileCount when FileCount > 0 ->
NewWQ = maybe_append_work(WorkQ,
LevelToAssess,
Man,
MaxFiles,
FileCount),
assess_workqueue(NewWQ, LevelToAssess + 1, Man, LevelToAssess);
0 ->
assess_workqueue(WorkQ, LevelToAssess + 1, Man, BasementLevel)
end.
maybe_append_work(WorkQ, Level, Manifest,
MaxFiles, FileCount)
when FileCount > MaxFiles ->
Overhead = FileCount - MaxFiles,
leveled_log:log("P0024", [Overhead, Level]),
lists:append(WorkQ, [{Level, Manifest, Overhead}]);
maybe_append_work(WorkQ, _Level, _Manifest,
_MaxFiles, _FileCount) ->
WorkQ.
get_item(Index, List, Default) ->
case lists:keysearch(Index, 1, List) of
{value, {Index, Value}} ->
Value;
false ->
Default
end.
%% Request a manifest change
%% The clerk should have completed the work, and created a new manifest
%% and persisted the new view of the manifest
%%
%% To complete the change of manifest:
%% - the state of the manifest file needs to be changed from pending to current
%% - the list of unreferenced files needs to be updated on State
%% - the current manifest needs to be update don State
%% - the list of ongoing work needs to be cleared of this item
commit_manifest_change(ReturnedWorkItem, State) ->
NewMSN = State#state.manifest_sqn + 1,
[SentWorkItem] = State#state.ongoing_work,
RootPath = State#state.root_path,
UnreferencedFiles = State#state.unreferenced_files,
if
NewMSN == SentWorkItem#penciller_work.next_sqn ->
WISrcLevel = SentWorkItem#penciller_work.src_level,
leveled_log:log_timer("P0025",
[SentWorkItem#penciller_work.next_sqn,
WISrcLevel],
SentWorkItem#penciller_work.start_time),
ok = rename_manifest_files(RootPath, NewMSN),
FilesToDelete = ReturnedWorkItem#penciller_work.unreferenced_files,
UnreferencedFilesUpd = update_deletions(FilesToDelete,
NewMSN,
UnreferencedFiles),
leveled_log:log("P0026", [NewMSN]),
NewManifest = ReturnedWorkItem#penciller_work.new_manifest,
CurrL0 = get_item(0, State#state.manifest, []),
% If the work isn't L0 work, then we may have an uncommitted
% manifest change at L0 - so add this back into the Manifest loop
% state
RevisedManifest = case {WISrcLevel, CurrL0} of
{0, _} ->
NewManifest;
{_, []} ->
NewManifest;
{_, [L0ManEntry]} ->
lists:keystore(0,
1,
NewManifest,
{0, [L0ManEntry]})
end,
{ok, State#state{ongoing_work=[],
manifest_sqn=NewMSN,
manifest=RevisedManifest,
unreferenced_files=UnreferencedFilesUpd}}
end.
rename_manifest_files(RootPath, NewMSN) ->
OldFN = filepath(RootPath, NewMSN, pending_manifest),
NewFN = filepath(RootPath, NewMSN, current_manifest),
leveled_log:log("P0027", [OldFN, filelib:is_file(OldFN),
NewFN, filelib:is_file(NewFN)]),
ok = file:rename(OldFN,NewFN).
filepath(RootPath, manifest) ->
RootPath ++ "/" ++ ?MANIFEST_FP;
filepath(RootPath, files) ->
RootPath ++ "/" ++ ?FILES_FP.
FP = RootPath ++ "/" ++ ?FILES_FP,
filelib:ensure_dir(FP ++ "/"),
FP.
filepath(RootPath, NewMSN, pending_manifest) ->
filepath(RootPath, manifest) ++ "/" ++ "nonzero_"
++ integer_to_list(NewMSN) ++ "." ++ ?PENDING_FILEX;
filepath(RootPath, NewMSN, current_manifest) ->
filepath(RootPath, manifest) ++ "/" ++ "nonzero_"
++ integer_to_list(NewMSN) ++ "." ++ ?CURRENT_FILEX;
filepath(RootPath, NewMSN, new_merge_files) ->
filepath(RootPath, files) ++ "/" ++ integer_to_list(NewMSN).
update_deletions([], _NewMSN, UnreferencedFiles) ->
UnreferencedFiles;
update_deletions([ClearedFile|Tail], MSN, UnreferencedFiles) ->
leveled_log:log("P0028", [ClearedFile#manifest_entry.filename]),
update_deletions(Tail,
MSN,
lists:append(UnreferencedFiles,
[{ClearedFile#manifest_entry.filename,
ClearedFile#manifest_entry.owner,
MSN}])).
confirm_delete(Filename, UnreferencedFiles, RegisteredSnapshots) ->
case lists:keyfind(Filename, 1, UnreferencedFiles) of
{Filename, Pid, MSN} ->
LowSQN = lists:foldl(fun({_, SQN}, MinSQN) -> min(SQN, MinSQN) end,
infinity,
RegisteredSnapshots),
if
MSN >= LowSQN ->
false;
true ->
{true, Pid}
end
end.
%%%============================================================================
@ -1320,7 +1053,7 @@ generate_randomkeys(Count, SQN, Acc) ->
clean_testdir(RootPath) ->
clean_subdir(filepath(RootPath, manifest)),
clean_subdir(leveled_pmanifest:filepath(RootPath, manifest)),
clean_subdir(filepath(RootPath, files)).
clean_subdir(DirPath) ->
@ -1338,47 +1071,6 @@ clean_subdir(DirPath) ->
end.
compaction_work_assessment_test() ->
L0 = [{{o, "B1", "K1", null}, {o, "B3", "K3", null}, dummy_pid}],
L1 = [{{o, "B1", "K1", null}, {o, "B2", "K2", null}, dummy_pid},
{{o, "B2", "K3", null}, {o, "B4", "K4", null}, dummy_pid}],
Manifest = [{0, L0}, {1, L1}],
{WorkQ1, 1} = assess_workqueue([], 0, Manifest, 0),
?assertMatch([{0, Manifest, 1}], WorkQ1),
L1Alt = lists:append(L1,
[{{o, "B5", "K0001", null}, {o, "B5", "K9999", null},
dummy_pid},
{{o, "B6", "K0001", null}, {o, "B6", "K9999", null},
dummy_pid},
{{o, "B7", "K0001", null}, {o, "B7", "K9999", null},
dummy_pid},
{{o, "B8", "K0001", null}, {o, "B8", "K9999", null},
dummy_pid},
{{o, "B9", "K0001", null}, {o, "B9", "K9999", null},
dummy_pid},
{{o, "BA", "K0001", null}, {o, "BA", "K9999", null},
dummy_pid},
{{o, "BB", "K0001", null}, {o, "BB", "K9999", null},
dummy_pid}]),
Manifest3 = [{0, []}, {1, L1Alt}],
{WorkQ3, 1} = assess_workqueue([], 0, Manifest3, 0),
?assertMatch([{1, Manifest3, 1}], WorkQ3).
confirm_delete_test() ->
Filename = 'test.sst',
UnreferencedFiles = [{'other.sst', dummy_owner, 15},
{Filename, dummy_owner, 10}],
RegisteredIterators1 = [{dummy_pid, 16}, {dummy_pid, 12}],
R1 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators1),
?assertMatch(R1, {true, dummy_owner}),
RegisteredIterators2 = [{dummy_pid, 10}, {dummy_pid, 12}],
R2 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators2),
?assertMatch(R2, false),
RegisteredIterators3 = [{dummy_pid, 9}, {dummy_pid, 12}],
R3 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators3),
?assertMatch(R3, false).
maybe_pause_push(PCL, KL) ->
T0 = leveled_skiplist:empty(true),
I0 = leveled_pmem:new_index(),
@ -1461,11 +1153,13 @@ simple_server_test() ->
?assertMatch(Key2, pcl_fetch(PCLr, {o,"Bucket0002", "Key0002", null})),
?assertMatch(Key3, pcl_fetch(PCLr, {o,"Bucket0003", "Key0003", null})),
?assertMatch(Key4, pcl_fetch(PCLr, {o,"Bucket0004", "Key0004", null})),
SnapOpts = #penciller_options{start_snapshot = true,
source_penciller = PCLr},
{ok, PclSnap} = pcl_start(SnapOpts),
leveled_bookie:load_snapshot(PclSnap,
leveled_bookie:empty_ledgercache()),
?assertMatch(Key1, pcl_fetch(PclSnap, {o,"Bucket0001", "Key0001", null})),
?assertMatch(Key2, pcl_fetch(PclSnap, {o,"Bucket0002", "Key0002", null})),
?assertMatch(Key3, pcl_fetch(PclSnap, {o,"Bucket0003", "Key0003", null})),
@ -1497,6 +1191,7 @@ simple_server_test() ->
% Add some more keys and confirm that check sequence number still
% sees the old version in the previous snapshot, but will see the new version
% in a new snapshot
Key1A_Pre = {{o,"Bucket0001", "Key0001", null},
{4005, {active, infinity}, null}},
Key1A = add_missing_hash(Key1A_Pre),
@ -1510,11 +1205,7 @@ simple_server_test() ->
null},
1)),
ok = pcl_close(PclSnap),
% Ignore a fake pending mnaifest on startup
ok = file:write_file(RootPath ++ "/" ++ ?MANIFEST_FP ++ "nonzero_99.pnd",
term_to_binary("Hello")),
{ok, PclSnap2} = pcl_start(SnapOpts),
leveled_bookie:load_snapshot(PclSnap2, leveled_bookie:empty_ledgercache()),
?assertMatch(false, pcl_checksequencenumber(PclSnap2,
@ -1540,57 +1231,6 @@ simple_server_test() ->
clean_testdir(RootPath).
rangequery_manifest_test() ->
{E1,
E2,
E3} = {#manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"},
end_key={i, "Bucket1", {"Idx1", "Fld9"}, "K93"},
filename="Z1"},
#manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld9"}, "K97"},
end_key={o, "Bucket1", "K71", null},
filename="Z2"},
#manifest_entry{start_key={o, "Bucket1", "K75", null},
end_key={o, "Bucket1", "K993", null},
filename="Z3"}},
{E4,
E5,
E6} = {#manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"},
end_key={i, "Bucket1", {"Idx1", "Fld7"}, "K93"},
filename="Z4"},
#manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld7"}, "K97"},
end_key={o, "Bucket1", "K78", null},
filename="Z5"},
#manifest_entry{start_key={o, "Bucket1", "K81", null},
end_key={o, "Bucket1", "K996", null},
filename="Z6"}},
Man = [{1, [E1, E2, E3]}, {2, [E4, E5, E6]}],
SK1 = {o, "Bucket1", "K711", null},
EK1 = {o, "Bucket1", "K999", null},
R1 = initiate_rangequery_frommanifest(SK1, EK1, Man),
?assertMatch([{1, [{next, E3, SK1}]},
{2, [{next, E5, SK1}, {next, E6, SK1}]}],
R1),
SK2 = {i, "Bucket1", {"Idx1", "Fld8"}, null},
EK2 = {i, "Bucket1", {"Idx1", "Fld8"}, null},
R2 = initiate_rangequery_frommanifest(SK2, EK2, Man),
?assertMatch([{1, [{next, E1, SK2}]}, {2, [{next, E5, SK2}]}], R2),
R3 = initiate_rangequery_frommanifest({i, "Bucket1", {"Idx0", "Fld8"}, null},
{i, "Bucket1", {"Idx0", "Fld9"}, null},
Man),
?assertMatch([], R3).
print_manifest_test() ->
M1 = #manifest_entry{start_key={i, "Bucket1", {<<"Idx1">>, "Fld1"}, "K8"},
end_key={i, 4565, {"Idx1", "Fld9"}, "K93"},
filename="Z1"},
M2 = #manifest_entry{start_key={i, self(), {null, "Fld1"}, "K8"},
end_key={i, <<200:32/integer>>, {"Idx1", "Fld9"}, "K93"},
filename="Z1"},
M3 = #manifest_entry{start_key={?STD_TAG, self(), {null, "Fld1"}, "K8"},
end_key={?RIAK_TAG, <<200:32/integer>>, {"Idx1", "Fld9"}, "K93"},
filename="Z1"},
print_manifest([{1, [M1, M2, M3]}]).
simple_findnextkey_test() ->
QueryArray = [
{2, [{{o, "Bucket1", "Key1"}, {5, {active, infinity}, null}},
@ -1757,81 +1397,6 @@ create_file_test() ->
{ok, Bin} = file:read_file("../test/new_file.sst.discarded"),
?assertMatch("hello", binary_to_term(Bin)).
commit_manifest_test() ->
Sent_WI = #penciller_work{next_sqn=1,
src_level=0,
start_time=os:timestamp()},
Resp_WI = #penciller_work{next_sqn=1,
src_level=0},
State = #state{ongoing_work = [Sent_WI],
root_path = "test",
manifest_sqn = 0},
ManifestFP = "test" ++ "/" ++ ?MANIFEST_FP ++ "/",
ok = filelib:ensure_dir(ManifestFP),
ok = file:write_file(ManifestFP ++ "nonzero_1.pnd",
term_to_binary("dummy data")),
L1_0 = [{1, [#manifest_entry{filename="1.sst"}]}],
Resp_WI0 = Resp_WI#penciller_work{new_manifest=L1_0,
unreferenced_files=[]},
{ok, State0} = commit_manifest_change(Resp_WI0, State),
?assertMatch(1, State0#state.manifest_sqn),
?assertMatch([], get_item(0, State0#state.manifest, [])),
L0Entry = [#manifest_entry{filename="0.sst"}],
ManifestPlus = [{0, L0Entry}|State0#state.manifest],
NxtSent_WI = #penciller_work{next_sqn=2,
src_level=1,
start_time=os:timestamp()},
NxtResp_WI = #penciller_work{next_sqn=2,
src_level=1},
State1 = State0#state{ongoing_work=[NxtSent_WI],
manifest = ManifestPlus},
ok = file:write_file(ManifestFP ++ "nonzero_2.pnd",
term_to_binary("dummy data")),
L2_0 = [#manifest_entry{filename="2.sst"}],
NxtResp_WI0 = NxtResp_WI#penciller_work{new_manifest=[{2, L2_0}],
unreferenced_files=[]},
{ok, State2} = commit_manifest_change(NxtResp_WI0, State1),
?assertMatch(1, State1#state.manifest_sqn),
?assertMatch(2, State2#state.manifest_sqn),
?assertMatch(L0Entry, get_item(0, State2#state.manifest, [])),
?assertMatch(L2_0, get_item(2, State2#state.manifest, [])),
clean_testdir(State#state.root_path).
badmanifest_test() ->
RootPath = "../test/ledger",
clean_testdir(RootPath),
{ok, PCL} = pcl_start(#penciller_options{root_path=RootPath,
max_inmemory_tablesize=1000}),
Key1_pre = {{o,"Bucket0001", "Key0001", null},
{1001, {active, infinity}, null}},
Key1 = add_missing_hash(Key1_pre),
KL1 = generate_randomkeys({1000, 1}),
ok = maybe_pause_push(PCL, KL1 ++ [Key1]),
%% Added together, as split apart there will be a race between the close
%% call to the penciller and the second fetch of the cache entry
?assertMatch(Key1, pcl_fetch(PCL, {o, "Bucket0001", "Key0001", null})),
timer:sleep(100), % Avoids confusion if L0 file not written before close
ok = pcl_close(PCL),
ManifestFP = filepath(RootPath, manifest),
ok = file:write_file(filename:join(ManifestFP, "yeszero_123.man"),
term_to_binary("hello")),
{ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath,
max_inmemory_tablesize=1000}),
?assertMatch(Key1, pcl_fetch(PCLr, {o,"Bucket0001", "Key0001", null})),
ok = pcl_close(PCLr),
clean_testdir(RootPath).
checkready(Pid) ->
try
leveled_sst:sst_checkready(Pid)

701
src/leveled_pmanifest.erl Normal file
View file

@ -0,0 +1,701 @@
%% -------- PENCILLER MANIFEST ---------
%%
%% The manifest is an ordered set of files for each level to be used to find
%% which file is relevant for a given key or range lookup at a given level.
%%
%% This implementation is incomplete, in that it just uses a plain list at
%% each level. This is fine for short-lived volume tests, but as the deeper
%% levels are used there will be an exponential penalty.
%%
%% The originial intention was to swap out this implementation for a
%% multi-version ETS table - but that became complex. So one of two changes
%% are pending:
%% - Use a single version ES cache for lower levels (and not allow snapshots to
%% access the cache)
%% - Use a skiplist like enhanced list at lower levels.
-module(leveled_pmanifest).
-include("include/leveled.hrl").
-export([
new_manifest/0,
open_manifest/1,
copy_manifest/1,
load_manifest/3,
close_manifest/2,
save_manifest/2,
get_manifest_sqn/1,
key_lookup/3,
range_lookup/4,
merge_lookup/4,
insert_manifest_entry/4,
remove_manifest_entry/4,
switch_manifest_entry/4,
mergefile_selector/2,
add_snapshot/3,
release_snapshot/2,
ready_to_delete/2,
check_for_work/2,
is_basement/2,
levelzero_present/1
]).
-export([
filepath/2
]).
-include_lib("eunit/include/eunit.hrl").
-define(MANIFEST_FILEX, "man").
-define(MANIFEST_FP, "ledger_manifest").
-define(MAX_LEVELS, 8).
-record(manifest, {levels,
% an array of lists or trees representing the manifest
manifest_sqn = 0 :: integer(),
% The current manifest SQN
snapshots :: list(),
% A list of snaphots (i.e. clones)
min_snapshot_sqn = 0 :: integer(),
% The smallest snapshot manifest SQN in the snapshot
% list
pending_deletes, % OTP16 does not like defining type
% a dictionary mapping keys (filenames) to SQN when the
% deletion was made, and the original Manifest Entry
basement :: integer()
% Currently the lowest level (the largest number)
}).
%%%============================================================================
%%% API
%%%============================================================================
new_manifest() ->
#manifest{
levels = array:new([{size, ?MAX_LEVELS + 1}, {default, []}]),
manifest_sqn = 0,
snapshots = [],
pending_deletes = dict:new(),
basement = 0
}.
open_manifest(RootPath) ->
% Open the manifest in the file path which has the highest SQN, and will
% open without error
ManifestPath = filepath(RootPath, manifest),
{ok, Filenames} = file:list_dir(ManifestPath),
CurrRegex = "nonzero_(?<MSN>[0-9]+)\\." ++ ?MANIFEST_FILEX,
ExtractSQNFun =
fun(FN, Acc) ->
case re:run(FN, CurrRegex, [{capture, ['MSN'], list}]) of
nomatch ->
Acc;
{match, [Int]} when is_list(Int) ->
Acc ++ [list_to_integer(Int)]
end
end,
ValidManSQNs = lists:reverse(lists:sort(lists:foldl(ExtractSQNFun,
[],
Filenames))),
open_manifestfile(RootPath, ValidManSQNs).
copy_manifest(Manifest) ->
% Copy the manifest ensuring anything only the master process should care
% about is switched to undefined
Manifest#manifest{snapshots = undefined, pending_deletes = undefined}.
load_manifest(Manifest, PidFun, SQNFun) ->
UpdateLevelFun =
fun(LevelIdx, {AccMaxSQN, AccMan}) ->
L0 = array:get(LevelIdx, AccMan#manifest.levels),
{L1, SQN1} = load_level(LevelIdx, L0, PidFun, SQNFun),
UpdLevels = array:set(LevelIdx, L1, AccMan#manifest.levels),
{max(AccMaxSQN, SQN1), AccMan#manifest{levels = UpdLevels}}
end,
lists:foldl(UpdateLevelFun, {0, Manifest},
lists:seq(0, Manifest#manifest.basement)).
close_manifest(Manifest, CloseEntryFun) ->
CloseLevelFun =
fun(LevelIdx) ->
Level = array:get(LevelIdx, Manifest#manifest.levels),
close_level(LevelIdx, Level, CloseEntryFun)
end,
lists:foreach(CloseLevelFun, lists:seq(0, Manifest#manifest.basement)),
ClosePDFun =
fun({_FN, {_SQN, ME}}) ->
CloseEntryFun(ME)
end,
lists:foreach(ClosePDFun, dict:to_list(Manifest#manifest.pending_deletes)).
save_manifest(Manifest, RootPath) ->
FP = filepath(RootPath, Manifest#manifest.manifest_sqn, current_manifest),
ManBin = term_to_binary(Manifest#manifest{snapshots = [],
pending_deletes = dict:new(),
min_snapshot_sqn = 0}),
CRC = erlang:crc32(ManBin),
ok = file:write_file(FP, <<CRC:32/integer, ManBin/binary>>).
insert_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) ->
Levels = Manifest#manifest.levels,
Level = array:get(LevelIdx, Levels),
UpdLevel = add_entry(LevelIdx, Level, Entry),
leveled_log:log("PC019", ["insert", LevelIdx, UpdLevel]),
Basement = max(LevelIdx, Manifest#manifest.basement),
Manifest#manifest{levels = array:set(LevelIdx, UpdLevel, Levels),
basement = Basement,
manifest_sqn = ManSQN}.
remove_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) ->
Levels = Manifest#manifest.levels,
Level = array:get(LevelIdx, Levels),
UpdLevel = remove_entry(LevelIdx, Level, Entry),
leveled_log:log("PC019", ["remove", LevelIdx, UpdLevel]),
DelFun =
fun(E, Acc) ->
dict:store(E#manifest_entry.filename,
{ManSQN, E},
Acc)
end,
Entries =
case is_list(Entry) of
true ->
Entry;
false ->
[Entry]
end,
PendingDeletes = lists:foldl(DelFun,
Manifest#manifest.pending_deletes,
Entries),
UpdLevels = array:set(LevelIdx, UpdLevel, Levels),
case is_empty(LevelIdx, UpdLevel) of
true ->
Manifest#manifest{levels = UpdLevels,
basement = get_basement(UpdLevels),
manifest_sqn = ManSQN,
pending_deletes = PendingDeletes};
false ->
Manifest#manifest{levels = UpdLevels,
manifest_sqn = ManSQN,
pending_deletes = PendingDeletes}
end.
switch_manifest_entry(Manifest, ManSQN, SrcLevel, Entry) ->
% Move to level below - so needs to be removed but not marked as a
% pending deletion
Levels = Manifest#manifest.levels,
Level = array:get(SrcLevel, Levels),
UpdLevel = remove_entry(SrcLevel, Level, Entry),
UpdLevels = array:set(SrcLevel, UpdLevel, Levels),
insert_manifest_entry(Manifest#manifest{levels = UpdLevels},
ManSQN,
SrcLevel + 1,
Entry).
get_manifest_sqn(Manifest) ->
Manifest#manifest.manifest_sqn.
key_lookup(Manifest, LevelIdx, Key) ->
case LevelIdx > Manifest#manifest.basement of
true ->
false;
false ->
key_lookup_level(LevelIdx,
array:get(LevelIdx, Manifest#manifest.levels),
Key)
end.
range_lookup(Manifest, LevelIdx, StartKey, EndKey) ->
MakePointerFun =
fun(M) ->
{next, M, StartKey}
end,
range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun).
merge_lookup(Manifest, LevelIdx, StartKey, EndKey) ->
MakePointerFun =
fun(M) ->
{next, M, all}
end,
range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun).
%% An algorithm for discovering which files to merge ....
%% We can find the most optimal file:
%% - The one with the most overlapping data below?
%% - The one that overlaps with the fewest files below?
%% - The smallest file?
%% We could try and be fair in some way (merge oldest first)
%% Ultimately, there is a lack of certainty that being fair or optimal is
%% genuinely better - eventually every file has to be compacted.
%%
%% Hence, the initial implementation is to select files to merge at random
mergefile_selector(Manifest, LevelIdx) ->
Level = array:get(LevelIdx, Manifest#manifest.levels),
lists:nth(random:uniform(length(Level)), Level).
add_snapshot(Manifest, Pid, Timeout) ->
{MegaNow, SecNow, _} = os:timestamp(),
TimeToTimeout = MegaNow * 1000000 + SecNow + Timeout,
SnapEntry = {Pid, Manifest#manifest.manifest_sqn, TimeToTimeout},
SnapList0 = [SnapEntry|Manifest#manifest.snapshots],
ManSQN = Manifest#manifest.manifest_sqn,
case Manifest#manifest.min_snapshot_sqn of
0 ->
Manifest#manifest{snapshots = SnapList0,
min_snapshot_sqn = ManSQN};
N ->
N0 = min(N, ManSQN),
Manifest#manifest{snapshots = SnapList0, min_snapshot_sqn = N0}
end.
release_snapshot(Manifest, Pid) ->
FilterFun =
fun({P, SQN, TS}, {Acc, MinSQN}) ->
case P of
Pid ->
{Acc, MinSQN};
_ ->
{[{P, SQN, TS}|Acc], min(SQN, MinSQN)}
end
end,
{SnapList0, MinSnapSQN} = lists:foldl(FilterFun,
{[], infinity},
Manifest#manifest.snapshots),
leveled_log:log("P0004", [SnapList0]),
case SnapList0 of
[] ->
Manifest#manifest{snapshots = SnapList0,
min_snapshot_sqn = 0};
_ ->
Manifest#manifest{snapshots = SnapList0,
min_snapshot_sqn = MinSnapSQN}
end.
ready_to_delete(Manifest, Filename) ->
{ChangeSQN, _ME} = dict:fetch(Filename, Manifest#manifest.pending_deletes),
case Manifest#manifest.min_snapshot_sqn of
0 ->
% no shapshots
PDs = dict:erase(Filename, Manifest#manifest.pending_deletes),
{true, Manifest#manifest{pending_deletes = PDs}};
N when N >= ChangeSQN ->
% Every snapshot is looking at a version of history after this
% was removed
PDs = dict:erase(Filename, Manifest#manifest.pending_deletes),
{true, Manifest#manifest{pending_deletes = PDs}};
_N ->
{false, Manifest}
end.
check_for_work(Manifest, Thresholds) ->
CheckLevelFun =
fun({LevelIdx, MaxCount}, {AccL, AccC}) ->
case LevelIdx > Manifest#manifest.basement of
true ->
{AccL, AccC};
false ->
Level = array:get(LevelIdx, Manifest#manifest.levels),
S = size(LevelIdx, Level),
case S > MaxCount of
true ->
{[LevelIdx|AccL], AccC + S - MaxCount};
false ->
{AccL, AccC}
end
end
end,
lists:foldr(CheckLevelFun, {[], 0}, Thresholds).
is_basement(Manifest, Level) ->
Level >= Manifest#manifest.basement.
levelzero_present(Manifest) ->
not is_empty(0, array:get(0, Manifest#manifest.levels)).
%%%============================================================================
%%% Internal Functions
%%%============================================================================
%% All these internal functions that work on a level are also passed LeveIdx
%% even if this is not presently relevant. Currnetly levels are lists, but
%% future branches may make lower levels trees or skiplists to improve fetch
%% efficiency
load_level(_LevelIdx, Level, PidFun, SQNFun) ->
LevelLoadFun =
fun(ME, {L_Out, L_MaxSQN}) ->
FN = ME#manifest_entry.filename,
P = PidFun(FN),
SQN = SQNFun(P),
{[ME#manifest_entry{owner=P}|L_Out], max(SQN, L_MaxSQN)}
end,
lists:foldr(LevelLoadFun, {[], 0}, Level).
close_level(_LevelIdx, Level, CloseEntryFun) ->
lists:foreach(CloseEntryFun, Level).
is_empty(_LevelIdx, []) ->
true;
is_empty(_LevelIdx, _Level) ->
false.
size(_LevelIdx, Level) ->
length(Level).
add_entry(_LevelIdx, Level, Entries) when is_list(Entries) ->
lists:sort(Level ++ Entries);
add_entry(_LevelIdx, Level, Entry) ->
lists:sort([Entry|Level]).
remove_entry(_LevelIdx, Level, Entries) when is_list(Entries) ->
% We're assuming we're removing a sorted sublist
RemLength = length(Entries),
[RemStart|_Tail] = Entries,
remove_section(Level, RemStart#manifest_entry.start_key, RemLength);
remove_entry(_LevelIdx, Level, Entry) ->
remove_section(Level, Entry#manifest_entry.start_key, 1).
remove_section(Level, SectionStartKey, SectionLength) ->
PredFun =
fun(E) ->
E#manifest_entry.start_key < SectionStartKey
end,
{Pre, Rest} = lists:splitwith(PredFun, Level),
Post = lists:nthtail(SectionLength, Rest),
Pre ++ Post.
key_lookup_level(_LevelIdx, [], _Key) ->
false;
key_lookup_level(LevelIdx, [Entry|Rest], Key) ->
case Entry#manifest_entry.end_key >= Key of
true ->
case Key >= Entry#manifest_entry.start_key of
true ->
Entry#manifest_entry.owner;
false ->
false
end;
false ->
key_lookup_level(LevelIdx, Rest, Key)
end.
range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun) ->
Range =
case LevelIdx > Manifest#manifest.basement of
true ->
[];
false ->
range_lookup_level(LevelIdx,
array:get(LevelIdx,
Manifest#manifest.levels),
StartKey,
EndKey)
end,
lists:map(MakePointerFun, Range).
range_lookup_level(_LevelIdx, Level, QStartKey, QEndKey) ->
BeforeFun =
fun(M) ->
QStartKey > M#manifest_entry.end_key
end,
NotAfterFun =
fun(M) ->
not leveled_codec:endkey_passed(QEndKey,
M#manifest_entry.start_key)
end,
{_Before, MaybeIn} = lists:splitwith(BeforeFun, Level),
{In, _After} = lists:splitwith(NotAfterFun, MaybeIn),
In.
get_basement(Levels) ->
GetBaseFun =
fun(L, Acc) ->
case is_empty(L, array:get(L, Levels)) of
false ->
max(L, Acc);
true ->
Acc
end
end,
lists:foldl(GetBaseFun, 0, lists:seq(0, ?MAX_LEVELS)).
filepath(RootPath, manifest) ->
MFP = RootPath ++ "/" ++ ?MANIFEST_FP ++ "/",
filelib:ensure_dir(MFP),
MFP.
filepath(RootPath, NewMSN, current_manifest) ->
filepath(RootPath, manifest) ++ "nonzero_"
++ integer_to_list(NewMSN) ++ "." ++ ?MANIFEST_FILEX.
open_manifestfile(_RootPath, []) ->
leveled_log:log("P0013", []),
new_manifest();
open_manifestfile(_RootPath, [0]) ->
leveled_log:log("P0013", []),
new_manifest();
open_manifestfile(RootPath, [TopManSQN|Rest]) ->
CurrManFile = filepath(RootPath, TopManSQN, current_manifest),
{ok, FileBin} = file:read_file(CurrManFile),
<<CRC:32/integer, BinaryOfTerm/binary>> = FileBin,
case erlang:crc32(BinaryOfTerm) of
CRC ->
leveled_log:log("P0012", [TopManSQN]),
binary_to_term(BinaryOfTerm);
_ ->
leveled_log:log("P0033", [CurrManFile, "crc wonky"]),
open_manifestfile(RootPath, Rest)
end.
%%%============================================================================
%%% Test
%%%============================================================================
-ifdef(TEST).
initial_setup() ->
E1 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"},
end_key={i, "Bucket1", {"Idx1", "Fld9"}, "K93"},
filename="Z1",
owner="pid_z1"},
E2 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld9"}, "K97"},
end_key={o, "Bucket1", "K71", null},
filename="Z2",
owner="pid_z2"},
E3 = #manifest_entry{start_key={o, "Bucket1", "K75", null},
end_key={o, "Bucket1", "K993", null},
filename="Z3",
owner="pid_z3"},
E4 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"},
end_key={i, "Bucket1", {"Idx1", "Fld7"}, "K93"},
filename="Z4",
owner="pid_z4"},
E5 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld7"}, "K97"},
end_key={o, "Bucket1", "K78", null},
filename="Z5",
owner="pid_z5"},
E6 = #manifest_entry{start_key={o, "Bucket1", "K81", null},
end_key={o, "Bucket1", "K996", null},
filename="Z6",
owner="pid_z6"},
Man0 = new_manifest(),
% insert_manifest_entry(Manifest, ManSQN, Level, Entry)
Man1 = insert_manifest_entry(Man0, 1, 1, E1),
Man2 = insert_manifest_entry(Man1, 1, 1, E2),
Man3 = insert_manifest_entry(Man2, 1, 1, E3),
Man4 = insert_manifest_entry(Man3, 1, 2, E4),
Man5 = insert_manifest_entry(Man4, 1, 2, E5),
Man6 = insert_manifest_entry(Man5, 1, 2, E6),
{Man0, Man1, Man2, Man3, Man4, Man5, Man6}.
changeup_setup(Man6) ->
E1 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"},
end_key={i, "Bucket1", {"Idx1", "Fld9"}, "K93"},
filename="Z1",
owner="pid_z1"},
E2 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld9"}, "K97"},
end_key={o, "Bucket1", "K71", null},
filename="Z2",
owner="pid_z2"},
E3 = #manifest_entry{start_key={o, "Bucket1", "K75", null},
end_key={o, "Bucket1", "K993", null},
filename="Z3",
owner="pid_z3"},
E1_2 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld4"}, "K8"},
end_key={i, "Bucket1", {"Idx1", "Fld9"}, "K62"},
owner="pid_y1",
filename="Y1"},
E2_2 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld9"}, "K67"},
end_key={o, "Bucket1", "K45", null},
owner="pid_y2",
filename="Y2"},
E3_2 = #manifest_entry{start_key={o, "Bucket1", "K47", null},
end_key={o, "Bucket1", "K812", null},
owner="pid_y3",
filename="Y3"},
E4_2 = #manifest_entry{start_key={o, "Bucket1", "K815", null},
end_key={o, "Bucket1", "K998", null},
owner="pid_y4",
filename="Y4"},
Man7 = remove_manifest_entry(Man6, 2, 1, E1),
Man8 = remove_manifest_entry(Man7, 2, 1, E2),
Man9 = remove_manifest_entry(Man8, 2, 1, E3),
Man10 = insert_manifest_entry(Man9, 2, 1, E1_2),
Man11 = insert_manifest_entry(Man10, 2, 1, E2_2),
Man12 = insert_manifest_entry(Man11, 2, 1, E3_2),
Man13 = insert_manifest_entry(Man12, 2, 1, E4_2),
% remove_manifest_entry(Manifest, ManSQN, Level, Entry)
{Man7, Man8, Man9, Man10, Man11, Man12, Man13}.
keylookup_manifest_test() ->
{Man0, Man1, Man2, Man3, _Man4, _Man5, Man6} = initial_setup(),
LK1_1 = {o, "Bucket1", "K711", null},
LK1_2 = {o, "Bucket1", "K70", null},
LK1_3 = {o, "Bucket1", "K71", null},
LK1_4 = {o, "Bucket1", "K75", null},
LK1_5 = {o, "Bucket1", "K76", null},
?assertMatch(false, key_lookup(Man0, 1, LK1_1)),
?assertMatch(false, key_lookup(Man1, 1, LK1_1)),
?assertMatch(false, key_lookup(Man2, 1, LK1_1)),
?assertMatch(false, key_lookup(Man3, 1, LK1_1)),
?assertMatch(false, key_lookup(Man6, 1, LK1_1)),
?assertMatch("pid_z2", key_lookup(Man6, 1, LK1_2)),
?assertMatch("pid_z2", key_lookup(Man6, 1, LK1_3)),
?assertMatch("pid_z3", key_lookup(Man6, 1, LK1_4)),
?assertMatch("pid_z3", key_lookup(Man6, 1, LK1_5)),
?assertMatch("pid_z5", key_lookup(Man6, 2, LK1_2)),
?assertMatch("pid_z5", key_lookup(Man6, 2, LK1_3)),
?assertMatch("pid_z5", key_lookup(Man6, 2, LK1_4)),
?assertMatch("pid_z5", key_lookup(Man6, 2, LK1_5)),
{_Man7, _Man8, _Man9, _Man10, _Man11, _Man12,
Man13} = changeup_setup(Man6),
?assertMatch(false, key_lookup(Man0, 1, LK1_1)),
?assertMatch(false, key_lookup(Man1, 1, LK1_1)),
?assertMatch(false, key_lookup(Man2, 1, LK1_1)),
?assertMatch(false, key_lookup(Man3, 1, LK1_1)),
?assertMatch(false, key_lookup(Man6, 1, LK1_1)),
?assertMatch("pid_z2", key_lookup(Man6, 1, LK1_2)),
?assertMatch("pid_z2", key_lookup(Man6, 1, LK1_3)),
?assertMatch("pid_z3", key_lookup(Man6, 1, LK1_4)),
?assertMatch("pid_z3", key_lookup(Man6, 1, LK1_5)),
?assertMatch("pid_z5", key_lookup(Man6, 2, LK1_2)),
?assertMatch("pid_z5", key_lookup(Man6, 2, LK1_3)),
?assertMatch("pid_z5", key_lookup(Man6, 2, LK1_4)),
?assertMatch("pid_z5", key_lookup(Man6, 2, LK1_5)),
?assertMatch("pid_y3", key_lookup(Man13, 1, LK1_4)),
?assertMatch("pid_z5", key_lookup(Man13, 2, LK1_4)).
rangequery_manifest_test() ->
{_Man0, _Man1, _Man2, _Man3, _Man4, _Man5, Man6} = initial_setup(),
PidMapFun =
fun(Pointer) ->
{next, ME, _SK} = Pointer,
ME#manifest_entry.owner
end,
SK1 = {o, "Bucket1", "K711", null},
EK1 = {o, "Bucket1", "K999", null},
RL1_1 = lists:map(PidMapFun, range_lookup(Man6, 1, SK1, EK1)),
?assertMatch(["pid_z3"], RL1_1),
RL1_2 = lists:map(PidMapFun, range_lookup(Man6, 2, SK1, EK1)),
?assertMatch(["pid_z5", "pid_z6"], RL1_2),
SK2 = {i, "Bucket1", {"Idx1", "Fld8"}, null},
EK2 = {i, "Bucket1", {"Idx1", "Fld8"}, null},
RL2_1 = lists:map(PidMapFun, range_lookup(Man6, 1, SK2, EK2)),
?assertMatch(["pid_z1"], RL2_1),
RL2_2 = lists:map(PidMapFun, range_lookup(Man6, 2, SK2, EK2)),
?assertMatch(["pid_z5"], RL2_2),
SK3 = {o, "Bucket1", "K994", null},
EK3 = {o, "Bucket1", "K995", null},
RL3_1 = lists:map(PidMapFun, range_lookup(Man6, 1, SK3, EK3)),
?assertMatch([], RL3_1),
RL3_2 = lists:map(PidMapFun, range_lookup(Man6, 2, SK3, EK3)),
?assertMatch(["pid_z6"], RL3_2),
{_Man7, _Man8, _Man9, _Man10, _Man11, _Man12,
Man13} = changeup_setup(Man6),
RL1_1A = lists:map(PidMapFun, range_lookup(Man6, 1, SK1, EK1)),
?assertMatch(["pid_z3"], RL1_1A),
RL2_1A = lists:map(PidMapFun, range_lookup(Man6, 1, SK2, EK2)),
?assertMatch(["pid_z1"], RL2_1A),
RL3_1A = lists:map(PidMapFun, range_lookup(Man6, 1, SK3, EK3)),
?assertMatch([], RL3_1A),
RL1_1B = lists:map(PidMapFun, range_lookup(Man13, 1, SK1, EK1)),
?assertMatch(["pid_y3", "pid_y4"], RL1_1B),
RL2_1B = lists:map(PidMapFun, range_lookup(Man13, 1, SK2, EK2)),
?assertMatch(["pid_y1"], RL2_1B),
RL3_1B = lists:map(PidMapFun, range_lookup(Man13, 1, SK3, EK3)),
?assertMatch(["pid_y4"], RL3_1B).
levelzero_present_test() ->
E0 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"},
end_key={o, "Bucket1", "Key996", null},
filename="Z0",
owner="pid_z0"},
Man0 = new_manifest(),
?assertMatch(false, levelzero_present(Man0)),
% insert_manifest_entry(Manifest, ManSQN, Level, Entry)
Man1 = insert_manifest_entry(Man0, 1, 0, E0),
?assertMatch(true, levelzero_present(Man1)).
snapshot_release_test() ->
Man6 = element(7, initial_setup()),
E1 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"},
end_key={i, "Bucket1", {"Idx1", "Fld9"}, "K93"},
filename="Z1",
owner="pid_z1"},
E2 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld9"}, "K97"},
end_key={o, "Bucket1", "K71", null},
filename="Z2",
owner="pid_z2"},
E3 = #manifest_entry{start_key={o, "Bucket1", "K75", null},
end_key={o, "Bucket1", "K993", null},
filename="Z3",
owner="pid_z3"},
Man7 = add_snapshot(Man6, "pid_a1", 3600),
Man8 = remove_manifest_entry(Man7, 2, 1, E1),
Man9 = add_snapshot(Man8, "pid_a2", 3600),
Man10 = remove_manifest_entry(Man9, 3, 1, E2),
Man11 = add_snapshot(Man10, "pid_a3", 3600),
Man12 = remove_manifest_entry(Man11, 4, 1, E3),
Man13 = add_snapshot(Man12, "pid_a4", 3600),
?assertMatch(false, element(1, ready_to_delete(Man8, "Z1"))),
?assertMatch(false, element(1, ready_to_delete(Man10, "Z2"))),
?assertMatch(false, element(1, ready_to_delete(Man12, "Z3"))),
Man14 = release_snapshot(Man13, "pid_a1"),
?assertMatch(false, element(1, ready_to_delete(Man14, "Z2"))),
?assertMatch(false, element(1, ready_to_delete(Man14, "Z3"))),
{Bool14, Man15} = ready_to_delete(Man14, "Z1"),
?assertMatch(true, Bool14),
%This doesn't change anything - released snaphsot not the min
Man16 = release_snapshot(Man15, "pid_a4"),
?assertMatch(false, element(1, ready_to_delete(Man16, "Z2"))),
?assertMatch(false, element(1, ready_to_delete(Man16, "Z3"))),
Man17 = release_snapshot(Man16, "pid_a2"),
?assertMatch(false, element(1, ready_to_delete(Man17, "Z3"))),
{Bool17, Man18} = ready_to_delete(Man17, "Z2"),
?assertMatch(true, Bool17),
Man19 = release_snapshot(Man18, "pid_a3"),
io:format("MinSnapSQN ~w~n", [Man19#manifest.min_snapshot_sqn]),
{Bool19, _Man20} = ready_to_delete(Man19, "Z3"),
?assertMatch(true, Bool19).
-endif.

View file

@ -363,7 +363,8 @@ delete_pending(close, _From, State) ->
delete_pending(timeout, State) ->
ok = leveled_penciller:pcl_confirmdelete(State#state.penciller,
State#state.filename),
State#state.filename,
self()),
{next_state, delete_pending, State, ?DELETE_TIMEOUT};
delete_pending(close, State) ->
leveled_log:log("SST07", [State#state.filename]),
@ -1175,8 +1176,8 @@ maybe_expand_pointer([{pointer, SSTPid, Slot, StartKey, all}|Tail]) ->
expand_list_by_pointer({pointer, SSTPid, Slot, StartKey, all},
Tail,
?MERGE_SCANWIDTH);
maybe_expand_pointer([{next, SSTPid, StartKey}|Tail]) ->
expand_list_by_pointer({next, SSTPid, StartKey, all},
maybe_expand_pointer([{next, ManEntry, StartKey}|Tail]) ->
expand_list_by_pointer({next, ManEntry, StartKey, all},
Tail,
?MERGE_SCANWIDTH);
maybe_expand_pointer(List) ->
@ -1202,7 +1203,9 @@ expand_list_by_pointer({pointer, SSTPid, Slot, StartKey, EndKey}, Tail, Width) -
{AccPointers, AccTail} = lists:foldl(FoldFun, InitAcc, Tail),
ExpPointers = leveled_sst:sst_getslots(SSTPid, AccPointers),
lists:append(ExpPointers, AccTail);
expand_list_by_pointer({next, SSTPid, StartKey, EndKey}, Tail, Width) ->
expand_list_by_pointer({next, ManEntry, StartKey, EndKey}, Tail, Width) ->
SSTPid = ManEntry#manifest_entry.owner,
leveled_log:log("SST10", [SSTPid, is_process_alive(SSTPid)]),
ExpPointer = leveled_sst:sst_getkvrange(SSTPid, StartKey, EndKey, Width),
ExpPointer ++ Tail.
@ -1440,8 +1443,8 @@ merge_test() ->
?assertMatch(ExpFK2, FK2),
?assertMatch(ExpLK1, LK1),
?assertMatch(ExpLK2, LK2),
ML1 = [{next, P1, FK1}],
ML2 = [{next, P2, FK2}],
ML1 = [{next, #manifest_entry{owner = P1}, FK1}],
ML2 = [{next, #manifest_entry{owner = P2}, FK2}],
{ok, P3, {{Rem1, Rem2}, FK3, LK3}} = sst_new("../test/level2_merge",
ML1,
ML2,