diff --git a/include/leveled.hrl b/include/leveled.hrl index 25216f6..13d862e 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -20,18 +20,6 @@ expire_tombstones = false :: boolean(), penciller :: pid()}). --record(penciller_work, - {next_sqn :: integer(), - clerk :: pid(), - src_level :: integer(), - manifest :: list(), - start_time :: tuple(), - ledger_filepath :: string(), - manifest_file :: string(), - new_manifest :: list(), - unreferenced_files :: list(), - target_is_basement = false ::boolean()}). - -record(level, {level :: integer(), is_basement = false :: boolean(), diff --git a/rebar.lock b/rebar.lock deleted file mode 100644 index 57afcca..0000000 --- a/rebar.lock +++ /dev/null @@ -1 +0,0 @@ -[]. diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 10ffcc7..6dbbff4 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -282,6 +282,8 @@ turn_to_string(Item) -> % Compare a key against a query key, only comparing elements that are non-null % in the Query key. This is used for comparing against end keys in queries. +endkey_passed(all, _) -> + false; endkey_passed({EK1, null, null, null}, {CK1, _, _, _}) -> EK1 < CK1; endkey_passed({EK1, EK2, null, null}, {CK1, CK2, _, _}) -> diff --git a/src/leveled_log.erl b/src/leveled_log.erl index a8c94d9..d2c8a3b 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -65,8 +65,7 @@ {"P0004", {info, "Remaining ledger snapshots are ~w"}}, {"P0005", - {info, "Delete confirmed as file ~s is removed from " ++ - "unreferenced files"}}, + {info, "Delete confirmed as file ~s is removed from Manifest"}}, {"P0006", {info, "Orphaned reply after timeout on L0 file write ~s"}}, {"P0007", @@ -74,8 +73,6 @@ ++ "reason ~w"}}, {"P0008", {info, "Penciller closing for reason ~w"}}, - {"P0009", - {info, "Level 0 cache empty at close of Penciller"}}, {"P0010", {info, "No level zero action on close of Penciller ~w"}}, {"P0011", @@ -97,9 +94,6 @@ ++ "L0 pending ~w and merge backlog ~w"}}, {"P0019", {info, "Rolling level zero to filename ~s at ledger sqn ~w"}}, - {"P0020", - {info, "Work at Level ~w to be scheduled for ~w with ~w " - ++ "queue items outstanding at all levels"}}, {"P0021", {info, "Allocation of work blocked as L0 pending"}}, {"P0022", @@ -108,7 +102,8 @@ {info, "Manifest entry of startkey ~s ~s ~s endkey ~s ~s ~s " ++ "filename=~s~n"}}, {"P0024", - {info, "Outstanding compaction work items of ~w at level ~w"}}, + {info, "Outstanding compaction work items of ~w with backlog status " + ++ "of ~w"}}, {"P0025", {info, "Merge to sqn ~w from Level ~w completed"}}, {"P0026", @@ -125,7 +120,17 @@ {info, "Completion of update to levelzero"}}, {"P0032", {info, "Head timing for result ~w is sample ~w total ~w and max ~w"}}, - + {"P0033", + {error, "Corrupted manifest file at path ~s to be ignored " + ++ "due to error ~w"}}, + {"P0034", + {warn, "Snapshot with pid ~w timed out and so deletion will " + ++ "continue regardless"}}, + {"P0035", + {info, "Startup with Manifest SQN of ~w"}}, + {"P0036", + {info, "Garbage collection on mnaifest removes key for filename ~s"}}, + {"PC001", {info, "Penciller's clerk ~w started with owner ~w"}}, {"PC002", @@ -147,15 +152,22 @@ {"PC010", {info, "Merge to be commenced for FileToMerge=~s with MSN=~w"}}, {"PC011", - {info, "Merge completed with MSN=~w Level=~w and FileCounter=~w"}}, + {info, "Merge completed with MSN=~w to Level=~w and FileCounter=~w"}}, {"PC012", - {info, "File to be created as part of MSN=~w Filename=~s"}}, + {info, "File to be created as part of MSN=~w Filename=~s " + ++ "IsBasement=~w"}}, {"PC013", {warn, "Merge resulted in empty file ~s"}}, {"PC015", {info, "File created"}}, {"PC016", {info, "Slow fetch from SFT ~w of ~w microseconds with result ~w"}}, + {"PC017", + {info, "Notified clerk of manifest change"}}, + {"PC018", + {info, "Saved manifest file"}}, + {"PC019", + {debug, "After ~s level ~w is ~w"}}, {"I0001", {info, "Unexpected failure to fetch value for Key=~w SQN=~w " @@ -250,6 +262,8 @@ {info, "Completed creation of ~s at level ~w with max sqn ~w"}}, {"SST09", {warn, "Read request exposes slot with bad CRC"}}, + {"SST10", + {info, "Expansion sought to support pointer to pid ~w status ~w"}}, {"CDB01", {info, "Opening file for writing with filename ~s"}}, diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index 9ccc791..66b7c74 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -2,10 +2,9 @@ %% %% The Penciller's clerk is responsible for compaction work within the Ledger. %% -%% The Clerk will periodically poll the Penciller to see if there is work for -%% it to complete, except if the Clerk has informed the Penciller that it has -%% readied a manifest change to be committed - in which case it will wait to -%% be called by the Penciller. +%% The Clerk will periodically poll the Penciller to check there is no work +%% at level zero pending completion, and if not the Clerk will examine the +%% manifest to see if work is necessary. %% %% -------- COMMITTING MANIFEST CHANGES --------- %% @@ -18,35 +17,7 @@ %% certain that the manifest change has been committed. Some uncollected %% garbage is considered acceptable. %% -%% The process of committing a manifest change is as follows: -%% -%% A - The Clerk completes a merge, and casts a prompt to the Penciller with -%% a work item describing the change -%% -%% B - The Penciller commits the change to disk, and then calls the Clerk to -%% confirm the manifest change -%% -%% C - The Clerk replies immediately to acknowledge this call, then marks the -%% removed files for deletion -%% -%% Shutdown < A/B - If the Penciller starts the shutdown process before the -%% merge is complete, in the shutdown the Penciller will call a request for the -%% manifest change which will pick up the pending change. It will then confirm -%% the change, and now the Clerk will mark the files for delete before it -%% replies to the Penciller so it can complete the shutdown process (which will -%% prompt erasing of the removed files). -%% -%% The clerk will not request work on timeout if the committing of a manifest -%% change is pending confirmation. -%% -%% -------- TIMEOUTS --------- -%% -%% The Penciller may prompt the Clerk to callback soon (i.e. reduce the -%% Timeout) if it has urgent work ready (i.e. it has written a L0 file). -%% -%% There will also be a natural quick timeout once the committing of a manifest -%% change has occurred. -%% + -module(leveled_pclerk). @@ -54,42 +25,54 @@ -include("include/leveled.hrl"). --export([init/1, +-export([ + init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - clerk_new/1, + code_change/3 + ]). + +-export([ + clerk_new/2, clerk_prompt/1, - clerk_manifestchange/3, - code_change/3]). + clerk_push/2, + clerk_close/1, + clerk_promptdeletions/2 + ]). -include_lib("eunit/include/eunit.hrl"). -define(MAX_TIMEOUT, 2000). --define(MIN_TIMEOUT, 50). +-define(MIN_TIMEOUT, 200). -record(state, {owner :: pid(), - change_pending=false :: boolean(), - work_item :: #penciller_work{}|null}). + root_path :: string(), + pending_deletions = dict:new() % OTP 16 does not like type + }). %%%============================================================================ %%% API %%%============================================================================ -clerk_new(Owner) -> +clerk_new(Owner, Manifest) -> {ok, Pid} = gen_server:start(?MODULE, [], []), - ok = gen_server:call(Pid, {register, Owner}, infinity), + ok = gen_server:call(Pid, {load, Owner, Manifest}, infinity), leveled_log:log("PC001", [Pid, Owner]), {ok, Pid}. -clerk_manifestchange(Pid, Action, Closing) -> - gen_server:call(Pid, {manifest_change, Action, Closing}, infinity). - clerk_prompt(Pid) -> gen_server:cast(Pid, prompt). +clerk_promptdeletions(Pid, ManifestSQN) -> + gen_server:cast(Pid, {prompt_deletions, ManifestSQN}). +clerk_push(Pid, Work) -> + gen_server:cast(Pid, {push_work, Work}). + +clerk_close(Pid) -> + gen_server:call(Pid, close, 20000). %%%============================================================================ %%% gen_server callbacks @@ -98,53 +81,26 @@ clerk_prompt(Pid) -> init([]) -> {ok, #state{}}. -handle_call({register, Owner}, _From, State) -> - {reply, - ok, - State#state{owner=Owner}, - ?MIN_TIMEOUT}; -handle_call({manifest_change, return, true}, _From, State) -> - leveled_log:log("PC002", []), - case State#state.change_pending of - true -> - WI = State#state.work_item, - {reply, {ok, WI}, State}; - false -> - {stop, normal, no_change, State} - end; -handle_call({manifest_change, confirm, Closing}, From, State) -> - case Closing of - true -> - leveled_log:log("PC003", []), - WI = State#state.work_item, - ok = mark_for_delete(WI#penciller_work.unreferenced_files, - State#state.owner), - {stop, normal, ok, State}; - false -> - leveled_log:log("PC004", []), - gen_server:reply(From, ok), - WI = State#state.work_item, - ok = mark_for_delete(WI#penciller_work.unreferenced_files, - State#state.owner), - {noreply, - State#state{work_item=null, change_pending=false}, - ?MIN_TIMEOUT} - end. +handle_call({load, Owner, RootPath}, _From, State) -> + {reply, ok, State#state{owner=Owner, root_path=RootPath}, ?MIN_TIMEOUT}; +handle_call(close, _From, State) -> + {stop, normal, ok, State}. handle_cast(prompt, State) -> - {noreply, State, ?MIN_TIMEOUT}. - -handle_info(timeout, State=#state{change_pending=Pnd}) when Pnd == false -> - case requestandhandle_work(State) of - {false, Timeout} -> - {noreply, State, Timeout}; - {true, WI} -> - % No timeout now as will wait for call to return manifest - % change - {noreply, - State#state{change_pending=true, work_item=WI}} - end. + handle_info(timeout, State); +handle_cast({push_work, Work}, State) -> + {ManifestSQN, Deletions} = handle_work(Work, State), + PDs = dict:store(ManifestSQN, Deletions, State#state.pending_deletions), + {noreply, State#state{pending_deletions = PDs}, ?MAX_TIMEOUT}; +handle_cast({prompt_deletions, ManifestSQN}, State) -> + Deletions = dict:fetch(ManifestSQN, State#state.pending_deletions), + ok = notify_deletions(Deletions, State#state.owner), + UpdDeletions = dict:erase(ManifestSQN, State#state.pending_deletions), + {noreply, State#state{pending_deletions = UpdDeletions}, ?MIN_TIMEOUT}. +handle_info(timeout, State) -> + request_work(State), + {noreply, State, ?MAX_TIMEOUT}. terminate(Reason, _State) -> leveled_log:log("PC005", [self(), Reason]). @@ -157,185 +113,117 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================ -requestandhandle_work(State) -> - case leveled_penciller:pcl_workforclerk(State#state.owner) of - none -> - leveled_log:log("PC006", []), - {false, ?MAX_TIMEOUT}; - WI -> - {NewManifest, FilesToDelete} = merge(WI), - UpdWI = WI#penciller_work{new_manifest=NewManifest, - unreferenced_files=FilesToDelete}, - leveled_log:log("PC007", []), - ok = leveled_penciller:pcl_promptmanifestchange(State#state.owner, - UpdWI), - {true, UpdWI} - end. +request_work(State) -> + ok = leveled_penciller:pcl_workforclerk(State#state.owner). +handle_work({SrcLevel, Manifest}, State) -> + {UpdManifest, EntriesToDelete} = merge(SrcLevel, + Manifest, + State#state.root_path), + leveled_log:log("PC007", []), + SWMC = os:timestamp(), + ok = leveled_penciller:pcl_manifestchange(State#state.owner, + UpdManifest), + leveled_log:log_timer("PC017", [], SWMC), + SWSM = os:timestamp(), + ok = leveled_pmanifest:save_manifest(UpdManifest, + State#state.root_path), + leveled_log:log_timer("PC018", [], SWSM), + {leveled_pmanifest:get_manifest_sqn(UpdManifest), EntriesToDelete}. -merge(WI) -> - SrcLevel = WI#penciller_work.src_level, - {SrcF, UpdMFest1} = select_filetomerge(SrcLevel, - WI#penciller_work.manifest), - SinkFiles = get_item(SrcLevel + 1, UpdMFest1, []), - {Candidates, Others} = check_for_merge_candidates(SrcF, SinkFiles), - %% TODO: - %% Need to work out if this is the top level - %% And then tell merge process to create files at the top level - %% Which will include the reaping of expired tombstones - leveled_log:log("PC008", [SrcLevel, length(Candidates)]), - - MergedFiles = case length(Candidates) of +merge(SrcLevel, Manifest, RootPath) -> + Src = leveled_pmanifest:mergefile_selector(Manifest, SrcLevel), + NewSQN = leveled_pmanifest:get_manifest_sqn(Manifest) + 1, + SinkList = leveled_pmanifest:merge_lookup(Manifest, + SrcLevel + 1, + Src#manifest_entry.start_key, + Src#manifest_entry.end_key), + Candidates = length(SinkList), + leveled_log:log("PC008", [SrcLevel, Candidates]), + case Candidates of 0 -> - %% If no overlapping candiates, manifest change only required - %% - %% TODO: need to think still about simply renaming when at - %% lower level leveled_log:log("PC009", - [SrcF#manifest_entry.filename, SrcLevel + 1]), - [SrcF]; + [Src#manifest_entry.filename, SrcLevel + 1]), + Man0 = leveled_pmanifest:switch_manifest_entry(Manifest, + NewSQN, + SrcLevel, + Src), + {Man0, []}; _ -> - perform_merge({SrcF#manifest_entry.owner, - SrcF#manifest_entry.filename}, - Candidates, - {SrcLevel, WI#penciller_work.target_is_basement}, - {WI#penciller_work.ledger_filepath, - WI#penciller_work.next_sqn}) - end, - NewLevel = lists:sort(lists:append(MergedFiles, Others)), - UpdMFest2 = lists:keystore(SrcLevel + 1, - 1, - UpdMFest1, - {SrcLevel + 1, NewLevel}), - - ok = filelib:ensure_dir(WI#penciller_work.manifest_file), - {ok, Handle} = file:open(WI#penciller_work.manifest_file, - [binary, raw, write]), - ok = file:write(Handle, term_to_binary(UpdMFest2)), - ok = file:close(Handle), - case lists:member(SrcF, MergedFiles) of - true -> - {UpdMFest2, Candidates}; - false -> - %% Can rub out src file as it is not part of output - {UpdMFest2, Candidates ++ [SrcF]} + FilePath = leveled_penciller:filepath(RootPath, + NewSQN, + new_merge_files), + perform_merge(Manifest, Src, SinkList, SrcLevel, FilePath, NewSQN) end. - -mark_for_delete([], _Penciller) -> +notify_deletions([], _Penciller) -> ok; -mark_for_delete([Head|Tail], Penciller) -> +notify_deletions([Head|Tail], Penciller) -> ok = leveled_sst:sst_setfordelete(Head#manifest_entry.owner, Penciller), - mark_for_delete(Tail, Penciller). - - -check_for_merge_candidates(SrcF, SinkFiles) -> - lists:partition(fun(Ref) -> - case {Ref#manifest_entry.start_key, - Ref#manifest_entry.end_key} of - {_, EK} when SrcF#manifest_entry.start_key > EK -> - false; - {SK, _} when SrcF#manifest_entry.end_key < SK -> - false; - _ -> - true - end end, - SinkFiles). - - -%% An algorithm for discovering which files to merge .... -%% We can find the most optimal file: -%% - The one with the most overlapping data below? -%% - The one that overlaps with the fewest files below? -%% - The smallest file? -%% We could try and be fair in some way (merge oldest first) -%% Ultimately, there is a lack of certainty that being fair or optimal is -%% genuinely better - eventually every file has to be compacted. -%% -%% Hence, the initial implementation is to select files to merge at random - -select_filetomerge(SrcLevel, Manifest) -> - {SrcLevel, LevelManifest} = lists:keyfind(SrcLevel, 1, Manifest), - Selected = lists:nth(random:uniform(length(LevelManifest)), - LevelManifest), - UpdManifest = lists:keyreplace(SrcLevel, - 1, - Manifest, - {SrcLevel, - lists:delete(Selected, - LevelManifest)}), - {Selected, UpdManifest}. - - + notify_deletions(Tail, Penciller). + %% Assumption is that there is a single SST from a higher level that needs -%% to be merged into multiple SSTs at a lower level. This should create an -%% entirely new set of SSTs, and the calling process can then update the -%% manifest. +%% to be merged into multiple SSTs at a lower level. %% -%% Once the FileToMerge has been emptied, the remainder of the candidate list -%% needs to be placed in a remainder SST that may be of a sub-optimal (small) -%% size. This stops the need to perpetually roll over the whole level if the -%% level consists of already full files. Some smartness may be required when -%% selecting the candidate list so that small files just outside the candidate -%% list be included to avoid a proliferation of small files. -%% -%% FileToMerge should be a tuple of {FileName, Pid} where the Pid is the Pid of -%% the gen_server leveled_sft process representing the file. -%% -%% CandidateList should be a list of {StartKey, EndKey, Pid} tuples -%% representing different gen_server leveled_sft processes, sorted by StartKey. -%% -%% The level is the level which the new files should be created at. +%% SrcLevel is the level of the src sst file, the sink should be srcLevel + 1 -perform_merge({SrcPid, SrcFN}, CandidateList, LevelInfo, {Filepath, MSN}) -> - leveled_log:log("PC010", [SrcFN, MSN]), - PointerList = lists:map(fun(P) -> - {next, P#manifest_entry.owner, all} end, - CandidateList), - MaxSQN = leveled_sst:sst_getmaxsequencenumber(SrcPid), - do_merge([{next, SrcPid, all}], - PointerList, - LevelInfo, - {Filepath, MSN}, - MaxSQN, - 0, - []). +perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN) -> + leveled_log:log("PC010", [Src#manifest_entry.filename, NewSQN]), + SrcList = [{next, Src, all}], + MaxSQN = leveled_sst:sst_getmaxsequencenumber(Src#manifest_entry.owner), + SinkLevel = SrcLevel + 1, + SinkBasement = leveled_pmanifest:is_basement(Manifest, SinkLevel), + Additions = do_merge(SrcList, SinkList, + SinkLevel, SinkBasement, + RootPath, NewSQN, MaxSQN, + []), + RevertPointerFun = + fun({next, ME, _SK}) -> + ME + end, + SinkManifestList = lists:map(RevertPointerFun, SinkList), + Man0 = leveled_pmanifest:remove_manifest_entry(Manifest, + NewSQN, + SinkLevel, + SinkManifestList), + Man1 = leveled_pmanifest:insert_manifest_entry(Man0, + NewSQN, + SinkLevel, + Additions), + + Man2 = leveled_pmanifest:remove_manifest_entry(Man1, + NewSQN, + SrcLevel, + Src), + {Man2, [Src|SinkManifestList]}. -do_merge([], [], {SrcLevel, _IsB}, {_Filepath, MSN}, _MaxSQN, - FileCounter, OutList) -> - leveled_log:log("PC011", [MSN, SrcLevel, FileCounter]), - OutList; -do_merge(KL1, KL2, {SrcLevel, IsB}, {Filepath, MSN}, MaxSQN, - FileCounter, OutList) -> - FileName = lists:flatten(io_lib:format(Filepath ++ "_~w_~w.sst", - [SrcLevel + 1, FileCounter])), - leveled_log:log("PC012", [MSN, FileName]), +do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, Additions) -> + leveled_log:log("PC011", [NewSQN, SinkLevel, length(Additions)]), + Additions; +do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, Additions) -> + FileName = lists:flatten(io_lib:format(RP ++ "_~w_~w.sst", + [SinkLevel, length(Additions)])), + leveled_log:log("PC012", [NewSQN, FileName, SinkB]), TS1 = os:timestamp(), - case leveled_sst:sst_new(FileName, KL1, KL2, IsB, SrcLevel + 1, MaxSQN) of + case leveled_sst:sst_new(FileName, KL1, KL2, SinkB, SinkLevel, MaxSQN) of empty -> leveled_log:log("PC013", [FileName]), - OutList; + do_merge([], [], + SinkLevel, SinkB, + RP, NewSQN, MaxSQN, + Additions); {ok, Pid, Reply} -> {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply, - ExtMan = lists:append(OutList, - [#manifest_entry{start_key=SmallestKey, - end_key=HighestKey, - owner=Pid, - filename=FileName}]), + Entry = #manifest_entry{start_key=SmallestKey, + end_key=HighestKey, + owner=Pid, + filename=FileName}, leveled_log:log_timer("PC015", [], TS1), do_merge(KL1Rem, KL2Rem, - {SrcLevel, IsB}, {Filepath, MSN}, MaxSQN, - FileCounter + 1, ExtMan) - end. - - -get_item(Index, List, Default) -> - case lists:keysearch(Index, 1, List) of - {value, {Index, Value}} -> - Value; - false -> - Default + SinkLevel, SinkB, + RP, NewSQN, MaxSQN, + Additions ++ [Entry]) end. @@ -361,26 +249,6 @@ generate_randomkeys(Count, Acc, BucketLow, BRange) -> null}}, generate_randomkeys(Count - 1, [RandKey|Acc], BucketLow, BRange). -choose_pid_toquery([ManEntry|_T], Key) when - Key >= ManEntry#manifest_entry.start_key, - ManEntry#manifest_entry.end_key >= Key -> - ManEntry#manifest_entry.owner; -choose_pid_toquery([_H|T], Key) -> - choose_pid_toquery(T, Key). - - -find_randomkeys(_FList, 0, _Source) -> - ok; -find_randomkeys(FList, Count, Source) -> - KV1 = lists:nth(random:uniform(length(Source)), Source), - K1 = leveled_codec:strip_to_keyonly(KV1), - P1 = choose_pid_toquery(FList, K1), - FoundKV = leveled_sst:sst_get(P1, K1), - Found = leveled_codec:strip_to_keyonly(FoundKV), - io:format("success finding ~w in ~w~n", [K1, P1]), - ?assertMatch(K1, Found), - find_randomkeys(FList, Count - 1, Source). - merge_file_test() -> KL1_L1 = lists:sort(generate_randomkeys(8000, 0, 1000)), @@ -408,57 +276,40 @@ merge_file_test() -> 2, KL4_L2, undefined), - Result = perform_merge({PidL1_1, "../test/KL1_L1.sst"}, - [#manifest_entry{owner=PidL2_1}, - #manifest_entry{owner=PidL2_2}, - #manifest_entry{owner=PidL2_3}, - #manifest_entry{owner=PidL2_4}], - {2, false}, {"../test/", 99}), - lists:foreach(fun(ManEntry) -> - {o, B1, K1} = ManEntry#manifest_entry.start_key, - {o, B2, K2} = ManEntry#manifest_entry.end_key, - io:format("Result of ~s ~s and ~s ~s with Pid ~w~n", - [B1, K1, B2, K2, ManEntry#manifest_entry.owner]) end, - Result), - io:format("Finding keys in KL1_L1~n"), - ok = find_randomkeys(Result, 50, KL1_L1), - io:format("Finding keys in KL1_L2~n"), - ok = find_randomkeys(Result, 50, KL1_L2), - io:format("Finding keys in KL2_L2~n"), - ok = find_randomkeys(Result, 50, KL2_L2), - io:format("Finding keys in KL3_L2~n"), - ok = find_randomkeys(Result, 50, KL3_L2), - io:format("Finding keys in KL4_L2~n"), - ok = find_randomkeys(Result, 50, KL4_L2), - leveled_sst:sst_clear(PidL1_1), - leveled_sst:sst_clear(PidL2_1), - leveled_sst:sst_clear(PidL2_2), - leveled_sst:sst_clear(PidL2_3), - leveled_sst:sst_clear(PidL2_4), - lists:foreach(fun(ManEntry) -> - leveled_sst:sst_clear(ManEntry#manifest_entry.owner) end, - Result). - -select_merge_candidates_test() -> - Sink1 = #manifest_entry{start_key = {o, "Bucket", "Key1"}, - end_key = {o, "Bucket", "Key20000"}}, - Sink2 = #manifest_entry{start_key = {o, "Bucket", "Key20001"}, - end_key = {o, "Bucket1", "Key1"}}, - Src1 = #manifest_entry{start_key = {o, "Bucket", "Key40001"}, - end_key = {o, "Bucket", "Key60000"}}, - {Candidates, Others} = check_for_merge_candidates(Src1, [Sink1, Sink2]), - ?assertMatch([Sink2], Candidates), - ?assertMatch([Sink1], Others). - - -select_merge_file_test() -> - L0 = [{{o, "B1", "K1"}, {o, "B3", "K3"}, dummy_pid}], - L1 = [{{o, "B1", "K1"}, {o, "B2", "K2"}, dummy_pid}, - {{o, "B2", "K3"}, {o, "B4", "K4"}, dummy_pid}], - Manifest = [{0, L0}, {1, L1}], - {FileRef, NewManifest} = select_filetomerge(0, Manifest), - ?assertMatch(FileRef, {{o, "B1", "K1"}, {o, "B3", "K3"}, dummy_pid}), - ?assertMatch(NewManifest, [{0, []}, {1, L1}]). + + E1 = #manifest_entry{owner = PidL1_1, + filename = "../test/KL1_L1.sst", + end_key = lists:last(KL1_L1), + start_key = lists:nth(1, KL1_L1)}, + E2 = #manifest_entry{owner = PidL2_1, + filename = "../test/KL1_L2.sst", + end_key = lists:last(KL1_L2), + start_key = lists:nth(1, KL1_L2)}, + E3 = #manifest_entry{owner = PidL2_2, + filename = "../test/KL2_L2.sst", + end_key = lists:last(KL2_L2), + start_key = lists:nth(1, KL2_L2)}, + E4 = #manifest_entry{owner = PidL2_3, + filename = "../test/KL3_L2.sst", + end_key = lists:last(KL3_L2), + start_key = lists:nth(1, KL3_L2)}, + E5 = #manifest_entry{owner = PidL2_4, + filename = "../test/KL4_L2.sst", + end_key = lists:last(KL4_L2), + start_key = lists:nth(1, KL4_L2)}, + + Man0 = leveled_pmanifest:new_manifest(), + Man1 = leveled_pmanifest:insert_manifest_entry(Man0, 1, 2, E2), + Man2 = leveled_pmanifest:insert_manifest_entry(Man1, 1, 2, E3), + Man3 = leveled_pmanifest:insert_manifest_entry(Man2, 1, 2, E4), + Man4 = leveled_pmanifest:insert_manifest_entry(Man3, 1, 2, E5), + Man5 = leveled_pmanifest:insert_manifest_entry(Man4, 2, 1, E1), + + PointerList = lists:map(fun(ME) -> {next, ME, all} end, + [E2, E3, E4, E5]), + {Man6, _Dels} = perform_merge(Man5, E1, PointerList, 1, "../test", 3), + + ?assertMatch(3, leveled_pmanifest:get_manifest_sqn(Man6)). coverage_cheat_test() -> {ok, _State1} = code_change(null, #state{}, null). diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index a07065b..9a6daf3 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -161,12 +161,15 @@ -include("include/leveled.hrl"). --export([init/1, +-export([ + init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3, + code_change/3]). + +-export([ pcl_start/1, pcl_pushmem/2, pcl_fetchlevelzero/2, @@ -176,15 +179,18 @@ pcl_fetchnextkey/5, pcl_checksequencenumber/3, pcl_workforclerk/1, - pcl_promptmanifestchange/2, + pcl_manifestchange/2, pcl_confirml0complete/4, - pcl_confirmdelete/2, + pcl_confirmdelete/3, pcl_close/1, pcl_doom/1, pcl_registersnapshot/2, pcl_releasesnapshot/2, pcl_loadsnapshot/2, - pcl_getstartupsequencenumber/1, + pcl_getstartupsequencenumber/1]). + +-export([ + filepath/3, clean_testdir/1]). -include_lib("eunit/include/eunit.hrl"). @@ -206,13 +212,12 @@ -define(COIN_SIDECOUNT, 5). -define(SLOW_FETCH, 20000). -define(ITERATOR_SCANWIDTH, 4). +-define(SNAPSHOT_TIMEOUT, 3600). --record(state, {manifest = [] :: list(), - manifest_sqn = 0 :: integer(), - ledger_sqn = 0 :: integer(), % The highest SQN added to L0 +-record(state, {manifest, % a manifest record from the leveled_manifest module persisted_sqn = 0 :: integer(), % The highest SQN persisted - registered_snapshots = [] :: list(), - unreferenced_files = [] :: list(), + + ledger_sqn = 0 :: integer(), % The highest SQN added to L0 root_path = "../test" :: string(), clerk :: pid(), @@ -230,8 +235,8 @@ source_penciller :: pid(), levelzero_astree :: list(), - ongoing_work = [] :: list(), - work_backlog = false :: boolean(), + work_ongoing = false :: boolean(), % i.e. compaction work + work_backlog = false :: boolean(), % i.e. compaction work head_timing :: tuple()}). @@ -284,16 +289,16 @@ pcl_checksequencenumber(Pid, Key, SQN) -> end. pcl_workforclerk(Pid) -> - gen_server:call(Pid, work_for_clerk, infinity). + gen_server:cast(Pid, work_for_clerk). -pcl_promptmanifestchange(Pid, WI) -> - gen_server:cast(Pid, {manifest_change, WI}). +pcl_manifestchange(Pid, Manifest) -> + gen_server:cast(Pid, {manifest_change, Manifest}). pcl_confirml0complete(Pid, FN, StartKey, EndKey) -> gen_server:cast(Pid, {levelzero_complete, FN, StartKey, EndKey}). -pcl_confirmdelete(Pid, FileName) -> - gen_server:cast(Pid, {confirm_delete, FileName}). +pcl_confirmdelete(Pid, FileName, FilePid) -> + gen_server:cast(Pid, {confirm_delete, FileName, FilePid}). pcl_getstartupsequencenumber(Pid) -> gen_server:call(Pid, get_startup_sqn, infinity). @@ -324,9 +329,11 @@ init([PCLopts]) -> {undefined, true} -> SrcPenciller = PCLopts#penciller_options.source_penciller, {ok, State} = pcl_registersnapshot(SrcPenciller, self()), + ManifestClone = leveled_pmanifest:copy_manifest(State#state.manifest), leveled_log:log("P0001", [self()]), - io:format("Snapshot ledger sqn at ~w~n", [State#state.ledger_sqn]), - {ok, State#state{is_snapshot=true, source_penciller=SrcPenciller}}; + {ok, State#state{is_snapshot=true, + source_penciller=SrcPenciller, + manifest=ManifestClone}}; %% Need to do something about timeout {_RootPath, false} -> start_from_file(PCLopts) @@ -401,23 +408,33 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys}, List -> List end, - SSTiter = initiate_rangequery_frommanifest(StartKey, - EndKey, - State#state.manifest), + + SetupFoldFun = + fun(Level, Acc) -> + Pointers = leveled_pmanifest:range_lookup(State#state.manifest, + Level, + StartKey, + EndKey), + case Pointers of + [] -> Acc; + PL -> Acc ++ [{Level, PL}] + end + end, + SSTiter = lists:foldl(SetupFoldFun, [], lists:seq(0, ?MAX_LEVELS - 1)), + Acc = keyfolder({L0AsList, SSTiter}, {StartKey, EndKey}, {AccFun, InitAcc}, MaxKeys), + {reply, Acc, State#state{levelzero_astree = L0AsList}}; -handle_call(work_for_clerk, From, State) -> - {UpdState, Work} = return_work(State, From), - {reply, Work, UpdState}; handle_call(get_startup_sqn, _From, State) -> {reply, State#state.persisted_sqn, State}; handle_call({register_snapshot, Snapshot}, _From, State) -> - Rs = [{Snapshot, - State#state.manifest_sqn}|State#state.registered_snapshots], - {reply, {ok, State}, State#state{registered_snapshots = Rs}}; + Manifest0 = leveled_pmanifest:add_snapshot(State#state.manifest, + Snapshot, + ?SNAPSHOT_TIMEOUT), + {reply, {ok, State}, State#state{manifest = Manifest0}}; handle_call({load_snapshot, {BookieIncrTree, BookieIdx, MinSQN, MaxSQN}}, _From, State) -> L0D = leveled_pmem:add_to_cache(State#state.levelzero_size, @@ -443,29 +460,33 @@ handle_call(doom, _From, State) -> FilesFP = State#state.root_path ++ "/" ++ ?FILES_FP ++ "/", {stop, normal, {ok, [ManifestFP, FilesFP]}, State}. -handle_cast({manifest_change, WI}, State) -> - {ok, UpdState} = commit_manifest_change(WI, State), - ok = leveled_pclerk:clerk_manifestchange(State#state.clerk, - confirm, - false), - {noreply, UpdState}; +handle_cast({manifest_change, NewManifest}, State) -> + NewManSQN = leveled_pmanifest:get_manifest_sqn(NewManifest), + ok = leveled_pclerk:clerk_promptdeletions(State#state.clerk, NewManSQN), + {noreply, State#state{manifest = NewManifest, work_ongoing=false}}; handle_cast({release_snapshot, Snapshot}, State) -> - Rs = lists:keydelete(Snapshot, 1, State#state.registered_snapshots), + Manifest0 = leveled_pmanifest:release_snapshot(State#state.manifest, + Snapshot), leveled_log:log("P0003", [Snapshot]), - leveled_log:log("P0004", [Rs]), - {noreply, State#state{registered_snapshots=Rs}}; -handle_cast({confirm_delete, FileName}, State=#state{is_snapshot=Snap}) + {noreply, State#state{manifest=Manifest0}}; +handle_cast({confirm_delete, Filename, FilePid}, State=#state{is_snapshot=Snap}) when Snap == false -> - Reply = confirm_delete(FileName, - State#state.unreferenced_files, - State#state.registered_snapshots), - case Reply of - {true, Pid} -> - UF1 = lists:keydelete(FileName, 1, State#state.unreferenced_files), - leveled_log:log("P0005", [FileName]), - ok = leveled_sst:sst_deleteconfirmed(Pid), - {noreply, State#state{unreferenced_files=UF1}}; - _ -> + case State#state.work_ongoing of + false -> + R2D = leveled_pmanifest:ready_to_delete(State#state.manifest, + Filename), + case R2D of + {true, M0} -> + leveled_log:log("P0005", [Filename]), + ok = leveled_sst:sst_deleteconfirmed(FilePid), + {noreply, State#state{manifest=M0}}; + {false, _M0} -> + {noreply, State} + end; + true -> + % If there is ongoing work, then we can't safely update the pidmap + % as any change will be reverted when the manifest is passed back + % from the Clerk {noreply, State} end; handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) -> @@ -474,7 +495,11 @@ handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) -> end_key=EndKey, owner=State#state.levelzero_constructor, filename=FN}, - UpdMan = lists:keystore(0, 1, State#state.manifest, {0, [ManEntry]}), + ManifestSQN = leveled_pmanifest:get_manifest_sqn(State#state.manifest) + 1, + UpdMan = leveled_pmanifest:insert_manifest_entry(State#state.manifest, + ManifestSQN, + 0, + ManEntry), % Prompt clerk to ask about work - do this for every L0 roll UpdIndex = leveled_pmem:clear_index(State#state.levelzero_index), ok = leveled_pclerk:clerk_prompt(State#state.clerk), @@ -484,7 +509,33 @@ handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) -> levelzero_constructor=undefined, levelzero_size=0, manifest=UpdMan, - persisted_sqn=State#state.ledger_sqn}}. + persisted_sqn=State#state.ledger_sqn}}; +handle_cast(work_for_clerk, State) -> + case State#state.levelzero_pending of + true -> + {noreply, State}; + false -> + {WL, WC} = leveled_pmanifest:check_for_work(State#state.manifest, + ?LEVEL_SCALEFACTOR), + case WC of + 0 -> + {noreply, State#state{work_backlog=false}}; + N when N > ?WORKQUEUE_BACKLOG_TOLERANCE -> + leveled_log:log("P0024", [N, true]), + [TL|_Tail] = WL, + ok = leveled_pclerk:clerk_push(State#state.clerk, + {TL, State#state.manifest}), + {noreply, + State#state{work_backlog=true, work_ongoing=true}}; + N -> + leveled_log:log("P0024", [N, false]), + [TL|_Tail] = WL, + ok = leveled_pclerk:clerk_push(State#state.clerk, + {TL, State#state.manifest}), + {noreply, + State#state{work_backlog=false, work_ongoing=true}} + end + end. handle_info(_Info, State) -> @@ -495,10 +546,6 @@ terminate(Reason, State=#state{is_snapshot=Snap}) when Snap == true -> leveled_log:log("P0007", [Reason]), ok; terminate(Reason, State) -> - %% When a Penciller shuts down it isn't safe to try an manage the safe - %% finishing of any outstanding work. The last commmitted manifest will - %% be used. - %% %% Level 0 files lie outside of the manifest, and so if there is no L0 %% file present it is safe to write the current contents of memory. If %% there is a L0 file present - then the memory can be dropped (it is @@ -506,43 +553,27 @@ terminate(Reason, State) -> %% as presumably the ETS file has been recently flushed, hence the presence %% of a L0 file). %% - %% The penciller should close each file in the unreferenced files, and - %% then each file in the manifest, and cast a close on the clerk. - %% The cast may not succeed as the clerk could be synchronously calling - %% the penciller looking for a manifest commit - %% + %% The penciller should close each file in the manifest, and cast a close + %% on the clerk. + ok = leveled_pclerk:clerk_close(State#state.clerk), + leveled_log:log("P0008", [Reason]), - MC = leveled_pclerk:clerk_manifestchange(State#state.clerk, - return, - true), - UpdState = case MC of - {ok, WI} -> - {ok, NewState} = commit_manifest_change(WI, State), - Clerk = State#state.clerk, - ok = leveled_pclerk:clerk_manifestchange(Clerk, - confirm, - true), - NewState; - no_change -> - State - end, - case {UpdState#state.levelzero_pending, - get_item(0, UpdState#state.manifest, []), - UpdState#state.levelzero_size} of - {false, [], 0} -> - leveled_log:log("P0009", []); - {false, [], _N} -> - L0Pid = roll_memory(UpdState, true), + L0_Present = leveled_pmanifest:key_lookup(State#state.manifest, 0, all), + L0_Left = State#state.levelzero_size > 0, + case {State#state.levelzero_pending, L0_Present, L0_Left} of + {false, false, true} -> + L0Pid = roll_memory(State, true), ok = leveled_sst:sst_close(L0Pid); StatusTuple -> leveled_log:log("P0010", [StatusTuple]) end, % Tidy shutdown of individual files - ok = close_files(0, UpdState#state.manifest), - lists:foreach(fun({_FN, Pid, _SN}) -> - ok = leveled_sst:sst_close(Pid) end, - UpdState#state.unreferenced_files), + EntryCloseFun = + fun(ME) -> + ok = leveled_sst:sst_close(ME#manifest_entry.owner) + end, + leveled_pmanifest:close_manifest(State#state.manifest, EntryCloseFun), leveled_log:log("P0011", []), ok. @@ -565,7 +596,7 @@ start_from_file(PCLopts) -> M end, - {ok, MergeClerk} = leveled_pclerk:clerk_new(self()), + {ok, MergeClerk} = leveled_pclerk:clerk_new(self(), RootPath), CoinToss = PCLopts#penciller_options.levelzero_cointoss, % Used to randomly defer the writing of L0 file. Intended to help with @@ -579,47 +610,21 @@ start_from_file(PCLopts) -> levelzero_index=leveled_pmem:new_index()}, %% Open manifest - ManifestPath = filepath(InitState#state.root_path, manifest) ++ "/", - SSTPath = filepath(InitState#state.root_path, files) ++ "/", - ok = filelib:ensure_dir(ManifestPath), - ok = filelib:ensure_dir(SSTPath), - - {ok, Filenames} = file:list_dir(ManifestPath), - CurrRegex = "nonzero_(?[0-9]+)\\." ++ ?CURRENT_FILEX, - ValidManSQNs = lists:foldl(fun(FN, Acc) -> - case re:run(FN, - CurrRegex, - [{capture, ['MSN'], list}]) of - nomatch -> - Acc; - {match, [Int]} when is_list(Int) -> - Acc ++ [list_to_integer(Int)] - end - end, - [], - Filenames), - TopManSQN = lists:foldl(fun(X, MaxSQN) -> max(X, MaxSQN) end, - 0, - ValidManSQNs), - leveled_log:log("P0012", [TopManSQN]), - ManUpdate = case TopManSQN of - 0 -> - leveled_log:log("P0013", []), - {[], 0}; - _ -> - CurrManFile = filepath(InitState#state.root_path, - TopManSQN, - current_manifest), - {ok, Bin} = file:read_file(CurrManFile), - Manifest = binary_to_term(Bin), - open_all_filesinmanifest(Manifest) - end, - - {UpdManifest, MaxSQN} = ManUpdate, + Manifest0 = leveled_pmanifest:open_manifest(RootPath), + OpenFun = + fun(FN) -> + {ok, Pid, {_FK, _LK}} = leveled_sst:sst_open(FN), + Pid + end, + SQNFun = fun leveled_sst:sst_getmaxsequencenumber/1, + {MaxSQN, Manifest1} = leveled_pmanifest:load_manifest(Manifest0, + OpenFun, + SQNFun), leveled_log:log("P0014", [MaxSQN]), - + ManSQN = leveled_pmanifest:get_manifest_sqn(Manifest1), + leveled_log:log("P0035", [ManSQN]), %% Find any L0 files - L0FN = filepath(RootPath, TopManSQN, new_merge_files) ++ "_0_0.sst", + L0FN = filepath(RootPath, ManSQN + 1, new_merge_files) ++ "_0_0.sst", case filelib:is_file(L0FN) of true -> leveled_log:log("P0015", [L0FN]), @@ -627,32 +632,29 @@ start_from_file(PCLopts) -> L0Pid, {L0StartKey, L0EndKey}} = leveled_sst:sst_open(L0FN), L0SQN = leveled_sst:sst_getmaxsequencenumber(L0Pid), - ManifestEntry = #manifest_entry{start_key=L0StartKey, - end_key=L0EndKey, - owner=L0Pid, - filename=L0FN}, - UpdManifest2 = lists:keystore(0, - 1, - UpdManifest, - {0, [ManifestEntry]}), + L0Entry = #manifest_entry{start_key = L0StartKey, + end_key = L0EndKey, + filename = L0FN, + owner = L0Pid}, + Manifest2 = leveled_pmanifest:insert_manifest_entry(Manifest1, + ManSQN + 1, + 0, + L0Entry), leveled_log:log("P0016", [L0SQN]), LedgerSQN = max(MaxSQN, L0SQN), {ok, - InitState#state{manifest=UpdManifest2, - manifest_sqn=TopManSQN, - ledger_sqn=LedgerSQN, - persisted_sqn=LedgerSQN}}; + InitState#state{manifest = Manifest2, + ledger_sqn = LedgerSQN, + persisted_sqn = LedgerSQN}}; false -> leveled_log:log("P0017", []), {ok, - InitState#state{manifest=UpdManifest, - manifest_sqn=TopManSQN, - ledger_sqn=MaxSQN, - persisted_sqn=MaxSQN}} + InitState#state{manifest = Manifest1, + ledger_sqn = MaxSQN, + persisted_sqn = MaxSQN}} end. - update_levelzero(L0Size, {PushedTree, PushedIdx, MinSQN, MaxSQN}, LedgerSQN, L0Cache, State) -> SW = os:timestamp(), @@ -673,7 +675,7 @@ update_levelzero(L0Size, {PushedTree, PushedIdx, MinSQN, MaxSQN}, ledger_sqn=UpdMaxSQN}, CacheTooBig = NewL0Size > State#state.levelzero_maxcachesize, CacheMuchTooBig = NewL0Size > ?SUPER_MAX_TABLE_SIZE, - Level0Free = length(get_item(0, State#state.manifest, [])) == 0, + L0Free = not leveled_pmanifest:levelzero_present(State#state.manifest), RandomFactor = case State#state.levelzero_cointoss of true -> @@ -686,9 +688,10 @@ update_levelzero(L0Size, {PushedTree, PushedIdx, MinSQN, MaxSQN}, false -> true end, + NoPendingManifestChange = not State#state.work_ongoing, JitterCheck = RandomFactor or CacheMuchTooBig, - case {CacheTooBig, Level0Free, JitterCheck} of - {true, true, true} -> + case {CacheTooBig, L0Free, JitterCheck, NoPendingManifestChange} of + {true, true, true, true} -> L0Constructor = roll_memory(UpdState, false), leveled_log:log_timer("P0031", [], SW), UpdState#state{levelzero_pending=true, @@ -732,10 +735,10 @@ roll_memory(State, true) -> Constructor. levelzero_filename(State) -> - MSN = State#state.manifest_sqn, + ManSQN = leveled_pmanifest:get_manifest_sqn(State#state.manifest) + 1, FileName = State#state.root_path ++ "/" ++ ?FILES_FP ++ "/" - ++ integer_to_list(MSN) ++ "_0_0", + ++ integer_to_list(ManSQN) ++ "_0_0", FileName. timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, HeadTimer) -> @@ -767,22 +770,11 @@ fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> fetch(_Key, _Hash, _Manifest, ?MAX_LEVELS + 1, _FetchFun) -> {not_present, basement}; fetch(Key, Hash, Manifest, Level, FetchFun) -> - LevelManifest = get_item(Level, Manifest, []), - case lists:foldl(fun(File, Acc) -> - case Acc of - not_present when - Key >= File#manifest_entry.start_key, - File#manifest_entry.end_key >= Key -> - File#manifest_entry.owner; - FoundDetails -> - FoundDetails - end end, - not_present, - LevelManifest) of - not_present -> + case leveled_pmanifest:key_lookup(Manifest, Level, Key) of + false -> fetch(Key, Hash, Manifest, Level + 1, FetchFun); - FileToCheck -> - case FetchFun(FileToCheck, Key, Hash) of + FP -> + case FetchFun(FP, Key, Hash) of not_present -> fetch(Key, Hash, Manifest, Level + 1, FetchFun); ObjectFound -> @@ -821,135 +813,6 @@ compare_to_sqn(Obj, SQN) -> end. -%% Work out what the current work queue should be -%% -%% The work queue should have a lower level work at the front, and no work -%% should be added to the queue if a compaction worker has already been asked -%% to look at work at that level -%% -%% The full queue is calculated for logging purposes only - -return_work(State, From) -> - {WorkQ, BasementL} = assess_workqueue([], 0, State#state.manifest, 0), - case length(WorkQ) of - L when L > 0 -> - Excess = lists:foldl(fun({_, _, OH}, Acc) -> Acc+OH end, 0, WorkQ), - [{SrcLevel, Manifest, _Overhead}|_OtherWork] = WorkQ, - leveled_log:log("P0020", [SrcLevel, From, Excess]), - IsBasement = if - SrcLevel + 1 == BasementL -> - true; - true -> - false - end, - Backlog = Excess >= ?WORKQUEUE_BACKLOG_TOLERANCE, - case State#state.levelzero_pending of - true -> - % Once the L0 file is completed there will be more work - % - so don't be busy doing other work now - leveled_log:log("P0021", []), - {State#state{work_backlog=Backlog}, none}; - false -> - %% No work currently outstanding - %% Can allocate work - NextSQN = State#state.manifest_sqn + 1, - FP = filepath(State#state.root_path, - NextSQN, - new_merge_files), - ManFile = filepath(State#state.root_path, - NextSQN, - pending_manifest), - WI = #penciller_work{next_sqn=NextSQN, - clerk=From, - src_level=SrcLevel, - manifest=Manifest, - start_time = os:timestamp(), - ledger_filepath = FP, - manifest_file = ManFile, - target_is_basement = IsBasement}, - {State#state{ongoing_work=[WI], work_backlog=Backlog}, WI} - end; - _ -> - {State#state{work_backlog=false}, none} - end. - - -close_files(?MAX_LEVELS - 1, _Manifest) -> - ok; -close_files(Level, Manifest) -> - LevelList = get_item(Level, Manifest, []), - lists:foreach(fun(F) -> - ok = leveled_sst:sst_close(F#manifest_entry.owner) end, - LevelList), - close_files(Level + 1, Manifest). - - -open_all_filesinmanifest(Manifest) -> - open_all_filesinmanifest({Manifest, 0}, 0). - -open_all_filesinmanifest(Result, ?MAX_LEVELS - 1) -> - Result; -open_all_filesinmanifest({Manifest, TopSQN}, Level) -> - LevelList = get_item(Level, Manifest, []), - %% The Pids in the saved manifest related to now closed references - %% Need to roll over the manifest at this level starting new processes to - %5 replace them - LvlR = lists:foldl(fun(F, {FL, FL_SQN}) -> - FN = F#manifest_entry.filename, - {ok, P, _Keys} = leveled_sst:sst_open(FN), - F_SQN = leveled_sst:sst_getmaxsequencenumber(P), - {lists:append(FL, - [F#manifest_entry{owner = P}]), - max(FL_SQN, F_SQN)} - end, - {[], 0}, - LevelList), - %% Result is tuple of revised file list for this level in manifest, and - %% the maximum sequence number seen at this level - {LvlFL, LvlSQN} = LvlR, - UpdManifest = lists:keystore(Level, 1, Manifest, {Level, LvlFL}), - open_all_filesinmanifest({UpdManifest, max(TopSQN, LvlSQN)}, Level + 1). - -print_manifest(Manifest) -> - lists:foreach(fun(L) -> - leveled_log:log("P0022", [L]), - Level = get_item(L, Manifest, []), - lists:foreach(fun print_manifest_entry/1, Level) - end, - lists:seq(0, ?MAX_LEVELS - 1)), - ok. - -print_manifest_entry(Entry) -> - {S1, S2, S3} = leveled_codec:print_key(Entry#manifest_entry.start_key), - {E1, E2, E3} = leveled_codec:print_key(Entry#manifest_entry.end_key), - leveled_log:log("P0023", - [S1, S2, S3, E1, E2, E3, Entry#manifest_entry.filename]). - -initiate_rangequery_frommanifest(StartKey, EndKey, Manifest) -> - CompareFun = fun(M) -> - C1 = StartKey > M#manifest_entry.end_key, - C2 = leveled_codec:endkey_passed(EndKey, - M#manifest_entry.start_key), - not (C1 or C2) end, - FoldFun = - fun(L, AccL) -> - Level = get_item(L, Manifest, []), - FL = lists:foldl(fun(M, Acc) -> - case CompareFun(M) of - true -> - Acc ++ [{next, M, StartKey}]; - false -> - Acc - end end, - [], - Level), - case FL of - [] -> AccL; - FL -> AccL ++ [{L, FL}] - end - end, - lists:foldl(FoldFun, [], lists:seq(0, ?MAX_LEVELS - 1)). - %% Looks to find the best choice for the next key across the levels (other %% than in-memory table) %% In finding the best choice, the next key in a given level may be a next @@ -995,10 +858,9 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, LCnt + 1, {BKL, BKV}, StartKey, EndKey, Width); - {{next, ManifestEntry, _SK}, BKL, BKV} -> + {{next, Owner, _SK}, BKL, BKV} -> % The first key at this level is pointer to a file - need to query % the file to expand this level out before proceeding - Owner = ManifestEntry#manifest_entry.owner, Pointer = {next, Owner, StartKey, EndKey}, UpdList = leveled_sst:expand_list_by_pointer(Pointer, RestOfKeys, @@ -1153,143 +1015,14 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, KeyRange, end. -assess_workqueue(WorkQ, ?MAX_LEVELS - 1, _Man, BasementLevel) -> - {WorkQ, BasementLevel}; -assess_workqueue(WorkQ, LevelToAssess, Man, BasementLevel) -> - MaxFiles = get_item(LevelToAssess, ?LEVEL_SCALEFACTOR, 0), - case length(get_item(LevelToAssess, Man, [])) of - FileCount when FileCount > 0 -> - NewWQ = maybe_append_work(WorkQ, - LevelToAssess, - Man, - MaxFiles, - FileCount), - assess_workqueue(NewWQ, LevelToAssess + 1, Man, LevelToAssess); - 0 -> - assess_workqueue(WorkQ, LevelToAssess + 1, Man, BasementLevel) - end. - - -maybe_append_work(WorkQ, Level, Manifest, - MaxFiles, FileCount) - when FileCount > MaxFiles -> - Overhead = FileCount - MaxFiles, - leveled_log:log("P0024", [Overhead, Level]), - lists:append(WorkQ, [{Level, Manifest, Overhead}]); -maybe_append_work(WorkQ, _Level, _Manifest, - _MaxFiles, _FileCount) -> - WorkQ. - - -get_item(Index, List, Default) -> - case lists:keysearch(Index, 1, List) of - {value, {Index, Value}} -> - Value; - false -> - Default - end. - - -%% Request a manifest change -%% The clerk should have completed the work, and created a new manifest -%% and persisted the new view of the manifest -%% -%% To complete the change of manifest: -%% - the state of the manifest file needs to be changed from pending to current -%% - the list of unreferenced files needs to be updated on State -%% - the current manifest needs to be update don State -%% - the list of ongoing work needs to be cleared of this item - - -commit_manifest_change(ReturnedWorkItem, State) -> - NewMSN = State#state.manifest_sqn + 1, - [SentWorkItem] = State#state.ongoing_work, - RootPath = State#state.root_path, - UnreferencedFiles = State#state.unreferenced_files, - - if - NewMSN == SentWorkItem#penciller_work.next_sqn -> - WISrcLevel = SentWorkItem#penciller_work.src_level, - leveled_log:log_timer("P0025", - [SentWorkItem#penciller_work.next_sqn, - WISrcLevel], - SentWorkItem#penciller_work.start_time), - ok = rename_manifest_files(RootPath, NewMSN), - FilesToDelete = ReturnedWorkItem#penciller_work.unreferenced_files, - UnreferencedFilesUpd = update_deletions(FilesToDelete, - NewMSN, - UnreferencedFiles), - leveled_log:log("P0026", [NewMSN]), - NewManifest = ReturnedWorkItem#penciller_work.new_manifest, - - CurrL0 = get_item(0, State#state.manifest, []), - % If the work isn't L0 work, then we may have an uncommitted - % manifest change at L0 - so add this back into the Manifest loop - % state - RevisedManifest = case {WISrcLevel, CurrL0} of - {0, _} -> - NewManifest; - {_, []} -> - NewManifest; - {_, [L0ManEntry]} -> - lists:keystore(0, - 1, - NewManifest, - {0, [L0ManEntry]}) - end, - {ok, State#state{ongoing_work=[], - manifest_sqn=NewMSN, - manifest=RevisedManifest, - unreferenced_files=UnreferencedFilesUpd}} - end. - - -rename_manifest_files(RootPath, NewMSN) -> - OldFN = filepath(RootPath, NewMSN, pending_manifest), - NewFN = filepath(RootPath, NewMSN, current_manifest), - leveled_log:log("P0027", [OldFN, filelib:is_file(OldFN), - NewFN, filelib:is_file(NewFN)]), - ok = file:rename(OldFN,NewFN). - -filepath(RootPath, manifest) -> - RootPath ++ "/" ++ ?MANIFEST_FP; filepath(RootPath, files) -> - RootPath ++ "/" ++ ?FILES_FP. + FP = RootPath ++ "/" ++ ?FILES_FP, + filelib:ensure_dir(FP ++ "/"), + FP. -filepath(RootPath, NewMSN, pending_manifest) -> - filepath(RootPath, manifest) ++ "/" ++ "nonzero_" - ++ integer_to_list(NewMSN) ++ "." ++ ?PENDING_FILEX; -filepath(RootPath, NewMSN, current_manifest) -> - filepath(RootPath, manifest) ++ "/" ++ "nonzero_" - ++ integer_to_list(NewMSN) ++ "." ++ ?CURRENT_FILEX; filepath(RootPath, NewMSN, new_merge_files) -> filepath(RootPath, files) ++ "/" ++ integer_to_list(NewMSN). -update_deletions([], _NewMSN, UnreferencedFiles) -> - UnreferencedFiles; -update_deletions([ClearedFile|Tail], MSN, UnreferencedFiles) -> - leveled_log:log("P0028", [ClearedFile#manifest_entry.filename]), - update_deletions(Tail, - MSN, - lists:append(UnreferencedFiles, - [{ClearedFile#manifest_entry.filename, - ClearedFile#manifest_entry.owner, - MSN}])). - -confirm_delete(Filename, UnreferencedFiles, RegisteredSnapshots) -> - case lists:keyfind(Filename, 1, UnreferencedFiles) of - {Filename, Pid, MSN} -> - LowSQN = lists:foldl(fun({_, SQN}, MinSQN) -> min(SQN, MinSQN) end, - infinity, - RegisteredSnapshots), - if - MSN >= LowSQN -> - false; - true -> - {true, Pid} - end - end. - %%%============================================================================ @@ -1320,7 +1053,7 @@ generate_randomkeys(Count, SQN, Acc) -> clean_testdir(RootPath) -> - clean_subdir(filepath(RootPath, manifest)), + clean_subdir(leveled_pmanifest:filepath(RootPath, manifest)), clean_subdir(filepath(RootPath, files)). clean_subdir(DirPath) -> @@ -1338,47 +1071,6 @@ clean_subdir(DirPath) -> end. -compaction_work_assessment_test() -> - L0 = [{{o, "B1", "K1", null}, {o, "B3", "K3", null}, dummy_pid}], - L1 = [{{o, "B1", "K1", null}, {o, "B2", "K2", null}, dummy_pid}, - {{o, "B2", "K3", null}, {o, "B4", "K4", null}, dummy_pid}], - Manifest = [{0, L0}, {1, L1}], - {WorkQ1, 1} = assess_workqueue([], 0, Manifest, 0), - ?assertMatch([{0, Manifest, 1}], WorkQ1), - L1Alt = lists:append(L1, - [{{o, "B5", "K0001", null}, {o, "B5", "K9999", null}, - dummy_pid}, - {{o, "B6", "K0001", null}, {o, "B6", "K9999", null}, - dummy_pid}, - {{o, "B7", "K0001", null}, {o, "B7", "K9999", null}, - dummy_pid}, - {{o, "B8", "K0001", null}, {o, "B8", "K9999", null}, - dummy_pid}, - {{o, "B9", "K0001", null}, {o, "B9", "K9999", null}, - dummy_pid}, - {{o, "BA", "K0001", null}, {o, "BA", "K9999", null}, - dummy_pid}, - {{o, "BB", "K0001", null}, {o, "BB", "K9999", null}, - dummy_pid}]), - Manifest3 = [{0, []}, {1, L1Alt}], - {WorkQ3, 1} = assess_workqueue([], 0, Manifest3, 0), - ?assertMatch([{1, Manifest3, 1}], WorkQ3). - -confirm_delete_test() -> - Filename = 'test.sst', - UnreferencedFiles = [{'other.sst', dummy_owner, 15}, - {Filename, dummy_owner, 10}], - RegisteredIterators1 = [{dummy_pid, 16}, {dummy_pid, 12}], - R1 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators1), - ?assertMatch(R1, {true, dummy_owner}), - RegisteredIterators2 = [{dummy_pid, 10}, {dummy_pid, 12}], - R2 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators2), - ?assertMatch(R2, false), - RegisteredIterators3 = [{dummy_pid, 9}, {dummy_pid, 12}], - R3 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators3), - ?assertMatch(R3, false). - - maybe_pause_push(PCL, KL) -> T0 = leveled_skiplist:empty(true), I0 = leveled_pmem:new_index(), @@ -1461,11 +1153,13 @@ simple_server_test() -> ?assertMatch(Key2, pcl_fetch(PCLr, {o,"Bucket0002", "Key0002", null})), ?assertMatch(Key3, pcl_fetch(PCLr, {o,"Bucket0003", "Key0003", null})), ?assertMatch(Key4, pcl_fetch(PCLr, {o,"Bucket0004", "Key0004", null})), + SnapOpts = #penciller_options{start_snapshot = true, source_penciller = PCLr}, {ok, PclSnap} = pcl_start(SnapOpts), leveled_bookie:load_snapshot(PclSnap, leveled_bookie:empty_ledgercache()), + ?assertMatch(Key1, pcl_fetch(PclSnap, {o,"Bucket0001", "Key0001", null})), ?assertMatch(Key2, pcl_fetch(PclSnap, {o,"Bucket0002", "Key0002", null})), ?assertMatch(Key3, pcl_fetch(PclSnap, {o,"Bucket0003", "Key0003", null})), @@ -1497,6 +1191,7 @@ simple_server_test() -> % Add some more keys and confirm that check sequence number still % sees the old version in the previous snapshot, but will see the new version % in a new snapshot + Key1A_Pre = {{o,"Bucket0001", "Key0001", null}, {4005, {active, infinity}, null}}, Key1A = add_missing_hash(Key1A_Pre), @@ -1510,11 +1205,7 @@ simple_server_test() -> null}, 1)), ok = pcl_close(PclSnap), - - % Ignore a fake pending mnaifest on startup - ok = file:write_file(RootPath ++ "/" ++ ?MANIFEST_FP ++ "nonzero_99.pnd", - term_to_binary("Hello")), - + {ok, PclSnap2} = pcl_start(SnapOpts), leveled_bookie:load_snapshot(PclSnap2, leveled_bookie:empty_ledgercache()), ?assertMatch(false, pcl_checksequencenumber(PclSnap2, @@ -1540,57 +1231,6 @@ simple_server_test() -> clean_testdir(RootPath). -rangequery_manifest_test() -> - {E1, - E2, - E3} = {#manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"}, - end_key={i, "Bucket1", {"Idx1", "Fld9"}, "K93"}, - filename="Z1"}, - #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld9"}, "K97"}, - end_key={o, "Bucket1", "K71", null}, - filename="Z2"}, - #manifest_entry{start_key={o, "Bucket1", "K75", null}, - end_key={o, "Bucket1", "K993", null}, - filename="Z3"}}, - {E4, - E5, - E6} = {#manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"}, - end_key={i, "Bucket1", {"Idx1", "Fld7"}, "K93"}, - filename="Z4"}, - #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld7"}, "K97"}, - end_key={o, "Bucket1", "K78", null}, - filename="Z5"}, - #manifest_entry{start_key={o, "Bucket1", "K81", null}, - end_key={o, "Bucket1", "K996", null}, - filename="Z6"}}, - Man = [{1, [E1, E2, E3]}, {2, [E4, E5, E6]}], - SK1 = {o, "Bucket1", "K711", null}, - EK1 = {o, "Bucket1", "K999", null}, - R1 = initiate_rangequery_frommanifest(SK1, EK1, Man), - ?assertMatch([{1, [{next, E3, SK1}]}, - {2, [{next, E5, SK1}, {next, E6, SK1}]}], - R1), - SK2 = {i, "Bucket1", {"Idx1", "Fld8"}, null}, - EK2 = {i, "Bucket1", {"Idx1", "Fld8"}, null}, - R2 = initiate_rangequery_frommanifest(SK2, EK2, Man), - ?assertMatch([{1, [{next, E1, SK2}]}, {2, [{next, E5, SK2}]}], R2), - R3 = initiate_rangequery_frommanifest({i, "Bucket1", {"Idx0", "Fld8"}, null}, - {i, "Bucket1", {"Idx0", "Fld9"}, null}, - Man), - ?assertMatch([], R3). - -print_manifest_test() -> - M1 = #manifest_entry{start_key={i, "Bucket1", {<<"Idx1">>, "Fld1"}, "K8"}, - end_key={i, 4565, {"Idx1", "Fld9"}, "K93"}, - filename="Z1"}, - M2 = #manifest_entry{start_key={i, self(), {null, "Fld1"}, "K8"}, - end_key={i, <<200:32/integer>>, {"Idx1", "Fld9"}, "K93"}, - filename="Z1"}, - M3 = #manifest_entry{start_key={?STD_TAG, self(), {null, "Fld1"}, "K8"}, - end_key={?RIAK_TAG, <<200:32/integer>>, {"Idx1", "Fld9"}, "K93"}, - filename="Z1"}, - print_manifest([{1, [M1, M2, M3]}]). - simple_findnextkey_test() -> QueryArray = [ {2, [{{o, "Bucket1", "Key1"}, {5, {active, infinity}, null}}, @@ -1757,81 +1397,6 @@ create_file_test() -> {ok, Bin} = file:read_file("../test/new_file.sst.discarded"), ?assertMatch("hello", binary_to_term(Bin)). -commit_manifest_test() -> - Sent_WI = #penciller_work{next_sqn=1, - src_level=0, - start_time=os:timestamp()}, - Resp_WI = #penciller_work{next_sqn=1, - src_level=0}, - State = #state{ongoing_work = [Sent_WI], - root_path = "test", - manifest_sqn = 0}, - ManifestFP = "test" ++ "/" ++ ?MANIFEST_FP ++ "/", - ok = filelib:ensure_dir(ManifestFP), - ok = file:write_file(ManifestFP ++ "nonzero_1.pnd", - term_to_binary("dummy data")), - - L1_0 = [{1, [#manifest_entry{filename="1.sst"}]}], - Resp_WI0 = Resp_WI#penciller_work{new_manifest=L1_0, - unreferenced_files=[]}, - {ok, State0} = commit_manifest_change(Resp_WI0, State), - ?assertMatch(1, State0#state.manifest_sqn), - ?assertMatch([], get_item(0, State0#state.manifest, [])), - - L0Entry = [#manifest_entry{filename="0.sst"}], - ManifestPlus = [{0, L0Entry}|State0#state.manifest], - - NxtSent_WI = #penciller_work{next_sqn=2, - src_level=1, - start_time=os:timestamp()}, - NxtResp_WI = #penciller_work{next_sqn=2, - src_level=1}, - State1 = State0#state{ongoing_work=[NxtSent_WI], - manifest = ManifestPlus}, - - ok = file:write_file(ManifestFP ++ "nonzero_2.pnd", - term_to_binary("dummy data")), - - L2_0 = [#manifest_entry{filename="2.sst"}], - NxtResp_WI0 = NxtResp_WI#penciller_work{new_manifest=[{2, L2_0}], - unreferenced_files=[]}, - {ok, State2} = commit_manifest_change(NxtResp_WI0, State1), - - ?assertMatch(1, State1#state.manifest_sqn), - ?assertMatch(2, State2#state.manifest_sqn), - ?assertMatch(L0Entry, get_item(0, State2#state.manifest, [])), - ?assertMatch(L2_0, get_item(2, State2#state.manifest, [])), - - clean_testdir(State#state.root_path). - - -badmanifest_test() -> - RootPath = "../test/ledger", - clean_testdir(RootPath), - {ok, PCL} = pcl_start(#penciller_options{root_path=RootPath, - max_inmemory_tablesize=1000}), - Key1_pre = {{o,"Bucket0001", "Key0001", null}, - {1001, {active, infinity}, null}}, - Key1 = add_missing_hash(Key1_pre), - KL1 = generate_randomkeys({1000, 1}), - - ok = maybe_pause_push(PCL, KL1 ++ [Key1]), - %% Added together, as split apart there will be a race between the close - %% call to the penciller and the second fetch of the cache entry - ?assertMatch(Key1, pcl_fetch(PCL, {o, "Bucket0001", "Key0001", null})), - - timer:sleep(100), % Avoids confusion if L0 file not written before close - ok = pcl_close(PCL), - - ManifestFP = filepath(RootPath, manifest), - ok = file:write_file(filename:join(ManifestFP, "yeszero_123.man"), - term_to_binary("hello")), - {ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath, - max_inmemory_tablesize=1000}), - ?assertMatch(Key1, pcl_fetch(PCLr, {o,"Bucket0001", "Key0001", null})), - ok = pcl_close(PCLr), - clean_testdir(RootPath). - checkready(Pid) -> try leveled_sst:sst_checkready(Pid) diff --git a/src/leveled_pmanifest.erl b/src/leveled_pmanifest.erl new file mode 100644 index 0000000..9fa50ef --- /dev/null +++ b/src/leveled_pmanifest.erl @@ -0,0 +1,701 @@ +%% -------- PENCILLER MANIFEST --------- +%% +%% The manifest is an ordered set of files for each level to be used to find +%% which file is relevant for a given key or range lookup at a given level. +%% +%% This implementation is incomplete, in that it just uses a plain list at +%% each level. This is fine for short-lived volume tests, but as the deeper +%% levels are used there will be an exponential penalty. +%% +%% The originial intention was to swap out this implementation for a +%% multi-version ETS table - but that became complex. So one of two changes +%% are pending: +%% - Use a single version ES cache for lower levels (and not allow snapshots to +%% access the cache) +%% - Use a skiplist like enhanced list at lower levels. + + +-module(leveled_pmanifest). + +-include("include/leveled.hrl"). + +-export([ + new_manifest/0, + open_manifest/1, + copy_manifest/1, + load_manifest/3, + close_manifest/2, + save_manifest/2, + get_manifest_sqn/1, + key_lookup/3, + range_lookup/4, + merge_lookup/4, + insert_manifest_entry/4, + remove_manifest_entry/4, + switch_manifest_entry/4, + mergefile_selector/2, + add_snapshot/3, + release_snapshot/2, + ready_to_delete/2, + check_for_work/2, + is_basement/2, + levelzero_present/1 + ]). + +-export([ + filepath/2 + ]). + +-include_lib("eunit/include/eunit.hrl"). + +-define(MANIFEST_FILEX, "man"). +-define(MANIFEST_FP, "ledger_manifest"). +-define(MAX_LEVELS, 8). + +-record(manifest, {levels, + % an array of lists or trees representing the manifest + manifest_sqn = 0 :: integer(), + % The current manifest SQN + snapshots :: list(), + % A list of snaphots (i.e. clones) + min_snapshot_sqn = 0 :: integer(), + % The smallest snapshot manifest SQN in the snapshot + % list + pending_deletes, % OTP16 does not like defining type + % a dictionary mapping keys (filenames) to SQN when the + % deletion was made, and the original Manifest Entry + basement :: integer() + % Currently the lowest level (the largest number) + }). + +%%%============================================================================ +%%% API +%%%============================================================================ + +new_manifest() -> + #manifest{ + levels = array:new([{size, ?MAX_LEVELS + 1}, {default, []}]), + manifest_sqn = 0, + snapshots = [], + pending_deletes = dict:new(), + basement = 0 + }. + +open_manifest(RootPath) -> + % Open the manifest in the file path which has the highest SQN, and will + % open without error + ManifestPath = filepath(RootPath, manifest), + {ok, Filenames} = file:list_dir(ManifestPath), + CurrRegex = "nonzero_(?[0-9]+)\\." ++ ?MANIFEST_FILEX, + ExtractSQNFun = + fun(FN, Acc) -> + case re:run(FN, CurrRegex, [{capture, ['MSN'], list}]) of + nomatch -> + Acc; + {match, [Int]} when is_list(Int) -> + Acc ++ [list_to_integer(Int)] + end + end, + ValidManSQNs = lists:reverse(lists:sort(lists:foldl(ExtractSQNFun, + [], + Filenames))), + open_manifestfile(RootPath, ValidManSQNs). + +copy_manifest(Manifest) -> + % Copy the manifest ensuring anything only the master process should care + % about is switched to undefined + Manifest#manifest{snapshots = undefined, pending_deletes = undefined}. + +load_manifest(Manifest, PidFun, SQNFun) -> + UpdateLevelFun = + fun(LevelIdx, {AccMaxSQN, AccMan}) -> + L0 = array:get(LevelIdx, AccMan#manifest.levels), + {L1, SQN1} = load_level(LevelIdx, L0, PidFun, SQNFun), + UpdLevels = array:set(LevelIdx, L1, AccMan#manifest.levels), + {max(AccMaxSQN, SQN1), AccMan#manifest{levels = UpdLevels}} + end, + lists:foldl(UpdateLevelFun, {0, Manifest}, + lists:seq(0, Manifest#manifest.basement)). + +close_manifest(Manifest, CloseEntryFun) -> + CloseLevelFun = + fun(LevelIdx) -> + Level = array:get(LevelIdx, Manifest#manifest.levels), + close_level(LevelIdx, Level, CloseEntryFun) + end, + lists:foreach(CloseLevelFun, lists:seq(0, Manifest#manifest.basement)), + + ClosePDFun = + fun({_FN, {_SQN, ME}}) -> + CloseEntryFun(ME) + end, + lists:foreach(ClosePDFun, dict:to_list(Manifest#manifest.pending_deletes)). + +save_manifest(Manifest, RootPath) -> + FP = filepath(RootPath, Manifest#manifest.manifest_sqn, current_manifest), + ManBin = term_to_binary(Manifest#manifest{snapshots = [], + pending_deletes = dict:new(), + min_snapshot_sqn = 0}), + CRC = erlang:crc32(ManBin), + ok = file:write_file(FP, <>). + +insert_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) -> + Levels = Manifest#manifest.levels, + Level = array:get(LevelIdx, Levels), + UpdLevel = add_entry(LevelIdx, Level, Entry), + leveled_log:log("PC019", ["insert", LevelIdx, UpdLevel]), + Basement = max(LevelIdx, Manifest#manifest.basement), + Manifest#manifest{levels = array:set(LevelIdx, UpdLevel, Levels), + basement = Basement, + manifest_sqn = ManSQN}. + +remove_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) -> + Levels = Manifest#manifest.levels, + Level = array:get(LevelIdx, Levels), + UpdLevel = remove_entry(LevelIdx, Level, Entry), + leveled_log:log("PC019", ["remove", LevelIdx, UpdLevel]), + DelFun = + fun(E, Acc) -> + dict:store(E#manifest_entry.filename, + {ManSQN, E}, + Acc) + end, + Entries = + case is_list(Entry) of + true -> + Entry; + false -> + [Entry] + end, + PendingDeletes = lists:foldl(DelFun, + Manifest#manifest.pending_deletes, + Entries), + UpdLevels = array:set(LevelIdx, UpdLevel, Levels), + case is_empty(LevelIdx, UpdLevel) of + true -> + Manifest#manifest{levels = UpdLevels, + basement = get_basement(UpdLevels), + manifest_sqn = ManSQN, + pending_deletes = PendingDeletes}; + false -> + Manifest#manifest{levels = UpdLevels, + manifest_sqn = ManSQN, + pending_deletes = PendingDeletes} + end. + +switch_manifest_entry(Manifest, ManSQN, SrcLevel, Entry) -> + % Move to level below - so needs to be removed but not marked as a + % pending deletion + Levels = Manifest#manifest.levels, + Level = array:get(SrcLevel, Levels), + UpdLevel = remove_entry(SrcLevel, Level, Entry), + UpdLevels = array:set(SrcLevel, UpdLevel, Levels), + insert_manifest_entry(Manifest#manifest{levels = UpdLevels}, + ManSQN, + SrcLevel + 1, + Entry). + +get_manifest_sqn(Manifest) -> + Manifest#manifest.manifest_sqn. + +key_lookup(Manifest, LevelIdx, Key) -> + case LevelIdx > Manifest#manifest.basement of + true -> + false; + false -> + key_lookup_level(LevelIdx, + array:get(LevelIdx, Manifest#manifest.levels), + Key) + end. + +range_lookup(Manifest, LevelIdx, StartKey, EndKey) -> + MakePointerFun = + fun(M) -> + {next, M, StartKey} + end, + range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun). + +merge_lookup(Manifest, LevelIdx, StartKey, EndKey) -> + MakePointerFun = + fun(M) -> + {next, M, all} + end, + range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun). + + + +%% An algorithm for discovering which files to merge .... +%% We can find the most optimal file: +%% - The one with the most overlapping data below? +%% - The one that overlaps with the fewest files below? +%% - The smallest file? +%% We could try and be fair in some way (merge oldest first) +%% Ultimately, there is a lack of certainty that being fair or optimal is +%% genuinely better - eventually every file has to be compacted. +%% +%% Hence, the initial implementation is to select files to merge at random +mergefile_selector(Manifest, LevelIdx) -> + Level = array:get(LevelIdx, Manifest#manifest.levels), + lists:nth(random:uniform(length(Level)), Level). + +add_snapshot(Manifest, Pid, Timeout) -> + {MegaNow, SecNow, _} = os:timestamp(), + TimeToTimeout = MegaNow * 1000000 + SecNow + Timeout, + SnapEntry = {Pid, Manifest#manifest.manifest_sqn, TimeToTimeout}, + SnapList0 = [SnapEntry|Manifest#manifest.snapshots], + ManSQN = Manifest#manifest.manifest_sqn, + case Manifest#manifest.min_snapshot_sqn of + 0 -> + + Manifest#manifest{snapshots = SnapList0, + min_snapshot_sqn = ManSQN}; + N -> + N0 = min(N, ManSQN), + Manifest#manifest{snapshots = SnapList0, min_snapshot_sqn = N0} + end. + +release_snapshot(Manifest, Pid) -> + FilterFun = + fun({P, SQN, TS}, {Acc, MinSQN}) -> + case P of + Pid -> + {Acc, MinSQN}; + _ -> + {[{P, SQN, TS}|Acc], min(SQN, MinSQN)} + end + end, + {SnapList0, MinSnapSQN} = lists:foldl(FilterFun, + {[], infinity}, + Manifest#manifest.snapshots), + leveled_log:log("P0004", [SnapList0]), + case SnapList0 of + [] -> + Manifest#manifest{snapshots = SnapList0, + min_snapshot_sqn = 0}; + _ -> + Manifest#manifest{snapshots = SnapList0, + min_snapshot_sqn = MinSnapSQN} + end. + +ready_to_delete(Manifest, Filename) -> + {ChangeSQN, _ME} = dict:fetch(Filename, Manifest#manifest.pending_deletes), + case Manifest#manifest.min_snapshot_sqn of + 0 -> + % no shapshots + PDs = dict:erase(Filename, Manifest#manifest.pending_deletes), + {true, Manifest#manifest{pending_deletes = PDs}}; + N when N >= ChangeSQN -> + % Every snapshot is looking at a version of history after this + % was removed + PDs = dict:erase(Filename, Manifest#manifest.pending_deletes), + {true, Manifest#manifest{pending_deletes = PDs}}; + _N -> + {false, Manifest} + end. + +check_for_work(Manifest, Thresholds) -> + CheckLevelFun = + fun({LevelIdx, MaxCount}, {AccL, AccC}) -> + case LevelIdx > Manifest#manifest.basement of + true -> + {AccL, AccC}; + false -> + Level = array:get(LevelIdx, Manifest#manifest.levels), + S = size(LevelIdx, Level), + case S > MaxCount of + true -> + {[LevelIdx|AccL], AccC + S - MaxCount}; + false -> + {AccL, AccC} + end + end + end, + lists:foldr(CheckLevelFun, {[], 0}, Thresholds). + +is_basement(Manifest, Level) -> + Level >= Manifest#manifest.basement. + +levelzero_present(Manifest) -> + not is_empty(0, array:get(0, Manifest#manifest.levels)). + +%%%============================================================================ +%%% Internal Functions +%%%============================================================================ + +%% All these internal functions that work on a level are also passed LeveIdx +%% even if this is not presently relevant. Currnetly levels are lists, but +%% future branches may make lower levels trees or skiplists to improve fetch +%% efficiency + +load_level(_LevelIdx, Level, PidFun, SQNFun) -> + LevelLoadFun = + fun(ME, {L_Out, L_MaxSQN}) -> + FN = ME#manifest_entry.filename, + P = PidFun(FN), + SQN = SQNFun(P), + {[ME#manifest_entry{owner=P}|L_Out], max(SQN, L_MaxSQN)} + end, + lists:foldr(LevelLoadFun, {[], 0}, Level). + +close_level(_LevelIdx, Level, CloseEntryFun) -> + lists:foreach(CloseEntryFun, Level). + +is_empty(_LevelIdx, []) -> + true; +is_empty(_LevelIdx, _Level) -> + false. + +size(_LevelIdx, Level) -> + length(Level). + +add_entry(_LevelIdx, Level, Entries) when is_list(Entries) -> + lists:sort(Level ++ Entries); +add_entry(_LevelIdx, Level, Entry) -> + lists:sort([Entry|Level]). + +remove_entry(_LevelIdx, Level, Entries) when is_list(Entries) -> + % We're assuming we're removing a sorted sublist + RemLength = length(Entries), + [RemStart|_Tail] = Entries, + remove_section(Level, RemStart#manifest_entry.start_key, RemLength); +remove_entry(_LevelIdx, Level, Entry) -> + remove_section(Level, Entry#manifest_entry.start_key, 1). + +remove_section(Level, SectionStartKey, SectionLength) -> + PredFun = + fun(E) -> + E#manifest_entry.start_key < SectionStartKey + end, + {Pre, Rest} = lists:splitwith(PredFun, Level), + Post = lists:nthtail(SectionLength, Rest), + Pre ++ Post. + + +key_lookup_level(_LevelIdx, [], _Key) -> + false; +key_lookup_level(LevelIdx, [Entry|Rest], Key) -> + case Entry#manifest_entry.end_key >= Key of + true -> + case Key >= Entry#manifest_entry.start_key of + true -> + Entry#manifest_entry.owner; + false -> + false + end; + false -> + key_lookup_level(LevelIdx, Rest, Key) + end. + +range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun) -> + Range = + case LevelIdx > Manifest#manifest.basement of + true -> + []; + false -> + range_lookup_level(LevelIdx, + array:get(LevelIdx, + Manifest#manifest.levels), + StartKey, + EndKey) + end, + lists:map(MakePointerFun, Range). + +range_lookup_level(_LevelIdx, Level, QStartKey, QEndKey) -> + BeforeFun = + fun(M) -> + QStartKey > M#manifest_entry.end_key + end, + NotAfterFun = + fun(M) -> + not leveled_codec:endkey_passed(QEndKey, + M#manifest_entry.start_key) + end, + {_Before, MaybeIn} = lists:splitwith(BeforeFun, Level), + {In, _After} = lists:splitwith(NotAfterFun, MaybeIn), + In. + +get_basement(Levels) -> + GetBaseFun = + fun(L, Acc) -> + case is_empty(L, array:get(L, Levels)) of + false -> + max(L, Acc); + true -> + Acc + end + end, + lists:foldl(GetBaseFun, 0, lists:seq(0, ?MAX_LEVELS)). + + +filepath(RootPath, manifest) -> + MFP = RootPath ++ "/" ++ ?MANIFEST_FP ++ "/", + filelib:ensure_dir(MFP), + MFP. + +filepath(RootPath, NewMSN, current_manifest) -> + filepath(RootPath, manifest) ++ "nonzero_" + ++ integer_to_list(NewMSN) ++ "." ++ ?MANIFEST_FILEX. + + +open_manifestfile(_RootPath, []) -> + leveled_log:log("P0013", []), + new_manifest(); +open_manifestfile(_RootPath, [0]) -> + leveled_log:log("P0013", []), + new_manifest(); +open_manifestfile(RootPath, [TopManSQN|Rest]) -> + CurrManFile = filepath(RootPath, TopManSQN, current_manifest), + {ok, FileBin} = file:read_file(CurrManFile), + <> = FileBin, + case erlang:crc32(BinaryOfTerm) of + CRC -> + leveled_log:log("P0012", [TopManSQN]), + binary_to_term(BinaryOfTerm); + _ -> + leveled_log:log("P0033", [CurrManFile, "crc wonky"]), + open_manifestfile(RootPath, Rest) + end. + +%%%============================================================================ +%%% Test +%%%============================================================================ + +-ifdef(TEST). + +initial_setup() -> + E1 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"}, + end_key={i, "Bucket1", {"Idx1", "Fld9"}, "K93"}, + filename="Z1", + owner="pid_z1"}, + E2 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld9"}, "K97"}, + end_key={o, "Bucket1", "K71", null}, + filename="Z2", + owner="pid_z2"}, + E3 = #manifest_entry{start_key={o, "Bucket1", "K75", null}, + end_key={o, "Bucket1", "K993", null}, + filename="Z3", + owner="pid_z3"}, + E4 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"}, + end_key={i, "Bucket1", {"Idx1", "Fld7"}, "K93"}, + filename="Z4", + owner="pid_z4"}, + E5 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld7"}, "K97"}, + end_key={o, "Bucket1", "K78", null}, + filename="Z5", + owner="pid_z5"}, + E6 = #manifest_entry{start_key={o, "Bucket1", "K81", null}, + end_key={o, "Bucket1", "K996", null}, + filename="Z6", + owner="pid_z6"}, + + Man0 = new_manifest(), + % insert_manifest_entry(Manifest, ManSQN, Level, Entry) + Man1 = insert_manifest_entry(Man0, 1, 1, E1), + Man2 = insert_manifest_entry(Man1, 1, 1, E2), + Man3 = insert_manifest_entry(Man2, 1, 1, E3), + Man4 = insert_manifest_entry(Man3, 1, 2, E4), + Man5 = insert_manifest_entry(Man4, 1, 2, E5), + Man6 = insert_manifest_entry(Man5, 1, 2, E6), + {Man0, Man1, Man2, Man3, Man4, Man5, Man6}. + +changeup_setup(Man6) -> + E1 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"}, + end_key={i, "Bucket1", {"Idx1", "Fld9"}, "K93"}, + filename="Z1", + owner="pid_z1"}, + E2 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld9"}, "K97"}, + end_key={o, "Bucket1", "K71", null}, + filename="Z2", + owner="pid_z2"}, + E3 = #manifest_entry{start_key={o, "Bucket1", "K75", null}, + end_key={o, "Bucket1", "K993", null}, + filename="Z3", + owner="pid_z3"}, + + E1_2 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld4"}, "K8"}, + end_key={i, "Bucket1", {"Idx1", "Fld9"}, "K62"}, + owner="pid_y1", + filename="Y1"}, + E2_2 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld9"}, "K67"}, + end_key={o, "Bucket1", "K45", null}, + owner="pid_y2", + filename="Y2"}, + E3_2 = #manifest_entry{start_key={o, "Bucket1", "K47", null}, + end_key={o, "Bucket1", "K812", null}, + owner="pid_y3", + filename="Y3"}, + E4_2 = #manifest_entry{start_key={o, "Bucket1", "K815", null}, + end_key={o, "Bucket1", "K998", null}, + owner="pid_y4", + filename="Y4"}, + + Man7 = remove_manifest_entry(Man6, 2, 1, E1), + Man8 = remove_manifest_entry(Man7, 2, 1, E2), + Man9 = remove_manifest_entry(Man8, 2, 1, E3), + + Man10 = insert_manifest_entry(Man9, 2, 1, E1_2), + Man11 = insert_manifest_entry(Man10, 2, 1, E2_2), + Man12 = insert_manifest_entry(Man11, 2, 1, E3_2), + Man13 = insert_manifest_entry(Man12, 2, 1, E4_2), + % remove_manifest_entry(Manifest, ManSQN, Level, Entry) + + {Man7, Man8, Man9, Man10, Man11, Man12, Man13}. + +keylookup_manifest_test() -> + {Man0, Man1, Man2, Man3, _Man4, _Man5, Man6} = initial_setup(), + LK1_1 = {o, "Bucket1", "K711", null}, + LK1_2 = {o, "Bucket1", "K70", null}, + LK1_3 = {o, "Bucket1", "K71", null}, + LK1_4 = {o, "Bucket1", "K75", null}, + LK1_5 = {o, "Bucket1", "K76", null}, + + ?assertMatch(false, key_lookup(Man0, 1, LK1_1)), + ?assertMatch(false, key_lookup(Man1, 1, LK1_1)), + ?assertMatch(false, key_lookup(Man2, 1, LK1_1)), + ?assertMatch(false, key_lookup(Man3, 1, LK1_1)), + ?assertMatch(false, key_lookup(Man6, 1, LK1_1)), + + ?assertMatch("pid_z2", key_lookup(Man6, 1, LK1_2)), + ?assertMatch("pid_z2", key_lookup(Man6, 1, LK1_3)), + ?assertMatch("pid_z3", key_lookup(Man6, 1, LK1_4)), + ?assertMatch("pid_z3", key_lookup(Man6, 1, LK1_5)), + + ?assertMatch("pid_z5", key_lookup(Man6, 2, LK1_2)), + ?assertMatch("pid_z5", key_lookup(Man6, 2, LK1_3)), + ?assertMatch("pid_z5", key_lookup(Man6, 2, LK1_4)), + ?assertMatch("pid_z5", key_lookup(Man6, 2, LK1_5)), + + {_Man7, _Man8, _Man9, _Man10, _Man11, _Man12, + Man13} = changeup_setup(Man6), + + ?assertMatch(false, key_lookup(Man0, 1, LK1_1)), + ?assertMatch(false, key_lookup(Man1, 1, LK1_1)), + ?assertMatch(false, key_lookup(Man2, 1, LK1_1)), + ?assertMatch(false, key_lookup(Man3, 1, LK1_1)), + ?assertMatch(false, key_lookup(Man6, 1, LK1_1)), + + ?assertMatch("pid_z2", key_lookup(Man6, 1, LK1_2)), + ?assertMatch("pid_z2", key_lookup(Man6, 1, LK1_3)), + ?assertMatch("pid_z3", key_lookup(Man6, 1, LK1_4)), + ?assertMatch("pid_z3", key_lookup(Man6, 1, LK1_5)), + + ?assertMatch("pid_z5", key_lookup(Man6, 2, LK1_2)), + ?assertMatch("pid_z5", key_lookup(Man6, 2, LK1_3)), + ?assertMatch("pid_z5", key_lookup(Man6, 2, LK1_4)), + ?assertMatch("pid_z5", key_lookup(Man6, 2, LK1_5)), + + ?assertMatch("pid_y3", key_lookup(Man13, 1, LK1_4)), + ?assertMatch("pid_z5", key_lookup(Man13, 2, LK1_4)). + + +rangequery_manifest_test() -> + {_Man0, _Man1, _Man2, _Man3, _Man4, _Man5, Man6} = initial_setup(), + + PidMapFun = + fun(Pointer) -> + {next, ME, _SK} = Pointer, + ME#manifest_entry.owner + end, + + SK1 = {o, "Bucket1", "K711", null}, + EK1 = {o, "Bucket1", "K999", null}, + RL1_1 = lists:map(PidMapFun, range_lookup(Man6, 1, SK1, EK1)), + ?assertMatch(["pid_z3"], RL1_1), + RL1_2 = lists:map(PidMapFun, range_lookup(Man6, 2, SK1, EK1)), + ?assertMatch(["pid_z5", "pid_z6"], RL1_2), + SK2 = {i, "Bucket1", {"Idx1", "Fld8"}, null}, + EK2 = {i, "Bucket1", {"Idx1", "Fld8"}, null}, + RL2_1 = lists:map(PidMapFun, range_lookup(Man6, 1, SK2, EK2)), + ?assertMatch(["pid_z1"], RL2_1), + RL2_2 = lists:map(PidMapFun, range_lookup(Man6, 2, SK2, EK2)), + ?assertMatch(["pid_z5"], RL2_2), + + SK3 = {o, "Bucket1", "K994", null}, + EK3 = {o, "Bucket1", "K995", null}, + RL3_1 = lists:map(PidMapFun, range_lookup(Man6, 1, SK3, EK3)), + ?assertMatch([], RL3_1), + RL3_2 = lists:map(PidMapFun, range_lookup(Man6, 2, SK3, EK3)), + ?assertMatch(["pid_z6"], RL3_2), + + {_Man7, _Man8, _Man9, _Man10, _Man11, _Man12, + Man13} = changeup_setup(Man6), + + RL1_1A = lists:map(PidMapFun, range_lookup(Man6, 1, SK1, EK1)), + ?assertMatch(["pid_z3"], RL1_1A), + RL2_1A = lists:map(PidMapFun, range_lookup(Man6, 1, SK2, EK2)), + ?assertMatch(["pid_z1"], RL2_1A), + RL3_1A = lists:map(PidMapFun, range_lookup(Man6, 1, SK3, EK3)), + ?assertMatch([], RL3_1A), + + RL1_1B = lists:map(PidMapFun, range_lookup(Man13, 1, SK1, EK1)), + ?assertMatch(["pid_y3", "pid_y4"], RL1_1B), + RL2_1B = lists:map(PidMapFun, range_lookup(Man13, 1, SK2, EK2)), + ?assertMatch(["pid_y1"], RL2_1B), + RL3_1B = lists:map(PidMapFun, range_lookup(Man13, 1, SK3, EK3)), + ?assertMatch(["pid_y4"], RL3_1B). + +levelzero_present_test() -> + E0 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"}, + end_key={o, "Bucket1", "Key996", null}, + filename="Z0", + owner="pid_z0"}, + + Man0 = new_manifest(), + ?assertMatch(false, levelzero_present(Man0)), + % insert_manifest_entry(Manifest, ManSQN, Level, Entry) + Man1 = insert_manifest_entry(Man0, 1, 0, E0), + ?assertMatch(true, levelzero_present(Man1)). + +snapshot_release_test() -> + Man6 = element(7, initial_setup()), + E1 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"}, + end_key={i, "Bucket1", {"Idx1", "Fld9"}, "K93"}, + filename="Z1", + owner="pid_z1"}, + E2 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld9"}, "K97"}, + end_key={o, "Bucket1", "K71", null}, + filename="Z2", + owner="pid_z2"}, + E3 = #manifest_entry{start_key={o, "Bucket1", "K75", null}, + end_key={o, "Bucket1", "K993", null}, + filename="Z3", + owner="pid_z3"}, + + Man7 = add_snapshot(Man6, "pid_a1", 3600), + Man8 = remove_manifest_entry(Man7, 2, 1, E1), + Man9 = add_snapshot(Man8, "pid_a2", 3600), + Man10 = remove_manifest_entry(Man9, 3, 1, E2), + Man11 = add_snapshot(Man10, "pid_a3", 3600), + Man12 = remove_manifest_entry(Man11, 4, 1, E3), + Man13 = add_snapshot(Man12, "pid_a4", 3600), + + ?assertMatch(false, element(1, ready_to_delete(Man8, "Z1"))), + ?assertMatch(false, element(1, ready_to_delete(Man10, "Z2"))), + ?assertMatch(false, element(1, ready_to_delete(Man12, "Z3"))), + + Man14 = release_snapshot(Man13, "pid_a1"), + ?assertMatch(false, element(1, ready_to_delete(Man14, "Z2"))), + ?assertMatch(false, element(1, ready_to_delete(Man14, "Z3"))), + {Bool14, Man15} = ready_to_delete(Man14, "Z1"), + ?assertMatch(true, Bool14), + + %This doesn't change anything - released snaphsot not the min + Man16 = release_snapshot(Man15, "pid_a4"), + ?assertMatch(false, element(1, ready_to_delete(Man16, "Z2"))), + ?assertMatch(false, element(1, ready_to_delete(Man16, "Z3"))), + + Man17 = release_snapshot(Man16, "pid_a2"), + ?assertMatch(false, element(1, ready_to_delete(Man17, "Z3"))), + {Bool17, Man18} = ready_to_delete(Man17, "Z2"), + ?assertMatch(true, Bool17), + + Man19 = release_snapshot(Man18, "pid_a3"), + + io:format("MinSnapSQN ~w~n", [Man19#manifest.min_snapshot_sqn]), + + {Bool19, _Man20} = ready_to_delete(Man19, "Z3"), + ?assertMatch(true, Bool19). + + + +-endif. \ No newline at end of file diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 62aa904..c9102d1 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -363,7 +363,8 @@ delete_pending(close, _From, State) -> delete_pending(timeout, State) -> ok = leveled_penciller:pcl_confirmdelete(State#state.penciller, - State#state.filename), + State#state.filename, + self()), {next_state, delete_pending, State, ?DELETE_TIMEOUT}; delete_pending(close, State) -> leveled_log:log("SST07", [State#state.filename]), @@ -1175,8 +1176,8 @@ maybe_expand_pointer([{pointer, SSTPid, Slot, StartKey, all}|Tail]) -> expand_list_by_pointer({pointer, SSTPid, Slot, StartKey, all}, Tail, ?MERGE_SCANWIDTH); -maybe_expand_pointer([{next, SSTPid, StartKey}|Tail]) -> - expand_list_by_pointer({next, SSTPid, StartKey, all}, +maybe_expand_pointer([{next, ManEntry, StartKey}|Tail]) -> + expand_list_by_pointer({next, ManEntry, StartKey, all}, Tail, ?MERGE_SCANWIDTH); maybe_expand_pointer(List) -> @@ -1202,7 +1203,9 @@ expand_list_by_pointer({pointer, SSTPid, Slot, StartKey, EndKey}, Tail, Width) - {AccPointers, AccTail} = lists:foldl(FoldFun, InitAcc, Tail), ExpPointers = leveled_sst:sst_getslots(SSTPid, AccPointers), lists:append(ExpPointers, AccTail); -expand_list_by_pointer({next, SSTPid, StartKey, EndKey}, Tail, Width) -> +expand_list_by_pointer({next, ManEntry, StartKey, EndKey}, Tail, Width) -> + SSTPid = ManEntry#manifest_entry.owner, + leveled_log:log("SST10", [SSTPid, is_process_alive(SSTPid)]), ExpPointer = leveled_sst:sst_getkvrange(SSTPid, StartKey, EndKey, Width), ExpPointer ++ Tail. @@ -1440,8 +1443,8 @@ merge_test() -> ?assertMatch(ExpFK2, FK2), ?assertMatch(ExpLK1, LK1), ?assertMatch(ExpLK2, LK2), - ML1 = [{next, P1, FK1}], - ML2 = [{next, P2, FK2}], + ML1 = [{next, #manifest_entry{owner = P1}, FK1}], + ML2 = [{next, #manifest_entry{owner = P2}, FK2}], {ok, P3, {{Rem1, Rem2}, FK3, LK3}} = sst_new("../test/level2_merge", ML1, ML2,