Refactor Penciller Push

Two aspects of pushing to the penciller have been refactored:
1 - Allow the penciller to respond before the ETS table has been updated
to unlock the Bookie sooner.
2 - Change the way the copy of the memtable is stored to work more
effectively with snapshots wihtout locking the Penciller any further on
a snapshot or push request
This commit is contained in:
martinsumner 2016-09-21 18:31:42 +01:00
parent 66d6db4e11
commit d3e985ed80
5 changed files with 433 additions and 211 deletions

Binary file not shown.

View file

@ -30,6 +30,7 @@
-record(penciller_options,
{root_path :: string(),
penciller :: pid(),
max_inmemory_tablesize :: integer()}).
-record(bookie_options,

View file

@ -19,7 +19,8 @@
-include_lib("eunit/include/eunit.hrl").
-define(KEYS_TO_CHECK, 100).
-define(BATCH_SIZE, 16)
-define(BATCHES_TO_CHECK, 8).
-record(state, {owner :: pid()}).
@ -81,6 +82,27 @@ journal_compact(_InkerManifest, _Penciller, _Timeout, _Owner) ->
check_all_files(_InkerManifest) ->
ok.
check_single_file(CDB, _PencilSnapshot, SampleSize, BatchSize) ->
PositionList = leveled_cdb:cdb_getpositions(CDB, SampleSize),
KeySizeList = fetch_inbatches(PositionList, BatchSize, CDB, []),
KeySizeList.
%% TODO:
%% Need to check the penciller snapshot to see if these keys are at the
%% right sequence number
%%
%% The calculate the proportion (by value size) of the CDB which is at the
%% wrong sequence number to help determine eligibility for compaction
%%
%% BIG TODO:
%% Need to snapshot a penciller
fetch_inbatches([], _BatchSize, _CDB, CheckedList) ->
CheckedList;
fetch_inbatches(PositionList, BatchSize, CDB, CheckedList) ->
{Batch, Tail} = lists:split(BatchSize, PositionList),
KL_List = leveled_cdb:direct_fetch(CDB, Batch, key_size),
fetch_inbatches(Tail, BatchSize, CDB, CheckedList ++ KL_List).
window_closed(_Timeout) ->
true.

View file

@ -59,12 +59,12 @@ handle_cast(stop, State) ->
{stop, normal, State}.
handle_info(timeout, State) ->
%% The pcl prompt will cause a penciller_prompt, to re-trigger timeout
case leveled_penciller:pcl_prompt(State#state.owner) of
ok ->
{noreply, State};
Timeout = requestandhandle_work(State),
{noreply, State, Timeout};
pause ->
{noreply, State}
{noreply, State, ?INACTIVITY_TIMEOUT}
end;
handle_info(_Info, State) ->
{noreply, State}.

View file

@ -86,6 +86,10 @@
%% files valid at the point of the snapshot until either the iterator is
%% completed or has timed out.
%%
%% Snapshot requests may request a filtered view of the ETS table (whihc may
%% be quicker than requesting the full table), or requets a snapshot of only
%% the persisted part of the Ledger
%%
%% ---------- ON STARTUP ----------
%%
%% On Startup the Bookie with ask the Penciller to initiate the Ledger first.
@ -146,6 +150,66 @@
%% table and build a new table starting with the remainder, and the keys from
%% the latest push.
%%
%% ---------- NOTES ON THE USE OF ETS ----------
%%
%% Insertion into ETS is very fast, and so using ETS does not slow the PUT
%% path. However, an ETS table is mutable, so it does complicate the
%% snapshotting of the Ledger.
%%
%% Some alternatives have been considered:
%%
%% A1 - Use gb_trees not ETS table
%% * Speed of inserts are too slow especially as the Bookie is blocked until
%% the insert is complete. Inserting 32K very simple keys takes 250ms. Only
%% the naive commands can be used, as Keys may be present - so not easy to
%% optimise. There is a lack of bulk operations
%%
%% A2 - Use some other structure other than gb_trees or ETS tables
%% * There is nothing else that will support iterators, so snapshots would
%% either need to do a conversion when they request the snapshot if
%% they need to iterate, or iterate through map functions scanning all the
%% keys. The conversion may not be expensive, as we know loading into an ETS
%% table is fast - but there may be some hidden overheads with creating and
%5 destroying many ETS tables.
%%
%% A3 - keep a parallel list of lists of things that have gone in the ETS
%% table in the format they arrived in
%% * There is doubling up of memory, and the snapshot must do some work to
%% make use of these lists. This combines the continued use of fast ETS
%% with the solution of A2 at a memory cost.
%%
%% A4 - Try and cache the conversion to be shared between snapshots registered
%% at the same Ledger SQN
%% * This is a rif on A2/A3, but if generally there is o(10) or o(100) seconds
%% between memory pushes, but much more frequent snapshots this may be more
%% efficient
%%
%% A5 - Produce a specific snapshot of the ETS table via an iterator on demand
%% for each snapshot
%% * So if a snapshot was required for na iterator, the Penciller would block
%% whilst it iterated over the ETS table first to produce a snapshot-specific
%% immutbale view. If the snapshot was required for a long-lived complete view
%% of the database the Penciller would block for a tab2list.
%%
%% A6 - Have snapshots incrementally create and share immutable trees, from a
%% parallel cache of changes
%% * This is a variance on A3. As changes are pushed to the Penciller in the
%% form of lists the Penciller updates a cache of the lists that are contained
%% in the current ETS table. These lists are returned to the snapshot when
%% the snapshot is registered. All snapshots it is assumed will convert these
%% lists into a gb_tree to use, but following that conversion they can cast
%% to the Penciller to refine the cache, so that the cache will become a
%% gb_tree up the ledger SQN at which the snapshot is registered, and now only
%% store new lists for subsequent updates. Future snapshot requests (before
%% the ets table is flushed) will now receive the array (if no changes have)
%% been made, or the array and only the lists needed to incrementally change
%% the array. If changes are infrequent, each snapshot request will pay the
%% full 20ms to 250ms cost of producing the array (although perhaps the clerk
%% could also update periodiclaly to avoid this). If changes are frequent,
%% the snapshot will generally not require to do a conversion, or will only
%% be required to do a small conversion
%%
%% A6 is the preferred option
-module(leveled_penciller).
@ -169,7 +233,10 @@
pcl_confirmdelete/2,
pcl_prompt/1,
pcl_close/1,
pcl_registersnapshot/2,
pcl_updatesnapshotcache/3,
pcl_getstartupsequencenumber/1,
roll_new_tree/3,
clean_testdir/1]).
-include_lib("eunit/include/eunit.hrl").
@ -187,22 +254,27 @@
-define(PROMPT_WAIT_ONL0, 5).
-define(L0PEND_RESET, {false, null, null}).
-record(l0snapshot, {increments = [] :: list,
tree = gb_trees:empty() :: gb_trees:tree(),
ledger_sqn = 0 :: integer()}).
-record(state, {manifest = [] :: list(),
ongoing_work = [] :: list(),
manifest_sqn = 0 :: integer(),
ledger_sqn = 0 :: integer(),
registered_iterators = [] :: list(),
registered_snapshots = [] :: list(),
unreferenced_files = [] :: list(),
root_path = "../test" :: string(),
table_size = 0 :: integer(),
clerk :: pid(),
levelzero_pending = ?L0PEND_RESET :: tuple(),
levelzero_snapshot = [] :: list(),
memtable_copy = #l0snapshot{} :: #l0snapshot{},
memtable,
backlog = false :: boolean(),
memtable_maxsize :: integer()}).
%%%============================================================================
%%% API
%%%============================================================================
@ -227,22 +299,265 @@ pcl_requestmanifestchange(Pid, WorkItem) ->
gen_server:call(Pid, {manifest_change, WorkItem}, infinity).
pcl_confirmdelete(Pid, FileName) ->
gen_server:call(Pid, {confirm_delete, FileName}).
gen_server:call(Pid, {confirm_delete, FileName}, infinity).
pcl_prompt(Pid) ->
gen_server:call(Pid, prompt_compaction).
gen_server:call(Pid, prompt_compaction, infinity).
pcl_getstartupsequencenumber(Pid) ->
gen_server:call(Pid, get_startup_sqn).
gen_server:call(Pid, get_startup_sqn, infinity).
pcl_registersnapshot(Pid, Snapshot) ->
gen_server:call(Pid, {register_snapshot, Snapshot}, infinity).
pcl_updatesnapshotcache(Pid, Tree, SQN) ->
gen_server:cast(Pid, {update_snapshotcache, Tree, SQN}).
pcl_close(Pid) ->
gen_server:call(Pid, close).
%%%============================================================================
%%% gen_server callbacks
%%%============================================================================
init([PCLopts]) ->
case PCLopts#penciller_options.root_path of
undefined ->
{ok, #state{}};
_RootPath ->
start_from_file(PCLopts)
end.
handle_call({push_mem, DumpList}, From, State0) ->
% The process for pushing to memory is as follows
% - Check that the inbound list does not contain any Keys with a lower
% sequence number than any existing keys (assess_sqn/1)
% - Check that any file that had been sent to be written to L0 previously
% is now completed. If it is wipe out the in-memory view as this is now
% safely persisted. This will block waiting for this to complete if it
% hasn't (checkready_pushmem/1).
% - Quick check to see if there is a need to write a L0 file
% (quickcheck_pushmem/3). If there clearly isn't, then we can reply, and
% then add to memory in the background before updating the loop state
% - Push the update into memory (do_pushtomem/3)
% - If we haven't got through quickcheck now need to check if there is a
% definite need to write a new L0 file (roll_memory/2). If all clear this
% will write the file in the background and allow a response to the user.
% If not the change has still been made but the the L0 file will not have
% been prompted - so the reply does not indicate failure but returns the
% atom 'pause' to signal a loose desire for back-pressure to be applied.
% The only reason in this case why there should be a pause is if the
% manifest is locked pending completion of a manifest change - so reacting
% to the pause signal may not be sensible
StartWatch = os:timestamp(),
case assess_sqn(DumpList) of
{MinSQN, MaxSQN} when MaxSQN >= MinSQN,
MinSQN >= State0#state.ledger_sqn ->
MaxTableSize = State0#state.memtable_maxsize,
{TableSize0, State1} = checkready_pushtomem(State0),
case quickcheck_pushtomem(DumpList,
TableSize0,
MaxTableSize) of
{twist, TableSize1} ->
gen_server:reply(From, ok),
io:format("Reply made on push in ~w microseconds~n",
[timer:now_diff(os:timestamp(), StartWatch)]),
L0Snap = do_pushtomem(DumpList,
State1#state.memtable,
State1#state.memtable_copy,
MaxSQN),
io:format("Push completed in ~w microseconds~n",
[timer:now_diff(os:timestamp(), StartWatch)]),
{noreply,
State1#state{memtable_copy=L0Snap,
table_size=TableSize1,
ledger_sqn=MaxSQN}};
{maybe_roll, TableSize1} ->
L0Snap = do_pushtomem(DumpList,
State1#state.memtable,
State1#state.memtable_copy,
MaxSQN),
case roll_memory(State1, MaxTableSize) of
{ok, L0Pend, ManSN, TableSize2} ->
io:format("Push completed in ~w microseconds~n",
[timer:now_diff(os:timestamp(), StartWatch)]),
{reply,
ok,
State1#state{levelzero_pending=L0Pend,
table_size=TableSize2,
manifest_sqn=ManSN,
memtable_copy=L0Snap,
ledger_sqn=MaxSQN,
backlog=false}};
{pause, Reason, Details} ->
io:format("Excess work due to - " ++ Reason,
Details),
{reply,
pause,
State1#state{backlog=true,
memtable_copy=L0Snap,
table_size=TableSize1,
ledger_sqn=MaxSQN}}
end
end;
{MinSQN, MaxSQN} ->
io:format("Mismatch of sequence number expectations with push "
++ "having sequence numbers between ~w and ~w "
++ "but current sequence number is ~w~n",
[MinSQN, MaxSQN, State0#state.ledger_sqn]),
{reply, refused, State0};
empty ->
io:format("Empty request pushed to Penciller~n"),
{reply, ok, State0}
end;
handle_call({fetch, Key}, _From, State) ->
{reply, fetch(Key, State#state.manifest, State#state.memtable), State};
handle_call(work_for_clerk, From, State) ->
{UpdState, Work} = return_work(State, From),
{reply, {Work, UpdState#state.backlog}, UpdState};
handle_call({confirm_delete, FileName}, _From, State) ->
Reply = confirm_delete(FileName,
State#state.unreferenced_files,
State#state.registered_snapshots),
case Reply of
true ->
UF1 = lists:keydelete(FileName, 1, State#state.unreferenced_files),
{reply, true, State#state{unreferenced_files=UF1}};
_ ->
{reply, Reply, State}
end;
handle_call(prompt_compaction, _From, State) ->
%% If there is a prompt immediately after a L0 async write event then
%% there exists the potential for the prompt to stall the database.
%% Should only accept prompts if there has been a safe wait from the
%% last L0 write event.
Proceed = case State#state.levelzero_pending of
{true, _Pid, TS} ->
TD = timer:now_diff(os:timestamp(),TS),
if
TD < ?PROMPT_WAIT_ONL0 * 1000000 -> false;
true -> true
end;
?L0PEND_RESET ->
true
end,
if
Proceed ->
{_TableSize, State1} = checkready_pushtomem(State),
case roll_memory(State1, State1#state.memtable_maxsize) of
{ok, L0Pend, MSN, TableSize} ->
io:format("Prompted push completed~n"),
{reply, ok, State1#state{levelzero_pending=L0Pend,
table_size=TableSize,
manifest_sqn=MSN,
backlog=false}};
{pause, Reason, Details} ->
io:format("Excess work due to - " ++ Reason, Details),
{reply, pause, State1#state{backlog=true}}
end;
true ->
{reply, ok, State#state{backlog=false}}
end;
handle_call({manifest_change, WI}, _From, State) ->
{ok, UpdState} = commit_manifest_change(WI, State),
{reply, ok, UpdState};
handle_call(get_startup_sqn, _From, State) ->
{reply, State#state.ledger_sqn, State};
handle_call({register_snapshot, Snapshot}, _From, State) ->
Rs = [{Snapshot, State#state.ledger_sqn}|State#state.registered_snapshots],
{reply,
{ok,
State#state.ledger_sqn,
State#state.manifest,
State#state.memtable_copy},
State#state{registered_snapshots = Rs}};
handle_call(close, _From, State) ->
{stop, normal, ok, State}.
handle_cast({update_snapshotcache, Tree, SQN}, State) ->
MemTableC = cache_tree_in_memcopy(State#state.memtable_copy, Tree, SQN),
{noreply, State#state{memtable_copy=MemTableC}};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, State) ->
%% When a Penciller shuts down it isn't safe to try an manage the safe
%% finishing of any outstanding work. The last commmitted manifest will
%% be used.
%%
%% Level 0 files lie outside of the manifest, and so if there is no L0
%% file present it is safe to write the current contents of memory. If
%% there is a L0 file present - then the memory can be dropped (it is
%% recoverable from the ledger, and there should not be a lot to recover
%% as presumably the ETS file has been recently flushed, hence the presence
%% of a L0 file).
%%
%% The penciller should close each file in the unreferenced files, and
%% then each file in the manifest, and cast a close on the clerk.
%% The cast may not succeed as the clerk could be synchronously calling
%% the penciller looking for a manifest commit
%%
leveled_pclerk:clerk_stop(State#state.clerk),
Dump = ets:tab2list(State#state.memtable),
case {State#state.levelzero_pending,
get_item(0, State#state.manifest, []), length(Dump)} of
{?L0PEND_RESET, [], L} when L > 0 ->
MSN = State#state.manifest_sqn + 1,
FileName = State#state.root_path
++ "/" ++ ?FILES_FP ++ "/"
++ integer_to_list(MSN) ++ "_0_0",
{ok,
L0Pid,
{{[], []}, _SK, _HK}} = leveled_sft:sft_new(FileName ++ ".pnd",
Dump,
[],
0),
io:format("Dump of memory on close to filename ~s~n", [FileName]),
leveled_sft:sft_close(L0Pid),
file:rename(FileName ++ ".pnd", FileName ++ ".sft");
{?L0PEND_RESET, [], L} when L == 0 ->
io:format("No keys to dump from memory when closing~n");
{{true, L0Pid, _TS}, _, _} ->
leveled_sft:sft_close(L0Pid),
io:format("No opportunity to persist memory before closing "
++ "with ~w keys discarded~n", [length(Dump)]);
_ ->
io:format("No opportunity to persist memory before closing "
++ "with ~w keys discarded~n", [length(Dump)])
end,
ok = close_files(0, State#state.manifest),
lists:foreach(fun({_FN, Pid, _SN}) -> leveled_sft:sft_close(Pid) end,
State#state.unreferenced_files),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%============================================================================
%%% External functions
%%%============================================================================
roll_new_tree(Tree, [], HighSQN) ->
{Tree, HighSQN};
roll_new_tree(Tree, [{SQN, KVList}|TailIncs], HighSQN) when SQN >= HighSQN ->
UpdTree = lists:foldl(fun({K, V}, TreeAcc) ->
gb_trees:enter(K, V, TreeAcc) end,
Tree,
KVList),
roll_new_tree(UpdTree, TailIncs, SQN).
%%%============================================================================
%%% Internal functions
%%%============================================================================
start_from_file(PCLopts) ->
RootPath = PCLopts#penciller_options.root_path,
MaxTableSize = case PCLopts#penciller_options.max_inmemory_tablesize of
undefined ->
@ -333,150 +648,7 @@ init([PCLopts]) ->
end.
handle_call({push_mem, DumpList}, _From, State) ->
StartWatch = os:timestamp(),
Response = case assess_sqn(DumpList) of
{MinSQN, MaxSQN} when MaxSQN >= MinSQN,
MinSQN >= State#state.ledger_sqn ->
io:format("SQN check completed in ~w microseconds~n",
[timer:now_diff(os:timestamp(),StartWatch)]),
case push_to_memory(DumpList, State) of
{ok, UpdState} ->
{reply, ok, UpdState};
{{pause, Reason, Details}, UpdState} ->
io:format("Excess work due to - " ++ Reason, Details),
{reply, pause, UpdState#state{backlog=true,
ledger_sqn=MaxSQN}}
end;
{MinSQN, MaxSQN} ->
io:format("Mismatch of sequence number expectations with push "
++ "having sequence numbers between ~w and ~w "
++ "but current sequence number is ~w~n",
[MinSQN, MaxSQN, State#state.ledger_sqn]),
{reply, refused, State};
empty ->
io:format("Empty request pushed to Penciller~n"),
{reply, ok, State}
end,
io:format("Push completed in ~w microseconds~n",
[timer:now_diff(os:timestamp(),StartWatch)]),
Response;
handle_call({fetch, Key}, _From, State) ->
{reply, fetch(Key, State#state.manifest, State#state.memtable), State};
handle_call(work_for_clerk, From, State) ->
{UpdState, Work} = return_work(State, From),
{reply, {Work, UpdState#state.backlog}, UpdState};
handle_call({confirm_delete, FileName}, _From, State) ->
Reply = confirm_delete(FileName,
State#state.unreferenced_files,
State#state.registered_iterators),
case Reply of
true ->
UF1 = lists:keydelete(FileName, 1, State#state.unreferenced_files),
{reply, true, State#state{unreferenced_files=UF1}};
_ ->
{reply, Reply, State}
end;
handle_call(prompt_compaction, _From, State) ->
%% If there is a prompt immediately after a L0 async write event then
%% there exists the potential for the prompt to stall the database.
%% Should only accept prompts if there has been a safe wait from the
%% last L0 write event.
Proceed = case State#state.levelzero_pending of
{true, _Pid, TS} ->
TD = timer:now_diff(os:timestamp(),TS),
if
TD < ?PROMPT_WAIT_ONL0 * 1000000 -> false;
true -> true
end;
?L0PEND_RESET ->
true
end,
if
Proceed ->
case push_to_memory([], State) of
{ok, UpdState} ->
{reply, ok, UpdState#state{backlog=false}};
{{pause, Reason, Details}, UpdState} ->
io:format("Excess work due to - " ++ Reason, Details),
{reply, pause, UpdState#state{backlog=true}}
end;
true ->
{reply, ok, State#state{backlog=false}}
end;
handle_call({manifest_change, WI}, _From, State) ->
{ok, UpdState} = commit_manifest_change(WI, State),
{reply, ok, UpdState};
handle_call(get_startup_sqn, _From, State) ->
{reply, State#state.ledger_sqn, State};
handle_call(close, _From, State) ->
{stop, normal, ok, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, State) ->
%% When a Penciller shuts down it isn't safe to try an manage the safe
%% finishing of any outstanding work. The last commmitted manifest will
%% be used.
%%
%% Level 0 files lie outside of the manifest, and so if there is no L0
%% file present it is safe to write the current contents of memory. If
%% there is a L0 file present - then the memory can be dropped (it is
%% recoverable from the ledger, and there should not be a lot to recover
%% as presumably the ETS file has been recently flushed, hence the presence
%% of a L0 file).
%%
%% The penciller should close each file in the unreferenced files, and
%% then each file in the manifest, and cast a close on the clerk.
%% The cast may not succeed as the clerk could be synchronously calling
%% the penciller looking for a manifest commit
%%
leveled_pclerk:clerk_stop(State#state.clerk),
Dump = ets:tab2list(State#state.memtable),
case {State#state.levelzero_pending,
get_item(0, State#state.manifest, []), length(Dump)} of
{?L0PEND_RESET, [], L} when L > 0 ->
MSN = State#state.manifest_sqn + 1,
FileName = State#state.root_path
++ "/" ++ ?FILES_FP ++ "/"
++ integer_to_list(MSN) ++ "_0_0",
{ok,
L0Pid,
{{[], []}, _SK, _HK}} = leveled_sft:sft_new(FileName ++ ".pnd",
Dump,
[],
0),
io:format("Dump of memory on close to filename ~s~n", [FileName]),
leveled_sft:sft_close(L0Pid),
file:rename(FileName ++ ".pnd", FileName ++ ".sft");
{?L0PEND_RESET, [], L} when L == 0 ->
io:format("No keys to dump from memory when closing~n");
{{true, L0Pid, _TS}, _, _} ->
leveled_sft:sft_close(L0Pid),
io:format("No opportunity to persist memory before closing "
++ "with ~w keys discarded~n", [length(Dump)]);
_ ->
io:format("No opportunity to persist memory before closing "
++ "with ~w keys discarded~n", [length(Dump)])
end,
ok = close_files(0, State#state.manifest),
lists:foreach(fun({_FN, Pid, _SN}) -> leveled_sft:sft_close(Pid) end,
State#state.unreferenced_files),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%============================================================================
%%% Internal functions
%%%============================================================================
push_to_memory(DumpList, State) ->
checkready_pushtomem(State) ->
{TableSize, UpdState} = case State#state.levelzero_pending of
{true, Pid, _TS} ->
%% Need to handle error scenarios?
@ -493,60 +665,63 @@ push_to_memory(DumpList, State) ->
State#state.manifest,
{0, [ManifestEntry]}),
levelzero_pending=?L0PEND_RESET,
levelzero_snapshot=[]}};
memtable_copy=#l0snapshot{}}};
?L0PEND_RESET ->
{State#state.table_size, State}
end,
%% Prompt clerk to ask about work - do this for every push_mem
ok = leveled_pclerk:clerk_prompt(UpdState#state.clerk, penciller),
{TableSize, UpdState}.
MemoryInsertion = do_push_to_mem(DumpList,
TableSize,
UpdState#state.memtable,
UpdState#state.levelzero_snapshot,
UpdState#state.memtable_maxsize),
quickcheck_pushtomem(DumpList, TableSize, MaxSize) ->
case TableSize + length(DumpList) of
ApproxTableSize when ApproxTableSize > MaxSize ->
{maybe_roll, ApproxTableSize};
ApproxTableSize ->
io:format("Table size is approximately ~w~n", [ApproxTableSize]),
{twist, ApproxTableSize}
end.
case MemoryInsertion of
{twist, ApproxTableSize, UpdSnapshot} ->
{ok, UpdState#state{table_size=ApproxTableSize,
levelzero_snapshot=UpdSnapshot}};
{roll, ApproxTableSize, UpdSnapshot} ->
L0 = get_item(0, UpdState#state.manifest, []),
case {L0, manifest_locked(UpdState)} of
do_pushtomem(DumpList, MemTable, Snapshot, MaxSQN) ->
SW = os:timestamp(),
UpdSnapshot = add_increment_to_memcopy(Snapshot, MaxSQN, DumpList),
ets:insert(MemTable, DumpList),
io:format("Push into memory timed at ~w microseconds~n",
[timer:now_diff(os:timestamp(), SW)]),
UpdSnapshot.
roll_memory(State, MaxSize) ->
case ets:info(State#state.memtable, size) of
Size when Size > MaxSize ->
L0 = get_item(0, State#state.manifest, []),
case {L0, manifest_locked(State)} of
{[], false} ->
MSN = UpdState#state.manifest_sqn + 1,
FileName = UpdState#state.root_path
MSN = State#state.manifest_sqn + 1,
FileName = State#state.root_path
++ "/" ++ ?FILES_FP ++ "/"
++ integer_to_list(MSN) ++ "_0_0",
Opts = #sft_options{wait=false},
{ok, L0Pid} = leveled_sft:sft_new(FileName,
UpdState#state.memtable,
State#state.memtable,
[],
0,
Opts),
{ok,
UpdState#state{levelzero_pending={true,
L0Pid,
os:timestamp()},
table_size=ApproxTableSize,
manifest_sqn=MSN,
levelzero_snapshot=UpdSnapshot}};
{ok, {true, L0Pid, os:timestamp()}, MSN, Size};
{[], true} ->
{{pause,
{pause,
"L0 file write blocked by change at sqn=~w~n",
[UpdState#state.manifest_sqn]},
UpdState#state{table_size=ApproxTableSize,
levelzero_snapshot=UpdSnapshot}};
[State#state.manifest_sqn]};
_ ->
{{pause,
{pause,
"L0 file write blocked by L0 file in manifest~n",
[]},
UpdState#state{table_size=ApproxTableSize,
levelzero_snapshot=UpdSnapshot}}
end
[]}
end;
Size ->
{ok, ?L0PEND_RESET, State#state.manifest_sqn, Size}
end.
fetch(Key, Manifest, TID) ->
case ets:lookup(TID, Key) of
[Object] ->
@ -581,27 +756,6 @@ fetch(Key, Manifest, Level, FetchFun) ->
end
end.
do_push_to_mem(DumpList, TableSize, MemTable, Snapshot, MaxSize) ->
SW = os:timestamp(),
UpdSnapshot = lists:append(Snapshot, DumpList),
ets:insert(MemTable, DumpList),
io:format("Push into memory timed at ~w microseconds~n",
[timer:now_diff(os:timestamp(), SW)]),
case TableSize + length(DumpList) of
ApproxTableSize when ApproxTableSize > MaxSize ->
case ets:info(MemTable, size) of
ActTableSize when ActTableSize > MaxSize ->
{roll, ActTableSize, UpdSnapshot};
ActTableSize ->
io:format("Table size is actually ~w~n", [ActTableSize]),
{twist, ActTableSize, UpdSnapshot}
end;
ApproxTableSize ->
io:format("Table size is approximately ~w~n", [ApproxTableSize]),
{twist, ApproxTableSize, UpdSnapshot}
end.
%% Manifest lock - don't have two changes to the manifest happening
%% concurrently
@ -675,6 +829,32 @@ return_work(State, From) ->
{State, none}
end.
%% Update the memtable copy if the tree created advances the SQN
cache_tree_in_memcopy(MemCopy, Tree, SQN) ->
case MemCopy#l0snapshot.ledger_sqn of
CurrentSQN when SQN > CurrentSQN ->
% Discard any merged increments
io:format("Updating cache with new tree at SQN=~w~n", [SQN]),
Incs = lists:foldl(fun({PushSQN, PushL}, Acc) ->
if
SQN >= PushSQN ->
Acc;
true ->
Acc ++ {PushSQN, PushL}
end end,
[],
MemCopy#l0snapshot.increments),
#l0snapshot{ledger_sqn = SQN,
increments = Incs,
tree = Tree};
_ ->
MemCopy
end.
add_increment_to_memcopy(MemCopy, SQN, KVList) ->
Incs = MemCopy#l0snapshot.increments ++ [{SQN, KVList}],
MemCopy#l0snapshot{increments=Incs}.
close_files(?MAX_LEVELS - 1, _Manifest) ->
ok;
@ -819,14 +999,14 @@ update_deletions([ClearedFile|Tail], MSN, UnreferencedFiles) ->
ClearedFile#manifest_entry.owner,
MSN}])).
confirm_delete(Filename, UnreferencedFiles, RegisteredIterators) ->
confirm_delete(Filename, UnreferencedFiles, RegisteredSnapshots) ->
case lists:keyfind(Filename, 1, UnreferencedFiles) of
false ->
false;
{Filename, _Pid, MSN} ->
LowSQN = lists:foldl(fun({_, SQN}, MinSQN) -> min(SQN, MinSQN) end,
infinity,
RegisteredIterators),
RegisteredSnapshots),
if
MSN >= LowSQN ->
false;
@ -984,4 +1164,23 @@ simple_server_test() ->
ok = pcl_close(PCLr),
clean_testdir(RootPath).
memcopy_test() ->
KVL1 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X),
"Value" ++ integer_to_list(X) ++ "A"} end,
lists:seq(1, 1000)),
KVL2 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X),
"Value" ++ integer_to_list(X) ++ "B"} end,
lists:seq(1001, 2000)),
KVL3 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X),
"Value" ++ integer_to_list(X) ++ "C"} end,
lists:seq(1, 1000)),
MemCopy0 = #l0snapshot{},
MemCopy1 = add_increment_to_memcopy(MemCopy0, 1000, KVL1),
MemCopy2 = add_increment_to_memcopy(MemCopy1, 2000, KVL2),
MemCopy3 = add_increment_to_memcopy(MemCopy2, 3000, KVL3),
{Tree1, HighSQN1} = roll_new_tree(gb_trees:empty(), MemCopy3#l0snapshot.increments, 0),
Size1 = gb_trees:size(Tree1),
?assertMatch(2000, Size1),
?assertMatch(3000, HighSQN1).
-endif.