From 08641e05cf569701cae8440004ad38adb9f7a812 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Thu, 12 Jan 2017 13:48:43 +0000 Subject: [PATCH] Manifest changes - BROKEN Going to abandond this branch for now. The change is beoming excessively time consuming, and it is not clear that a smaller change might not achieve more of the objectives. All this is broken - but perhaps could get picke dup another day. --- include/leveled.hrl | 5 +- src/leveled_log.erl | 8 +- src/leveled_manifest.erl | 34 ++- src/leveled_pclerk.erl | 137 +++------ src/leveled_penciller.erl | 590 ++++++++++---------------------------- 5 files changed, 216 insertions(+), 558 deletions(-) diff --git a/include/leveled.hrl b/include/leveled.hrl index 25216f6..7843739 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -24,12 +24,11 @@ {next_sqn :: integer(), clerk :: pid(), src_level :: integer(), - manifest :: list(), start_time :: tuple(), ledger_filepath :: string(), - manifest_file :: string(), - new_manifest :: list(), unreferenced_files :: list(), + new_files :: list(), + level_counts :: dict(), target_is_basement = false ::boolean()}). -record(level, diff --git a/src/leveled_log.erl b/src/leveled_log.erl index 898c910..f4b59a2 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -74,8 +74,6 @@ ++ "reason ~w"}}, {"P0008", {info, "Penciller closing for reason ~w"}}, - {"P0009", - {info, "Level 0 cache empty at close of Penciller"}}, {"P0010", {info, "No level zero action on close of Penciller ~w"}}, {"P0011", @@ -97,9 +95,6 @@ ++ "L0 pending ~w and merge backlog ~w"}}, {"P0019", {info, "Rolling level zero to filename ~s at ledger sqn ~w"}}, - {"P0020", - {info, "Work at Level ~w to be scheduled for ~w with ~w " - ++ "queue items outstanding at all levels"}}, {"P0021", {info, "Allocation of work blocked as L0 pending"}}, {"P0022", @@ -108,7 +103,8 @@ {info, "Manifest entry of startkey ~s ~s ~s endkey ~s ~s ~s " ++ "filename=~s~n"}}, {"P0024", - {info, "Outstanding compaction work items of ~w at level ~w"}}, + {info, "Outstanding compaction work items of ~w with backlog status " + ++ "of ~w"}}, {"P0025", {info, "Merge to sqn ~w from Level ~w completed"}}, {"P0026", diff --git a/src/leveled_manifest.erl b/src/leveled_manifest.erl index f7e4f68..82c323a 100644 --- a/src/leveled_manifest.erl +++ b/src/leveled_manifest.erl @@ -87,15 +87,17 @@ save_manifest(Manifest, RootPath, ManSQN) -> initiate_from_manifest(Manifest) -> FlatManifest = ets:tab2list(Manifest), InitiateFun = - fun({{_L, _EK, FN}, {_SK, ActSt, DelSt}}, {FNList, MaxSQN}) -> + fun({{L, _EK, FN}, {_SK, ActSt, DelSt}}, {FNList, MaxSQN, LCount}) -> case {ActSt, DelSt} of {{active, ActSQN}, {tomb, infinity}} -> - {[FN|FNList], max(ActSQN, MaxSQN)}; + {[FN|FNList], + max(ActSQN, MaxSQN), + dict:update_counter(L, 1, LCount)}; {_, {tomb, TombSQN}} -> - {FNList, max(TombSQN, MaxSQN)} + {FNList, max(TombSQN, MaxSQN), LCount} end end, - lists:foldl(InitiateFun, {[], 0}, FlatManifest). + lists:foldl(InitiateFun, {[], 0, dict:new()}, FlatManifest). insert_manifest_entry(Manifest, ManSQN, Level, Entry) -> @@ -274,6 +276,8 @@ range_lookup(Manifest, Level, {LastKey, LastFN}, SK, EK, Acc, ManSQN) -> -ifdef(TEST). + + rangequery_manifest_test() -> E1 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"}, end_key={i, "Bucket1", {"Idx1", "Fld9"}, "K93"}, @@ -358,8 +362,7 @@ rangequery_manifest_test() -> ?assertMatch(["Y4"], RL3_1B). - -keyquery_manifest_test() -> +startup_manifest() E1 = #manifest_entry{start_key={o, "Bucket1", "K0001", null}, end_key={o, "Bucket1", "K0990", null}, filename="Z1"}, @@ -369,15 +372,19 @@ keyquery_manifest_test() -> E3 = #manifest_entry{start_key={o, "Bucket1", "K3750", null}, end_key={o, "Bucket1", "K9930", null}, filename="Z3"}, - EToRemove = #manifest_entry{start_key={o, "Bucket99", "K3750", null}, - end_key={o, "Bucket99", "K9930", null}, - filename="ZR"}, - Manifest0 = open_manifestfile(dummy, []), insert_manifest_entry(Manifest0, 1, 1, E1), insert_manifest_entry(Manifest0, 1, 1, E2), insert_manifest_entry(Manifest0, 1, 1, E3), + Manifest0 + +keyquery_manifest_test() -> + Manifest0 = startup_manifest(), + + EToRemove = #manifest_entry{start_key={o, "Bucket99", "K3750", null}, + end_key={o, "Bucket99", "K9930", null}, + filename="ZR"}, insert_manifest_entry(Manifest0, 1, 1, EToRemove), remove_manifest_entry(Manifest0, 2, 1, EToRemove), @@ -394,9 +401,10 @@ keyquery_manifest_test() -> ?assertMatch(true, filelib:is_file(BadFP)), Manifest = open_manifest(RootPath), - {FNList, ManSQN} = initiate_from_manifest(Manifest), + {FNList, ManSQN, LCount} = initiate_from_manifest(Manifest), ?assertMatch(["Z1", "Z2", "Z3"], lists:sort(FNList)), ?assertMatch(2, ManSQN), + ?assertMatch(3, dict:fetch(1, LCount)), K1 = {o, "Bucket1", "K0000", null}, K2 = {o, "Bucket1", "K0001", null}, @@ -502,5 +510,9 @@ snapshot_test() -> ready_to_delete(Snap5, 3, {MegaS0, S0, MicroS0})). +allatlevel_test() -> + Manifest0 = startup_manifest(), + AllAtL1 = range_lookup(Manifest, 1, all, {null, null, null, null}, 1), + ?assertMatch(["Z1", "Z2", "Z3"], AllAtL1). -endif. \ No newline at end of file diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index 9ccc791..58ed347 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -2,10 +2,9 @@ %% %% The Penciller's clerk is responsible for compaction work within the Ledger. %% -%% The Clerk will periodically poll the Penciller to see if there is work for -%% it to complete, except if the Clerk has informed the Penciller that it has -%% readied a manifest change to be committed - in which case it will wait to -%% be called by the Penciller. +%% The Clerk will periodically poll the Penciller to check there is no work +%% at level zero pending completion, and if not the Clerk will examine the +%% manifest to see if work is necessary. %% %% -------- COMMITTING MANIFEST CHANGES --------- %% @@ -18,35 +17,7 @@ %% certain that the manifest change has been committed. Some uncollected %% garbage is considered acceptable. %% -%% The process of committing a manifest change is as follows: -%% -%% A - The Clerk completes a merge, and casts a prompt to the Penciller with -%% a work item describing the change -%% -%% B - The Penciller commits the change to disk, and then calls the Clerk to -%% confirm the manifest change -%% -%% C - The Clerk replies immediately to acknowledge this call, then marks the -%% removed files for deletion -%% -%% Shutdown < A/B - If the Penciller starts the shutdown process before the -%% merge is complete, in the shutdown the Penciller will call a request for the -%% manifest change which will pick up the pending change. It will then confirm -%% the change, and now the Clerk will mark the files for delete before it -%% replies to the Penciller so it can complete the shutdown process (which will -%% prompt erasing of the removed files). -%% -%% The clerk will not request work on timeout if the committing of a manifest -%% change is pending confirmation. -%% -%% -------- TIMEOUTS --------- -%% -%% The Penciller may prompt the Clerk to callback soon (i.e. reduce the -%% Timeout) if it has urgent work ready (i.e. it has written a L0 file). -%% -%% There will also be a natural quick timeout once the committing of a manifest -%% change has occurred. -%% + -module(leveled_pclerk). @@ -68,8 +39,10 @@ -define(MAX_TIMEOUT, 2000). -define(MIN_TIMEOUT, 50). +-define(END_KEY, {null, null, null, null}). -record(state, {owner :: pid(), + manifest, % ets table reference change_pending=false :: boolean(), work_item :: #penciller_work{}|null}). @@ -77,19 +50,17 @@ %%% API %%%============================================================================ -clerk_new(Owner) -> +clerk_new(Owner, Manifest) -> {ok, Pid} = gen_server:start(?MODULE, [], []), - ok = gen_server:call(Pid, {register, Owner}, infinity), + ok = gen_server:call(Pid, {load, Owner, Manifest}, infinity), leveled_log:log("PC001", [Pid, Owner]), {ok, Pid}. -clerk_manifestchange(Pid, Action, Closing) -> - gen_server:call(Pid, {manifest_change, Action, Closing}, infinity). - clerk_prompt(Pid) -> gen_server:cast(Pid, prompt). - +clerk_close(Pid) -> + gen_server:cast(Pid, close). %%%============================================================================ %%% gen_server callbacks @@ -98,41 +69,16 @@ clerk_prompt(Pid) -> init([]) -> {ok, #state{}}. -handle_call({register, Owner}, _From, State) -> +handle_call({load, Owner, Manifest}, _From, State) -> {reply, ok, - State#state{owner=Owner}, - ?MIN_TIMEOUT}; -handle_call({manifest_change, return, true}, _From, State) -> - leveled_log:log("PC002", []), - case State#state.change_pending of - true -> - WI = State#state.work_item, - {reply, {ok, WI}, State}; - false -> - {stop, normal, no_change, State} - end; -handle_call({manifest_change, confirm, Closing}, From, State) -> - case Closing of - true -> - leveled_log:log("PC003", []), - WI = State#state.work_item, - ok = mark_for_delete(WI#penciller_work.unreferenced_files, - State#state.owner), - {stop, normal, ok, State}; - false -> - leveled_log:log("PC004", []), - gen_server:reply(From, ok), - WI = State#state.work_item, - ok = mark_for_delete(WI#penciller_work.unreferenced_files, - State#state.owner), - {noreply, - State#state{work_item=null, change_pending=false}, - ?MIN_TIMEOUT} - end. + State#state{owner=Owner, manifest=Manifest}, + ?MIN_TIMEOUT}. handle_cast(prompt, State) -> - {noreply, State, ?MIN_TIMEOUT}. + {noreply, State, ?MIN_TIMEOUT}; +handle_cast(close, State) -> + (stop, normal, State). handle_info(timeout, State=#state{change_pending=Pnd}) when Pnd == false -> case requestandhandle_work(State) of @@ -159,26 +105,28 @@ code_change(_OldVsn, State, _Extra) -> requestandhandle_work(State) -> case leveled_penciller:pcl_workforclerk(State#state.owner) of - none -> + false -> leveled_log:log("PC006", []), - {false, ?MAX_TIMEOUT}; - WI -> - {NewManifest, FilesToDelete} = merge(WI), - UpdWI = WI#penciller_work{new_manifest=NewManifest, - unreferenced_files=FilesToDelete}, + false; + {SrcLevel, ManifestSQN} -> + {Additions, Removals} = merge(Level, + State#state.manifest, + ManifestSQN), leveled_log:log("PC007", []), - ok = leveled_penciller:pcl_promptmanifestchange(State#state.owner, - UpdWI), - {true, UpdWI} + ok = leveled_penciller:pcl_commitmanifestchange(State#state.owner, + SrcLevel, + Additions, + Removals, + ManifestSQN), + true end. -merge(WI) -> - SrcLevel = WI#penciller_work.src_level, - {SrcF, UpdMFest1} = select_filetomerge(SrcLevel, - WI#penciller_work.manifest), - SinkFiles = get_item(SrcLevel + 1, UpdMFest1, []), - {Candidates, Others} = check_for_merge_candidates(SrcF, SinkFiles), +merge(SrcLevel, Manifest, ManifestSQN) -> + SrcF = select_filetomerge(SrcLevel, Manifest), + + + Candidates = check_for_merge_candidates(SrcF, SinkFiles), %% TODO: %% Need to work out if this is the top level %% And then tell merge process to create files at the top level @@ -254,17 +202,14 @@ check_for_merge_candidates(SrcF, SinkFiles) -> %% %% Hence, the initial implementation is to select files to merge at random -select_filetomerge(SrcLevel, Manifest) -> - {SrcLevel, LevelManifest} = lists:keyfind(SrcLevel, 1, Manifest), - Selected = lists:nth(random:uniform(length(LevelManifest)), - LevelManifest), - UpdManifest = lists:keyreplace(SrcLevel, - 1, - Manifest, - {SrcLevel, - lists:delete(Selected, - LevelManifest)}), - {Selected, UpdManifest}. +select_filetomerge(SrcLevel, Manifest, ManifestSQN) -> + Level = leveled_manifest:range_lookup(Manifest, + 1, + all, + ?END_KEY, + ManifestSQN), + + FN = lists:nth(random:uniform(length(Level)), Level). diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index a07065b..0f4fa2d 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -176,7 +176,7 @@ pcl_fetchnextkey/5, pcl_checksequencenumber/3, pcl_workforclerk/1, - pcl_promptmanifestchange/2, + pcl_confirmmanifestchange/2, pcl_confirml0complete/4, pcl_confirmdelete/2, pcl_close/1, @@ -206,13 +206,17 @@ -define(COIN_SIDECOUNT, 5). -define(SLOW_FETCH, 20000). -define(ITERATOR_SCANWIDTH, 4). +-define(SNAPSHOT_TIMEOUT, 3600). --record(state, {manifest = [] :: list(), +-record(state, {manifest, % an ETS table reference manifest_sqn = 0 :: integer(), - ledger_sqn = 0 :: integer(), % The highest SQN added to L0 persisted_sqn = 0 :: integer(), % The highest SQN persisted registered_snapshots = [] :: list(), - unreferenced_files = [] :: list(), + pidmap = dict:new() :: dict(), + level_counts :: dict(), + deletions_pending = dict:new() ::dict(), + + ledger_sqn = 0 :: integer(), % The highest SQN added to L0 root_path = "../test" :: string(), clerk :: pid(), @@ -286,7 +290,7 @@ pcl_checksequencenumber(Pid, Key, SQN) -> pcl_workforclerk(Pid) -> gen_server:call(Pid, work_for_clerk, infinity). -pcl_promptmanifestchange(Pid, WI) -> +pcl_confirmmanifestchange(Pid, WI) -> gen_server:cast(Pid, {manifest_change, WI}). pcl_confirml0complete(Pid, FN, StartKey, EndKey) -> @@ -371,18 +375,24 @@ handle_call({push_mem, {PushedTree, PushedIdx, MinSQN, MaxSQN}}, State)} end; handle_call({fetch, Key, Hash}, _From, State) -> + Structure = {State#state.manifest, + State#state.pid_map, + State#state.manifest_sqn}, {R, HeadTimer} = timed_fetch_mem(Key, Hash, - State#state.manifest, + Structure, State#state.levelzero_cache, State#state.levelzero_index, State#state.head_timing), {reply, R, State#state{head_timing=HeadTimer}}; handle_call({check_sqn, Key, Hash, SQN}, _From, State) -> + Structure = {State#state.manifest, + State#state.pid_map, + State#state.manifest_sqn}, {reply, compare_to_sqn(plain_fetch_mem(Key, Hash, - State#state.manifest, + Structure, State#state.levelzero_cache, State#state.levelzero_index), SQN), @@ -401,23 +411,53 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys}, List -> List end, - SSTiter = initiate_rangequery_frommanifest(StartKey, + + ConvertToPointerFun = + fun(FN) -> {next, dict:fetch(FN, State#state.pid_map), StartKey} end, + SetupFoldFun = + fun(Level, Acc) -> + FNs = leveled_manifest:range_lookup(State#state.manifest, + Level, + StartKey, EndKey, - State#state.manifest), + State#state.manifest_sqn), + Pointers = lists:map(ConvertToPointerFun, FNs), + case Pointers of + [] -> Acc; + PL -> Acc ++ [{L, PL}] + end + end, + SSTiter = lists:foldl(SetupFoldFun, [], lists:seq(0, ?MAX_LEVELS - 1)), + Acc = keyfolder({L0AsList, SSTiter}, {StartKey, EndKey}, {AccFun, InitAcc}, MaxKeys), + {reply, Acc, State#state{levelzero_astree = L0AsList}}; handle_call(work_for_clerk, From, State) -> - {UpdState, Work} = return_work(State, From), - {reply, Work, UpdState}; + DelayForPendingL0 = State#state.levelzero_pending, + {WL, WC} = check_for_work(State#state.level_counts), + case WC of + 0 -> + {reply, none, State#state{work_backlog=false}}; + N when N > ?WORKQUEUE_BACKLOG_TOLERANCE -> + leveled_log:log("P0024", [N, true]), + [TL|_Tail] = WL, + {reply, TL, State#state{work_backlog=true}}; + N -> + leveled_log:log("P0024", [N, false]), + [TL|_Tail] = WL, + {reply, TL, State#state{work_backlog=false}} + end; handle_call(get_startup_sqn, _From, State) -> {reply, State#state.persisted_sqn, State}; handle_call({register_snapshot, Snapshot}, _From, State) -> - Rs = [{Snapshot, - State#state.manifest_sqn}|State#state.registered_snapshots], - {reply, {ok, State}, State#state{registered_snapshots = Rs}}; + RegisteredSnaps = add_snapshot(State#state.registered_snapshots, + Snapshot, + State#state.manifest_sqn, + ?SNAPSHOT_TIMEOUT), + {reply, {ok, State}, State#state{registered_snapshots = RegisteredSnaps}}; handle_call({load_snapshot, {BookieIncrTree, BookieIdx, MinSQN, MaxSQN}}, _From, State) -> L0D = leveled_pmem:add_to_cache(State#state.levelzero_size, @@ -444,28 +484,36 @@ handle_call(doom, _From, State) -> {stop, normal, {ok, [ManifestFP, FilesFP]}, State}. handle_cast({manifest_change, WI}, State) -> - {ok, UpdState} = commit_manifest_change(WI, State), - ok = leveled_pclerk:clerk_manifestchange(State#state.clerk, - confirm, - false), - {noreply, UpdState}; + NewManifestSQN = WI#next_sqn, + UnreferenceFun = + fun(FN, Acc) -> + dict:store(FN, NewManifestSQN, Acc) + end, + DelPending = lists:foldl(UnreferenceFun, + State#state.deletions_pending, + WI#unreferenced_files), + {noreply, State{deletions_pending = DelPending, + manifest_sqn = NewManifestSQN}}; handle_cast({release_snapshot, Snapshot}, State) -> - Rs = lists:keydelete(Snapshot, 1, State#state.registered_snapshots), + Rs = leveled_manifest:release_snapshot(State#state.registered_snapshots, + Snapshot), leveled_log:log("P0003", [Snapshot]), leveled_log:log("P0004", [Rs]), {noreply, State#state{registered_snapshots=Rs}}; -handle_cast({confirm_delete, FileName}, State=#state{is_snapshot=Snap}) +handle_cast({confirm_delete, Filename}, State=#state{is_snapshot=Snap}) when Snap == false -> - Reply = confirm_delete(FileName, - State#state.unreferenced_files, - State#state.registered_snapshots), - case Reply of - {true, Pid} -> - UF1 = lists:keydelete(FileName, 1, State#state.unreferenced_files), + DeleteSQN = dict:fetch(Filename, State#state.deletions_pending), + R2D = leveled_manifest:ready_to_delete(State#state.registered_snapshots, + DeleteSQN), + case R2D of + true -> + PidToDelete = dict:fetch(Filename, State#state.pidmap), leveled_log:log("P0005", [FileName]), + DP0 = dict:erase(Filename, State#state.deletions_pending), + PM0 = dict:erase(Filename, State#state.pidmap), ok = leveled_sst:sst_deleteconfirmed(Pid), - {noreply, State#state{unreferenced_files=UF1}}; - _ -> + {noreply, State#state{deletions_pending = DP0, pidmap = PM0}}; + false -> {noreply, State} end; handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) -> @@ -478,11 +526,13 @@ handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) -> % Prompt clerk to ask about work - do this for every L0 roll UpdIndex = leveled_pmem:clear_index(State#state.levelzero_index), ok = leveled_pclerk:clerk_prompt(State#state.clerk), + UpdLevelCounts = dict:store(0, 1, State#state.level_counts), {noreply, State#state{levelzero_cache=[], levelzero_index=UpdIndex, levelzero_pending=false, levelzero_constructor=undefined, levelzero_size=0, + level_counts=UpdLevelCounts, manifest=UpdMan, persisted_sqn=State#state.ledger_sqn}}. @@ -495,10 +545,6 @@ terminate(Reason, State=#state{is_snapshot=Snap}) when Snap == true -> leveled_log:log("P0007", [Reason]), ok; terminate(Reason, State) -> - %% When a Penciller shuts down it isn't safe to try an manage the safe - %% finishing of any outstanding work. The last commmitted manifest will - %% be used. - %% %% Level 0 files lie outside of the manifest, and so if there is no L0 %% file present it is safe to write the current contents of memory. If %% there is a L0 file present - then the memory can be dropped (it is @@ -506,44 +552,27 @@ terminate(Reason, State) -> %% as presumably the ETS file has been recently flushed, hence the presence %% of a L0 file). %% - %% The penciller should close each file in the unreferenced files, and - %% then each file in the manifest, and cast a close on the clerk. - %% The cast may not succeed as the clerk could be synchronously calling - %% the penciller looking for a manifest commit - %% + %% The penciller should close each file in the manifest, and cast a close + %% on the clerk. + ok = leveled_pclerk:clerk_close(State#state.clerk), + leveled_log:log("P0008", [Reason]), - MC = leveled_pclerk:clerk_manifestchange(State#state.clerk, - return, - true), - UpdState = case MC of - {ok, WI} -> - {ok, NewState} = commit_manifest_change(WI, State), - Clerk = State#state.clerk, - ok = leveled_pclerk:clerk_manifestchange(Clerk, - confirm, - true), - NewState; - no_change -> - State - end, - case {UpdState#state.levelzero_pending, - get_item(0, UpdState#state.manifest, []), - UpdState#state.levelzero_size} of - {false, [], 0} -> - leveled_log:log("P0009", []); - {false, [], _N} -> - L0Pid = roll_memory(UpdState, true), + L0 = key_lookup(State#state.manifest, 0, all, State#state.manifest_sqn), + case {UpdState#state.levelzero_pending, L0} of + {false, false} -> + L0Pid = roll_memory(UpdState, true), ok = leveled_sst:sst_close(L0Pid); StatusTuple -> leveled_log:log("P0010", [StatusTuple]) end, % Tidy shutdown of individual files - ok = close_files(0, UpdState#state.manifest), - lists:foreach(fun({_FN, Pid, _SN}) -> - ok = leveled_sst:sst_close(Pid) end, - UpdState#state.unreferenced_files), + lists:foreach(fun({_FN, Pid}) -> + ok = leveled_sst:sst_close(Pid) + end, + dict:to_list(State#state.pidmap)), leveled_log:log("P0011", []), + ok. @@ -579,47 +608,21 @@ start_from_file(PCLopts) -> levelzero_index=leveled_pmem:new_index()}, %% Open manifest - ManifestPath = filepath(InitState#state.root_path, manifest) ++ "/", - SSTPath = filepath(InitState#state.root_path, files) ++ "/", - ok = filelib:ensure_dir(ManifestPath), - ok = filelib:ensure_dir(SSTPath), - - {ok, Filenames} = file:list_dir(ManifestPath), - CurrRegex = "nonzero_(?[0-9]+)\\." ++ ?CURRENT_FILEX, - ValidManSQNs = lists:foldl(fun(FN, Acc) -> - case re:run(FN, - CurrRegex, - [{capture, ['MSN'], list}]) of - nomatch -> - Acc; - {match, [Int]} when is_list(Int) -> - Acc ++ [list_to_integer(Int)] - end - end, - [], - Filenames), - TopManSQN = lists:foldl(fun(X, MaxSQN) -> max(X, MaxSQN) end, - 0, - ValidManSQNs), - leveled_log:log("P0012", [TopManSQN]), - ManUpdate = case TopManSQN of - 0 -> - leveled_log:log("P0013", []), - {[], 0}; - _ -> - CurrManFile = filepath(InitState#state.root_path, - TopManSQN, - current_manifest), - {ok, Bin} = file:read_file(CurrManFile), - Manifest = binary_to_term(Bin), - open_all_filesinmanifest(Manifest) - end, - - {UpdManifest, MaxSQN} = ManUpdate, + Manifest = leveled_manifest:open_manifest(RootPath), + {FNList, + ManSQN, + LevelCounts) = leveled_manifest:initiate_from_manifest(Manifest), + InitiateFun = + fun(FN, {AccMaxSQN, AccPidMap}) -> + {ok, P, {_FK, _LK}} = leveled_sst:sst_open(FN), + FileMaxSQN = leveled_sst:sst_getmaxsequencenumber(P), + {max(AccMaxSQN, FileMaxSQN), dict:store(FN, P, AccPidMap)} + end, + {MaxSQN, PidMap} = lists:foldl(InitiateFun, {0, dict:new()}, FNList), leveled_log:log("P0014", [MaxSQN]), %% Find any L0 files - L0FN = filepath(RootPath, TopManSQN, new_merge_files) ++ "_0_0.sst", + L0FN = filepath(RootPath, ManSQN, new_merge_files) ++ "_0_0.sst", case filelib:is_file(L0FN) of true -> leveled_log:log("P0015", [L0FN]), @@ -627,30 +630,42 @@ start_from_file(PCLopts) -> L0Pid, {L0StartKey, L0EndKey}} = leveled_sst:sst_open(L0FN), L0SQN = leveled_sst:sst_getmaxsequencenumber(L0Pid), - ManifestEntry = #manifest_entry{start_key=L0StartKey, - end_key=L0EndKey, - owner=L0Pid, - filename=L0FN}, - UpdManifest2 = lists:keystore(0, - 1, - UpdManifest, - {0, [ManifestEntry]}), + L0Entry = #manifest_entry{start_key = L0StartKey, + end_key = L0EndKey, + filename = L0FN}, + PidMap0 = dict:store(L0FN, L0Pid, PidMap), + insert_manifest_entry(Manifest, ManSQN, 0, L0Entry) leveled_log:log("P0016", [L0SQN]), LedgerSQN = max(MaxSQN, L0SQN), {ok, - InitState#state{manifest=UpdManifest2, - manifest_sqn=TopManSQN, - ledger_sqn=LedgerSQN, - persisted_sqn=LedgerSQN}}; + InitState#state{manifest = Manifest, + manifest_sqn = ManSQN, + ledger_sqn = LedgerSQN, + persisted_sqn = LedgerSQN, + level_counts = LevelCounts, + pid_map = PidMap0}}; false -> leveled_log:log("P0017", []), {ok, - InitState#state{manifest=UpdManifest, - manifest_sqn=TopManSQN, - ledger_sqn=MaxSQN, - persisted_sqn=MaxSQN}} + InitState#state{manifest = Manifest, + manifest_sqn = ManSQN, + ledger_sqn = MaxSQN, + persisted_sqn = MaxSQN, + level_counts = LevelCounts, + pid_map = PidMap}} end. +check_for_work(LevelCounts) -> + CheckLevelFun = + fun({Level, MaxCount}, {AccL, AccC}) -> + case dict:fetch(Level, LevelCounts) of + LC when LC > MaxCount -> + {[Level|AccL], AccC + LC - MaxCount}; + _ -> + {AccL, AccC} + end + end, + lists:foldl(CheckLevelFun, {[], 0}, ?LEVEL_SCALEFACTOR). update_levelzero(L0Size, {PushedTree, PushedIdx, MinSQN, MaxSQN}, @@ -738,9 +753,9 @@ levelzero_filename(State) -> ++ integer_to_list(MSN) ++ "_0_0", FileName. -timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, HeadTimer) -> +timed_fetch_mem(Key, Hash, Structure, L0Cache, L0Index, HeadTimer) -> SW = os:timestamp(), - {R, Level} = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index), + {R, Level} = fetch_mem(Key, Hash, Structure, L0Cache, L0Index), UpdHeadTimer = case R of not_present -> @@ -750,41 +765,32 @@ timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, HeadTimer) -> end, {R, UpdHeadTimer}. -plain_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> - R = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index), +plain_fetch_mem(Key, Hash, Structure, L0Cache, L0Index) -> + R = fetch_mem(Key, Hash, Structure, L0Cache, L0Index), element(1, R). -fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> +fetch_mem(Key, Hash, Structure, L0Cache, L0Index) -> PosList = leveled_pmem:check_index(Hash, L0Index), L0Check = leveled_pmem:check_levelzero(Key, Hash, PosList, L0Cache), case L0Check of {false, not_found} -> - fetch(Key, Hash, Manifest, 0, fun timed_sst_get/3); + fetch(Key, Hash, Structure, 0, fun timed_sst_get/3); {true, KV} -> {KV, 0} end. -fetch(_Key, _Hash, _Manifest, ?MAX_LEVELS + 1, _FetchFun) -> +fetch(_Key, _Hash, _Structure, ?MAX_LEVELS + 1, _FetchFun) -> {not_present, basement}; -fetch(Key, Hash, Manifest, Level, FetchFun) -> - LevelManifest = get_item(Level, Manifest, []), - case lists:foldl(fun(File, Acc) -> - case Acc of - not_present when - Key >= File#manifest_entry.start_key, - File#manifest_entry.end_key >= Key -> - File#manifest_entry.owner; - FoundDetails -> - FoundDetails - end end, - not_present, - LevelManifest) of - not_present -> - fetch(Key, Hash, Manifest, Level + 1, FetchFun); - FileToCheck -> - case FetchFun(FileToCheck, Key, Hash) of +fetch(Key, Hash, Structure, Level, FetchFun) -> + {Manifest, PidMap, ManSQN} = Structure, + case leveled_manifest:key_lookup(Manifest, Level, Key, ManSQN) of + false -> + fetch(Key, Hash, Structure, Level + 1, FetchFun); + FN -> + FP = dict:fetch(FN, PidMap), + case FetchFun(FP, Key, Hash) of not_present -> - fetch(Key, Hash, Manifest, Level + 1, FetchFun); + fetch(Key, Hash, Structure, Level + 1, FetchFun); ObjectFound -> {ObjectFound, Level} end @@ -821,134 +827,6 @@ compare_to_sqn(Obj, SQN) -> end. -%% Work out what the current work queue should be -%% -%% The work queue should have a lower level work at the front, and no work -%% should be added to the queue if a compaction worker has already been asked -%% to look at work at that level -%% -%% The full queue is calculated for logging purposes only - -return_work(State, From) -> - {WorkQ, BasementL} = assess_workqueue([], 0, State#state.manifest, 0), - case length(WorkQ) of - L when L > 0 -> - Excess = lists:foldl(fun({_, _, OH}, Acc) -> Acc+OH end, 0, WorkQ), - [{SrcLevel, Manifest, _Overhead}|_OtherWork] = WorkQ, - leveled_log:log("P0020", [SrcLevel, From, Excess]), - IsBasement = if - SrcLevel + 1 == BasementL -> - true; - true -> - false - end, - Backlog = Excess >= ?WORKQUEUE_BACKLOG_TOLERANCE, - case State#state.levelzero_pending of - true -> - % Once the L0 file is completed there will be more work - % - so don't be busy doing other work now - leveled_log:log("P0021", []), - {State#state{work_backlog=Backlog}, none}; - false -> - %% No work currently outstanding - %% Can allocate work - NextSQN = State#state.manifest_sqn + 1, - FP = filepath(State#state.root_path, - NextSQN, - new_merge_files), - ManFile = filepath(State#state.root_path, - NextSQN, - pending_manifest), - WI = #penciller_work{next_sqn=NextSQN, - clerk=From, - src_level=SrcLevel, - manifest=Manifest, - start_time = os:timestamp(), - ledger_filepath = FP, - manifest_file = ManFile, - target_is_basement = IsBasement}, - {State#state{ongoing_work=[WI], work_backlog=Backlog}, WI} - end; - _ -> - {State#state{work_backlog=false}, none} - end. - - -close_files(?MAX_LEVELS - 1, _Manifest) -> - ok; -close_files(Level, Manifest) -> - LevelList = get_item(Level, Manifest, []), - lists:foreach(fun(F) -> - ok = leveled_sst:sst_close(F#manifest_entry.owner) end, - LevelList), - close_files(Level + 1, Manifest). - - -open_all_filesinmanifest(Manifest) -> - open_all_filesinmanifest({Manifest, 0}, 0). - -open_all_filesinmanifest(Result, ?MAX_LEVELS - 1) -> - Result; -open_all_filesinmanifest({Manifest, TopSQN}, Level) -> - LevelList = get_item(Level, Manifest, []), - %% The Pids in the saved manifest related to now closed references - %% Need to roll over the manifest at this level starting new processes to - %5 replace them - LvlR = lists:foldl(fun(F, {FL, FL_SQN}) -> - FN = F#manifest_entry.filename, - {ok, P, _Keys} = leveled_sst:sst_open(FN), - F_SQN = leveled_sst:sst_getmaxsequencenumber(P), - {lists:append(FL, - [F#manifest_entry{owner = P}]), - max(FL_SQN, F_SQN)} - end, - {[], 0}, - LevelList), - %% Result is tuple of revised file list for this level in manifest, and - %% the maximum sequence number seen at this level - {LvlFL, LvlSQN} = LvlR, - UpdManifest = lists:keystore(Level, 1, Manifest, {Level, LvlFL}), - open_all_filesinmanifest({UpdManifest, max(TopSQN, LvlSQN)}, Level + 1). - -print_manifest(Manifest) -> - lists:foreach(fun(L) -> - leveled_log:log("P0022", [L]), - Level = get_item(L, Manifest, []), - lists:foreach(fun print_manifest_entry/1, Level) - end, - lists:seq(0, ?MAX_LEVELS - 1)), - ok. - -print_manifest_entry(Entry) -> - {S1, S2, S3} = leveled_codec:print_key(Entry#manifest_entry.start_key), - {E1, E2, E3} = leveled_codec:print_key(Entry#manifest_entry.end_key), - leveled_log:log("P0023", - [S1, S2, S3, E1, E2, E3, Entry#manifest_entry.filename]). - -initiate_rangequery_frommanifest(StartKey, EndKey, Manifest) -> - CompareFun = fun(M) -> - C1 = StartKey > M#manifest_entry.end_key, - C2 = leveled_codec:endkey_passed(EndKey, - M#manifest_entry.start_key), - not (C1 or C2) end, - FoldFun = - fun(L, AccL) -> - Level = get_item(L, Manifest, []), - FL = lists:foldl(fun(M, Acc) -> - case CompareFun(M) of - true -> - Acc ++ [{next, M, StartKey}]; - false -> - Acc - end end, - [], - Level), - case FL of - [] -> AccL; - FL -> AccL ++ [{L, FL}] - end - end, - lists:foldl(FoldFun, [], lists:seq(0, ?MAX_LEVELS - 1)). %% Looks to find the best choice for the next key across the levels (other %% than in-memory table) @@ -1153,143 +1031,12 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, KeyRange, end. -assess_workqueue(WorkQ, ?MAX_LEVELS - 1, _Man, BasementLevel) -> - {WorkQ, BasementLevel}; -assess_workqueue(WorkQ, LevelToAssess, Man, BasementLevel) -> - MaxFiles = get_item(LevelToAssess, ?LEVEL_SCALEFACTOR, 0), - case length(get_item(LevelToAssess, Man, [])) of - FileCount when FileCount > 0 -> - NewWQ = maybe_append_work(WorkQ, - LevelToAssess, - Man, - MaxFiles, - FileCount), - assess_workqueue(NewWQ, LevelToAssess + 1, Man, LevelToAssess); - 0 -> - assess_workqueue(WorkQ, LevelToAssess + 1, Man, BasementLevel) - end. - - -maybe_append_work(WorkQ, Level, Manifest, - MaxFiles, FileCount) - when FileCount > MaxFiles -> - Overhead = FileCount - MaxFiles, - leveled_log:log("P0024", [Overhead, Level]), - lists:append(WorkQ, [{Level, Manifest, Overhead}]); -maybe_append_work(WorkQ, _Level, _Manifest, - _MaxFiles, _FileCount) -> - WorkQ. - - -get_item(Index, List, Default) -> - case lists:keysearch(Index, 1, List) of - {value, {Index, Value}} -> - Value; - false -> - Default - end. - - -%% Request a manifest change -%% The clerk should have completed the work, and created a new manifest -%% and persisted the new view of the manifest -%% -%% To complete the change of manifest: -%% - the state of the manifest file needs to be changed from pending to current -%% - the list of unreferenced files needs to be updated on State -%% - the current manifest needs to be update don State -%% - the list of ongoing work needs to be cleared of this item - - -commit_manifest_change(ReturnedWorkItem, State) -> - NewMSN = State#state.manifest_sqn + 1, - [SentWorkItem] = State#state.ongoing_work, - RootPath = State#state.root_path, - UnreferencedFiles = State#state.unreferenced_files, - - if - NewMSN == SentWorkItem#penciller_work.next_sqn -> - WISrcLevel = SentWorkItem#penciller_work.src_level, - leveled_log:log_timer("P0025", - [SentWorkItem#penciller_work.next_sqn, - WISrcLevel], - SentWorkItem#penciller_work.start_time), - ok = rename_manifest_files(RootPath, NewMSN), - FilesToDelete = ReturnedWorkItem#penciller_work.unreferenced_files, - UnreferencedFilesUpd = update_deletions(FilesToDelete, - NewMSN, - UnreferencedFiles), - leveled_log:log("P0026", [NewMSN]), - NewManifest = ReturnedWorkItem#penciller_work.new_manifest, - - CurrL0 = get_item(0, State#state.manifest, []), - % If the work isn't L0 work, then we may have an uncommitted - % manifest change at L0 - so add this back into the Manifest loop - % state - RevisedManifest = case {WISrcLevel, CurrL0} of - {0, _} -> - NewManifest; - {_, []} -> - NewManifest; - {_, [L0ManEntry]} -> - lists:keystore(0, - 1, - NewManifest, - {0, [L0ManEntry]}) - end, - {ok, State#state{ongoing_work=[], - manifest_sqn=NewMSN, - manifest=RevisedManifest, - unreferenced_files=UnreferencedFilesUpd}} - end. - - -rename_manifest_files(RootPath, NewMSN) -> - OldFN = filepath(RootPath, NewMSN, pending_manifest), - NewFN = filepath(RootPath, NewMSN, current_manifest), - leveled_log:log("P0027", [OldFN, filelib:is_file(OldFN), - NewFN, filelib:is_file(NewFN)]), - ok = file:rename(OldFN,NewFN). - -filepath(RootPath, manifest) -> - RootPath ++ "/" ++ ?MANIFEST_FP; filepath(RootPath, files) -> RootPath ++ "/" ++ ?FILES_FP. -filepath(RootPath, NewMSN, pending_manifest) -> - filepath(RootPath, manifest) ++ "/" ++ "nonzero_" - ++ integer_to_list(NewMSN) ++ "." ++ ?PENDING_FILEX; -filepath(RootPath, NewMSN, current_manifest) -> - filepath(RootPath, manifest) ++ "/" ++ "nonzero_" - ++ integer_to_list(NewMSN) ++ "." ++ ?CURRENT_FILEX; filepath(RootPath, NewMSN, new_merge_files) -> filepath(RootPath, files) ++ "/" ++ integer_to_list(NewMSN). -update_deletions([], _NewMSN, UnreferencedFiles) -> - UnreferencedFiles; -update_deletions([ClearedFile|Tail], MSN, UnreferencedFiles) -> - leveled_log:log("P0028", [ClearedFile#manifest_entry.filename]), - update_deletions(Tail, - MSN, - lists:append(UnreferencedFiles, - [{ClearedFile#manifest_entry.filename, - ClearedFile#manifest_entry.owner, - MSN}])). - -confirm_delete(Filename, UnreferencedFiles, RegisteredSnapshots) -> - case lists:keyfind(Filename, 1, UnreferencedFiles) of - {Filename, Pid, MSN} -> - LowSQN = lists:foldl(fun({_, SQN}, MinSQN) -> min(SQN, MinSQN) end, - infinity, - RegisteredSnapshots), - if - MSN >= LowSQN -> - false; - true -> - {true, Pid} - end - end. - %%%============================================================================ @@ -1338,47 +1085,6 @@ clean_subdir(DirPath) -> end. -compaction_work_assessment_test() -> - L0 = [{{o, "B1", "K1", null}, {o, "B3", "K3", null}, dummy_pid}], - L1 = [{{o, "B1", "K1", null}, {o, "B2", "K2", null}, dummy_pid}, - {{o, "B2", "K3", null}, {o, "B4", "K4", null}, dummy_pid}], - Manifest = [{0, L0}, {1, L1}], - {WorkQ1, 1} = assess_workqueue([], 0, Manifest, 0), - ?assertMatch([{0, Manifest, 1}], WorkQ1), - L1Alt = lists:append(L1, - [{{o, "B5", "K0001", null}, {o, "B5", "K9999", null}, - dummy_pid}, - {{o, "B6", "K0001", null}, {o, "B6", "K9999", null}, - dummy_pid}, - {{o, "B7", "K0001", null}, {o, "B7", "K9999", null}, - dummy_pid}, - {{o, "B8", "K0001", null}, {o, "B8", "K9999", null}, - dummy_pid}, - {{o, "B9", "K0001", null}, {o, "B9", "K9999", null}, - dummy_pid}, - {{o, "BA", "K0001", null}, {o, "BA", "K9999", null}, - dummy_pid}, - {{o, "BB", "K0001", null}, {o, "BB", "K9999", null}, - dummy_pid}]), - Manifest3 = [{0, []}, {1, L1Alt}], - {WorkQ3, 1} = assess_workqueue([], 0, Manifest3, 0), - ?assertMatch([{1, Manifest3, 1}], WorkQ3). - -confirm_delete_test() -> - Filename = 'test.sst', - UnreferencedFiles = [{'other.sst', dummy_owner, 15}, - {Filename, dummy_owner, 10}], - RegisteredIterators1 = [{dummy_pid, 16}, {dummy_pid, 12}], - R1 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators1), - ?assertMatch(R1, {true, dummy_owner}), - RegisteredIterators2 = [{dummy_pid, 10}, {dummy_pid, 12}], - R2 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators2), - ?assertMatch(R2, false), - RegisteredIterators3 = [{dummy_pid, 9}, {dummy_pid, 12}], - R3 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators3), - ?assertMatch(R3, false). - - maybe_pause_push(PCL, KL) -> T0 = leveled_skiplist:empty(true), I0 = leveled_pmem:new_index(),