From 33f1efd5762771ee4d41e131b8a5170ac0102999 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Tue, 2 Aug 2016 17:51:43 +0100 Subject: [PATCH] Work on descriptions Add further descriptions of roles following name changes. Attempt to simplify manifest management in the Penciller by assuming there is only one Penciller's Clerk active - and so only one piece of work can be ongoing --- src/leveled_bookie.erl | 18 +-- src/leveled_inker.erl | 3 + src/leveled_penciller.erl | 267 ++++++++++++++++++++++++-------------- 3 files changed, 173 insertions(+), 115 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 321c7f8..84f7b5a 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -57,21 +57,9 @@ %% be added to the ledger. Initially this will be added to the Bookie's %% in-memory view of recent changes only. %% -%% The Bookie's memory consists of up to two in-memory ets tables -%% - the 'cmem' (current in-memory table) which is always subject to potential -%% change; -%% - the 'imem' (the immutable in-memory table) which is awaiting persistence -%% to the disk-based lsm-tree by the Penciller. -%% -%% The key and metadata should be written to the cmem store if it has -%% sufficient capacity, but this potentially should include the secondary key -%% changes which have been made as part of the transaction. -%% -%% If there is insufficient space in the cmem, the cmem should be converted -%% into the imem, and a new cmem be created. This requires the previous imem -%% to have been cleared from state due to compaction into the persisted Ledger -%% by the Penciller - otherwise the PUT is blocked. On creation of an imem, -%% the compaction process for that imem by the Penciller should be triggered. +%% The Bookie's memory consists of an in-memory ets table. Periodically, the +%% current table is pushed to the Penciller for eventual persistence, and a +%% new tabble is started. %% %% This completes the non-deferrable work associated with a PUT %% diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index c192433..7564ed4 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -6,3 +6,6 @@ %% %% + +-module(leveled_inker). + diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 2ac3384..0e1cfaa 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -1,4 +1,4 @@ -%% -------- Penciller --------- +%% -------- PENCILLER --------- %% %% The penciller is repsonsible for writing and re-writing the ledger - a %% persisted, ordered view of non-recent Keys and Metadata which have been @@ -12,10 +12,16 @@ %% Bookie, and calls the Bookie once the process of pencilling this data in %% the Ledger is complete - and the Bookie is free to forget about the data %% -%% -------- Ledger --------- +%% -------- LEDGER --------- %% %% The Ledger is divided into many levels -%% L1 - Ln: May contain multiple non-overlapping PIDs managing sft files. +%% L0: ETS tables are received from the Bookie and merged into a single ETS +%% table, until that table is the size of a SFT file, and it is then persisted +%% as a SFT file at this level. Once the persistence is completed, the ETS +%% table can be dropped. There can be only one SFT file at Level 0, so +%% the work to merge that file to the lower level must be the highest priority, +%% as otherwise the database will stall. +%% L1 TO L7: May contain multiple non-overlapping PIDs managing sft files. %% Compaction work should be sheduled if the number of files exceeds the target %% size of the level, where the target size is 8 ^ n. %% @@ -27,12 +33,14 @@ %% If a compaction change takes the size of a level beyond the target size, %% then compaction work for that level + 1 should be added to the compaction %% work queue. -%% Compaction work is fetched from the compaction worker because: +%% Compaction work is fetched by the Pencllier's Clerk because: %% - it has timed out due to a period of inactivity %% - it has been triggered by the a cast to indicate the arrival of high %% priority compaction work -%% The compaction worker will always call the level manager to find out the -%% highest priority work currently in the queue before proceeding. +%% The Penciller's Clerk (which performs compaction worker) will always call +%% the Penciller to find out the highest priority work currently in the queue +%% whenever it has either completed work, or a timeout has occurred since it +%% was informed there was no work to do. %% %% When the clerk picks work off the queue it will take the current manifest %% for the level and level - 1. The clerk will choose which file to compact @@ -44,13 +52,95 @@ %% will call the Penciller on a timeout to confirm that they are no longer in %% use (by any iterators). %% +%% ---------- PUSH ---------- +%% +%% The Penciller must support the PUSH of an ETS table from the Bookie. The +%% call to PUSH should be immediately acknowledged, and then work should be +%% completed to merge the ETS table into the L0 ETS table. +%% +%% The Penciller MUST NOT accept a new PUSH if the Clerk has commenced the +%% conversion of the current ETS table into a SFT file, but not completed this +%% change. This should prompt a stall. +%% +%% ---------- FETCH ---------- +%% +%% On request to fetch a key the Penciller should look first in the L0 ETS +%% table, and then look in the SFT files Level by Level, consulting the +%% Manifest to determine which file should be checked at each level. +%% +%% ---------- SNAPSHOT ---------- +%% +%% Iterators may request a snapshot of the database. To provide a snapshot +%% the Penciller must snapshot the ETS table, and then send this with a copy +%% of the manifest. +%% +%% Iterators requesting snapshots are registered by the Penciller, so that SFT +%% files valid at the point of the snapshot until either the iterator is +%% completed or has timed out. +%% +%% ---------- ON STARTUP ---------- +%% +%% On Startup the Bookie with ask the Penciller to initiate the Ledger first. +%% To initiate the Ledger the must consult the manifest, and then start a SFT +%% management process for each file in the manifest. +%% +%% The penciller should then try and read any persisted ETS table in the +%% on_shutdown folder. The Penciller must then discover the highest sequence +%% number in the ledger, and respond to the Bookie with that sequence number. +%% +%% The Bookie will ask the Inker for any Keys seen beyond that sequence number +%% before the startup of the overall store can be completed. +%% +%% ---------- ON SHUTDOWN ---------- +%% +%% On a controlled shutdown the Penciller should attempt to write any in-memory +%% ETS table to disk into the special ..on_shutdown folder +%% +%% ---------- FOLDER STRUCTURE ---------- +%% +%% The following folders are used by the Penciller +%% $ROOT/ledger_manifest/ - used for keeping manifest files +%% $ROOT/ledger_onshutdown/ - containing the persisted view of the ETS table +%% written on controlled shutdown +%% $ROOT/ledger_files/ - containing individual SFT files +%% +%% In larger stores there could be a large number of files in the ledger_file +%% folder - perhaps o(1000). It is assumed that modern file systems should +%% handle this efficiently. +%% +%% ---------- COMPACTION & MANIFEST UPDATES ---------- +%% +%% The Penciller can have one and only one Clerk for performing compaction +%% work. When the Clerk has requested and taken work, it should perform the +%5 compaction work starting the new SFT process to manage the new Ledger state +%% and then write a new manifest file that represents that state with using +%% The MergeID as the filename .pnd. +%% +%% Prior to completing the work the previous manifest file should be renamed +%% to the filename .bak, and any .bak files other than the +%% the most recent n files should be deleted. +%% +%% The Penciller on accepting the change should rename the manifest file to +%% '.crr'. +%% +%% On startup, the Penciller should look first for a *.crr file, and if +%% one is not present it should promot the most recently modified *.bak - +%% checking first that all files referenced in it are still present. +%% +%% The pace at which the store can accept updates will be dependent on the +%% speed at which the Penciller's Clerk can merge files at lower levels plus +%% the time it takes to merge from Level 0. As if a clerk has commenced +%% compaction work at a lower level and then immediately a L0 SFT file is +%% written the Penciller will need to wait for this compaction work to +%% complete and the L0 file to be compacted before the ETS table can be +%% allowed to again reach capacity -module(leveled_penciller). %% -behaviour(gen_server). --export([return_work/2, commit_manifest_change/7]). +-export([return_work/2, commit_manifest_change/5]). -include_lib("eunit/include/eunit.hrl"). @@ -58,15 +148,22 @@ {4, 4096}, {5, 32768}, {6, 262144}, {7, infinity}]). -define(MAX_LEVELS, 8). -define(MAX_WORK_WAIT, 300). --define(MANIFEST_FP, "manifest"). --define(FILES_FP, "files"). +-define(MANIFEST_FP, "ledger_manifest"). +-define(FILES_FP, "ledger_files"). +-define(SHUTDOWN_FP, "ledger_onshutdown"). +-define(CURRENT_FILEX, "crr"). +-define(PENDING_FILEX, "pnd"). +-define(BACKUP_FILEX, "bak"). +-define(ARCHIVE_FILEX, "arc"). --record(state, {level_fileref :: list(), +-record(state, {manifest :: list(), ongoing_work :: list(), manifest_sqn :: integer(), registered_iterators :: list(), unreferenced_files :: list(), - root_path :: string()}). + root_path :: string(), + mem :: ets:tid()}). + %% Work out what the current work queue should be @@ -76,37 +173,42 @@ %% to look at work at that level return_work(State, From) -> - OngoingWork = State#state.ongoing_work, - WorkQueue = assess_workqueue([], - 0, - State#state.level_fileref, - OngoingWork), - case length(WorkQueue) of - L when L > 0 -> - [{SrcLevel, SrcManifest, SnkManifest}|OtherWork] = WorkQueue, - UpdatedWork = lists:append(OngoingWork, - [{SrcLevel, From, os:timestamp()}, - {SrcLevel + 1, From, os:timestamp()}]), - io:format("Work at Level ~w to be scheduled for ~w with ~w queue - items outstanding", [SrcLevel, From, length(OtherWork)]), - {State#state{ongoing_work=UpdatedWork}, - {SrcLevel, SrcManifest, SnkManifest}}; - _ -> + case State#state.ongoing_work of + [] -> + WorkQueue = assess_workqueue([], + 0, + State#state.manifest, + []), + case length(WorkQueue) of + L when L > 0 -> + [{SrcLevel, Manifest}|OtherWork] = WorkQueue, + io:format("Work at Level ~w to be scheduled for ~w + with ~w queue items outstanding~n", + [SrcLevel, From, length(OtherWork)]), + {State#state{ongoing_work={SrcLevel, From, os:timestamp()}}, + {SrcLevel, Manifest}}; + _ -> + {State, none} + end; + [{SrcLevel, OtherFrom, _TS}|T] -> + io:format("Ongoing work requested by ~w but work + outstanding from Level ~w and Clerk ~w with + ~w other items outstanding~n", + [From, SrcLevel, OtherFrom, length(T)]), {State, none} end. - -assess_workqueue(WorkQ, ?MAX_LEVELS - 1, _LevelFileRef, _OngoingWork) -> +assess_workqueue(WorkQ, ?MAX_LEVELS - 1, _Manifest, _OngoingWork) -> WorkQ; -assess_workqueue(WorkQ, LevelToAssess, LevelFileRef, OngoingWork)-> +assess_workqueue(WorkQ, LevelToAssess, Manifest, OngoingWork)-> MaxFiles = get_item(LevelToAssess, ?LEVEL_SCALEFACTOR, 0), - FileCount = length(get_item(LevelToAssess, LevelFileRef, [])), - NewWQ = maybe_append_work(WorkQ, LevelToAssess, LevelFileRef, MaxFiles, + FileCount = length(get_item(LevelToAssess, Manifest, [])), + NewWQ = maybe_append_work(WorkQ, LevelToAssess, Manifest, MaxFiles, FileCount, OngoingWork), - assess_workqueue(NewWQ, LevelToAssess + 1, LevelFileRef, OngoingWork). + assess_workqueue(NewWQ, LevelToAssess + 1, Manifest, OngoingWork). -maybe_append_work(WorkQ, Level, LevelFileRef, +maybe_append_work(WorkQ, Level, Manifest, MaxFiles, FileCount, OngoingWork) when FileCount > MaxFiles -> io:format("Outstanding compaction work items of ~w at level ~w~n", @@ -117,11 +219,9 @@ maybe_append_work(WorkQ, Level, LevelFileRef, outstanding work with ~w assigned at ~w~n", [Pid, TS]), WorkQ; false -> - lists:append(WorkQ, [{Level, - get_item(Level, LevelFileRef, []), - get_item(Level + 1, LevelFileRef, [])}]) + lists:append(WorkQ, [{Level, Manifest}]) end; -maybe_append_work(WorkQ, Level, _LevelFileRef, +maybe_append_work(WorkQ, Level, _Manifest, _MaxFiles, FileCount, _OngoingWork) -> io:format("No compaction work due to file count ~w at level ~w~n", [FileCount, Level]), @@ -139,55 +239,37 @@ get_item(Index, List, Default) -> %% Request a manifest change %% Should be passed the -%% - {SrcLevel, NewSrcManifest, NewSnkManifest, ClearedFiles, MergeID, From, -%% State} +%% - {SrcLevel, NewManifest, ClearedFiles, MergeID, From, State} %% To complete a manifest change need to: %% - Update the Manifest Sequence Number (msn) %% - Confirm this Pid has a current element of manifest work outstanding at %% that level -%% - Rename the manifest file created under the MergeID (.) -%% at the sink Level to be the current manifest file (current_.) -%% -------- NOTE -------- -%% If there is a crash between these two points, the K/V data that has been -%% merged from the source level will now be in both the source and the sink -%% level. Therefore in store operations this potential duplication must be -%% handled. -%% -------- NOTE -------- -%% - Rename the manifest file created under the MergeID (.) -%% at the source level to the current manifest file (current_.) +%% - Rename the manifest file created under the MergeID (.manifest) +%% to the filename current.manifest %% - Update the state of the LevelFileRef lists %% - Add the ClearedFiles to the list of files to be cleared (as a tuple with %% the new msn) -commit_manifest_change(SrcLevel, NewSrcMan, NewSnkMan, ClearedFiles, - MergeID, From, State) -> +commit_manifest_change(NewManifest, ClearedFiles, MergeID, From, State) -> NewMSN = State#state.manifest_sqn + 1, OngoingWork = State#state.ongoing_work, RootPath = State#state.root_path, - SnkLevel = SrcLevel + 1, - case {lists:keyfind(SrcLevel, 1, OngoingWork), - lists:keyfind(SrcLevel + 1, 1, OngoingWork)} of - {{SrcLevel, From, TS}, {SnkLevel, From, TS}} -> - io:format("Merge ~s was a success in ~w microseconds", - [MergeID, timer:diff_now(os:timestamp(), TS)]), - OutstandingWork = lists:keydelete(SnkLevel, 1, - lists:keydelete(SrcLevel, 1, OngoingWork)), - ok = rename_manifest_files(RootPath, MergeID, - NewMSN, SrcLevel, SnkLevel), - NewLFR = update_levelfileref(NewSrcMan, - NewSnkMan, - SrcLevel, - State#state.level_fileref), - UnreferencedFiles = update_deletions(ClearedFiles, - NewMSN, - State#state.unreferenced_files), + UnreferencedFiles = State#state.unreferenced_files, + case OngoingWork of + {SrcLevel, From, TS} -> + io:format("Merge ~s completed in ~w microseconds at Level ~w~n", + [MergeID, timer:diff_now(os:timestamp(), TS), SrcLevel]), + ok = rename_manifest_files(RootPath, MergeID), + UnreferencedFilesUpd = update_deletions(ClearedFiles, + NewMSN, + UnreferencedFiles), io:format("Merge ~s has been commmitted at sequence number ~w~n", [MergeID, NewMSN]), - {ok, State#state{ongoing_work=OutstandingWork, + {ok, State#state{ongoing_work=null, manifest_sqn=NewMSN, - level_fileref=NewLFR, - unreferenced_files=UnreferencedFiles}}; + manifest=NewManifest, + unreferenced_files=UnreferencedFilesUpd}}; _ -> io:format("Merge commit ~s not matched to known work~n", [MergeID]), @@ -196,27 +278,14 @@ commit_manifest_change(SrcLevel, NewSrcMan, NewSnkMan, ClearedFiles, -rename_manifest_files(RootPath, MergeID, NewMSN, SrcLevel, SnkLevel) -> +rename_manifest_files(RootPath, MergeID) -> ManifestFP = RootPath ++ "/" ++ ?MANIFEST_FP ++ "/", ok = file:rename(ManifestFP ++ MergeID - ++ "." ++ integer_to_list(SnkLevel), - ManifestFP ++ "current_" ++ integer_to_list(SnkLevel) - ++ "." ++ integer_to_list(NewMSN)), - ok = file:rename(ManifestFP ++ MergeID - ++ "." ++ integer_to_list(SrcLevel), - ManifestFP ++ "current_" ++ integer_to_list(SrcLevel) - ++ "." ++ integer_to_list(NewMSN)), + ++ "." ++ ?PENDING_FILEX, + ManifestFP ++ MergeID + ++ "." ++ ?CURRENT_FILEX), ok. -update_levelfileref(NewSrcMan, NewSinkMan, SrcLevel, CurrLFR) -> - lists:keyreplace(SrcLevel + 1, - 1, - lists:keyreplace(SrcLevel, - 1, - CurrLFR, - {SrcLevel, NewSrcMan}), - {SrcLevel + 1, NewSinkMan}). - update_deletions([], _NewMSN, UnreferencedFiles) -> UnreferencedFiles; update_deletions([ClearedFile|Tail], MSN, UnreferencedFiles) -> @@ -233,12 +302,12 @@ compaction_work_assessment_test() -> L0 = [{{o, "B1", "K1"}, {o, "B3", "K3"}, dummy_pid}], L1 = [{{o, "B1", "K1"}, {o, "B2", "K2"}, dummy_pid}, {{o, "B2", "K3"}, {o, "B4", "K4"}, dummy_pid}], - LevelFileRef = [{0, L0}, {1, L1}], + Manifest = [{0, L0}, {1, L1}], OngoingWork1 = [], - WorkQ1 = assess_workqueue([], 0, LevelFileRef, OngoingWork1), - ?assertMatch(WorkQ1, [{0, L0, L1}]), + WorkQ1 = assess_workqueue([], 0, Manifest, OngoingWork1), + ?assertMatch(WorkQ1, [{0, Manifest}]), OngoingWork2 = [{0, dummy_pid, os:timestamp()}], - WorkQ2 = assess_workqueue([], 0, LevelFileRef, OngoingWork2), + WorkQ2 = assess_workqueue([], 0, Manifest, OngoingWork2), ?assertMatch(WorkQ2, []), L1Alt = lists:append(L1, [{{o, "B5", "K0001"}, {o, "B5", "K9999"}, dummy_pid}, @@ -248,10 +317,8 @@ compaction_work_assessment_test() -> {{o, "B9", "K0001"}, {o, "B9", "K9999"}, dummy_pid}, {{o, "BA", "K0001"}, {o, "BA", "K9999"}, dummy_pid}, {{o, "BB", "K0001"}, {o, "BB", "K9999"}, dummy_pid}]), - WorkQ3 = assess_workqueue([], 0, [{0, []}, {1, L1Alt}], OngoingWork1), - ?assertMatch(WorkQ3, [{1, L1Alt, []}]), - WorkQ4 = assess_workqueue([], 0, [{0, []}, {1, L1Alt}], OngoingWork2), - ?assertMatch(WorkQ4, [{1, L1Alt, []}]), - OngoingWork3 = lists:append(OngoingWork2, [{1, dummy_pid, os:timestamp()}]), - WorkQ5 = assess_workqueue([], 0, [{0, []}, {1, L1Alt}], OngoingWork3), - ?assertMatch(WorkQ5, []). + Manifest3 = [{0, []}, {1, L1Alt}], + WorkQ3 = assess_workqueue([], 0, Manifest3, OngoingWork1), + ?assertMatch(WorkQ3, [{1, Manifest3}]), + WorkQ4 = assess_workqueue([], 0, Manifest3, OngoingWork2), + ?assertMatch(WorkQ4, [{1, Manifest3}]).