From a2d873a06d5ce8c14e0b3f177e5864fc7f818106 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Wed, 27 Jul 2016 18:03:44 +0100 Subject: [PATCH] Add first draft of manager Start to build up functions for the keymanager --- src/leveled_keymanager.erl | 220 +++++++++++++++++++++++++++++++++++++ src/leveled_worker.erl | 2 +- 2 files changed, 221 insertions(+), 1 deletion(-) create mode 100644 src/leveled_keymanager.erl diff --git a/src/leveled_keymanager.erl b/src/leveled_keymanager.erl new file mode 100644 index 0000000..b08b3ae --- /dev/null +++ b/src/leveled_keymanager.erl @@ -0,0 +1,220 @@ +%% The manager is responsible for controlling access to the store and +%% maintaining both an in-memory view and a persisted state of all the sft +%% files in use across the store. +%% +%% The store is divided into many levels +%% L0: May contain one, and only one sft file PID which is the most recent file +%% added to the top of the store. Access to the store will be stalled when a +%% second file is added whilst one still remains at this level. The target +%% size of L0 is therefore 0. +%% L1 - Ln: May contain multiple non-overlapping PIDs managing sft files. +%% Compaction work should be sheduled if the number of files exceeds the target +%% size of the level, where the target size is 8 ^ n. +%% +%% The most recent revision of a Key can be found by checking each level until +%% the key is found. To check a level the write file must be sought from the +%% manifest for that level, and then a call is made to that level. +%% +%% If a compaction change takes the size of a level beyond the target size, +%% then compaction work for that level + 1 should be added to the compaction +%% work queue. +%% Compaction work is fetched from the compaction worker because: +%% - it has timed out due to a period of inactivity +%% - it has been triggered by the a cast to indicate the arrival of high +%% priority compaction work +%% The compaction worker will always call the level manager to find out the +%% highest priority work currently in the queue before proceeding. +%% +%% When the compaction worker picks work off the queue it will take the current +%% manifest for the level and level - 1. The compaction worker will choose +%% which file to compact from level - 1, and once the compaction is complete +%% will call to the manager with the new version of the manifest to be written. +%% Once the new version of the manifest had been persisted, the state of any +%% deleted files will be changed to pending deletion. In pending deletion they +%% will call the manifets manager on a timeout to confirm that they are no +%% longer in use (by any iterators). +%% +%% If there is an iterator request, the manager will simply handoff a copy of +%% the manifest, and register the interest of the iterator at the manifest +%% sequence number at the time of the request. Iterators should de-register +%% themselves from the manager on completion. Iterators should be +%% automatically release after a timeout period. A file can be deleted if +%% there are no registered iterators from before the point the file was +%% removed from the manifest. + +-module(leveled_keymanager). + +%% -behaviour(gen_server). + +-export([return_work/2, commit_manifest_change/7]). + +-include_lib("eunit/include/eunit.hrl"). + +-define(LEVEL_SCALEFACTOR, [0, 8, 64, 512, + 4096, 32768, 262144, infinity]). +-define(MAX_LEVELS, 8). +-define(MAX_WORK_WAIT, 300). + +-record(state, {level_fileref :: list(), + ongoing_work :: list(), + manifest_sqn :: integer(), + registered_iterators :: list(), + unreferenced_files :: list()}). + + +%% Work out what the current work queue should be +%% +%% The work queue should have a lower level work at the front, and no work +%% should be added to the queue if a compaction worker has already been asked +%% to look at work at that level + +return_work(State, From) -> + OngoingWork = State#state.ongoing_work, + WorkQueue = assess_workqueue([], + 0, + State#state.level_fileref, + OngoingWork), + case length(WorkQueue) of + L when L > 0 -> + [{SrcLevel, SrcManifest, SnkManifest}|OtherWork] = WorkQueue, + UpdatedWork = lists:append(OngoingWork, + [{SrcLevel, From, os:timestamp()}, + {SrcLevel + 1, From, os:timestamp()}]), + io:format("Work at Level ~w to be scheduled for ~w with ~w queue + items outstanding", [SrcLevel, From, length(OtherWork)]), + {State#state{ongoing_work=UpdatedWork}, + {SrcLevel, SrcManifest, SnkManifest}}; + _ -> + {State, none} + end. + + +assess_workqueue(WorkQ, ?MAX_LEVELS - 1, _LevelFileRef, _OngoingWork) -> + WorkQ; +assess_workqueue(WorkQ, LevelToAssess, LevelFileRef, OngoingWork)-> + MaxFiles = get_item(LevelToAssess + 1, ?LEVEL_SCALEFACTOR, 0), + FileCount = length(get_item(LevelToAssess + 1, LevelFileRef, [])), + NewWQ = maybe_append_work(WorkQ, LevelToAssess, LevelFileRef, MaxFiles, + FileCount, OngoingWork), + assess_workqueue(NewWQ, LevelToAssess + 1, LevelFileRef, OngoingWork). + + +maybe_append_work(WorkQ, Level, LevelFileRef, + MaxFiles, FileCount, OngoingWork) + when FileCount > MaxFiles -> + io:format("Outstanding compaction work items of ~w at level ~w~n", + [FileCount - MaxFiles, Level]), + case lists:keyfind(Level, 1, OngoingWork) of + {Level, Pid, TS} -> + io:format("Work will not be added to queue due to + outstanding work with ~w assigned at ~w~n", [Pid, TS]), + WorkQ; + false -> + lists:append(WorkQ, [{Level, + get_item(Level + 1, LevelFileRef, []), + get_item(Level + 2, LevelFileRef, [])}]) + end; +maybe_append_work(WorkQ, Level, _LevelFileRef, + _MaxFiles, FileCount, _OngoingWork) -> + io:format("No compaction work due to file count ~w at level ~w~n", + [FileCount, Level]), + WorkQ. + + +get_item(Index, List, Default) when Index > length(List) -> + Default; +get_item(Index, List, _Default) -> + lists:nth(Index, List). + + +%% Request a manifest change +%% Should be passed the +%% - {SrcLevel, NewSrcManifest, NewSnkManifest, ClearedFiles, MergeID, From, +%% State} +%% To complete a manifest change need to: +%% - Update the Manifest Sequence Number (msn) +%% - Confirm this Pid has a current element of manifest work outstanding at +%% that level +%% - Rename the manifest file created under the MergeID at the sink Level +%% to be the current manifest file (current..sink) +%% (Note than on startup if the highest msn in all the current. files for that +%% level is a sink file, it must be confirmed that th elevel above is at the +%% same or higher msn. If not the next lowest current..sink must be +%% chosen. This avoids inconsistency on crash between these steps - although +%% the inconsistency would have been survivable) +%% - Rename the manifest file created under the MergeID at the source levl +%% to the current manifest file (current..src) +%% - Update the state of the LevelFileRef lists +%% - Add the ClearedFiles to the list of files to be cleared (as a tuple with +%% the new msn) + + +commit_manifest_change(SrcLevel, NewSrcMan, NewSnkMan, ClearedFiles, + MergeID, From, State) -> + NewMSN = State#state.manifest_sqn + 1, + OngoingWork = State#state.ongoing_work, + SnkLevel = SrcLevel + 1, + case {lists:keyfind(SrcLevel, 1, OngoingWork), + lists:keyfind(SrcLevel + 1, 1, OngoingWork)} of + {{SrcLevel, From, TS}, {SnkLevel, From, TS}} -> + io:format("Merge ~s was a success in ~w microseconds", + [MergeID, timer:diff_now(os:timestamp(), TS)]), + _OutstandingWork = lists:keydelete(SnkLevel, 1, + lists:keydelete(SrcLevel, 1, OngoingWork)), + rename_manifest_file(MergeID, sink, NewMSN, SnkLevel), + rename_manifest_file(MergeID, src, NewMSN, SrcLevel), + _NewLFR = update_levelfileref(NewSrcMan, + NewSnkMan, + SrcLevel, + State#state.level_fileref), + _UnreferencedFiles = update_deletions(ClearedFiles, + NewMSN, + State#state.unreferenced_files), + ok; + _ -> + error + end. + + + +rename_manifest_file(_MergeID, _SrcOrSink, _NewMSN, _Level) -> + ok. + +update_levelfileref(_NewSrcMan, _NewSinkMan, _SrcLevel, CurrLFR) -> + CurrLFR. + +update_deletions(_ClearedFiles, _NewMSN, UnreferencedFiles) -> + UnreferencedFiles. + +%%%============================================================================ +%%% Test +%%%============================================================================ + + +compaction_work_assessment_test() -> + L0 = [{{o, "B1", "K1"}, {o, "B3", "K3"}, dummy_pid}], + L1 = [{{o, "B1", "K1"}, {o, "B2", "K2"}, dummy_pid}, + {{o, "B2", "K3"}, {o, "B4", "K4"}, dummy_pid}], + LevelFileRef = [L0, L1], + OngoingWork1 = [], + WorkQ1 = assess_workqueue([], 0, LevelFileRef, OngoingWork1), + ?assertMatch(WorkQ1, [{0, L0, L1}]), + OngoingWork2 = [{0, dummy_pid, os:timestamp()}], + WorkQ2 = assess_workqueue([], 0, LevelFileRef, OngoingWork2), + ?assertMatch(WorkQ2, []), + L1Alt = lists:append(L1, + [{{o, "B5", "K0001"}, {o, "B5", "K9999"}, dummy_pid}, + {{o, "B6", "K0001"}, {o, "B6", "K9999"}, dummy_pid}, + {{o, "B7", "K0001"}, {o, "B7", "K9999"}, dummy_pid}, + {{o, "B8", "K0001"}, {o, "B8", "K9999"}, dummy_pid}, + {{o, "B9", "K0001"}, {o, "B9", "K9999"}, dummy_pid}, + {{o, "BA", "K0001"}, {o, "BA", "K9999"}, dummy_pid}, + {{o, "BB", "K0001"}, {o, "BB", "K9999"}, dummy_pid}]), + WorkQ3 = assess_workqueue([], 0, [[], L1Alt], OngoingWork1), + ?assertMatch(WorkQ3, [{1, L1Alt, []}]), + WorkQ4 = assess_workqueue([], 0, [[], L1Alt], OngoingWork2), + ?assertMatch(WorkQ4, [{1, L1Alt, []}]), + OngoingWork3 = lists:append(OngoingWork2, [{1, dummy_pid, os:timestamp()}]), + WorkQ5 = assess_workqueue([], 0, [[], L1Alt], OngoingWork3), + ?assertMatch(WorkQ5, []). + diff --git a/src/leveled_worker.erl b/src/leveled_worker.erl index cdeed7b..50b3faf 100644 --- a/src/leveled_worker.erl +++ b/src/leveled_worker.erl @@ -158,4 +158,4 @@ merge_file_test() -> leveled_sft:sft_clear(PidL2_2), leveled_sft:sft_clear(PidL2_3), leveled_sft:sft_clear(PidL2_4), - lists:foreach(fun({_StK, _EndK, Pid}) -> leveled_sft:sft_clear(Pid) end, Result). \ No newline at end of file + lists:foreach(fun({_StK, _EndK, Pid}) -> leveled_sft:sft_clear(Pid) end, Result).