From d3e985ed803f046f961873c063a393ec6b360d6e Mon Sep 17 00:00:00 2001 From: martinsumner Date: Wed, 21 Sep 2016 18:31:42 +0100 Subject: [PATCH] Refactor Penciller Push Two aspects of pushing to the penciller have been refactored: 1 - Allow the penciller to respond before the ETS table has been updated to unlock the Bookie sooner. 2 - Change the way the copy of the memtable is stored to work more effectively with snapshots wihtout locking the Penciller any further on a snapshot or push request --- .rebar/erlcinfo | Bin 531 -> 0 bytes include/leveled.hrl | 1 + src/leveled_iclerk.erl | 24 +- src/leveled_pclerk.erl | 6 +- src/leveled_penciller.erl | 613 +++++++++++++++++++++++++------------- 5 files changed, 433 insertions(+), 211 deletions(-) delete mode 100644 .rebar/erlcinfo diff --git a/.rebar/erlcinfo b/.rebar/erlcinfo deleted file mode 100644 index f883b7cc3e21883e73352197d3098883ab2bc41c..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 531 zcmV+u0_^>RPyhf5b9i2@Rm)D}FcdWnuhJJy3j!ow5({QQA}XPZnOVeZCCn$Dd9pcqo}XLiYYa7lxq)$MUg}^_Q77 z%>qd-X{T#WNK-1Za?%cTk@P4PzRT$=?h!NwArkTr=Bw8+J_^$HrKu@B3@CX*7&ye47)`mZg zL0>25Io}It79tqURlXWgN0d|lBe{kP>435C>w6z0Y%xXvI*wX zlA;w$ocD|&s@)uVjuUkhZsX5ZKw38xY(rDk9K5-5P)A#QW80}D-H7NbT!Ck+FsSRv ztH9CIbJW^bs^h*K;j;QRTAQKX%7*Ds#ae}J@Y&^!YCSZ!K!d5*Zu3U{jB2vgyT{b} zjmx)rO@-tuuu1ZK{toabJM*7Y3@ZQt diff --git a/include/leveled.hrl b/include/leveled.hrl index 030fdd4..0f929cc 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -30,6 +30,7 @@ -record(penciller_options, {root_path :: string(), + penciller :: pid(), max_inmemory_tablesize :: integer()}). -record(bookie_options, diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 3f95547..335a649 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -19,7 +19,8 @@ -include_lib("eunit/include/eunit.hrl"). --define(KEYS_TO_CHECK, 100). +-define(BATCH_SIZE, 16) +-define(BATCHES_TO_CHECK, 8). -record(state, {owner :: pid()}). @@ -81,6 +82,27 @@ journal_compact(_InkerManifest, _Penciller, _Timeout, _Owner) -> check_all_files(_InkerManifest) -> ok. +check_single_file(CDB, _PencilSnapshot, SampleSize, BatchSize) -> + PositionList = leveled_cdb:cdb_getpositions(CDB, SampleSize), + KeySizeList = fetch_inbatches(PositionList, BatchSize, CDB, []), + KeySizeList. + %% TODO: + %% Need to check the penciller snapshot to see if these keys are at the + %% right sequence number + %% + %% The calculate the proportion (by value size) of the CDB which is at the + %% wrong sequence number to help determine eligibility for compaction + %% + %% BIG TODO: + %% Need to snapshot a penciller + +fetch_inbatches([], _BatchSize, _CDB, CheckedList) -> + CheckedList; +fetch_inbatches(PositionList, BatchSize, CDB, CheckedList) -> + {Batch, Tail} = lists:split(BatchSize, PositionList), + KL_List = leveled_cdb:direct_fetch(CDB, Batch, key_size), + fetch_inbatches(Tail, BatchSize, CDB, CheckedList ++ KL_List). + window_closed(_Timeout) -> true. diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index d3334da..c54ba01 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -59,12 +59,12 @@ handle_cast(stop, State) -> {stop, normal, State}. handle_info(timeout, State) -> - %% The pcl prompt will cause a penciller_prompt, to re-trigger timeout case leveled_penciller:pcl_prompt(State#state.owner) of ok -> - {noreply, State}; + Timeout = requestandhandle_work(State), + {noreply, State, Timeout}; pause -> - {noreply, State} + {noreply, State, ?INACTIVITY_TIMEOUT} end; handle_info(_Info, State) -> {noreply, State}. diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 8ec90c5..225431f 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -86,6 +86,10 @@ %% files valid at the point of the snapshot until either the iterator is %% completed or has timed out. %% +%% Snapshot requests may request a filtered view of the ETS table (whihc may +%% be quicker than requesting the full table), or requets a snapshot of only +%% the persisted part of the Ledger +%% %% ---------- ON STARTUP ---------- %% %% On Startup the Bookie with ask the Penciller to initiate the Ledger first. @@ -146,6 +150,66 @@ %% table and build a new table starting with the remainder, and the keys from %% the latest push. %% +%% ---------- NOTES ON THE USE OF ETS ---------- +%% +%% Insertion into ETS is very fast, and so using ETS does not slow the PUT +%% path. However, an ETS table is mutable, so it does complicate the +%% snapshotting of the Ledger. +%% +%% Some alternatives have been considered: +%% +%% A1 - Use gb_trees not ETS table +%% * Speed of inserts are too slow especially as the Bookie is blocked until +%% the insert is complete. Inserting 32K very simple keys takes 250ms. Only +%% the naive commands can be used, as Keys may be present - so not easy to +%% optimise. There is a lack of bulk operations +%% +%% A2 - Use some other structure other than gb_trees or ETS tables +%% * There is nothing else that will support iterators, so snapshots would +%% either need to do a conversion when they request the snapshot if +%% they need to iterate, or iterate through map functions scanning all the +%% keys. The conversion may not be expensive, as we know loading into an ETS +%% table is fast - but there may be some hidden overheads with creating and +%5 destroying many ETS tables. +%% +%% A3 - keep a parallel list of lists of things that have gone in the ETS +%% table in the format they arrived in +%% * There is doubling up of memory, and the snapshot must do some work to +%% make use of these lists. This combines the continued use of fast ETS +%% with the solution of A2 at a memory cost. +%% +%% A4 - Try and cache the conversion to be shared between snapshots registered +%% at the same Ledger SQN +%% * This is a rif on A2/A3, but if generally there is o(10) or o(100) seconds +%% between memory pushes, but much more frequent snapshots this may be more +%% efficient +%% +%% A5 - Produce a specific snapshot of the ETS table via an iterator on demand +%% for each snapshot +%% * So if a snapshot was required for na iterator, the Penciller would block +%% whilst it iterated over the ETS table first to produce a snapshot-specific +%% immutbale view. If the snapshot was required for a long-lived complete view +%% of the database the Penciller would block for a tab2list. +%% +%% A6 - Have snapshots incrementally create and share immutable trees, from a +%% parallel cache of changes +%% * This is a variance on A3. As changes are pushed to the Penciller in the +%% form of lists the Penciller updates a cache of the lists that are contained +%% in the current ETS table. These lists are returned to the snapshot when +%% the snapshot is registered. All snapshots it is assumed will convert these +%% lists into a gb_tree to use, but following that conversion they can cast +%% to the Penciller to refine the cache, so that the cache will become a +%% gb_tree up the ledger SQN at which the snapshot is registered, and now only +%% store new lists for subsequent updates. Future snapshot requests (before +%% the ets table is flushed) will now receive the array (if no changes have) +%% been made, or the array and only the lists needed to incrementally change +%% the array. If changes are infrequent, each snapshot request will pay the +%% full 20ms to 250ms cost of producing the array (although perhaps the clerk +%% could also update periodiclaly to avoid this). If changes are frequent, +%% the snapshot will generally not require to do a conversion, or will only +%% be required to do a small conversion +%% +%% A6 is the preferred option -module(leveled_penciller). @@ -169,7 +233,10 @@ pcl_confirmdelete/2, pcl_prompt/1, pcl_close/1, + pcl_registersnapshot/2, + pcl_updatesnapshotcache/3, pcl_getstartupsequencenumber/1, + roll_new_tree/3, clean_testdir/1]). -include_lib("eunit/include/eunit.hrl"). @@ -187,21 +254,26 @@ -define(PROMPT_WAIT_ONL0, 5). -define(L0PEND_RESET, {false, null, null}). +-record(l0snapshot, {increments = [] :: list, + tree = gb_trees:empty() :: gb_trees:tree(), + ledger_sqn = 0 :: integer()}). + -record(state, {manifest = [] :: list(), ongoing_work = [] :: list(), manifest_sqn = 0 :: integer(), ledger_sqn = 0 :: integer(), - registered_iterators = [] :: list(), + registered_snapshots = [] :: list(), unreferenced_files = [] :: list(), root_path = "../test" :: string(), table_size = 0 :: integer(), clerk :: pid(), levelzero_pending = ?L0PEND_RESET :: tuple(), - levelzero_snapshot = [] :: list(), + memtable_copy = #l0snapshot{} :: #l0snapshot{}, memtable, backlog = false :: boolean(), memtable_maxsize :: integer()}). + %%%============================================================================ %%% API @@ -227,22 +299,265 @@ pcl_requestmanifestchange(Pid, WorkItem) -> gen_server:call(Pid, {manifest_change, WorkItem}, infinity). pcl_confirmdelete(Pid, FileName) -> - gen_server:call(Pid, {confirm_delete, FileName}). + gen_server:call(Pid, {confirm_delete, FileName}, infinity). pcl_prompt(Pid) -> - gen_server:call(Pid, prompt_compaction). + gen_server:call(Pid, prompt_compaction, infinity). pcl_getstartupsequencenumber(Pid) -> - gen_server:call(Pid, get_startup_sqn). + gen_server:call(Pid, get_startup_sqn, infinity). + +pcl_registersnapshot(Pid, Snapshot) -> + gen_server:call(Pid, {register_snapshot, Snapshot}, infinity). + +pcl_updatesnapshotcache(Pid, Tree, SQN) -> + gen_server:cast(Pid, {update_snapshotcache, Tree, SQN}). pcl_close(Pid) -> gen_server:call(Pid, close). + %%%============================================================================ %%% gen_server callbacks %%%============================================================================ init([PCLopts]) -> + case PCLopts#penciller_options.root_path of + undefined -> + {ok, #state{}}; + _RootPath -> + start_from_file(PCLopts) + end. + + +handle_call({push_mem, DumpList}, From, State0) -> + % The process for pushing to memory is as follows + % - Check that the inbound list does not contain any Keys with a lower + % sequence number than any existing keys (assess_sqn/1) + % - Check that any file that had been sent to be written to L0 previously + % is now completed. If it is wipe out the in-memory view as this is now + % safely persisted. This will block waiting for this to complete if it + % hasn't (checkready_pushmem/1). + % - Quick check to see if there is a need to write a L0 file + % (quickcheck_pushmem/3). If there clearly isn't, then we can reply, and + % then add to memory in the background before updating the loop state + % - Push the update into memory (do_pushtomem/3) + % - If we haven't got through quickcheck now need to check if there is a + % definite need to write a new L0 file (roll_memory/2). If all clear this + % will write the file in the background and allow a response to the user. + % If not the change has still been made but the the L0 file will not have + % been prompted - so the reply does not indicate failure but returns the + % atom 'pause' to signal a loose desire for back-pressure to be applied. + % The only reason in this case why there should be a pause is if the + % manifest is locked pending completion of a manifest change - so reacting + % to the pause signal may not be sensible + StartWatch = os:timestamp(), + case assess_sqn(DumpList) of + {MinSQN, MaxSQN} when MaxSQN >= MinSQN, + MinSQN >= State0#state.ledger_sqn -> + MaxTableSize = State0#state.memtable_maxsize, + {TableSize0, State1} = checkready_pushtomem(State0), + case quickcheck_pushtomem(DumpList, + TableSize0, + MaxTableSize) of + {twist, TableSize1} -> + gen_server:reply(From, ok), + io:format("Reply made on push in ~w microseconds~n", + [timer:now_diff(os:timestamp(), StartWatch)]), + L0Snap = do_pushtomem(DumpList, + State1#state.memtable, + State1#state.memtable_copy, + MaxSQN), + io:format("Push completed in ~w microseconds~n", + [timer:now_diff(os:timestamp(), StartWatch)]), + {noreply, + State1#state{memtable_copy=L0Snap, + table_size=TableSize1, + ledger_sqn=MaxSQN}}; + {maybe_roll, TableSize1} -> + L0Snap = do_pushtomem(DumpList, + State1#state.memtable, + State1#state.memtable_copy, + MaxSQN), + + case roll_memory(State1, MaxTableSize) of + {ok, L0Pend, ManSN, TableSize2} -> + io:format("Push completed in ~w microseconds~n", + [timer:now_diff(os:timestamp(), StartWatch)]), + {reply, + ok, + State1#state{levelzero_pending=L0Pend, + table_size=TableSize2, + manifest_sqn=ManSN, + memtable_copy=L0Snap, + ledger_sqn=MaxSQN, + backlog=false}}; + {pause, Reason, Details} -> + io:format("Excess work due to - " ++ Reason, + Details), + {reply, + pause, + State1#state{backlog=true, + memtable_copy=L0Snap, + table_size=TableSize1, + ledger_sqn=MaxSQN}} + end + end; + {MinSQN, MaxSQN} -> + io:format("Mismatch of sequence number expectations with push " + ++ "having sequence numbers between ~w and ~w " + ++ "but current sequence number is ~w~n", + [MinSQN, MaxSQN, State0#state.ledger_sqn]), + {reply, refused, State0}; + empty -> + io:format("Empty request pushed to Penciller~n"), + {reply, ok, State0} + end; +handle_call({fetch, Key}, _From, State) -> + {reply, fetch(Key, State#state.manifest, State#state.memtable), State}; +handle_call(work_for_clerk, From, State) -> + {UpdState, Work} = return_work(State, From), + {reply, {Work, UpdState#state.backlog}, UpdState}; +handle_call({confirm_delete, FileName}, _From, State) -> + Reply = confirm_delete(FileName, + State#state.unreferenced_files, + State#state.registered_snapshots), + case Reply of + true -> + UF1 = lists:keydelete(FileName, 1, State#state.unreferenced_files), + {reply, true, State#state{unreferenced_files=UF1}}; + _ -> + {reply, Reply, State} + end; +handle_call(prompt_compaction, _From, State) -> + %% If there is a prompt immediately after a L0 async write event then + %% there exists the potential for the prompt to stall the database. + %% Should only accept prompts if there has been a safe wait from the + %% last L0 write event. + Proceed = case State#state.levelzero_pending of + {true, _Pid, TS} -> + TD = timer:now_diff(os:timestamp(),TS), + if + TD < ?PROMPT_WAIT_ONL0 * 1000000 -> false; + true -> true + end; + ?L0PEND_RESET -> + true + end, + if + Proceed -> + {_TableSize, State1} = checkready_pushtomem(State), + case roll_memory(State1, State1#state.memtable_maxsize) of + {ok, L0Pend, MSN, TableSize} -> + io:format("Prompted push completed~n"), + {reply, ok, State1#state{levelzero_pending=L0Pend, + table_size=TableSize, + manifest_sqn=MSN, + backlog=false}}; + {pause, Reason, Details} -> + io:format("Excess work due to - " ++ Reason, Details), + {reply, pause, State1#state{backlog=true}} + end; + true -> + {reply, ok, State#state{backlog=false}} + end; +handle_call({manifest_change, WI}, _From, State) -> + {ok, UpdState} = commit_manifest_change(WI, State), + {reply, ok, UpdState}; +handle_call(get_startup_sqn, _From, State) -> + {reply, State#state.ledger_sqn, State}; +handle_call({register_snapshot, Snapshot}, _From, State) -> + Rs = [{Snapshot, State#state.ledger_sqn}|State#state.registered_snapshots], + {reply, + {ok, + State#state.ledger_sqn, + State#state.manifest, + State#state.memtable_copy}, + State#state{registered_snapshots = Rs}}; +handle_call(close, _From, State) -> + {stop, normal, ok, State}. + +handle_cast({update_snapshotcache, Tree, SQN}, State) -> + MemTableC = cache_tree_in_memcopy(State#state.memtable_copy, Tree, SQN), + {noreply, State#state{memtable_copy=MemTableC}}; +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, State) -> + %% When a Penciller shuts down it isn't safe to try an manage the safe + %% finishing of any outstanding work. The last commmitted manifest will + %% be used. + %% + %% Level 0 files lie outside of the manifest, and so if there is no L0 + %% file present it is safe to write the current contents of memory. If + %% there is a L0 file present - then the memory can be dropped (it is + %% recoverable from the ledger, and there should not be a lot to recover + %% as presumably the ETS file has been recently flushed, hence the presence + %% of a L0 file). + %% + %% The penciller should close each file in the unreferenced files, and + %% then each file in the manifest, and cast a close on the clerk. + %% The cast may not succeed as the clerk could be synchronously calling + %% the penciller looking for a manifest commit + %% + leveled_pclerk:clerk_stop(State#state.clerk), + Dump = ets:tab2list(State#state.memtable), + case {State#state.levelzero_pending, + get_item(0, State#state.manifest, []), length(Dump)} of + {?L0PEND_RESET, [], L} when L > 0 -> + MSN = State#state.manifest_sqn + 1, + FileName = State#state.root_path + ++ "/" ++ ?FILES_FP ++ "/" + ++ integer_to_list(MSN) ++ "_0_0", + {ok, + L0Pid, + {{[], []}, _SK, _HK}} = leveled_sft:sft_new(FileName ++ ".pnd", + Dump, + [], + 0), + io:format("Dump of memory on close to filename ~s~n", [FileName]), + leveled_sft:sft_close(L0Pid), + file:rename(FileName ++ ".pnd", FileName ++ ".sft"); + {?L0PEND_RESET, [], L} when L == 0 -> + io:format("No keys to dump from memory when closing~n"); + {{true, L0Pid, _TS}, _, _} -> + leveled_sft:sft_close(L0Pid), + io:format("No opportunity to persist memory before closing " + ++ "with ~w keys discarded~n", [length(Dump)]); + _ -> + io:format("No opportunity to persist memory before closing " + ++ "with ~w keys discarded~n", [length(Dump)]) + end, + ok = close_files(0, State#state.manifest), + lists:foreach(fun({_FN, Pid, _SN}) -> leveled_sft:sft_close(Pid) end, + State#state.unreferenced_files), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%============================================================================ +%%% External functions +%%%============================================================================ + +roll_new_tree(Tree, [], HighSQN) -> + {Tree, HighSQN}; +roll_new_tree(Tree, [{SQN, KVList}|TailIncs], HighSQN) when SQN >= HighSQN -> + UpdTree = lists:foldl(fun({K, V}, TreeAcc) -> + gb_trees:enter(K, V, TreeAcc) end, + Tree, + KVList), + roll_new_tree(UpdTree, TailIncs, SQN). + + +%%%============================================================================ +%%% Internal functions +%%%============================================================================ + +start_from_file(PCLopts) -> RootPath = PCLopts#penciller_options.root_path, MaxTableSize = case PCLopts#penciller_options.max_inmemory_tablesize of undefined -> @@ -331,152 +646,9 @@ init([PCLopts]) -> ledger_sqn=MaxSQN}} end end. - - -handle_call({push_mem, DumpList}, _From, State) -> - StartWatch = os:timestamp(), - Response = case assess_sqn(DumpList) of - {MinSQN, MaxSQN} when MaxSQN >= MinSQN, - MinSQN >= State#state.ledger_sqn -> - io:format("SQN check completed in ~w microseconds~n", - [timer:now_diff(os:timestamp(),StartWatch)]), - case push_to_memory(DumpList, State) of - {ok, UpdState} -> - {reply, ok, UpdState}; - {{pause, Reason, Details}, UpdState} -> - io:format("Excess work due to - " ++ Reason, Details), - {reply, pause, UpdState#state{backlog=true, - ledger_sqn=MaxSQN}} - end; - {MinSQN, MaxSQN} -> - io:format("Mismatch of sequence number expectations with push " - ++ "having sequence numbers between ~w and ~w " - ++ "but current sequence number is ~w~n", - [MinSQN, MaxSQN, State#state.ledger_sqn]), - {reply, refused, State}; - empty -> - io:format("Empty request pushed to Penciller~n"), - {reply, ok, State} - end, - io:format("Push completed in ~w microseconds~n", - [timer:now_diff(os:timestamp(),StartWatch)]), - Response; -handle_call({fetch, Key}, _From, State) -> - {reply, fetch(Key, State#state.manifest, State#state.memtable), State}; -handle_call(work_for_clerk, From, State) -> - {UpdState, Work} = return_work(State, From), - {reply, {Work, UpdState#state.backlog}, UpdState}; -handle_call({confirm_delete, FileName}, _From, State) -> - Reply = confirm_delete(FileName, - State#state.unreferenced_files, - State#state.registered_iterators), - case Reply of - true -> - UF1 = lists:keydelete(FileName, 1, State#state.unreferenced_files), - {reply, true, State#state{unreferenced_files=UF1}}; - _ -> - {reply, Reply, State} - end; -handle_call(prompt_compaction, _From, State) -> - %% If there is a prompt immediately after a L0 async write event then - %% there exists the potential for the prompt to stall the database. - %% Should only accept prompts if there has been a safe wait from the - %% last L0 write event. - Proceed = case State#state.levelzero_pending of - {true, _Pid, TS} -> - TD = timer:now_diff(os:timestamp(),TS), - if - TD < ?PROMPT_WAIT_ONL0 * 1000000 -> false; - true -> true - end; - ?L0PEND_RESET -> - true - end, - if - Proceed -> - case push_to_memory([], State) of - {ok, UpdState} -> - {reply, ok, UpdState#state{backlog=false}}; - {{pause, Reason, Details}, UpdState} -> - io:format("Excess work due to - " ++ Reason, Details), - {reply, pause, UpdState#state{backlog=true}} - end; - true -> - {reply, ok, State#state{backlog=false}} - end; -handle_call({manifest_change, WI}, _From, State) -> - {ok, UpdState} = commit_manifest_change(WI, State), - {reply, ok, UpdState}; -handle_call(get_startup_sqn, _From, State) -> - {reply, State#state.ledger_sqn, State}; -handle_call(close, _From, State) -> - {stop, normal, ok, State}. - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, State) -> - %% When a Penciller shuts down it isn't safe to try an manage the safe - %% finishing of any outstanding work. The last commmitted manifest will - %% be used. - %% - %% Level 0 files lie outside of the manifest, and so if there is no L0 - %% file present it is safe to write the current contents of memory. If - %% there is a L0 file present - then the memory can be dropped (it is - %% recoverable from the ledger, and there should not be a lot to recover - %% as presumably the ETS file has been recently flushed, hence the presence - %% of a L0 file). - %% - %% The penciller should close each file in the unreferenced files, and - %% then each file in the manifest, and cast a close on the clerk. - %% The cast may not succeed as the clerk could be synchronously calling - %% the penciller looking for a manifest commit - %% - leveled_pclerk:clerk_stop(State#state.clerk), - Dump = ets:tab2list(State#state.memtable), - case {State#state.levelzero_pending, - get_item(0, State#state.manifest, []), length(Dump)} of - {?L0PEND_RESET, [], L} when L > 0 -> - MSN = State#state.manifest_sqn + 1, - FileName = State#state.root_path - ++ "/" ++ ?FILES_FP ++ "/" - ++ integer_to_list(MSN) ++ "_0_0", - {ok, - L0Pid, - {{[], []}, _SK, _HK}} = leveled_sft:sft_new(FileName ++ ".pnd", - Dump, - [], - 0), - io:format("Dump of memory on close to filename ~s~n", [FileName]), - leveled_sft:sft_close(L0Pid), - file:rename(FileName ++ ".pnd", FileName ++ ".sft"); - {?L0PEND_RESET, [], L} when L == 0 -> - io:format("No keys to dump from memory when closing~n"); - {{true, L0Pid, _TS}, _, _} -> - leveled_sft:sft_close(L0Pid), - io:format("No opportunity to persist memory before closing " - ++ "with ~w keys discarded~n", [length(Dump)]); - _ -> - io:format("No opportunity to persist memory before closing " - ++ "with ~w keys discarded~n", [length(Dump)]) - end, - ok = close_files(0, State#state.manifest), - lists:foreach(fun({_FN, Pid, _SN}) -> leveled_sft:sft_close(Pid) end, - State#state.unreferenced_files), - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. -%%%============================================================================ -%%% Internal functions -%%%============================================================================ - -push_to_memory(DumpList, State) -> +checkready_pushtomem(State) -> {TableSize, UpdState} = case State#state.levelzero_pending of {true, Pid, _TS} -> %% Need to handle error scenarios? @@ -493,60 +665,63 @@ push_to_memory(DumpList, State) -> State#state.manifest, {0, [ManifestEntry]}), levelzero_pending=?L0PEND_RESET, - levelzero_snapshot=[]}}; + memtable_copy=#l0snapshot{}}}; ?L0PEND_RESET -> {State#state.table_size, State} end, %% Prompt clerk to ask about work - do this for every push_mem ok = leveled_pclerk:clerk_prompt(UpdState#state.clerk, penciller), - - MemoryInsertion = do_push_to_mem(DumpList, - TableSize, - UpdState#state.memtable, - UpdState#state.levelzero_snapshot, - UpdState#state.memtable_maxsize), - - case MemoryInsertion of - {twist, ApproxTableSize, UpdSnapshot} -> - {ok, UpdState#state{table_size=ApproxTableSize, - levelzero_snapshot=UpdSnapshot}}; - {roll, ApproxTableSize, UpdSnapshot} -> - L0 = get_item(0, UpdState#state.manifest, []), - case {L0, manifest_locked(UpdState)} of + {TableSize, UpdState}. + +quickcheck_pushtomem(DumpList, TableSize, MaxSize) -> + case TableSize + length(DumpList) of + ApproxTableSize when ApproxTableSize > MaxSize -> + {maybe_roll, ApproxTableSize}; + ApproxTableSize -> + io:format("Table size is approximately ~w~n", [ApproxTableSize]), + {twist, ApproxTableSize} + end. + +do_pushtomem(DumpList, MemTable, Snapshot, MaxSQN) -> + SW = os:timestamp(), + UpdSnapshot = add_increment_to_memcopy(Snapshot, MaxSQN, DumpList), + ets:insert(MemTable, DumpList), + io:format("Push into memory timed at ~w microseconds~n", + [timer:now_diff(os:timestamp(), SW)]), + UpdSnapshot. + +roll_memory(State, MaxSize) -> + case ets:info(State#state.memtable, size) of + Size when Size > MaxSize -> + L0 = get_item(0, State#state.manifest, []), + case {L0, manifest_locked(State)} of {[], false} -> - MSN = UpdState#state.manifest_sqn + 1, - FileName = UpdState#state.root_path + MSN = State#state.manifest_sqn + 1, + FileName = State#state.root_path ++ "/" ++ ?FILES_FP ++ "/" ++ integer_to_list(MSN) ++ "_0_0", Opts = #sft_options{wait=false}, {ok, L0Pid} = leveled_sft:sft_new(FileName, - UpdState#state.memtable, + State#state.memtable, [], 0, Opts), - {ok, - UpdState#state{levelzero_pending={true, - L0Pid, - os:timestamp()}, - table_size=ApproxTableSize, - manifest_sqn=MSN, - levelzero_snapshot=UpdSnapshot}}; + {ok, {true, L0Pid, os:timestamp()}, MSN, Size}; {[], true} -> - {{pause, - "L0 file write blocked by change at sqn=~w~n", - [UpdState#state.manifest_sqn]}, - UpdState#state{table_size=ApproxTableSize, - levelzero_snapshot=UpdSnapshot}}; + {pause, + "L0 file write blocked by change at sqn=~w~n", + [State#state.manifest_sqn]}; _ -> - {{pause, + {pause, "L0 file write blocked by L0 file in manifest~n", - []}, - UpdState#state{table_size=ApproxTableSize, - levelzero_snapshot=UpdSnapshot}} - end + []} + end; + Size -> + {ok, ?L0PEND_RESET, State#state.manifest_sqn, Size} end. + fetch(Key, Manifest, TID) -> case ets:lookup(TID, Key) of [Object] -> @@ -581,27 +756,6 @@ fetch(Key, Manifest, Level, FetchFun) -> end end. -do_push_to_mem(DumpList, TableSize, MemTable, Snapshot, MaxSize) -> - SW = os:timestamp(), - UpdSnapshot = lists:append(Snapshot, DumpList), - ets:insert(MemTable, DumpList), - io:format("Push into memory timed at ~w microseconds~n", - [timer:now_diff(os:timestamp(), SW)]), - case TableSize + length(DumpList) of - ApproxTableSize when ApproxTableSize > MaxSize -> - case ets:info(MemTable, size) of - ActTableSize when ActTableSize > MaxSize -> - {roll, ActTableSize, UpdSnapshot}; - ActTableSize -> - io:format("Table size is actually ~w~n", [ActTableSize]), - {twist, ActTableSize, UpdSnapshot} - end; - ApproxTableSize -> - io:format("Table size is approximately ~w~n", [ApproxTableSize]), - {twist, ApproxTableSize, UpdSnapshot} - end. - - %% Manifest lock - don't have two changes to the manifest happening %% concurrently @@ -675,6 +829,32 @@ return_work(State, From) -> {State, none} end. +%% Update the memtable copy if the tree created advances the SQN +cache_tree_in_memcopy(MemCopy, Tree, SQN) -> + case MemCopy#l0snapshot.ledger_sqn of + CurrentSQN when SQN > CurrentSQN -> + % Discard any merged increments + io:format("Updating cache with new tree at SQN=~w~n", [SQN]), + Incs = lists:foldl(fun({PushSQN, PushL}, Acc) -> + if + SQN >= PushSQN -> + Acc; + true -> + Acc ++ {PushSQN, PushL} + end end, + [], + MemCopy#l0snapshot.increments), + #l0snapshot{ledger_sqn = SQN, + increments = Incs, + tree = Tree}; + _ -> + MemCopy + end. + +add_increment_to_memcopy(MemCopy, SQN, KVList) -> + Incs = MemCopy#l0snapshot.increments ++ [{SQN, KVList}], + MemCopy#l0snapshot{increments=Incs}. + close_files(?MAX_LEVELS - 1, _Manifest) -> ok; @@ -819,14 +999,14 @@ update_deletions([ClearedFile|Tail], MSN, UnreferencedFiles) -> ClearedFile#manifest_entry.owner, MSN}])). -confirm_delete(Filename, UnreferencedFiles, RegisteredIterators) -> +confirm_delete(Filename, UnreferencedFiles, RegisteredSnapshots) -> case lists:keyfind(Filename, 1, UnreferencedFiles) of false -> false; {Filename, _Pid, MSN} -> LowSQN = lists:foldl(fun({_, SQN}, MinSQN) -> min(SQN, MinSQN) end, infinity, - RegisteredIterators), + RegisteredSnapshots), if MSN >= LowSQN -> false; @@ -984,4 +1164,23 @@ simple_server_test() -> ok = pcl_close(PCLr), clean_testdir(RootPath). +memcopy_test() -> + KVL1 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X), + "Value" ++ integer_to_list(X) ++ "A"} end, + lists:seq(1, 1000)), + KVL2 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X), + "Value" ++ integer_to_list(X) ++ "B"} end, + lists:seq(1001, 2000)), + KVL3 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X), + "Value" ++ integer_to_list(X) ++ "C"} end, + lists:seq(1, 1000)), + MemCopy0 = #l0snapshot{}, + MemCopy1 = add_increment_to_memcopy(MemCopy0, 1000, KVL1), + MemCopy2 = add_increment_to_memcopy(MemCopy1, 2000, KVL2), + MemCopy3 = add_increment_to_memcopy(MemCopy2, 3000, KVL3), + {Tree1, HighSQN1} = roll_new_tree(gb_trees:empty(), MemCopy3#l0snapshot.increments, 0), + Size1 = gb_trees:size(Tree1), + ?assertMatch(2000, Size1), + ?assertMatch(3000, HighSQN1). + -endif. \ No newline at end of file