From ce0c55a2ec012b6ad179598fcafccbe56afd0475 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Tue, 16 Aug 2016 12:45:48 +0100 Subject: [PATCH] Resolve issue of Remainders Two issues looked at - There shouldn't be a remainder after writing the L0 file, as this could have overlapping sequence numbers which will be missed on restart - There should be a safety-check to stop the Clerk from doing a fake push too soon after a background L0 file ahs been written (as the fake push would lock the ledger waiting for the L0 file write to finish) --- src/leveled_clerk.erl | 4 +- src/leveled_penciller.erl | 172 ++++++++++++++++++++++---------------- src/leveled_sft.erl | 132 +++++++++++++++++------------ 3 files changed, 177 insertions(+), 131 deletions(-) diff --git a/src/leveled_clerk.erl b/src/leveled_clerk.erl index 107f12b..f423b59 100644 --- a/src/leveled_clerk.erl +++ b/src/leveled_clerk.erl @@ -251,7 +251,7 @@ do_merge(KL1, KL2, Level, {Filepath, MSN}, FileCounter, OutList) -> io:format("File to be created as part of MSN=~w Filename=~s~n", [MSN, FileName]), TS1 = os:timestamp(), - case leveled_sft:sft_new(FileName, KL1, KL2, Level) of + case leveled_sft:sft_new(FileName, KL1, KL2, Level + 1) of {ok, _Pid, {error, Reason}} -> io:format("Exiting due to error~w~n", [Reason]), error; @@ -263,7 +263,7 @@ do_merge(KL1, KL2, Level, {Filepath, MSN}, FileCounter, OutList) -> owner=Pid, filename=FileName}]), MTime = timer:now_diff(os:timestamp(), TS1), - io:format("file creation took ~w microseconds ~n", [MTime]), + io:format("File creation took ~w microseconds ~n", [MTime]), do_merge(KL1Rem, KL2Rem, Level, {Filepath, MSN}, FileCounter + 1, ExtMan) end. diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 49de7c6..c84bfe7 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -1,29 +1,35 @@ %% -------- PENCILLER --------- %% -%% The penciller is repsonsible for writing and re-writing the ledger - a +%% The penciller is responsible for writing and re-writing the ledger - a %% persisted, ordered view of non-recent Keys and Metadata which have been %% added to the store. %% - The penciller maintains a manifest of all the files within the current %% Ledger. -%% - The Penciller queues re-write (compaction) work up to be managed by Clerks +%% - The Penciller provides re-write (compaction) work up to be managed by +%% the Penciller's Clerk %% - The Penciller mainatins a register of iterators who have requested %% snapshots of the Ledger -%% - The accepts new dumps (in the form of immutable ets tables) from the -%% Bookie, and calls the Bookie once the process of pencilling this data in -%% the Ledger is complete - and the Bookie is free to forget about the data +%% - The accepts new dumps (in the form of lists of keys) from the Bookie, and +%% calls the Bookie once the process of pencilling this data in the Ledger is +%% complete - and the Bookie is free to forget about the data %% %% -------- LEDGER --------- %% %% The Ledger is divided into many levels -%% L0: ETS tables are received from the Bookie and merged into a single ETS +%% - L0: New keys are received from the Bookie and merged into a single ETS %% table, until that table is the size of a SFT file, and it is then persisted -%% as a SFT file at this level. Once the persistence is completed, the ETS -%% table can be dropped. There can be only one SFT file at Level 0, so -%% the work to merge that file to the lower level must be the highest priority, -%% as otherwise the database will stall. -%% L1 TO L7: May contain multiple non-overlapping PIDs managing sft files. -%% Compaction work should be sheduled if the number of files exceeds the target -%% size of the level, where the target size is 8 ^ n. +%% as a SFT file at this level. L0 SFT files can be larger than the normal +%% maximum size - so we don't have to consider problems of either having more +%% than one L0 file (and handling what happens on a crash between writing the +%% files when the second may have overlapping sequence numbers), or having a +%% remainder with overlapping in sequence numbers in memory after the file is +%% written. Once the persistence is completed, the ETS table can be erased. +%% There can be only one SFT file at Level 0, so the work to merge that file +%% to the lower level must be the highest priority, as otherwise writes to the +%% ledger will stall, when there is next a need to persist. +%% - L1 TO L7: May contain multiple processes managing non-overlapping sft +%% files. Compaction work should be sheduled if the number of files exceeds +%% the target size of the level, where the target size is 8 ^ n. %% %% The most recent revision of a Key can be found by checking each level until %% the key is found. To check a level the correct file must be sought from the @@ -33,28 +39,30 @@ %% If a compaction change takes the size of a level beyond the target size, %% then compaction work for that level + 1 should be added to the compaction %% work queue. -%% Compaction work is fetched by the Pencllier's Clerk because: +%% Compaction work is fetched by the Penciller's Clerk because: %% - it has timed out due to a period of inactivity %% - it has been triggered by the a cast to indicate the arrival of high %% priority compaction work %% The Penciller's Clerk (which performs compaction worker) will always call -%% the Penciller to find out the highest priority work currently in the queue +%% the Penciller to find out the highest priority work currently required %% whenever it has either completed work, or a timeout has occurred since it %% was informed there was no work to do. %% -%% When the clerk picks work off the queue it will take the current manifest -%% for the level and level - 1. The clerk will choose which file to compact -%% from level - 1, and once the compaction is complete will call to the -%% Penciller with the new version of the manifest to be written. +%% When the clerk picks work it will take the current manifest, and the +%% Penciller assumes the manifest sequence number is to be incremented. +%% When the clerk has completed the work it cna request that the manifest +%% change be committed by the Penciller. The commit is made through changing +%% the filename of the new manifest - so the Penciller is not held up by the +%% process of wiritng a file, just altering file system metadata. %% -%% Once the new version of the manifest had been persisted, the state of any -%% deleted files will be changed to pending deletion. In pending deletion they -%% will call the Penciller on a timeout to confirm that they are no longer in -%% use (by any iterators). +%% The manifest is locked by a clerk taking work, or by there being a need to +%% write a file to Level 0. If the manifest is locked, then new keys can still +%% be added in memory - however, the response to that push will be to "pause", +%% that is to say the Penciller will ask the Bookie to slowdown. %% %% ---------- PUSH ---------- %% -%% The Penciller must support the PUSH of an ETS table from the Bookie. The +%% The Penciller must support the PUSH of a dump of keys from the Bookie. The %% call to PUSH should be immediately acknowledged, and then work should be %% completed to merge the ETS table into the L0 ETS table. %% @@ -177,7 +185,8 @@ -define(ARCHIVE_FILEX, "arc"). -define(MEMTABLE, mem). -define(MAX_TABLESIZE, 32000). --define(L0PEND_RESET, {false, [], none}). +-define(PROMPT_WAIT_ONL0, 5). +-define(L0PEND_RESET, {false, null, null}). -record(state, {manifest = [] :: list(), ongoing_work = [] :: list(), @@ -188,7 +197,7 @@ root_path = "../test" :: string(), table_size = 0 :: integer(), clerk :: pid(), - levelzero_pending = {false, [], none} :: tuple(), + levelzero_pending = ?L0PEND_RESET :: tuple(), memtable, backlog = false :: boolean()}). @@ -230,13 +239,18 @@ pcl_close(Pid) -> %%%============================================================================ init([RootPath]) -> - TID = ets:new(?MEMTABLE, [ordered_set, private]), + TID = ets:new(?MEMTABLE, [ordered_set]), {ok, Clerk} = leveled_clerk:clerk_new(self()), InitState = #state{memtable=TID, clerk=Clerk, root_path=RootPath}, %% Open manifest ManifestPath = InitState#state.root_path ++ "/" ++ ?MANIFEST_FP ++ "/", - {ok, Filenames} = file:list_dir(ManifestPath), + {ok, Filenames} = case filelib:is_dir(ManifestPath) of + true -> + file:list_dir(ManifestPath); + false -> + {ok, []} + end, CurrRegex = "nonzero_(?[0-9]+)\\." ++ ?CURRENT_FILEX, ValidManSQNs = lists:foldl(fun(FN, Acc) -> case re:run(FN, @@ -270,8 +284,7 @@ init([RootPath]) -> ++ "found in nonzero levels~n", [MaxSQN]), - %% TODO - %% Find any L0 File left outstanding + %% Find any L0 files L0FN = filepath(RootPath, TopManSQN + 1, new_merge_files) ++ "_0_0.sft", @@ -311,6 +324,8 @@ handle_call({push_mem, DumpList}, _From, State) -> Response = case assess_sqn(DumpList) of {MinSQN, MaxSQN} when MaxSQN > MinSQN, MinSQN >= State#state.ledger_sqn -> + io:format("SQN check completed in ~w microseconds~n", + [timer:now_diff(os:timestamp(),StartWatch)]), case push_to_memory(DumpList, State) of {ok, UpdState} -> {reply, ok, UpdState}; @@ -346,12 +361,31 @@ handle_call({confirm_delete, FileName}, _From, State) -> {reply, Reply, State} end; handle_call(prompt_compaction, _From, State) -> - case push_to_memory([], State) of - {ok, UpdState} -> - {reply, ok, UpdState#state{backlog=false}}; - {{pause, Reason, Details}, UpdState} -> - io:format("Excess work due to - " ++ Reason, Details), - {reply, pause, UpdState#state{backlog=true}} + %% If there is a prompt immediately after a L0 async write event then + %% there exists the potential for the prompt to stall the database. + %% Should only accept prompts if there has been a safe wait from the + %% last L0 write event. + Proceed = case State#state.levelzero_pending of + {true, _Pid, TS} -> + TD = timer:now_diff(os:timestamp(),TS), + if + TD < ?PROMPT_WAIT_ONL0 * 1000000 -> false; + true -> true + end; + ?L0PEND_RESET -> + true + end, + if + Proceed -> + case push_to_memory([], State) of + {ok, UpdState} -> + {reply, ok, UpdState#state{backlog=false}}; + {{pause, Reason, Details}, UpdState} -> + io:format("Excess work due to - " ++ Reason, Details), + {reply, pause, UpdState#state{backlog=true}} + end; + true -> + {reply, ok, State#state{backlog=false}} end; handle_call({manifest_change, WI}, _From, State) -> {ok, UpdState} = commit_manifest_change(WI, State), @@ -388,22 +422,21 @@ terminate(_Reason, State) -> Dump = ets:tab2list(State#state.memtable), case {State#state.levelzero_pending, get_item(0, State#state.manifest, []), length(Dump)} of - {{false, _, _}, [], L} when L > 0 -> + {?L0PEND_RESET, [], L} when L > 0 -> MSN = State#state.manifest_sqn + 1, FileName = State#state.root_path ++ "/" ++ ?FILES_FP ++ "/" ++ integer_to_list(MSN) ++ "_0_0", {ok, L0Pid, - {{KR1, _}, _SK, _HK}} = leveled_sft:sft_new(FileName ++ ".pnd", + {{[], []}, _SK, _HK}} = leveled_sft:sft_new(FileName ++ ".pnd", Dump, [], 0), - io:format("Dump of memory on close to filename ~s with" - ++ " remainder ~w~n", [FileName, length(KR1)]), + io:format("Dump of memory on close to filename ~s~n", [FileName]), leveled_sft:sft_close(L0Pid), file:rename(FileName ++ ".pnd", FileName ++ ".sft"); - {{false, _, _}, [], L} when L == 0 -> + {?L0PEND_RESET, [], L} when L == 0 -> io:format("No keys to dump from memory when closing~n"); _ -> io:format("No opportunity to persist memory before closing " @@ -424,31 +457,36 @@ code_change(_OldVsn, State, _Extra) -> push_to_memory(DumpList, State) -> {TableSize, UpdState} = case State#state.levelzero_pending of - {true, Remainder, {StartKey, EndKey, Pid}} -> + {true, Pid, _TS} -> %% Need to handle error scenarios? %% N.B. Sync call - so will be ready - {ok, SrcFN} = leveled_sft:sft_checkready(Pid), - %% Reset ETS, but re-insert any remainder + {ok, SrcFN, StartKey, EndKey} = leveled_sft:sft_checkready(Pid), true = ets:delete_all_objects(State#state.memtable), - true = ets:insert(State#state.memtable, Remainder), ManifestEntry = #manifest_entry{start_key=StartKey, end_key=EndKey, owner=Pid, filename=SrcFN}, - {length(Remainder), + {0, State#state{manifest=lists:keystore(0, 1, State#state.manifest, {0, [ManifestEntry]}), levelzero_pending=?L0PEND_RESET}}; - {false, _, _} -> + ?L0PEND_RESET -> {State#state.table_size, State} end, %% Prompt clerk to ask about work - do this for every push_mem ok = leveled_clerk:clerk_prompt(UpdState#state.clerk, penciller), - case do_push_to_mem(DumpList, TableSize, UpdState#state.memtable) of + SW2 = os:timestamp(), + MemoryInsertion = do_push_to_mem(DumpList, + TableSize, + UpdState#state.memtable), + io:format("Push into memory timed at ~w microseconds~n", + [timer:now_diff(os:timestamp(),SW2)]), + + case MemoryInsertion of {twist, ApproxTableSize} -> {ok, UpdState#state{table_size=ApproxTableSize}}; {roll, ApproxTableSize} -> @@ -459,30 +497,16 @@ push_to_memory(DumpList, State) -> FileName = UpdState#state.root_path ++ "/" ++ ?FILES_FP ++ "/" ++ integer_to_list(MSN) ++ "_0_0", - Dump = ets:tab2list(UpdState#state.memtable), - L0_SFT = leveled_sft:sft_new(FileName, - Dump, - [], - 0, - #sft_options{wait=false}), - {ok, L0Pid, Reply} = L0_SFT, - {{KL1Rem, []}, L0StartKey, L0EndKey} = Reply, - Backlog = length(KL1Rem), - Rsp = - if - Backlog > ?MAX_TABLESIZE -> - {pause, - "Backlog of ~w in memory table~n", - [Backlog]}; - true -> - ok - end, - {Rsp, + Opts = #sft_options{wait=false}, + {ok, L0Pid} = leveled_sft:sft_new(FileName, + UpdState#state.memtable, + [], + 0, + Opts), + {ok, UpdState#state{levelzero_pending={true, - KL1Rem, - {L0StartKey, - L0EndKey, - L0Pid}}, + L0Pid, + os:timestamp()}, table_size=ApproxTableSize, manifest_sqn=MSN}}; {[], true} -> @@ -558,20 +582,20 @@ manifest_locked(State) -> true; true -> case State#state.levelzero_pending of - {true, _, _} -> + {true, _Pid, _TS} -> true; _ -> false end end. - - %% Work out what the current work queue should be %% %% The work queue should have a lower level work at the front, and no work %% should be added to the queue if a compaction worker has already been asked %% to look at work at that level +%% +%% The full queue is calculated for logging purposes only return_work(State, From) -> WorkQueue = assess_workqueue([], diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index fe4d331..12e36e5 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -184,8 +184,8 @@ -define(HEADER_LEN, 56). -define(ITERATOR_SCANWIDTH, 1). -define(MERGE_SCANWIDTH, 8). --define(MAX_KEYS, ?SLOT_COUNT * ?BLOCK_COUNT * ?BLOCK_SIZE). -define(DELETE_TIMEOUT, 60000). +-define(MAX_KEYS, ?SLOT_COUNT * ?BLOCK_COUNT * ?BLOCK_SIZE). -record(state, {version = ?CURRENT_VERSION :: tuple(), @@ -202,8 +202,9 @@ summ_length :: integer(), filename :: string(), handle :: file:fd(), - background_complete=false :: boolean(), - background_failure="Unknown" :: string(), + background_complete = false :: boolean(), + background_failure = "Unknown" :: string(), + oversized_file = false :: boolean(), ready_for_delete = false ::boolean(), penciller :: pid()}). @@ -217,17 +218,17 @@ sft_new(Filename, KL1, KL2, Level) -> sft_new(Filename, KL1, KL2, Level, Options) -> {ok, Pid} = gen_server:start(?MODULE, [], []), - Reply = case Options#sft_options.wait of + case Options#sft_options.wait of true -> - gen_server:call(Pid, - {sft_new, Filename, KL1, KL2, Level}, - infinity); + Reply = gen_server:call(Pid, + {sft_new, Filename, KL1, KL2, Level}, + infinity), + {ok, Pid, Reply}; false -> - gen_server:call(Pid, - {sft_new, Filename, KL1, KL2, Level, background}, - infinity) - end, - {ok, Pid, Reply}. + gen_server:cast(Pid, + {sft_new, Filename, KL1, KL2, Level}), + {ok, Pid} + end. sft_open(Filename) -> {ok, Pid} = gen_server:start(?MODULE, [], []), @@ -278,47 +279,13 @@ sft_getmaxsequencenumber(Pid) -> init([]) -> {ok, #state{}}. -handle_call({sft_new, Filename, KL1, [], Level, background}, From, State) -> - {ListForFile, KL1Rem} = case length(KL1) of - L when L >= ?MAX_KEYS -> - lists:split(?MAX_KEYS, KL1); - _ -> - {KL1, []} - end, - StartKey = strip_to_keyonly(lists:nth(1, ListForFile)), - EndKey = strip_to_keyonly(lists:last(ListForFile)), - Ext = filename:extension(Filename), - Components = filename:split(Filename), - {TmpFilename, PrmFilename} = case Ext of - [] -> - {filename:join(Components) ++ ".pnd", filename:join(Components) ++ ".sft"}; - Ext -> - %% This seems unnecessarily hard - DN = filename:dirname(Filename), - FP = lists:last(Components), - FP_NOEXT = lists:sublist(FP, 1, 1 + length(FP) - length(Ext)), - {DN ++ "/" ++ FP_NOEXT ++ ".pnd", DN ++ "/" ++ FP_NOEXT ++ ".sft"} - end, - gen_server:reply(From, {{KL1Rem, []}, StartKey, EndKey}), - case create_file(TmpFilename) of - {error, Reason} -> - {noreply, State#state{background_complete=false, - background_failure=Reason}}; - {Handle, FileMD} -> - io:format("Creating file in background with input of size ~w~n", - [length(ListForFile)]), - % Key remainders must match to empty - Rename = {true, TmpFilename, PrmFilename}, - {ReadHandle, UpdFileMD, {[], []}} = complete_file(Handle, - FileMD, - ListForFile, - [], - Level, - Rename), - {noreply, UpdFileMD#state{handle=ReadHandle, - filename=PrmFilename, - background_complete=true}} - end; +handle_call({sft_new, Filename, KL1, [], 0}, _From, _State) -> + {ok, State} = create_levelzero(KL1, Filename), + {reply, + {{[], []}, + State#state.smallest_key, + State#state.highest_key}, + State}; handle_call({sft_new, Filename, KL1, KL2, Level}, _From, State) -> case create_file(Filename) of {error, Reason} -> @@ -365,7 +332,12 @@ handle_call(clear, _From, State) -> handle_call(background_complete, _From, State) -> case State#state.background_complete of true -> - {reply, {ok, State#state.filename}, State}; + {reply, + {ok, + State#state.filename, + State#state.smallest_key, + State#state.highest_key}, + State}; false -> {reply, {error, State#state.background_failure}, State} end; @@ -380,6 +352,9 @@ handle_call({set_for_delete, Penciller}, _From, State) -> handle_call(get_maxsqn, _From, State) -> {reply, State#state.highest_sqn, State}. +handle_cast({sft_new, Filename, Inp1, [], 0}, _State) -> + {ok, State} = create_levelzero(Inp1, Filename), + {noreply, State}; handle_cast(_Msg, State) -> {noreply, State}. @@ -423,6 +398,47 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================ +create_levelzero(Inp1, Filename) -> + ListForFile = case is_list(Inp1) of + true -> + Inp1; + false -> + ets:tab2list(Inp1) + end, + Ext = filename:extension(Filename), + Components = filename:split(Filename), + {TmpFilename, PrmFilename} = case Ext of + [] -> + {filename:join(Components) ++ ".pnd", + filename:join(Components) ++ ".sft"}; + Ext -> + %% This seems unnecessarily hard + DN = filename:dirname(Filename), + FP = lists:last(Components), + FP_NOEXT = lists:sublist(FP, 1, 1 + length(FP) - length(Ext)), + {DN ++ "/" ++ FP_NOEXT ++ ".pnd", DN ++ "/" ++ FP_NOEXT ++ ".sft"} + end, + case create_file(TmpFilename) of + {error, Reason} -> + {error, + #state{background_complete=false, background_failure=Reason}}; + {Handle, FileMD} -> + InputSize = length(ListForFile), + io:format("Creating file with input of size ~w~n", [InputSize]), + Rename = {true, TmpFilename, PrmFilename}, + {ReadHandle, UpdFileMD, {[], []}} = complete_file(Handle, + FileMD, + ListForFile, + [], + 0, + Rename), + {ok, + UpdFileMD#state{handle=ReadHandle, + filename=PrmFilename, + background_complete=true, + oversized_file=InputSize>?MAX_KEYS}} + end. + %% Start a bare file with an initial header and no further details %% Return the {Handle, metadata record} create_file(FileName) when is_list(FileName) -> @@ -446,7 +462,9 @@ create_file(FileName) when is_list(FileName) -> create_header(initial) -> {Major, Minor} = ?CURRENT_VERSION, Version = <>, - Options = <<0:8>>, % Not thought of any options + %% Not thought of any options - options are ignored + Options = <<0:8>>, + %% Settings are currently ignored {BlSize, BlCount, SlCount} = {?BLOCK_COUNT, ?BLOCK_SIZE, ?SLOT_COUNT}, Settings = <>, {SpareO, SpareL} = {<<0:48>>, <<0:192>>}, @@ -871,6 +889,10 @@ sftwrite_function(finalise, SNExtremes, KeyExtremes}). +%% Level 0 files are of variable (infinite) size to avoid issues with having +%% any remainders when flushing from memory +maxslots_bylevel(_SlotTotal, 0) -> + continue; maxslots_bylevel(SlotTotal, _Level) -> case SlotTotal of ?SLOT_COUNT ->