diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index a055cdf..4116ad0 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -18,9 +18,11 @@ %% -------- The actors --------- %% %% The store is fronted by a Bookie, who takes support from different actors: -%% - An Inker who persists new data into the jornal, and returns items from +%% - An Inker who persists new data into the journal, and returns items from %% the journal based on sequence number -%% - A Penciller who periodically redraws the ledger +%% - A Penciller who periodically redraws the ledger, that associates keys with +%% sequence numbers and other metadata, as well as secondary keys (for index +%% queries) %% - One or more Clerks, who may be used by either the inker or the penciller %% to fulfill background tasks %% @@ -61,11 +63,10 @@ %% well as the object size on disk within the Journal. %% %% Once the object has been persisted to the Journal, the Ledger can be updated. -%% The Ledger is updated by the Bookie applying a function (passed in at -%% startup) to the Value to return the Object Metadata, a function to generate -%% a hash of the Value and also taking the Primary Key, the IndexSpecs, the -%% Sequence Number in the Journal and the Object Size (returned from the -%% Inker). +%% The Ledger is updated by the Bookie applying a function (extract_metadata/4) +%% to the Value to return the Object Metadata, a function to generate a hash +%% of the Value and also taking the Primary Key, the IndexSpecs, the Sequence +%% Number in the Journal and the Object Size (returned from the Inker). %% %% The Bookie should generate a series of ledger key changes from this %% information, using a function passed in at startup. For Riak this will be @@ -79,20 +80,20 @@ %% null, %% {active, TS}|{tomb, TS}} %% -%% Recent Ledger changes are retained initially in the Bookies' memory (in an -%% in-memory ets table). Periodically, the current table is pushed to the -%% Penciller for eventual persistence, and a new table is started. +%% Recent Ledger changes are retained initially in the Bookies' memory (in a +%% small generally balanced tree). Periodically, the current table is pushed to +%% the Penciller for eventual persistence, and a new table is started. %% %% This completes the non-deferrable work associated with a PUT %% %% -------- Snapshots (Key & Metadata Only) -------- %% %% If there is a snapshot request (e.g. to iterate over the keys) the Bookie -%% must first produce a tree representing the results of the request which are -%% present in its in-memory view of the ledger. The Bookie then requests -%% a copy of the current Ledger manifest from the Penciller, and the Penciller -%5 should interest of the iterator at the manifest sequence number at the time -%% of the request. +%% may request a clone of the Penciller, or the Penciller and the Inker. +%% +%% The clone is seeded with the manifest. Teh clone should be registered with +%% the real Inker/Penciller, so that the real Inker/Penciller may prevent the +%% deletion of files still in use by a snapshot clone. %% %% Iterators should de-register themselves from the Penciller on completion. %% Iterators should be automatically release after a timeout period. A file @@ -100,10 +101,6 @@ %% there are no registered iterators from before the point the file was %% removed from the manifest. %% -%% Snapshots may be non-recent, if recency is unimportant. Non-recent -%% snapshots do no require the Bookie to return the results of the in-memory -%% table, the Penciller alone cna be asked. -%% %% -------- Special Ops -------- %% %% e.g. Get all for SegmentID/Partition @@ -115,10 +112,11 @@ %% On startup the Bookie must restart both the Inker to load the Journal, and %% the Penciller to load the Ledger. Once the Penciller has started, the %% Bookie should request the highest sequence number in the Ledger, and then -%% and try and rebuild any missing information from the Journal +%% and try and rebuild any missing information from the Journal. %% %% To rebuild the Ledger it requests the Inker to scan over the files from -%% the sequence number and re-generate the Ledger changes. +%% the sequence number and re-generate the Ledger changes - pushing the changes +%% directly back into the Ledger. @@ -359,7 +357,6 @@ shutdown_wait([TopPause|Rest], Inker) -> set_options(Opts) -> - %% TODO: Change the max size default, and allow setting through options MaxJournalSize = case Opts#bookie_options.max_journalsize of undefined -> 30000; @@ -497,8 +494,6 @@ load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) -> {MinSQN, MaxSQN, Output} = Acc0, {SQN, PK} = KeyInLedger, % VBin may already be a term - % TODO: Should VSize include CRC? - % Using ExtractFun means we ignore simple way of getting size (from length) {VBin, VSize} = ExtractFun(ValueInLedger), {Obj, IndexSpecs} = case is_binary(VBin) of true -> diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 7957d94..b54c2d2 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -59,7 +59,7 @@ %% %% Compaction is a process whereby an Inker's clerk will: %% - Request a view of the current Inker manifest and a snaphot of the Ledger -%% - Test all files within the Journal to find th eapproximate comapction +%% - Test all files within the Journal to find the approximate comapction %% potential percentage (the volume of the Journal that has been replaced) %% - Attempts to find the optimal "run" of files to compact %% - Compacts those files in the run, by rolling over the files re-writing diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index 42001c1..307eab6 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -1,6 +1,52 @@ -%% Controlling asynchronous work in leveleddb to manage compaction within a -%% level and cleaning out of old files across a level - +%% -------- PENCILLER's CLERK --------- +%% +%% The Penciller's clerk is responsible for compaction work within the Ledger. +%% +%% The Clerk will periodically poll the Penciller to see if there is work for +%% it to complete, except if the Clerk has informed the Penciller that it has +%% readied a manifest change to be committed - in which case it will wait to +%% be called by the Penciller. +%% +%% -------- COMMITTING MANIFEST CHANGES --------- +%% +%% Once the Penciller has taken a manifest change, the SFT file owners which no +%% longer form part of the manifest will be marked for delete. By marking for +%% deletion, the owners will poll to confirm when it is safe for them to be +%% deleted. +%% +%% It is imperative that the file is not marked for deletion until it is +%% certain that the manifest change has been committed. Some uncollected +%% garbage is considered acceptable. +%% +%% The process of committing a manifest change is as follows: +%% +%% A - The Clerk completes a merge, and casts a prompt to the Penciller with +%% a work item describing the change +%% +%% B - The Penciller commits the change to disk, and then calls the Clerk to +%% confirm the manifest change +%% +%% C - The Clerk replies immediately to acknowledge this call, then marks the +%% removed files for deletion +%% +%% Shutdown < A/B - If the Penciller starts the shutdown process before the +%% merge is complete, in the shutdown the Penciller will call a request for the +%% manifest change which will pick up the pending change. It will then confirm +%% the change, and now the Clerk will mark the files for delete before it +%% replies to the Penciller so it can complete the shutdown process (which will +%% prompt erasing of the removed files). +%% +%% The clerk will not request work on timeout if the committing of a manifest +%5 change is pending confirmation. +%% +%% -------- TIMEOUTS --------- +%% +%% The Penciller may prompt the Clerk to callback soon (i.e. reduce the +%% Timeout) if it has urgent work ready (i.e. it has written a L0 file). +%% +%% There will also be a natural quick timeout once the committing of a manifest +%% change has occurred. +%% -module(leveled_pclerk). @@ -15,15 +61,15 @@ terminate/2, clerk_new/1, clerk_prompt/1, - clerk_returnmanifestchange/2, + clerk_manifestchange/3, code_change/3, perform_merge/4]). -include_lib("eunit/include/eunit.hrl"). --define(INACTIVITY_TIMEOUT, 2000). +-define(INACTIVITY_TIMEOUT, 5000). -define(QUICK_TIMEOUT, 500). --define(HAPPYTIME_MULTIPLIER, 5). +-define(HAPPYTIME_MULTIPLIER, 2). -record(state, {owner :: pid(), change_pending=false :: boolean(), @@ -38,8 +84,8 @@ clerk_new(Owner) -> ok = gen_server:call(Pid, {register, Owner}, infinity), {ok, Pid}. -clerk_returnmanifestchange(Pid, Closing) -> - gen_server:call(Pid, {return_manifest_change, Closing}). +clerk_manifestchange(Pid, Action, Closing) -> + gen_server:call(Pid, {manifest_change, Action, Closing}, infinity). clerk_prompt(Pid) -> gen_server:cast(Pid, prompt). @@ -53,23 +99,29 @@ init([]) -> handle_call({register, Owner}, _From, State) -> {reply, ok, State#state{owner=Owner}, ?INACTIVITY_TIMEOUT}; -handle_call({return_manifest_change, Closing}, From, State) -> - case {State#state.change_pending, Closing} of - {true, true} -> +handle_call({manifest_change, return, true}, _From, State) -> + case State#state.change_pending of + true -> + WI = State#state.work_item, + {reply, {ok, WI}, State}; + false -> + {reply, no_change, State} + end; +handle_call({manifest_change, confirm, Closing}, From, State) -> + case Closing of + true -> WI = State#state.work_item, ok = mark_for_delete(WI#penciller_work.unreferenced_files, State#state.owner), - {stop, normal, {ok, WI}, State}; - {true, false} -> + {stop, normal, ok, State}; + false -> + gen_server:reply(From, ok), WI = State#state.work_item, - gen_server:reply(From, {ok, WI}), mark_for_delete(WI#penciller_work.unreferenced_files, State#state.owner), {noreply, State#state{work_item=null, change_pending=false}, - ?INACTIVITY_TIMEOUT}; - {false, true} -> - {stop, normal, no_change_required, State} + ?QUICK_TIMEOUT} end. handle_cast(prompt, State) -> @@ -116,7 +168,8 @@ requestandhandle_work(State) -> {NewManifest, FilesToDelete} = merge(WI), UpdWI = WI#penciller_work{new_manifest=NewManifest, unreferenced_files=FilesToDelete}, - ok = leveled_penciller:pcl_promptmanifestchange(State#state.owner), + ok = leveled_penciller:pcl_promptmanifestchange(State#state.owner, + UpdWI), {true, UpdWI} end. @@ -203,8 +256,8 @@ check_for_merge_candidates(SrcF, SinkFiles) -> %% - The one that overlaps with the fewest files below? %% - The smallest file? %% We could try and be fair in some way (merge oldest first) -%% Ultimately, there is alack of certainty that being fair or optimal is -%% genuinely better - ultimately every file has to be compacted. +%% Ultimately, there is a lack of certainty that being fair or optimal is +%% genuinely better - eventually every file has to be compacted. %% %% Hence, the initial implementation is to select files to merge at random @@ -286,6 +339,7 @@ get_item(Index, List, Default) -> %%% Test %%%============================================================================ +-ifdef(TEST). generate_randomkeys(Count, BucketRangeLow, BucketRangeHigh) -> generate_randomkeys(Count, [], BucketRangeLow, BucketRangeHigh). @@ -398,4 +452,6 @@ select_merge_file_test() -> Manifest = [{0, L0}, {1, L1}], {FileRef, NewManifest} = select_filetomerge(0, Manifest), ?assertMatch(FileRef, {{o, "B1", "K1"}, {o, "B3", "K3"}, dummy_pid}), - ?assertMatch(NewManifest, [{0, []}, {1, L1}]). \ No newline at end of file + ?assertMatch(NewManifest, [{0, []}, {1, L1}]). + +-endif. \ No newline at end of file diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index fb20819..dda82cc 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -7,8 +7,8 @@ %% Ledger. %% - The Penciller provides re-write (compaction) work up to be managed by %% the Penciller's Clerk -%% - The Penciller maintains a register of iterators who have requested -%% snapshots of the Ledger +%% - The Penciller can be cloned and maintains a register of clones who have +%% requested snapshots of the Ledger %% - The accepts new dumps (in the form of lists of keys) from the Bookie, and %% calls the Bookie once the process of pencilling this data in the Ledger is %% complete - and the Bookie is free to forget about the data @@ -236,7 +236,7 @@ pcl_fetch/2, pcl_checksequencenumber/3, pcl_workforclerk/1, - pcl_promptmanifestchange/1, + pcl_promptmanifestchange/2, pcl_confirmdelete/2, pcl_close/1, pcl_registersnapshot/2, @@ -307,8 +307,8 @@ pcl_checksequencenumber(Pid, Key, SQN) -> pcl_workforclerk(Pid) -> gen_server:call(Pid, work_for_clerk, infinity). -pcl_promptmanifestchange(Pid) -> - gen_server:cast(Pid, manifest_change). +pcl_promptmanifestchange(Pid, WI) -> + gen_server:cast(Pid, {manifest_change, WI}). pcl_confirmdelete(Pid, FileName) -> gen_server:call(Pid, {confirm_delete, FileName}, infinity). @@ -511,10 +511,11 @@ handle_call(close, _From, State) -> handle_cast({update_snapshotcache, Tree, SQN}, State) -> MemTableC = cache_tree_in_memcopy(State#state.memtable_copy, Tree, SQN), {noreply, State#state{memtable_copy=MemTableC}}; -handle_cast(manifest_change, State) -> - {ok, WI} = leveled_pclerk:clerk_returnmanifestchange(State#state.clerk, - false), +handle_cast({manifest_change, WI}, State) -> {ok, UpdState} = commit_manifest_change(WI, State), + ok = leveled_pclerk:clerk_manifestchange(State#state.clerk, + confirm, + false), {noreply, UpdState}; handle_cast(_Msg, State) -> {noreply, State}. @@ -541,12 +542,18 @@ terminate(_Reason, State) -> %% The cast may not succeed as the clerk could be synchronously calling %% the penciller looking for a manifest commit %% - MC = leveled_pclerk:clerk_returnmanifestchange(State#state.clerk, true), + MC = leveled_pclerk:clerk_manifestchange(State#state.clerk, + return, + true), UpdState = case MC of {ok, WI} -> {ok, NewState} = commit_manifest_change(WI, State), + Clerk = State#state.clerk, + ok = leveled_pclerk:clerk_manifestchange(Clerk, + confirm, + true), NewState; - no_change_required -> + no_change -> State end, Dump = ets:tab2list(UpdState#state.memtable), @@ -1233,12 +1240,9 @@ simple_server_test() -> ?assertMatch(Key1, pcl_fetch(PCLr, {o,"Bucket0001", "Key0001"})), ?assertMatch(Key2, pcl_fetch(PCLr, {o,"Bucket0002", "Key0002"})), ?assertMatch(Key3, pcl_fetch(PCLr, {o,"Bucket0003", "Key0003"})), - S4 = pcl_pushmem(PCLr, KL3), - if S4 == pause -> timer:sleep(1000); true -> ok end, - S5 = pcl_pushmem(PCLr, [Key4]), - if S5 == pause -> timer:sleep(1000); true -> ok end, - S6 = pcl_pushmem(PCLr, KL4), - if S6 == pause -> timer:sleep(1000); true -> ok end, + maybe_pause_push(pcl_pushmem(PCLr, KL3)), + maybe_pause_push(pcl_pushmem(PCLr, [Key4])), + maybe_pause_push(pcl_pushmem(PCLr, KL4)), ?assertMatch(Key1, pcl_fetch(PCLr, {o,"Bucket0001", "Key0001"})), ?assertMatch(Key2, pcl_fetch(PCLr, {o,"Bucket0002", "Key0002"})), ?assertMatch(Key3, pcl_fetch(PCLr, {o,"Bucket0003", "Key0003"})), @@ -1268,10 +1272,8 @@ simple_server_test() -> % in a new snapshot Key1A = {{o,"Bucket0001", "Key0001"}, {4002, {active, infinity}, null}}, KL1A = lists:sort(leveled_sft:generate_randomkeys({4002, 2})), - S7 = pcl_pushmem(PCLr, [Key1A]), - if S7 == pause -> timer:sleep(1000); true -> ok end, - S8 = pcl_pushmem(PCLr, KL1A), - if S8 == pause -> timer:sleep(1000); true -> ok end, + maybe_pause_push(pcl_pushmem(PCLr, [Key1A])), + maybe_pause_push(pcl_pushmem(PCLr, KL1A)), ?assertMatch(true, pcl_checksequencenumber(PclSnap, {o,"Bucket0001", "Key0001"}, 1)),