Penciller Manifest and Locking

The penciller had the concept of a manifest_lock - but it wasn't clear
what the purpose of it was.

The updating of the manifest has now been updated to reduce the code and
make the process cleaner and more obvious.  Now the committed manifest
only covers non-L0 levels.  A clerk can work concurrently on a manifest
change whilst the Penciller is accepting a new L0 file.

On startup the manifets is opened as well as any L0 file.  There is a
possible race condition with killing process where there may be a L0
file which is merged but undeleted - and this is believed to be inert.

There is some outstanding work still.  Currently the whole store is
paused if a push_mem is received by the Penciller, and the writing of a
L0 sft file has not been completed.  The creation of a L0 file appears
to take about 300ms, so if the ledger_cache fills in this period a pause
will occurr (perhaps due to objects with lots of index entries).  It
would be preferable to pause more elegantly in this situation.  Perhaps
there should be a harsh timeout on the call to check the SFT complete,
and catching it should cause a refused response.  The next PUT will then
wait, but a any queued GETs can progress.
This commit is contained in:
martinsumner 2016-10-19 17:34:58 +01:00
parent f16f71ae81
commit 12fe1d01bd
7 changed files with 88 additions and 80 deletions

View file

@ -46,8 +46,6 @@
{root_path :: string(), {root_path :: string(),
cache_size :: integer(), cache_size :: integer(),
max_journalsize :: integer(), max_journalsize :: integer(),
metadata_extractor :: function(),
indexspec_converter :: function(),
snapshot_bookie :: pid()}). snapshot_bookie :: pid()}).
-record(iclerk_options, -record(iclerk_options,

View file

@ -133,6 +133,7 @@
terminate/2, terminate/2,
code_change/3, code_change/3,
book_start/1, book_start/1,
book_start/3,
book_riakput/3, book_riakput/3,
book_riakdelete/4, book_riakdelete/4,
book_riakget/3, book_riakget/3,
@ -169,6 +170,11 @@
%%% API %%% API
%%%============================================================================ %%%============================================================================
book_start(RootPath, LedgerCacheSize, JournalSize) ->
book_start(#bookie_options{root_path=RootPath,
cache_size=LedgerCacheSize,
max_journalsize=JournalSize}).
book_start(Opts) -> book_start(Opts) ->
gen_server:start(?MODULE, [Opts], []). gen_server:start(?MODULE, [Opts], []).

View file

@ -39,7 +39,6 @@
strip_to_keyseqstatusonly/1, strip_to_keyseqstatusonly/1,
striphead_to_details/1, striphead_to_details/1,
is_active/2, is_active/2,
is_indexkey/1,
endkey_passed/2, endkey_passed/2,
key_dominates/2, key_dominates/2,
print_key/1, print_key/1,
@ -108,11 +107,6 @@ to_ledgerkey(Bucket, Key, Tag, Field, Value) when Tag == ?IDX_TAG ->
to_ledgerkey(Bucket, Key, Tag) -> to_ledgerkey(Bucket, Key, Tag) ->
{Tag, Bucket, Key, null}. {Tag, Bucket, Key, null}.
is_indexkey({Tag, _, _, _}) when Tag == ?IDX_TAG ->
true;
is_indexkey(_Key) ->
false.
hash(Obj) -> hash(Obj) ->
erlang:phash2(term_to_binary(Obj)). erlang:phash2(term_to_binary(Obj)).

View file

@ -695,7 +695,7 @@ simple_manifest_writer(Manifest, ManSQN, RootPath) ->
TmpFN = filename:join(ManPath, TmpFN = filename:join(ManPath,
integer_to_list(ManSQN) ++ "." ++ ?PENDING_FILEX), integer_to_list(ManSQN) ++ "." ++ ?PENDING_FILEX),
MBin = term_to_binary(lists:map(fun({SQN, FN, _PID}) -> {SQN, FN} end, MBin = term_to_binary(lists:map(fun({SQN, FN, _PID}) -> {SQN, FN} end,
Manifest)), Manifest), [compressed]),
case filelib:is_file(NewFN) of case filelib:is_file(NewFN) of
true -> true ->
io:format("Error - trying to write manifest for" io:format("Error - trying to write manifest for"

View file

@ -139,7 +139,10 @@
%% - nonzero_<ManifestSQN#>.crr %% - nonzero_<ManifestSQN#>.crr
%% %%
%% On startup, the Penciller should look for the nonzero_*.crr file with the %% On startup, the Penciller should look for the nonzero_*.crr file with the
%% highest such manifest sequence number. %% highest such manifest sequence number. This will be started as the
%% manifest, together with any _0_0.sft file found at that Manifest SQN.
%% Level zero files are not kept in the persisted manifest, and adding a L0
%% file does not advanced the Manifest SQN.
%% %%
%% The pace at which the store can accept updates will be dependent on the %% The pace at which the store can accept updates will be dependent on the
%% speed at which the Penciller's Clerk can merge files at lower levels plus %% speed at which the Penciller's Clerk can merge files at lower levels plus
@ -157,6 +160,27 @@
%% table and build a new table starting with the remainder, and the keys from %% table and build a new table starting with the remainder, and the keys from
%% the latest push. %% the latest push.
%% %%
%% Only a single L0 file may exist at any one moment in time. If pushes are
%% received when memory is over the maximum size, the pushes must be kept into
%% memory.
%%
%% 1 - A L0 file is prompted to be created at ManifestSQN n
%% 2 - The next push to memory will be stalled until the L0 write is reported
%% as completed (as the memory needs to be flushed)
%% 3 - The completion of the L0 file will cause a prompt to be cast to the
%% clerk for them to look for work
%% 4 - On completion of the merge (of the L0 file into L1, as this will be the
%% highest priority work), the clerk will create a new manifest file at
%% manifest SQN n+1
%% 5 - The clerk will prompt the penciller about the change, and the Penciller
%% will then commit the change (by renaming the manifest file to be active, and
%% advancing th ein-memory state of the manifest and manifest SQN)
%% 6 - The Penciller having committed the change will cast back to the Clerk
%% to inform the Clerk that the chnage has been committed, and so it can carry
%% on requetsing new work
%% 7 - If the Penciller now receives a Push to over the max size, a new L0 file
%% can now be created with the ManifestSQN of n+1
%%
%% ---------- NOTES ON THE USE OF ETS ---------- %% ---------- NOTES ON THE USE OF ETS ----------
%% %%
%% Insertion into ETS is very fast, and so using ETS does not slow the PUT %% Insertion into ETS is very fast, and so using ETS does not slow the PUT
@ -177,7 +201,7 @@
%% they need to iterate, or iterate through map functions scanning all the %% they need to iterate, or iterate through map functions scanning all the
%% keys. The conversion may not be expensive, as we know loading into an ETS %% keys. The conversion may not be expensive, as we know loading into an ETS
%% table is fast - but there may be some hidden overheads with creating and %% table is fast - but there may be some hidden overheads with creating and
%5 destroying many ETS tables. %% destroying many ETS tables.
%% %%
%% A3 - keep a parallel list of lists of things that have gone in the ETS %% A3 - keep a parallel list of lists of things that have gone in the ETS
%% table in the format they arrived in %% table in the format they arrived in
@ -252,7 +276,8 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-define(LEVEL_SCALEFACTOR, [{0, 0}, {1, 8}, {2, 64}, {3, 512}, -define(LEVEL_SCALEFACTOR, [{0, 0}, {1, 8}, {2, 64}, {3, 512},
{4, 4096}, {5, 32768}, {6, 262144}, {7, infinity}]). {4, 4096}, {5, 32768}, {6, 262144},
{7, infinity}]).
-define(MAX_LEVELS, 8). -define(MAX_LEVELS, 8).
-define(MAX_WORK_WAIT, 300). -define(MAX_WORK_WAIT, 300).
-define(MANIFEST_FP, "ledger_manifest"). -define(MANIFEST_FP, "ledger_manifest").
@ -383,9 +408,6 @@ handle_call({push_mem, DumpList}, From, State=#state{is_snapshot=Snap})
% If not the change has still been made but the the L0 file will not have % If not the change has still been made but the the L0 file will not have
% been prompted - so the reply does not indicate failure but returns the % been prompted - so the reply does not indicate failure but returns the
% atom 'pause' to signal a loose desire for back-pressure to be applied. % atom 'pause' to signal a loose desire for back-pressure to be applied.
% The only reason in this case why there should be a pause is if the
% manifest is locked pending completion of a manifest change - so reacting
% to the pause signal may not be sensible
StartWatch = os:timestamp(), StartWatch = os:timestamp(),
case assess_sqn(DumpList) of case assess_sqn(DumpList) of
{MinSQN, MaxSQN} when MaxSQN >= MinSQN, {MinSQN, MaxSQN} when MaxSQN >= MinSQN,
@ -494,8 +516,6 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc},
SFTiter = initiate_rangequery_frommanifest(StartKey, SFTiter = initiate_rangequery_frommanifest(StartKey,
EndKey, EndKey,
State#state.manifest), State#state.manifest),
io:format("Manifest for iterator of:~n"),
print_manifest(SFTiter),
Acc = keyfolder(L0iter, SFTiter, StartKey, EndKey, {AccFun, InitAcc}), Acc = keyfolder(L0iter, SFTiter, StartKey, EndKey, {AccFun, InitAcc}),
{reply, Acc, State}; {reply, Acc, State};
handle_call(work_for_clerk, From, State) -> handle_call(work_for_clerk, From, State) ->
@ -588,18 +608,12 @@ terminate(Reason, State) ->
no_change -> no_change ->
State State
end, end,
% TODO:
% This next section (to the end of the case clause), appears to be
% pointless. It will persist the in-memory state to a SFT file, but on
% startup that file will be ignored as the manifest has not bene updated
%
% Should we update the manifest, or stop trying to persist on closure?
Dump = roll_into_list(State#state.memtable_copy), Dump = roll_into_list(State#state.memtable_copy),
case {UpdState#state.levelzero_pending, case {UpdState#state.levelzero_pending,
get_item(0, UpdState#state.manifest, []), get_item(0, UpdState#state.manifest, []),
length(Dump)} of length(Dump)} of
{?L0PEND_RESET, [], L} when L > 0 -> {?L0PEND_RESET, [], L} when L > 0 ->
MSN = UpdState#state.manifest_sqn + 1, MSN = UpdState#state.manifest_sqn,
FileName = UpdState#state.root_path FileName = UpdState#state.root_path
++ "/" ++ ?FILES_FP ++ "/" ++ "/" ++ ?FILES_FP ++ "/"
++ integer_to_list(MSN) ++ "_0_0", ++ integer_to_list(MSN) ++ "_0_0",
@ -701,7 +715,7 @@ start_from_file(PCLopts) ->
%% Find any L0 files %% Find any L0 files
L0FN = filepath(RootPath, L0FN = filepath(RootPath,
TopManSQN + 1, TopManSQN,
new_merge_files) ++ "_0_0.sft", new_merge_files) ++ "_0_0.sft",
case filelib:is_file(L0FN) of case filelib:is_file(L0FN) of
true -> true ->
@ -785,9 +799,9 @@ roll_memory(State, MaxSize, MemTableCopy) ->
case ets:info(State#state.memtable, size) of case ets:info(State#state.memtable, size) of
Size when Size > MaxSize -> Size when Size > MaxSize ->
L0 = get_item(0, State#state.manifest, []), L0 = get_item(0, State#state.manifest, []),
case {L0, manifest_locked(State)} of case L0 of
{[], false} -> [] ->
MSN = State#state.manifest_sqn + 1, MSN = State#state.manifest_sqn,
FileName = State#state.root_path FileName = State#state.root_path
++ "/" ++ ?FILES_FP ++ "/" ++ "/" ++ ?FILES_FP ++ "/"
++ integer_to_list(MSN) ++ "_0_0", ++ integer_to_list(MSN) ++ "_0_0",
@ -798,10 +812,6 @@ roll_memory(State, MaxSize, MemTableCopy) ->
0, 0,
Opts), Opts),
{ok, {true, L0Pid, os:timestamp()}, MSN, Size}; {ok, {true, L0Pid, os:timestamp()}, MSN, Size};
{[], true} ->
{pause,
"L0 file write blocked by change at sqn=~w~n",
[State#state.manifest_sqn]};
_ -> _ ->
{pause, {pause,
"L0 file write blocked by L0 file in manifest~n", "L0 file write blocked by L0 file in manifest~n",
@ -870,23 +880,6 @@ compare_to_sqn(Obj, SQN) ->
end. end.
%% Manifest lock - don't have two changes to the manifest happening
%% concurrently
% TODO: Is this necessary now?
manifest_locked(State) ->
if
length(State#state.ongoing_work) > 0 ->
true;
true ->
case State#state.levelzero_pending of
{true, _Pid, _TS} ->
true;
_ ->
false
end
end.
%% Work out what the current work queue should be %% Work out what the current work queue should be
%% %%
%% The work queue should have a lower level work at the front, and no work %% The work queue should have a lower level work at the front, and no work
@ -905,8 +898,24 @@ return_work(State, From) ->
io:format("Work at Level ~w to be scheduled for ~w with ~w " ++ io:format("Work at Level ~w to be scheduled for ~w with ~w " ++
"queue items outstanding~n", "queue items outstanding~n",
[SrcLevel, From, length(OtherWork)]), [SrcLevel, From, length(OtherWork)]),
case {manifest_locked(State), State#state.ongoing_work} of case {element(1, State#state.levelzero_pending),
{false, _} -> State#state.ongoing_work} of
{true, _} ->
% Once the L0 file is completed there will be more work
% - so don't be busy doing other work now
io:format("Allocation of work blocked as L0 pending~n"),
{State, none};
{_, [OutstandingWork]} ->
% Still awaiting a response
io:format("Ongoing work requested by ~w " ++
"but work outstanding from Level ~w " ++
"and Clerk ~w at sequence number ~w~n",
[From,
OutstandingWork#penciller_work.src_level,
OutstandingWork#penciller_work.clerk,
OutstandingWork#penciller_work.next_sqn]),
{State, none};
_ ->
%% No work currently outstanding %% No work currently outstanding
%% Can allocate work %% Can allocate work
NextSQN = State#state.manifest_sqn + 1, NextSQN = State#state.manifest_sqn + 1,
@ -923,22 +932,7 @@ return_work(State, From) ->
start_time = os:timestamp(), start_time = os:timestamp(),
ledger_filepath = FP, ledger_filepath = FP,
manifest_file = ManFile}, manifest_file = ManFile},
{State#state{ongoing_work=[WI]}, WI}; {State#state{ongoing_work=[WI]}, WI}
{true, [OutstandingWork]} ->
%% Still awaiting a response
io:format("Ongoing work requested by ~w " ++
"but work outstanding from Level ~w " ++
"and Clerk ~w at sequence number ~w~n",
[From,
OutstandingWork#penciller_work.src_level,
OutstandingWork#penciller_work.clerk,
OutstandingWork#penciller_work.next_sqn]),
{State, none};
{true, _} ->
%% Manifest locked
io:format("Manifest locked but no work outstanding " ++
"with clerk~n"),
{State, none}
end; end;
_ -> _ ->
{State, none} {State, none}
@ -1313,11 +1307,12 @@ commit_manifest_change(ReturnedWorkItem, State) ->
{NewMSN, _From} -> {NewMSN, _From} ->
MTime = timer:now_diff(os:timestamp(), MTime = timer:now_diff(os:timestamp(),
SentWorkItem#penciller_work.start_time), SentWorkItem#penciller_work.start_time),
WISrcLevel = SentWorkItem#penciller_work.src_level,
io:format("Merge to sqn ~w completed in ~w microseconds " ++ io:format("Merge to sqn ~w completed in ~w microseconds " ++
"at Level ~w~n", "from Level ~w~n",
[SentWorkItem#penciller_work.next_sqn, [SentWorkItem#penciller_work.next_sqn,
MTime, MTime,
SentWorkItem#penciller_work.src_level]), WISrcLevel]),
ok = rename_manifest_files(RootPath, NewMSN), ok = rename_manifest_files(RootPath, NewMSN),
FilesToDelete = ReturnedWorkItem#penciller_work.unreferenced_files, FilesToDelete = ReturnedWorkItem#penciller_work.unreferenced_files,
UnreferencedFilesUpd = update_deletions(FilesToDelete, UnreferencedFilesUpd = update_deletions(FilesToDelete,
@ -1326,10 +1321,26 @@ commit_manifest_change(ReturnedWorkItem, State) ->
io:format("Merge has been commmitted at sequence number ~w~n", io:format("Merge has been commmitted at sequence number ~w~n",
[NewMSN]), [NewMSN]),
NewManifest = ReturnedWorkItem#penciller_work.new_manifest, NewManifest = ReturnedWorkItem#penciller_work.new_manifest,
print_manifest(NewManifest),
CurrL0 = get_item(0, State#state.manifest, []),
% If the work isn't L0 work, then we may have an uncommitted
% manifest change at L0 - so add this back into the Manifest loop
% state
RevisedManifest = case {WISrcLevel, CurrL0} of
{0, _} ->
NewManifest;
{_, []} ->
NewManifest;
{_, [L0ManEntry]} ->
lists:keystore(0,
1,
NewManifest,
{0, [L0ManEntry]})
end,
print_manifest(RevisedManifest),
{ok, State#state{ongoing_work=[], {ok, State#state{ongoing_work=[],
manifest_sqn=NewMSN, manifest_sqn=NewMSN,
manifest=NewManifest, manifest=RevisedManifest,
unreferenced_files=UnreferencedFilesUpd}}; unreferenced_files=UnreferencedFilesUpd}};
{MaybeWrongMSN, From} -> {MaybeWrongMSN, From} ->
io:format("Merge commit at sqn ~w not matched to expected" ++ io:format("Merge commit at sqn ~w not matched to expected" ++
@ -1508,6 +1519,7 @@ simple_server_test() ->
?assertMatch(Key2, pcl_fetch(PCL, {o,"Bucket0002", "Key0002", null})), ?assertMatch(Key2, pcl_fetch(PCL, {o,"Bucket0002", "Key0002", null})),
maybe_pause_push(pcl_pushmem(PCL, KL2)), maybe_pause_push(pcl_pushmem(PCL, KL2)),
maybe_pause_push(pcl_pushmem(PCL, [Key3])), maybe_pause_push(pcl_pushmem(PCL, [Key3])),
?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001", null})), ?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001", null})),
?assertMatch(Key2, pcl_fetch(PCL, {o,"Bucket0002", "Key0002", null})), ?assertMatch(Key2, pcl_fetch(PCL, {o,"Bucket0002", "Key0002", null})),
?assertMatch(Key3, pcl_fetch(PCL, {o,"Bucket0003", "Key0003", null})), ?assertMatch(Key3, pcl_fetch(PCL, {o,"Bucket0003", "Key0003", null})),

View file

@ -282,9 +282,6 @@ handle_call({sft_new, Filename, KL1, KL2, Level}, _From, State) ->
FileMD, FileMD,
KL1, KL2, KL1, KL2,
Level), Level),
{KL1Rem, KL2Rem} = KeyRemainders,
io:format("File created with remainders of size ~w ~w~n",
[length(KL1Rem), length(KL2Rem)]),
{reply, {KeyRemainders, {reply, {KeyRemainders,
UpdFileMD#state.smallest_key, UpdFileMD#state.smallest_key,
UpdFileMD#state.highest_key}, UpdFileMD#state.highest_key},
@ -334,7 +331,10 @@ handle_call(get_maxsqn, _From, State) ->
{reply, State#state.highest_sqn, State}. {reply, State#state.highest_sqn, State}.
handle_cast({sft_new, Filename, Inp1, [], 0}, _State) -> handle_cast({sft_new, Filename, Inp1, [], 0}, _State) ->
SW = os:timestamp(),
{ok, State} = create_levelzero(Inp1, Filename), {ok, State} = create_levelzero(Inp1, Filename),
io:format("File creation of L0 file ~s took ~w microseconds~n",
[Filename, timer:now_diff(os:timestamp(), SW)]),
{noreply, State}; {noreply, State};
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.

View file

@ -40,9 +40,7 @@ simple_load_with2i(_Config) ->
simple_querycount(_Config) -> simple_querycount(_Config) ->
RootPath = testutil:reset_filestructure(), RootPath = testutil:reset_filestructure(),
StartOpts1 = #bookie_options{root_path=RootPath, {ok, Book1} = leveled_bookie:book_start(RootPath, 4000, 50000000),
max_journalsize=50000000},
{ok, Book1} = leveled_bookie:book_start(StartOpts1),
{TestObject, TestSpec} = testutil:generate_testobject(), {TestObject, TestSpec} = testutil:generate_testobject(),
ok = leveled_bookie:book_riakput(Book1, TestObject, TestSpec), ok = leveled_bookie:book_riakput(Book1, TestObject, TestSpec),
testutil:check_forobject(Book1, TestObject), testutil:check_forobject(Book1, TestObject),
@ -91,7 +89,7 @@ simple_querycount(_Config) ->
Book1, Book1,
?KEY_ONLY), ?KEY_ONLY),
ok = leveled_bookie:book_close(Book1), ok = leveled_bookie:book_close(Book1),
{ok, Book2} = leveled_bookie:book_start(StartOpts1), {ok, Book2} = leveled_bookie:book_start(RootPath, 2000, 50000000),
Index1Count = count_termsonindex("Bucket", Index1Count = count_termsonindex("Bucket",
"idx1_bin", "idx1_bin",
Book2, Book2,
@ -206,7 +204,7 @@ simple_querycount(_Config) ->
end, end,
R9), R9),
ok = leveled_bookie:book_close(Book2), ok = leveled_bookie:book_close(Book2),
{ok, Book3} = leveled_bookie:book_start(StartOpts1), {ok, Book3} = leveled_bookie:book_start(RootPath, 2000, 50000000),
lists:foreach(fun({IdxF, IdxT, X}) -> lists:foreach(fun({IdxF, IdxT, X}) ->
R = leveled_bookie:book_returnfolder(Book3, R = leveled_bookie:book_returnfolder(Book3,
{index_query, {index_query,
@ -223,7 +221,7 @@ simple_querycount(_Config) ->
end, end,
R9), R9),
ok = leveled_bookie:book_riakput(Book3, Obj9, Spc9), ok = leveled_bookie:book_riakput(Book3, Obj9, Spc9),
{ok, Book4} = leveled_bookie:book_start(StartOpts1), {ok, Book4} = leveled_bookie:book_start(RootPath, 2000, 50000000),
lists:foreach(fun({IdxF, IdxT, X}) -> lists:foreach(fun({IdxF, IdxT, X}) ->
R = leveled_bookie:book_returnfolder(Book4, R = leveled_bookie:book_returnfolder(Book4,
{index_query, {index_query,