Resolve issue of Remainders

Two issues looked at
- There shouldn't be a remainder after writing the L0 file, as this
could have overlapping sequence numbers which will be missed on restart
- There should be a safety-check to stop the Clerk from doing a fake
push too soon after a background L0 file ahs been written (as the fake
push would lock the ledger waiting for the L0 file write to finish)
This commit is contained in:
martinsumner 2016-08-16 12:45:48 +01:00
parent 4586e2340a
commit ce0c55a2ec
3 changed files with 177 additions and 131 deletions

View file

@ -251,7 +251,7 @@ do_merge(KL1, KL2, Level, {Filepath, MSN}, FileCounter, OutList) ->
io:format("File to be created as part of MSN=~w Filename=~s~n",
[MSN, FileName]),
TS1 = os:timestamp(),
case leveled_sft:sft_new(FileName, KL1, KL2, Level) of
case leveled_sft:sft_new(FileName, KL1, KL2, Level + 1) of
{ok, _Pid, {error, Reason}} ->
io:format("Exiting due to error~w~n", [Reason]),
error;
@ -263,7 +263,7 @@ do_merge(KL1, KL2, Level, {Filepath, MSN}, FileCounter, OutList) ->
owner=Pid,
filename=FileName}]),
MTime = timer:now_diff(os:timestamp(), TS1),
io:format("file creation took ~w microseconds ~n", [MTime]),
io:format("File creation took ~w microseconds ~n", [MTime]),
do_merge(KL1Rem, KL2Rem, Level, {Filepath, MSN},
FileCounter + 1, ExtMan)
end.

View file

@ -1,29 +1,35 @@
%% -------- PENCILLER ---------
%%
%% The penciller is repsonsible for writing and re-writing the ledger - a
%% The penciller is responsible for writing and re-writing the ledger - a
%% persisted, ordered view of non-recent Keys and Metadata which have been
%% added to the store.
%% - The penciller maintains a manifest of all the files within the current
%% Ledger.
%% - The Penciller queues re-write (compaction) work up to be managed by Clerks
%% - The Penciller provides re-write (compaction) work up to be managed by
%% the Penciller's Clerk
%% - The Penciller mainatins a register of iterators who have requested
%% snapshots of the Ledger
%% - The accepts new dumps (in the form of immutable ets tables) from the
%% Bookie, and calls the Bookie once the process of pencilling this data in
%% the Ledger is complete - and the Bookie is free to forget about the data
%% - The accepts new dumps (in the form of lists of keys) from the Bookie, and
%% calls the Bookie once the process of pencilling this data in the Ledger is
%% complete - and the Bookie is free to forget about the data
%%
%% -------- LEDGER ---------
%%
%% The Ledger is divided into many levels
%% L0: ETS tables are received from the Bookie and merged into a single ETS
%% - L0: New keys are received from the Bookie and merged into a single ETS
%% table, until that table is the size of a SFT file, and it is then persisted
%% as a SFT file at this level. Once the persistence is completed, the ETS
%% table can be dropped. There can be only one SFT file at Level 0, so
%% the work to merge that file to the lower level must be the highest priority,
%% as otherwise the database will stall.
%% L1 TO L7: May contain multiple non-overlapping PIDs managing sft files.
%% Compaction work should be sheduled if the number of files exceeds the target
%% size of the level, where the target size is 8 ^ n.
%% as a SFT file at this level. L0 SFT files can be larger than the normal
%% maximum size - so we don't have to consider problems of either having more
%% than one L0 file (and handling what happens on a crash between writing the
%% files when the second may have overlapping sequence numbers), or having a
%% remainder with overlapping in sequence numbers in memory after the file is
%% written. Once the persistence is completed, the ETS table can be erased.
%% There can be only one SFT file at Level 0, so the work to merge that file
%% to the lower level must be the highest priority, as otherwise writes to the
%% ledger will stall, when there is next a need to persist.
%% - L1 TO L7: May contain multiple processes managing non-overlapping sft
%% files. Compaction work should be sheduled if the number of files exceeds
%% the target size of the level, where the target size is 8 ^ n.
%%
%% The most recent revision of a Key can be found by checking each level until
%% the key is found. To check a level the correct file must be sought from the
@ -33,28 +39,30 @@
%% If a compaction change takes the size of a level beyond the target size,
%% then compaction work for that level + 1 should be added to the compaction
%% work queue.
%% Compaction work is fetched by the Pencllier's Clerk because:
%% Compaction work is fetched by the Penciller's Clerk because:
%% - it has timed out due to a period of inactivity
%% - it has been triggered by the a cast to indicate the arrival of high
%% priority compaction work
%% The Penciller's Clerk (which performs compaction worker) will always call
%% the Penciller to find out the highest priority work currently in the queue
%% the Penciller to find out the highest priority work currently required
%% whenever it has either completed work, or a timeout has occurred since it
%% was informed there was no work to do.
%%
%% When the clerk picks work off the queue it will take the current manifest
%% for the level and level - 1. The clerk will choose which file to compact
%% from level - 1, and once the compaction is complete will call to the
%% Penciller with the new version of the manifest to be written.
%% When the clerk picks work it will take the current manifest, and the
%% Penciller assumes the manifest sequence number is to be incremented.
%% When the clerk has completed the work it cna request that the manifest
%% change be committed by the Penciller. The commit is made through changing
%% the filename of the new manifest - so the Penciller is not held up by the
%% process of wiritng a file, just altering file system metadata.
%%
%% Once the new version of the manifest had been persisted, the state of any
%% deleted files will be changed to pending deletion. In pending deletion they
%% will call the Penciller on a timeout to confirm that they are no longer in
%% use (by any iterators).
%% The manifest is locked by a clerk taking work, or by there being a need to
%% write a file to Level 0. If the manifest is locked, then new keys can still
%% be added in memory - however, the response to that push will be to "pause",
%% that is to say the Penciller will ask the Bookie to slowdown.
%%
%% ---------- PUSH ----------
%%
%% The Penciller must support the PUSH of an ETS table from the Bookie. The
%% The Penciller must support the PUSH of a dump of keys from the Bookie. The
%% call to PUSH should be immediately acknowledged, and then work should be
%% completed to merge the ETS table into the L0 ETS table.
%%
@ -177,7 +185,8 @@
-define(ARCHIVE_FILEX, "arc").
-define(MEMTABLE, mem).
-define(MAX_TABLESIZE, 32000).
-define(L0PEND_RESET, {false, [], none}).
-define(PROMPT_WAIT_ONL0, 5).
-define(L0PEND_RESET, {false, null, null}).
-record(state, {manifest = [] :: list(),
ongoing_work = [] :: list(),
@ -188,7 +197,7 @@
root_path = "../test" :: string(),
table_size = 0 :: integer(),
clerk :: pid(),
levelzero_pending = {false, [], none} :: tuple(),
levelzero_pending = ?L0PEND_RESET :: tuple(),
memtable,
backlog = false :: boolean()}).
@ -230,13 +239,18 @@ pcl_close(Pid) ->
%%%============================================================================
init([RootPath]) ->
TID = ets:new(?MEMTABLE, [ordered_set, private]),
TID = ets:new(?MEMTABLE, [ordered_set]),
{ok, Clerk} = leveled_clerk:clerk_new(self()),
InitState = #state{memtable=TID, clerk=Clerk, root_path=RootPath},
%% Open manifest
ManifestPath = InitState#state.root_path ++ "/" ++ ?MANIFEST_FP ++ "/",
{ok, Filenames} = file:list_dir(ManifestPath),
{ok, Filenames} = case filelib:is_dir(ManifestPath) of
true ->
file:list_dir(ManifestPath);
false ->
{ok, []}
end,
CurrRegex = "nonzero_(?<MSN>[0-9]+)\\." ++ ?CURRENT_FILEX,
ValidManSQNs = lists:foldl(fun(FN, Acc) ->
case re:run(FN,
@ -270,8 +284,7 @@ init([RootPath]) ->
++ "found in nonzero levels~n",
[MaxSQN]),
%% TODO
%% Find any L0 File left outstanding
%% Find any L0 files
L0FN = filepath(RootPath,
TopManSQN + 1,
new_merge_files) ++ "_0_0.sft",
@ -311,6 +324,8 @@ handle_call({push_mem, DumpList}, _From, State) ->
Response = case assess_sqn(DumpList) of
{MinSQN, MaxSQN} when MaxSQN > MinSQN,
MinSQN >= State#state.ledger_sqn ->
io:format("SQN check completed in ~w microseconds~n",
[timer:now_diff(os:timestamp(),StartWatch)]),
case push_to_memory(DumpList, State) of
{ok, UpdState} ->
{reply, ok, UpdState};
@ -346,12 +361,31 @@ handle_call({confirm_delete, FileName}, _From, State) ->
{reply, Reply, State}
end;
handle_call(prompt_compaction, _From, State) ->
case push_to_memory([], State) of
{ok, UpdState} ->
{reply, ok, UpdState#state{backlog=false}};
{{pause, Reason, Details}, UpdState} ->
io:format("Excess work due to - " ++ Reason, Details),
{reply, pause, UpdState#state{backlog=true}}
%% If there is a prompt immediately after a L0 async write event then
%% there exists the potential for the prompt to stall the database.
%% Should only accept prompts if there has been a safe wait from the
%% last L0 write event.
Proceed = case State#state.levelzero_pending of
{true, _Pid, TS} ->
TD = timer:now_diff(os:timestamp(),TS),
if
TD < ?PROMPT_WAIT_ONL0 * 1000000 -> false;
true -> true
end;
?L0PEND_RESET ->
true
end,
if
Proceed ->
case push_to_memory([], State) of
{ok, UpdState} ->
{reply, ok, UpdState#state{backlog=false}};
{{pause, Reason, Details}, UpdState} ->
io:format("Excess work due to - " ++ Reason, Details),
{reply, pause, UpdState#state{backlog=true}}
end;
true ->
{reply, ok, State#state{backlog=false}}
end;
handle_call({manifest_change, WI}, _From, State) ->
{ok, UpdState} = commit_manifest_change(WI, State),
@ -388,22 +422,21 @@ terminate(_Reason, State) ->
Dump = ets:tab2list(State#state.memtable),
case {State#state.levelzero_pending,
get_item(0, State#state.manifest, []), length(Dump)} of
{{false, _, _}, [], L} when L > 0 ->
{?L0PEND_RESET, [], L} when L > 0 ->
MSN = State#state.manifest_sqn + 1,
FileName = State#state.root_path
++ "/" ++ ?FILES_FP ++ "/"
++ integer_to_list(MSN) ++ "_0_0",
{ok,
L0Pid,
{{KR1, _}, _SK, _HK}} = leveled_sft:sft_new(FileName ++ ".pnd",
{{[], []}, _SK, _HK}} = leveled_sft:sft_new(FileName ++ ".pnd",
Dump,
[],
0),
io:format("Dump of memory on close to filename ~s with"
++ " remainder ~w~n", [FileName, length(KR1)]),
io:format("Dump of memory on close to filename ~s~n", [FileName]),
leveled_sft:sft_close(L0Pid),
file:rename(FileName ++ ".pnd", FileName ++ ".sft");
{{false, _, _}, [], L} when L == 0 ->
{?L0PEND_RESET, [], L} when L == 0 ->
io:format("No keys to dump from memory when closing~n");
_ ->
io:format("No opportunity to persist memory before closing "
@ -424,31 +457,36 @@ code_change(_OldVsn, State, _Extra) ->
push_to_memory(DumpList, State) ->
{TableSize, UpdState} = case State#state.levelzero_pending of
{true, Remainder, {StartKey, EndKey, Pid}} ->
{true, Pid, _TS} ->
%% Need to handle error scenarios?
%% N.B. Sync call - so will be ready
{ok, SrcFN} = leveled_sft:sft_checkready(Pid),
%% Reset ETS, but re-insert any remainder
{ok, SrcFN, StartKey, EndKey} = leveled_sft:sft_checkready(Pid),
true = ets:delete_all_objects(State#state.memtable),
true = ets:insert(State#state.memtable, Remainder),
ManifestEntry = #manifest_entry{start_key=StartKey,
end_key=EndKey,
owner=Pid,
filename=SrcFN},
{length(Remainder),
{0,
State#state{manifest=lists:keystore(0,
1,
State#state.manifest,
{0, [ManifestEntry]}),
levelzero_pending=?L0PEND_RESET}};
{false, _, _} ->
?L0PEND_RESET ->
{State#state.table_size, State}
end,
%% Prompt clerk to ask about work - do this for every push_mem
ok = leveled_clerk:clerk_prompt(UpdState#state.clerk, penciller),
case do_push_to_mem(DumpList, TableSize, UpdState#state.memtable) of
SW2 = os:timestamp(),
MemoryInsertion = do_push_to_mem(DumpList,
TableSize,
UpdState#state.memtable),
io:format("Push into memory timed at ~w microseconds~n",
[timer:now_diff(os:timestamp(),SW2)]),
case MemoryInsertion of
{twist, ApproxTableSize} ->
{ok, UpdState#state{table_size=ApproxTableSize}};
{roll, ApproxTableSize} ->
@ -459,30 +497,16 @@ push_to_memory(DumpList, State) ->
FileName = UpdState#state.root_path
++ "/" ++ ?FILES_FP ++ "/"
++ integer_to_list(MSN) ++ "_0_0",
Dump = ets:tab2list(UpdState#state.memtable),
L0_SFT = leveled_sft:sft_new(FileName,
Dump,
[],
0,
#sft_options{wait=false}),
{ok, L0Pid, Reply} = L0_SFT,
{{KL1Rem, []}, L0StartKey, L0EndKey} = Reply,
Backlog = length(KL1Rem),
Rsp =
if
Backlog > ?MAX_TABLESIZE ->
{pause,
"Backlog of ~w in memory table~n",
[Backlog]};
true ->
ok
end,
{Rsp,
Opts = #sft_options{wait=false},
{ok, L0Pid} = leveled_sft:sft_new(FileName,
UpdState#state.memtable,
[],
0,
Opts),
{ok,
UpdState#state{levelzero_pending={true,
KL1Rem,
{L0StartKey,
L0EndKey,
L0Pid}},
L0Pid,
os:timestamp()},
table_size=ApproxTableSize,
manifest_sqn=MSN}};
{[], true} ->
@ -558,20 +582,20 @@ manifest_locked(State) ->
true;
true ->
case State#state.levelzero_pending of
{true, _, _} ->
{true, _Pid, _TS} ->
true;
_ ->
false
end
end.
%% Work out what the current work queue should be
%%
%% The work queue should have a lower level work at the front, and no work
%% should be added to the queue if a compaction worker has already been asked
%% to look at work at that level
%%
%% The full queue is calculated for logging purposes only
return_work(State, From) ->
WorkQueue = assess_workqueue([],

View file

@ -184,8 +184,8 @@
-define(HEADER_LEN, 56).
-define(ITERATOR_SCANWIDTH, 1).
-define(MERGE_SCANWIDTH, 8).
-define(MAX_KEYS, ?SLOT_COUNT * ?BLOCK_COUNT * ?BLOCK_SIZE).
-define(DELETE_TIMEOUT, 60000).
-define(MAX_KEYS, ?SLOT_COUNT * ?BLOCK_COUNT * ?BLOCK_SIZE).
-record(state, {version = ?CURRENT_VERSION :: tuple(),
@ -202,8 +202,9 @@
summ_length :: integer(),
filename :: string(),
handle :: file:fd(),
background_complete=false :: boolean(),
background_failure="Unknown" :: string(),
background_complete = false :: boolean(),
background_failure = "Unknown" :: string(),
oversized_file = false :: boolean(),
ready_for_delete = false ::boolean(),
penciller :: pid()}).
@ -217,17 +218,17 @@ sft_new(Filename, KL1, KL2, Level) ->
sft_new(Filename, KL1, KL2, Level, Options) ->
{ok, Pid} = gen_server:start(?MODULE, [], []),
Reply = case Options#sft_options.wait of
case Options#sft_options.wait of
true ->
gen_server:call(Pid,
{sft_new, Filename, KL1, KL2, Level},
infinity);
Reply = gen_server:call(Pid,
{sft_new, Filename, KL1, KL2, Level},
infinity),
{ok, Pid, Reply};
false ->
gen_server:call(Pid,
{sft_new, Filename, KL1, KL2, Level, background},
infinity)
end,
{ok, Pid, Reply}.
gen_server:cast(Pid,
{sft_new, Filename, KL1, KL2, Level}),
{ok, Pid}
end.
sft_open(Filename) ->
{ok, Pid} = gen_server:start(?MODULE, [], []),
@ -278,47 +279,13 @@ sft_getmaxsequencenumber(Pid) ->
init([]) ->
{ok, #state{}}.
handle_call({sft_new, Filename, KL1, [], Level, background}, From, State) ->
{ListForFile, KL1Rem} = case length(KL1) of
L when L >= ?MAX_KEYS ->
lists:split(?MAX_KEYS, KL1);
_ ->
{KL1, []}
end,
StartKey = strip_to_keyonly(lists:nth(1, ListForFile)),
EndKey = strip_to_keyonly(lists:last(ListForFile)),
Ext = filename:extension(Filename),
Components = filename:split(Filename),
{TmpFilename, PrmFilename} = case Ext of
[] ->
{filename:join(Components) ++ ".pnd", filename:join(Components) ++ ".sft"};
Ext ->
%% This seems unnecessarily hard
DN = filename:dirname(Filename),
FP = lists:last(Components),
FP_NOEXT = lists:sublist(FP, 1, 1 + length(FP) - length(Ext)),
{DN ++ "/" ++ FP_NOEXT ++ ".pnd", DN ++ "/" ++ FP_NOEXT ++ ".sft"}
end,
gen_server:reply(From, {{KL1Rem, []}, StartKey, EndKey}),
case create_file(TmpFilename) of
{error, Reason} ->
{noreply, State#state{background_complete=false,
background_failure=Reason}};
{Handle, FileMD} ->
io:format("Creating file in background with input of size ~w~n",
[length(ListForFile)]),
% Key remainders must match to empty
Rename = {true, TmpFilename, PrmFilename},
{ReadHandle, UpdFileMD, {[], []}} = complete_file(Handle,
FileMD,
ListForFile,
[],
Level,
Rename),
{noreply, UpdFileMD#state{handle=ReadHandle,
filename=PrmFilename,
background_complete=true}}
end;
handle_call({sft_new, Filename, KL1, [], 0}, _From, _State) ->
{ok, State} = create_levelzero(KL1, Filename),
{reply,
{{[], []},
State#state.smallest_key,
State#state.highest_key},
State};
handle_call({sft_new, Filename, KL1, KL2, Level}, _From, State) ->
case create_file(Filename) of
{error, Reason} ->
@ -365,7 +332,12 @@ handle_call(clear, _From, State) ->
handle_call(background_complete, _From, State) ->
case State#state.background_complete of
true ->
{reply, {ok, State#state.filename}, State};
{reply,
{ok,
State#state.filename,
State#state.smallest_key,
State#state.highest_key},
State};
false ->
{reply, {error, State#state.background_failure}, State}
end;
@ -380,6 +352,9 @@ handle_call({set_for_delete, Penciller}, _From, State) ->
handle_call(get_maxsqn, _From, State) ->
{reply, State#state.highest_sqn, State}.
handle_cast({sft_new, Filename, Inp1, [], 0}, _State) ->
{ok, State} = create_levelzero(Inp1, Filename),
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
@ -423,6 +398,47 @@ code_change(_OldVsn, State, _Extra) ->
%%%============================================================================
create_levelzero(Inp1, Filename) ->
ListForFile = case is_list(Inp1) of
true ->
Inp1;
false ->
ets:tab2list(Inp1)
end,
Ext = filename:extension(Filename),
Components = filename:split(Filename),
{TmpFilename, PrmFilename} = case Ext of
[] ->
{filename:join(Components) ++ ".pnd",
filename:join(Components) ++ ".sft"};
Ext ->
%% This seems unnecessarily hard
DN = filename:dirname(Filename),
FP = lists:last(Components),
FP_NOEXT = lists:sublist(FP, 1, 1 + length(FP) - length(Ext)),
{DN ++ "/" ++ FP_NOEXT ++ ".pnd", DN ++ "/" ++ FP_NOEXT ++ ".sft"}
end,
case create_file(TmpFilename) of
{error, Reason} ->
{error,
#state{background_complete=false, background_failure=Reason}};
{Handle, FileMD} ->
InputSize = length(ListForFile),
io:format("Creating file with input of size ~w~n", [InputSize]),
Rename = {true, TmpFilename, PrmFilename},
{ReadHandle, UpdFileMD, {[], []}} = complete_file(Handle,
FileMD,
ListForFile,
[],
0,
Rename),
{ok,
UpdFileMD#state{handle=ReadHandle,
filename=PrmFilename,
background_complete=true,
oversized_file=InputSize>?MAX_KEYS}}
end.
%% Start a bare file with an initial header and no further details
%% Return the {Handle, metadata record}
create_file(FileName) when is_list(FileName) ->
@ -446,7 +462,9 @@ create_file(FileName) when is_list(FileName) ->
create_header(initial) ->
{Major, Minor} = ?CURRENT_VERSION,
Version = <<Major:5, Minor:3>>,
Options = <<0:8>>, % Not thought of any options
%% Not thought of any options - options are ignored
Options = <<0:8>>,
%% Settings are currently ignored
{BlSize, BlCount, SlCount} = {?BLOCK_COUNT, ?BLOCK_SIZE, ?SLOT_COUNT},
Settings = <<BlSize:8, BlCount:8, SlCount:16>>,
{SpareO, SpareL} = {<<0:48>>, <<0:192>>},
@ -871,6 +889,10 @@ sftwrite_function(finalise,
SNExtremes,
KeyExtremes}).
%% Level 0 files are of variable (infinite) size to avoid issues with having
%% any remainders when flushing from memory
maxslots_bylevel(_SlotTotal, 0) ->
continue;
maxslots_bylevel(SlotTotal, _Level) ->
case SlotTotal of
?SLOT_COUNT ->