Penciller accepting push
Standardise on record definitions between modules to make easier - then add functionality to pushing to penciller as bookie would do. Some initial manual testing of this seems OK.
This commit is contained in:
parent
75996b90ca
commit
718425633a
4 changed files with 612 additions and 138 deletions
|
@ -114,18 +114,14 @@
|
|||
%% work. When the Clerk has requested and taken work, it should perform the
|
||||
%5 compaction work starting the new SFT process to manage the new Ledger state
|
||||
%% and then write a new manifest file that represents that state with using
|
||||
%% The MergeID as the filename <MergeID>.pnd.
|
||||
%%
|
||||
%% Prior to completing the work the previous manifest file should be renamed
|
||||
%% to the filename <OldMergeID>.bak, and any .bak files other than the
|
||||
%% the most recent n files should be deleted.
|
||||
%% the next Manifest sequence number as the filename:
|
||||
%% - nonzero_<ManifestSQN#>.pnd
|
||||
%%
|
||||
%% The Penciller on accepting the change should rename the manifest file to
|
||||
%% '<MergeID>.crr'.
|
||||
%% The Penciller on accepting the change should rename the manifest file to -
|
||||
%% - nonzero_<ManifestSQN#>.crr
|
||||
%%
|
||||
%% On startup, the Penciller should look first for a *.crr file, and if
|
||||
%% one is not present it should promot the most recently modified *.bak -
|
||||
%% checking first that all files referenced in it are still present.
|
||||
%% On startup, the Penciller should look for the nonzero_*.crr file with the
|
||||
%% highest such manifest sequence number.
|
||||
%%
|
||||
%% The pace at which the store can accept updates will be dependent on the
|
||||
%% speed at which the Penciller's Clerk can merge files at lower levels plus
|
||||
|
@ -134,13 +130,36 @@
|
|||
%% written the Penciller will need to wait for this compaction work to
|
||||
%% complete and the L0 file to be compacted before the ETS table can be
|
||||
%% allowed to again reach capacity
|
||||
%%
|
||||
%% The writing of L0 files do not require the involvement of the clerk.
|
||||
%% The L0 files are prompted directly by the penciller when the in-memory ets
|
||||
%% table has reached capacity. When there is a next push into memory the
|
||||
%% penciller calls to check that the file is now active (which may pause if the
|
||||
%% write is ongoing the acceptence of the push), and if so it can clear the ets
|
||||
%% table and build a new table starting with the remainder, and the keys from
|
||||
%% the latest push.
|
||||
%%
|
||||
|
||||
|
||||
-module(leveled_penciller).
|
||||
|
||||
%% -behaviour(gen_server).
|
||||
-behaviour(gen_server).
|
||||
|
||||
-export([return_work/2, commit_manifest_change/5]).
|
||||
-include("../include/leveled.hrl").
|
||||
|
||||
-export([init/1,
|
||||
handle_call/3,
|
||||
handle_cast/2,
|
||||
handle_info/2,
|
||||
terminate/2,
|
||||
code_change/3,
|
||||
pcl_new/0,
|
||||
pcl_start/1,
|
||||
pcl_pushmem/2,
|
||||
pcl_fetch/2,
|
||||
pcl_workforclerk/1,
|
||||
pcl_requestmanifestchange/2,
|
||||
commit_manifest_change/3]).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
|
@ -155,14 +174,186 @@
|
|||
-define(PENDING_FILEX, "pnd").
|
||||
-define(BACKUP_FILEX, "bak").
|
||||
-define(ARCHIVE_FILEX, "arc").
|
||||
-define(MEMTABLE, mem).
|
||||
-define(MAX_TABLESIZE, 32000).
|
||||
-define(L0PEND_RESET, {false, [], none}).
|
||||
|
||||
-record(state, {manifest :: list(),
|
||||
ongoing_work :: list(),
|
||||
manifest_sqn :: integer(),
|
||||
registered_iterators :: list(),
|
||||
unreferenced_files :: list(),
|
||||
root_path :: string(),
|
||||
mem :: ets:tid()}).
|
||||
-record(state, {manifest = [] :: list(),
|
||||
ongoing_work = [] :: list(),
|
||||
manifest_sqn = 0 :: integer(),
|
||||
levelzero_sqn =0 :: integer(),
|
||||
registered_iterators = [] :: list(),
|
||||
unreferenced_files = [] :: list(),
|
||||
root_path = "../test/" :: string(),
|
||||
table_size = 0 :: integer(),
|
||||
clerk :: pid(),
|
||||
levelzero_pending = {false, [], none} :: tuple(),
|
||||
memtable}).
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
%%% API
|
||||
%%%============================================================================
|
||||
|
||||
pcl_new() ->
|
||||
gen_server:start(?MODULE, [], []).
|
||||
|
||||
pcl_start(_RootDir) ->
|
||||
%% TODO
|
||||
%% Need to call startup to rebuild from disk
|
||||
ok.
|
||||
|
||||
pcl_pushmem(Pid, DumpList) ->
|
||||
%% Bookie to dump memory onto penciller
|
||||
gen_server:call(Pid, {push_mem, DumpList}, infinity).
|
||||
|
||||
pcl_fetch(Pid, Key) ->
|
||||
gen_server:call(Pid, {fetch, Key}, infinity).
|
||||
|
||||
pcl_workforclerk(Pid) ->
|
||||
gen_server:call(Pid, work_for_clerk, infinity).
|
||||
|
||||
pcl_requestmanifestchange(Pid, WorkItem) ->
|
||||
gen_server:call(Pid, {manifest_change, WorkItem}, infinity).
|
||||
|
||||
%%%============================================================================
|
||||
%%% gen_server callbacks
|
||||
%%%============================================================================
|
||||
|
||||
init([]) ->
|
||||
TID = ets:new(?MEMTABLE, [ordered_set, private]),
|
||||
{ok, #state{memtable=TID}}.
|
||||
|
||||
handle_call({push_mem, DumpList}, _From, State) ->
|
||||
{TableSize, Manifest, L0Pend} = case State#state.levelzero_pending of
|
||||
{true, Remainder, {StartKey, EndKey, Pid}} ->
|
||||
%% Need to handle not error scenarios?
|
||||
%% N.B. Sync call - so will be ready
|
||||
ok = leveled_sft:sft_checkready(Pid),
|
||||
%% Reset ETS, but re-insert any remainder
|
||||
true = ets:delete_all_objects(State#state.memtable),
|
||||
true = ets:insert(State#state.memtable, Remainder),
|
||||
{length(Remainder),
|
||||
lists:keystore(0,
|
||||
1,
|
||||
State#state.manifest,
|
||||
{0, [{StartKey, EndKey, Pid}]}),
|
||||
?L0PEND_RESET};
|
||||
{false, _, _} ->
|
||||
{State#state.table_size,
|
||||
State#state.manifest,
|
||||
State#state.levelzero_pending};
|
||||
Unexpected ->
|
||||
io:format("Unexpected value of ~w~n", [Unexpected]),
|
||||
error
|
||||
end,
|
||||
case do_push_to_mem(DumpList, TableSize, State#state.memtable) of
|
||||
{twist, ApproxTableSize} ->
|
||||
{reply, ok, State#state{table_size=ApproxTableSize,
|
||||
manifest=Manifest,
|
||||
levelzero_pending=L0Pend}};
|
||||
{roll, ApproxTableSize} ->
|
||||
case {get_item(0, Manifest, []), L0Pend} of
|
||||
{[], ?L0PEND_RESET} ->
|
||||
L0SN = State#state.levelzero_sqn + 1,
|
||||
FileName = State#state.root_path
|
||||
++ ?FILES_FP ++ "/"
|
||||
++ integer_to_list(L0SN),
|
||||
SFT = leveled_sft:sft_new(FileName,
|
||||
ets:tab2list(State#state.memtable),
|
||||
[],
|
||||
0,
|
||||
#sft_options{wait=false}),
|
||||
{ok, L0Pid, Reply} = SFT,
|
||||
{{KL1Rem, []}, L0StartKey, L0EndKey} = Reply,
|
||||
{reply, ok, State#state{levelzero_pending={true,
|
||||
KL1Rem,
|
||||
{L0StartKey,
|
||||
L0EndKey,
|
||||
L0Pid}},
|
||||
table_size=ApproxTableSize,
|
||||
levelzero_sqn=L0SN}};
|
||||
_ ->
|
||||
io:format("Memory has exceeded limit but L0 file is still
|
||||
awaiting compaction ~n"),
|
||||
{reply, pause, State#state{table_size=ApproxTableSize,
|
||||
manifest=Manifest,
|
||||
levelzero_pending=L0Pend}}
|
||||
end
|
||||
end;
|
||||
handle_call({fetch, Key}, _From, State) ->
|
||||
{reply, fetch(Key, State#state.manifest, State#state.memtable), State};
|
||||
handle_call(work_for_clerk, From, State) ->
|
||||
{UpdState, Work} = return_work(State, From),
|
||||
{reply, Work, UpdState}.
|
||||
|
||||
handle_cast(_Msg, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
handle_info(_Info, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
%%% Internal functions
|
||||
%%%============================================================================
|
||||
|
||||
|
||||
fetch(Key, Manifest, TID) ->
|
||||
case ets:lookup(TID, Key) of
|
||||
[Object] ->
|
||||
Object;
|
||||
[] ->
|
||||
fetch(Key, Manifest, 0, fun leveled_sft:sft_get/2)
|
||||
end.
|
||||
|
||||
fetch(_Key, _Manifest, ?MAX_LEVELS + 1, _FetchFun) ->
|
||||
not_present;
|
||||
fetch(Key, Manifest, Level, FetchFun) ->
|
||||
LevelManifest = get_item(Level, Manifest, []),
|
||||
case lists:foldl(fun(File, Acc) ->
|
||||
case Acc of
|
||||
not_present when
|
||||
Key >= File#manifest_entry.start_key,
|
||||
File#manifest_entry.end_key >= Key ->
|
||||
File#manifest_entry.owner;
|
||||
PidFound ->
|
||||
PidFound
|
||||
end end,
|
||||
not_present,
|
||||
LevelManifest) of
|
||||
not_present ->
|
||||
fetch(Key, Manifest, Level + 1, FetchFun);
|
||||
FileToCheck ->
|
||||
case FetchFun(FileToCheck, Key) of
|
||||
not_present ->
|
||||
fetch(Key, Manifest, Level + 1, FetchFun);
|
||||
ObjectFound ->
|
||||
ObjectFound
|
||||
end
|
||||
end.
|
||||
|
||||
do_push_to_mem(DumpList, TableSize, MemTable) ->
|
||||
ets:insert(MemTable, DumpList),
|
||||
case TableSize + length(DumpList) of
|
||||
ApproxTableSize when ApproxTableSize > ?MAX_TABLESIZE ->
|
||||
case ets:info(MemTable, size) of
|
||||
ActTableSize when ActTableSize > ?MAX_TABLESIZE ->
|
||||
{roll, ActTableSize};
|
||||
ActTableSize ->
|
||||
io:format("Table size is actually ~w~n", [ActTableSize]),
|
||||
{twist, ActTableSize}
|
||||
end;
|
||||
ApproxTableSize ->
|
||||
io:format("Table size is approximately ~w~n", [ApproxTableSize]),
|
||||
{twist, ApproxTableSize}
|
||||
end.
|
||||
|
||||
|
||||
|
||||
|
@ -173,56 +364,70 @@
|
|||
%% to look at work at that level
|
||||
|
||||
return_work(State, From) ->
|
||||
case State#state.ongoing_work of
|
||||
[] ->
|
||||
WorkQueue = assess_workqueue([],
|
||||
0,
|
||||
State#state.manifest,
|
||||
[]),
|
||||
case length(WorkQueue) of
|
||||
L when L > 0 ->
|
||||
[{SrcLevel, Manifest}|OtherWork] = WorkQueue,
|
||||
io:format("Work at Level ~w to be scheduled for ~w
|
||||
with ~w queue items outstanding~n",
|
||||
[SrcLevel, From, length(OtherWork)]),
|
||||
{State#state{ongoing_work={SrcLevel, From, os:timestamp()}},
|
||||
{SrcLevel, Manifest}};
|
||||
_ ->
|
||||
WorkQueue = assess_workqueue([],
|
||||
0,
|
||||
State#state.manifest),
|
||||
case length(WorkQueue) of
|
||||
L when L > 0 ->
|
||||
[{SrcLevel, Manifest}|OtherWork] = WorkQueue,
|
||||
io:format("Work at Level ~w to be scheduled for ~w
|
||||
with ~w queue items outstanding~n",
|
||||
[SrcLevel, From, length(OtherWork)]),
|
||||
case State#state.ongoing_work of
|
||||
[] ->
|
||||
%% No work currently outstanding
|
||||
%% Can allocate work
|
||||
NextSQN = State#state.manifest_sqn + 1,
|
||||
FP = filepath(State#state.root_path,
|
||||
NextSQN,
|
||||
new_merge_files),
|
||||
ManFile = filepath(State#state.root_path,
|
||||
NextSQN,
|
||||
pending_manifest),
|
||||
WI = #penciller_work{next_sqn=NextSQN,
|
||||
clerk=From,
|
||||
src_level=SrcLevel,
|
||||
manifest=Manifest,
|
||||
start_time = os:timestamp(),
|
||||
ledger_filepath = FP,
|
||||
manifest_file = ManFile},
|
||||
{State#state{ongoing_work=[WI]}, WI};
|
||||
[OutstandingWork] ->
|
||||
%% Still awaiting a response
|
||||
io:format("Ongoing work requested by ~w but work
|
||||
outstanding from Level ~w and Clerk ~w
|
||||
at sequence number ~w~n",
|
||||
[From,
|
||||
OutstandingWork#penciller_work.src_level,
|
||||
OutstandingWork#penciller_work.clerk,
|
||||
OutstandingWork#penciller_work.next_sqn]),
|
||||
{State, none}
|
||||
end;
|
||||
[{SrcLevel, OtherFrom, _TS}|T] ->
|
||||
io:format("Ongoing work requested by ~w but work
|
||||
outstanding from Level ~w and Clerk ~w with
|
||||
~w other items outstanding~n",
|
||||
[From, SrcLevel, OtherFrom, length(T)]),
|
||||
_ ->
|
||||
{State, none}
|
||||
end.
|
||||
|
||||
assess_workqueue(WorkQ, ?MAX_LEVELS - 1, _Manifest, _OngoingWork) ->
|
||||
|
||||
|
||||
|
||||
assess_workqueue(WorkQ, ?MAX_LEVELS - 1, _Manifest) ->
|
||||
WorkQ;
|
||||
assess_workqueue(WorkQ, LevelToAssess, Manifest, OngoingWork)->
|
||||
assess_workqueue(WorkQ, LevelToAssess, Manifest)->
|
||||
MaxFiles = get_item(LevelToAssess, ?LEVEL_SCALEFACTOR, 0),
|
||||
FileCount = length(get_item(LevelToAssess, Manifest, [])),
|
||||
NewWQ = maybe_append_work(WorkQ, LevelToAssess, Manifest, MaxFiles,
|
||||
FileCount, OngoingWork),
|
||||
assess_workqueue(NewWQ, LevelToAssess + 1, Manifest, OngoingWork).
|
||||
FileCount),
|
||||
assess_workqueue(NewWQ, LevelToAssess + 1, Manifest).
|
||||
|
||||
|
||||
maybe_append_work(WorkQ, Level, Manifest,
|
||||
MaxFiles, FileCount, OngoingWork)
|
||||
MaxFiles, FileCount)
|
||||
when FileCount > MaxFiles ->
|
||||
io:format("Outstanding compaction work items of ~w at level ~w~n",
|
||||
[FileCount - MaxFiles, Level]),
|
||||
case lists:keyfind(Level, 1, OngoingWork) of
|
||||
{Level, Pid, TS} ->
|
||||
io:format("Work will not be added to queue due to
|
||||
outstanding work with ~w assigned at ~w~n", [Pid, TS]),
|
||||
WorkQ;
|
||||
false ->
|
||||
lists:append(WorkQ, [{Level, Manifest}])
|
||||
end;
|
||||
lists:append(WorkQ, [{Level, Manifest}]);
|
||||
maybe_append_work(WorkQ, Level, _Manifest,
|
||||
_MaxFiles, FileCount, _OngoingWork) ->
|
||||
_MaxFiles, FileCount) ->
|
||||
io:format("No compaction work due to file count ~w at level ~w~n",
|
||||
[FileCount, Level]),
|
||||
WorkQ.
|
||||
|
@ -238,54 +443,65 @@ get_item(Index, List, Default) ->
|
|||
|
||||
|
||||
%% Request a manifest change
|
||||
%% Should be passed the
|
||||
%% - {SrcLevel, NewManifest, ClearedFiles, MergeID, From, State}
|
||||
%% To complete a manifest change need to:
|
||||
%% - Update the Manifest Sequence Number (msn)
|
||||
%% - Confirm this Pid has a current element of manifest work outstanding at
|
||||
%% that level
|
||||
%% - Rename the manifest file created under the MergeID (<mergeID>.manifest)
|
||||
%% to the filename current.manifest
|
||||
%% - Update the state of the LevelFileRef lists
|
||||
%% - Add the ClearedFiles to the list of files to be cleared (as a tuple with
|
||||
%% the new msn)
|
||||
%% The clerk should have completed the work, and created a new manifest
|
||||
%% and persisted the new view of the manifest
|
||||
%%
|
||||
%% To complete the change of manifest:
|
||||
%% - the state of the manifest file needs to be changed from pending to current
|
||||
%% - the list of unreferenced files needs to be updated on State
|
||||
%% - the current manifest needs to be update don State
|
||||
%% - the list of ongoing work needs to be cleared of this item
|
||||
|
||||
|
||||
commit_manifest_change(NewManifest, ClearedFiles, MergeID, From, State) ->
|
||||
commit_manifest_change(ReturnedWorkItem, From, State) ->
|
||||
NewMSN = State#state.manifest_sqn + 1,
|
||||
OngoingWork = State#state.ongoing_work,
|
||||
[SentWorkItem] = State#state.ongoing_work,
|
||||
RootPath = State#state.root_path,
|
||||
UnreferencedFiles = State#state.unreferenced_files,
|
||||
case OngoingWork of
|
||||
{SrcLevel, From, TS} ->
|
||||
io:format("Merge ~s completed in ~w microseconds at Level ~w~n",
|
||||
[MergeID, timer:diff_now(os:timestamp(), TS), SrcLevel]),
|
||||
ok = rename_manifest_files(RootPath, MergeID),
|
||||
UnreferencedFilesUpd = update_deletions(ClearedFiles,
|
||||
|
||||
case {SentWorkItem#penciller_work.next_sqn,
|
||||
SentWorkItem#penciller_work.clerk} of
|
||||
{NewMSN, From} ->
|
||||
MTime = timer:diff_now(os:timestamp(),
|
||||
SentWorkItem#penciller_work.start_time),
|
||||
io:format("Merge to sqn ~w completed in ~w microseconds
|
||||
at Level ~w~n",
|
||||
[SentWorkItem#penciller_work.next_sqn,
|
||||
MTime,
|
||||
SentWorkItem#penciller_work.src_level]),
|
||||
ok = rename_manifest_files(RootPath, NewMSN),
|
||||
FilesToDelete = ReturnedWorkItem#penciller_work.unreferenced_files,
|
||||
UnreferencedFilesUpd = update_deletions(FilesToDelete,
|
||||
NewMSN,
|
||||
UnreferencedFiles),
|
||||
io:format("Merge ~s has been commmitted at sequence number ~w~n",
|
||||
[MergeID, NewMSN]),
|
||||
io:format("Merge has been commmitted at sequence number ~w~n",
|
||||
[NewMSN]),
|
||||
NewManifest = ReturnedWorkItem#penciller_work.new_manifest,
|
||||
{ok, State#state{ongoing_work=null,
|
||||
manifest_sqn=NewMSN,
|
||||
manifest=NewManifest,
|
||||
unreferenced_files=UnreferencedFilesUpd}};
|
||||
_ ->
|
||||
io:format("Merge commit ~s not matched to known work~n",
|
||||
[MergeID]),
|
||||
{MaybeWrongMSN, MaybeWrongClerk} ->
|
||||
io:format("Merge commit from ~w at sqn ~w not matched to expected
|
||||
clerk ~w or sqn ~w~n",
|
||||
[From, NewMSN, MaybeWrongClerk, MaybeWrongMSN]),
|
||||
{error, State}
|
||||
end.
|
||||
|
||||
end.
|
||||
|
||||
|
||||
rename_manifest_files(RootPath, MergeID) ->
|
||||
ManifestFP = RootPath ++ "/" ++ ?MANIFEST_FP ++ "/",
|
||||
ok = file:rename(ManifestFP ++ MergeID
|
||||
++ "." ++ ?PENDING_FILEX,
|
||||
ManifestFP ++ MergeID
|
||||
++ "." ++ ?CURRENT_FILEX),
|
||||
ok.
|
||||
rename_manifest_files(RootPath, NewMSN) ->
|
||||
file:rename(filepath(RootPath, NewMSN, pending_manifest),
|
||||
filepath(RootPath, NewMSN, current_manifest)).
|
||||
|
||||
filepath(RootPath, NewMSN, pending_manifest) ->
|
||||
RootPath ++ "/" ++ ?MANIFEST_FP ++ "/" ++ "nonzero_"
|
||||
++ integer_to_list(NewMSN) ++ "." ++ ?PENDING_FILEX;
|
||||
filepath(RootPath, NewMSN, current_manifest) ->
|
||||
RootPath ++ "/" ++ ?MANIFEST_FP ++ "/" ++ "nonzero_"
|
||||
++ integer_to_list(NewMSN) ++ "." ++ ?CURRENT_FILEX;
|
||||
filepath(RootPath, NewMSN, new_merge_files) ->
|
||||
RootPath ++ "/" ++ ?FILES_FP ++ "/" ++ integer_to_list(NewMSN).
|
||||
|
||||
update_deletions([], _NewMSN, UnreferencedFiles) ->
|
||||
UnreferencedFiles;
|
||||
update_deletions([ClearedFile|Tail], MSN, UnreferencedFiles) ->
|
||||
|
@ -303,12 +519,8 @@ compaction_work_assessment_test() ->
|
|||
L1 = [{{o, "B1", "K1"}, {o, "B2", "K2"}, dummy_pid},
|
||||
{{o, "B2", "K3"}, {o, "B4", "K4"}, dummy_pid}],
|
||||
Manifest = [{0, L0}, {1, L1}],
|
||||
OngoingWork1 = [],
|
||||
WorkQ1 = assess_workqueue([], 0, Manifest, OngoingWork1),
|
||||
WorkQ1 = assess_workqueue([], 0, Manifest),
|
||||
?assertMatch(WorkQ1, [{0, Manifest}]),
|
||||
OngoingWork2 = [{0, dummy_pid, os:timestamp()}],
|
||||
WorkQ2 = assess_workqueue([], 0, Manifest, OngoingWork2),
|
||||
?assertMatch(WorkQ2, []),
|
||||
L1Alt = lists:append(L1,
|
||||
[{{o, "B5", "K0001"}, {o, "B5", "K9999"}, dummy_pid},
|
||||
{{o, "B6", "K0001"}, {o, "B6", "K9999"}, dummy_pid},
|
||||
|
@ -318,7 +530,5 @@ compaction_work_assessment_test() ->
|
|||
{{o, "BA", "K0001"}, {o, "BA", "K9999"}, dummy_pid},
|
||||
{{o, "BB", "K0001"}, {o, "BB", "K9999"}, dummy_pid}]),
|
||||
Manifest3 = [{0, []}, {1, L1Alt}],
|
||||
WorkQ3 = assess_workqueue([], 0, Manifest3, OngoingWork1),
|
||||
?assertMatch(WorkQ3, [{1, Manifest3}]),
|
||||
WorkQ4 = assess_workqueue([], 0, Manifest3, OngoingWork2),
|
||||
?assertMatch(WorkQ4, [{1, Manifest3}]).
|
||||
WorkQ3 = assess_workqueue([], 0, Manifest3),
|
||||
?assertMatch(WorkQ3, [{1, Manifest3}]).
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue