Snapshot testing
Work to test the checking of sequence numbers in snapshots as required by the inkers clerk to calculate the percentage of a file which is compactable
This commit is contained in:
parent
c64d67d9fb
commit
e2bb09b873
3 changed files with 311 additions and 131 deletions
|
@ -12,16 +12,19 @@
|
||||||
handle_info/2,
|
handle_info/2,
|
||||||
terminate/2,
|
terminate/2,
|
||||||
clerk_new/1,
|
clerk_new/1,
|
||||||
|
clerk_compact/5,
|
||||||
clerk_remove/2,
|
clerk_remove/2,
|
||||||
clerk_stop/1,
|
clerk_stop/1,
|
||||||
code_change/3]).
|
code_change/3]).
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
-define(BATCH_SIZE, 16)
|
-define(SAMPLE_SIZE, 200).
|
||||||
|
-define(BATCH_SIZE, 16).
|
||||||
-define(BATCHES_TO_CHECK, 8).
|
-define(BATCHES_TO_CHECK, 8).
|
||||||
|
|
||||||
-record(state, {owner :: pid()}).
|
-record(state, {owner :: pid(),
|
||||||
|
penciller_snapshot :: pid()}).
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
|
@ -36,6 +39,9 @@ clerk_remove(Pid, Removals) ->
|
||||||
gen_server:cast(Pid, {remove, Removals}),
|
gen_server:cast(Pid, {remove, Removals}),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
clerk_compact(Pid, Manifest, ManifestSQN, Penciller, Timeout) ->
|
||||||
|
gen_server:cast(Pid, {compact, Manifest, ManifestSQN, Penciller, Timeout}).
|
||||||
|
|
||||||
clerk_stop(Pid) ->
|
clerk_stop(Pid) ->
|
||||||
gen_server:cast(Pid, stop).
|
gen_server:cast(Pid, stop).
|
||||||
|
|
||||||
|
@ -49,6 +55,15 @@ init([]) ->
|
||||||
handle_call({register, Owner}, _From, State) ->
|
handle_call({register, Owner}, _From, State) ->
|
||||||
{reply, ok, State#state{owner=Owner}}.
|
{reply, ok, State#state{owner=Owner}}.
|
||||||
|
|
||||||
|
handle_cast({compact, Manifest, _ManifestSQN, Penciller, _Timeout}, State) ->
|
||||||
|
PclOpts = #penciller_options{start_snapshot = true,
|
||||||
|
source_penciller = Penciller,
|
||||||
|
requestor = self()},
|
||||||
|
PclSnap = leveled_penciller:pcl_start(PclOpts),
|
||||||
|
ok = leveled_penciller:pcl_loadsnapshot(PclSnap, []),
|
||||||
|
_CandidateList = scan_all_files(Manifest, PclSnap),
|
||||||
|
%% TODO - Lots
|
||||||
|
{noreply, State};
|
||||||
handle_cast({remove, _Removals}, State) ->
|
handle_cast({remove, _Removals}, State) ->
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
handle_cast(stop, State) ->
|
handle_cast(stop, State) ->
|
||||||
|
@ -69,20 +84,38 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
|
|
||||||
check_single_file(CDB, _PencilSnapshot, SampleSize, BatchSize) ->
|
check_single_file(CDB, PclSnap, SampleSize, BatchSize) ->
|
||||||
PositionList = leveled_cdb:cdb_getpositions(CDB, SampleSize),
|
PositionList = leveled_cdb:cdb_getpositions(CDB, SampleSize),
|
||||||
KeySizeList = fetch_inbatches(PositionList, BatchSize, CDB, []),
|
KeySizeList = fetch_inbatches(PositionList, BatchSize, CDB, []),
|
||||||
KeySizeList.
|
R0 = lists:foldl(fun(KS, {ActSize, RplSize}) ->
|
||||||
%% TODO:
|
{{PK, SQN}, Size} = KS,
|
||||||
%% Need to check the penciller snapshot to see if these keys are at the
|
Chk = leveled_pcl:pcl_checksequencenumber(PclSnap,
|
||||||
%% right sequence number
|
PK,
|
||||||
%%
|
SQN),
|
||||||
%% The calculate the proportion (by value size) of the CDB which is at the
|
case Chk of
|
||||||
%% wrong sequence number to help determine eligibility for compaction
|
true ->
|
||||||
%%
|
{ActSize + Size, RplSize};
|
||||||
%% BIG TODO:
|
false ->
|
||||||
%% Need to snapshot a penciller
|
{ActSize, RplSize + Size}
|
||||||
|
end end,
|
||||||
|
{0, 0},
|
||||||
|
KeySizeList),
|
||||||
|
{ActiveSize, ReplacedSize} = R0,
|
||||||
|
100 * (ActiveSize / (ActiveSize + ReplacedSize)).
|
||||||
|
|
||||||
|
scan_all_files(Manifest, Penciller) ->
|
||||||
|
scan_all_files(Manifest, Penciller, []).
|
||||||
|
|
||||||
|
scan_all_files([], _Penciller, CandidateList) ->
|
||||||
|
CandidateList;
|
||||||
|
scan_all_files([{LowSQN, FN, JournalP}|Tail], Penciller, CandidateList) ->
|
||||||
|
CompactPerc = check_single_file(JournalP,
|
||||||
|
Penciller,
|
||||||
|
?SAMPLE_SIZE,
|
||||||
|
?BATCH_SIZE),
|
||||||
|
scan_all_files(Tail, Penciller, CandidateList ++
|
||||||
|
[{LowSQN, FN, JournalP, CompactPerc}]).
|
||||||
|
|
||||||
fetch_inbatches([], _BatchSize, _CDB, CheckedList) ->
|
fetch_inbatches([], _BatchSize, _CDB, CheckedList) ->
|
||||||
CheckedList;
|
CheckedList;
|
||||||
fetch_inbatches(PositionList, BatchSize, CDB, CheckedList) ->
|
fetch_inbatches(PositionList, BatchSize, CDB, CheckedList) ->
|
||||||
|
@ -90,8 +123,8 @@ fetch_inbatches(PositionList, BatchSize, CDB, CheckedList) ->
|
||||||
KL_List = leveled_cdb:direct_fetch(CDB, Batch, key_size),
|
KL_List = leveled_cdb:direct_fetch(CDB, Batch, key_size),
|
||||||
fetch_inbatches(Tail, BatchSize, CDB, CheckedList ++ KL_List).
|
fetch_inbatches(Tail, BatchSize, CDB, CheckedList ++ KL_List).
|
||||||
|
|
||||||
window_closed(_Timeout) ->
|
|
||||||
true.
|
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
|
@ -104,6 +104,7 @@
|
||||||
ink_fetch/3,
|
ink_fetch/3,
|
||||||
ink_loadpcl/4,
|
ink_loadpcl/4,
|
||||||
ink_registersnapshot/2,
|
ink_registersnapshot/2,
|
||||||
|
ink_compactjournal/3,
|
||||||
ink_close/1,
|
ink_close/1,
|
||||||
ink_print_manifest/1,
|
ink_print_manifest/1,
|
||||||
build_dummy_journal/0,
|
build_dummy_journal/0,
|
||||||
|
@ -126,8 +127,10 @@
|
||||||
active_journaldb :: pid(),
|
active_journaldb :: pid(),
|
||||||
active_journaldb_sqn :: integer(),
|
active_journaldb_sqn :: integer(),
|
||||||
removed_journaldbs = [] :: list(),
|
removed_journaldbs = [] :: list(),
|
||||||
|
registered_snapshots = [] :: list(),
|
||||||
root_path :: string(),
|
root_path :: string(),
|
||||||
cdb_options :: #cdb_options{}}).
|
cdb_options :: #cdb_options{},
|
||||||
|
clerk :: pid()}).
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
@ -155,6 +158,9 @@ ink_close(Pid) ->
|
||||||
ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) ->
|
ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) ->
|
||||||
gen_server:call(Pid, {load_pcl, MinSQN, FilterFun, Penciller}, infinity).
|
gen_server:call(Pid, {load_pcl, MinSQN, FilterFun, Penciller}, infinity).
|
||||||
|
|
||||||
|
ink_compactjournal(Pid, Penciller, Timeout) ->
|
||||||
|
gen_server:call(Pid, {compact_journal, Penciller, Timeout}, infinty).
|
||||||
|
|
||||||
ink_print_manifest(Pid) ->
|
ink_print_manifest(Pid) ->
|
||||||
gen_server:call(Pid, print_manifest, infinity).
|
gen_server:call(Pid, print_manifest, infinity).
|
||||||
|
|
||||||
|
@ -221,15 +227,18 @@ handle_call({load_pcl, StartSQN, FilterFun, Penciller}, _From, State) ->
|
||||||
State#state.active_journaldb}],
|
State#state.active_journaldb}],
|
||||||
Reply = load_from_sequence(StartSQN, FilterFun, Penciller, Manifest),
|
Reply = load_from_sequence(StartSQN, FilterFun, Penciller, Manifest),
|
||||||
{reply, Reply, State};
|
{reply, Reply, State};
|
||||||
handle_call({register_snapshot, _Requestor}, _From , State) ->
|
handle_call({register_snapshot, Requestor}, _From , State) ->
|
||||||
%% TODO: Not yet implemented registration of snapshot
|
Rs = [{Requestor,
|
||||||
%% Should return manifest and register the snapshot
|
State#state.manifest_sqn}|State#state.registered_snapshots],
|
||||||
{reply, {State#state.manifest,
|
{reply, {State#state.manifest,
|
||||||
State#state.active_journaldb},
|
State#state.active_journaldb},
|
||||||
State};
|
State#state{registered_snapshots=Rs}};
|
||||||
handle_call(print_manifest, _From, State) ->
|
handle_call(print_manifest, _From, State) ->
|
||||||
manifest_printer(State#state.manifest),
|
manifest_printer(State#state.manifest),
|
||||||
{reply, ok, State};
|
{reply, ok, State};
|
||||||
|
handle_call({compact_journal, Penciller, Timeout}, _From, State) ->
|
||||||
|
leveled_iclerk:clerk_compact(Penciller, Timeout),
|
||||||
|
{reply, ok, State};
|
||||||
handle_call(close, _From, State) ->
|
handle_call(close, _From, State) ->
|
||||||
{stop, normal, ok, State}.
|
{stop, normal, ok, State}.
|
||||||
|
|
||||||
|
@ -256,6 +265,7 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
start_from_file(InkerOpts) ->
|
start_from_file(InkerOpts) ->
|
||||||
|
{ok, Clerk} = leveled_iclerk:clerk_new(self()),
|
||||||
RootPath = InkerOpts#inker_options.root_path,
|
RootPath = InkerOpts#inker_options.root_path,
|
||||||
CDBopts = InkerOpts#inker_options.cdb_options,
|
CDBopts = InkerOpts#inker_options.cdb_options,
|
||||||
JournalFP = filepath(RootPath, journal_dir),
|
JournalFP = filepath(RootPath, journal_dir),
|
||||||
|
@ -288,7 +298,8 @@ start_from_file(InkerOpts) ->
|
||||||
active_journaldb = ActiveJournal,
|
active_journaldb = ActiveJournal,
|
||||||
active_journaldb_sqn = LowActiveSQN,
|
active_journaldb_sqn = LowActiveSQN,
|
||||||
root_path = RootPath,
|
root_path = RootPath,
|
||||||
cdb_options = CDBopts}}.
|
cdb_options = CDBopts,
|
||||||
|
clerk = Clerk}}.
|
||||||
|
|
||||||
|
|
||||||
put_object(PrimaryKey, Object, KeyChanges, State) ->
|
put_object(PrimaryKey, Object, KeyChanges, State) ->
|
||||||
|
|
|
@ -228,6 +228,7 @@
|
||||||
pcl_quickstart/1,
|
pcl_quickstart/1,
|
||||||
pcl_pushmem/2,
|
pcl_pushmem/2,
|
||||||
pcl_fetch/2,
|
pcl_fetch/2,
|
||||||
|
pcl_checksequencenumber/3,
|
||||||
pcl_workforclerk/1,
|
pcl_workforclerk/1,
|
||||||
pcl_requestmanifestchange/2,
|
pcl_requestmanifestchange/2,
|
||||||
pcl_confirmdelete/2,
|
pcl_confirmdelete/2,
|
||||||
|
@ -297,6 +298,9 @@ pcl_pushmem(Pid, DumpList) ->
|
||||||
pcl_fetch(Pid, Key) ->
|
pcl_fetch(Pid, Key) ->
|
||||||
gen_server:call(Pid, {fetch, Key}, infinity).
|
gen_server:call(Pid, {fetch, Key}, infinity).
|
||||||
|
|
||||||
|
pcl_checksequencenumber(Pid, Key, SQN) ->
|
||||||
|
gen_server:call(Pid, {check_sqn, Key, SQN}, infinity).
|
||||||
|
|
||||||
pcl_workforclerk(Pid) ->
|
pcl_workforclerk(Pid) ->
|
||||||
gen_server:call(Pid, work_for_clerk, infinity).
|
gen_server:call(Pid, work_for_clerk, infinity).
|
||||||
|
|
||||||
|
@ -334,9 +338,10 @@ init([PCLopts]) ->
|
||||||
PCLopts#penciller_options.start_snapshot} of
|
PCLopts#penciller_options.start_snapshot} of
|
||||||
{undefined, true} ->
|
{undefined, true} ->
|
||||||
SrcPenciller = PCLopts#penciller_options.source_penciller,
|
SrcPenciller = PCLopts#penciller_options.source_penciller,
|
||||||
{ok, {LedgerSQN,
|
{ok,
|
||||||
MemTableCopy,
|
LedgerSQN,
|
||||||
Manifest}} = pcl_registersnapshot(SrcPenciller, self()),
|
Manifest,
|
||||||
|
MemTableCopy} = pcl_registersnapshot(SrcPenciller, self()),
|
||||||
|
|
||||||
{ok, #state{memtable_copy=MemTableCopy,
|
{ok, #state{memtable_copy=MemTableCopy,
|
||||||
is_snapshot=true,
|
is_snapshot=true,
|
||||||
|
@ -349,7 +354,175 @@ init([PCLopts]) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
handle_call({push_mem, DumpList}, From, State0) ->
|
handle_call({push_mem, DumpList}, From, State) ->
|
||||||
|
if
|
||||||
|
State#state.is_snapshot == true ->
|
||||||
|
{reply, bad_request, State};
|
||||||
|
true ->
|
||||||
|
writer_call({push_mem, DumpList}, From, State)
|
||||||
|
end;
|
||||||
|
handle_call({confirm_delete, FileName}, _From, State) ->
|
||||||
|
if
|
||||||
|
State#state.is_snapshot == true ->
|
||||||
|
{reply, bad_request, State};
|
||||||
|
true ->
|
||||||
|
writer_call({confirm_delete, FileName}, _From, State)
|
||||||
|
end;
|
||||||
|
handle_call(prompt_compaction, _From, State) ->
|
||||||
|
if
|
||||||
|
State#state.is_snapshot == true ->
|
||||||
|
{reply, bad_request, State};
|
||||||
|
true ->
|
||||||
|
writer_call(prompt_compaction, _From, State)
|
||||||
|
end;
|
||||||
|
handle_call({manifest_change, WI}, _From, State) ->
|
||||||
|
if
|
||||||
|
State#state.is_snapshot == true ->
|
||||||
|
{reply, bad_request, State};
|
||||||
|
true ->
|
||||||
|
writer_call({manifest_change, WI}, _From, State)
|
||||||
|
end;
|
||||||
|
handle_call({check_sqn, Key, SQN}, _From, State) ->
|
||||||
|
Obj = if
|
||||||
|
State#state.is_snapshot == true ->
|
||||||
|
fetch_snap(Key,
|
||||||
|
State#state.manifest,
|
||||||
|
State#state.levelzero_snapshot);
|
||||||
|
true ->
|
||||||
|
fetch(Key,
|
||||||
|
State#state.manifest,
|
||||||
|
State#state.memtable)
|
||||||
|
end,
|
||||||
|
Reply = case Obj of
|
||||||
|
not_present ->
|
||||||
|
false;
|
||||||
|
Obj ->
|
||||||
|
SQNToCompare = leveled_bookie:strip_to_seqonly(Obj),
|
||||||
|
if
|
||||||
|
SQNToCompare > SQN ->
|
||||||
|
false;
|
||||||
|
true ->
|
||||||
|
true
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
{reply, Reply, State};
|
||||||
|
handle_call({fetch, Key}, _From, State) ->
|
||||||
|
Reply = if
|
||||||
|
State#state.is_snapshot == true ->
|
||||||
|
fetch_snap(Key,
|
||||||
|
State#state.manifest,
|
||||||
|
State#state.levelzero_snapshot);
|
||||||
|
true ->
|
||||||
|
fetch(Key,
|
||||||
|
State#state.manifest,
|
||||||
|
State#state.memtable)
|
||||||
|
end,
|
||||||
|
{reply, Reply, State};
|
||||||
|
handle_call(work_for_clerk, From, State) ->
|
||||||
|
{UpdState, Work} = return_work(State, From),
|
||||||
|
{reply, {Work, UpdState#state.backlog}, UpdState};
|
||||||
|
handle_call(get_startup_sqn, _From, State) ->
|
||||||
|
{reply, State#state.ledger_sqn, State};
|
||||||
|
handle_call({register_snapshot, Snapshot}, _From, State) ->
|
||||||
|
Rs = [{Snapshot, State#state.ledger_sqn}|State#state.registered_snapshots],
|
||||||
|
{reply,
|
||||||
|
{ok,
|
||||||
|
State#state.ledger_sqn,
|
||||||
|
State#state.manifest,
|
||||||
|
State#state.memtable_copy},
|
||||||
|
State#state{registered_snapshots = Rs}};
|
||||||
|
handle_call({load_snapshot, Increment}, _From, State) ->
|
||||||
|
MemTableCopy = State#state.memtable_copy,
|
||||||
|
{Tree0, TreeSQN0} = roll_new_tree(MemTableCopy#l0snapshot.tree,
|
||||||
|
MemTableCopy#l0snapshot.increments,
|
||||||
|
MemTableCopy#l0snapshot.ledger_sqn),
|
||||||
|
if
|
||||||
|
TreeSQN0 > MemTableCopy#l0snapshot.ledger_sqn ->
|
||||||
|
pcl_updatesnapshotcache(State#state.source_penciller,
|
||||||
|
Tree0,
|
||||||
|
TreeSQN0)
|
||||||
|
end,
|
||||||
|
{Tree1, TreeSQN1} = roll_new_tree(Tree0, [Increment], TreeSQN0),
|
||||||
|
io:format("Snapshot loaded to start at SQN~w~n", [TreeSQN1]),
|
||||||
|
{reply, ok, State#state{levelzero_snapshot=Tree1,
|
||||||
|
ledger_sqn=TreeSQN1,
|
||||||
|
snapshot_fully_loaded=true}};
|
||||||
|
handle_call(close, _From, State) ->
|
||||||
|
{stop, normal, ok, State}.
|
||||||
|
|
||||||
|
handle_cast({update_snapshotcache, Tree, SQN}, State) ->
|
||||||
|
MemTableC = cache_tree_in_memcopy(State#state.memtable_copy, Tree, SQN),
|
||||||
|
{noreply, State#state{memtable_copy=MemTableC}};
|
||||||
|
handle_cast(_Msg, State) ->
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
handle_info(_Info, State) ->
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
terminate(_Reason, State) ->
|
||||||
|
%% When a Penciller shuts down it isn't safe to try an manage the safe
|
||||||
|
%% finishing of any outstanding work. The last commmitted manifest will
|
||||||
|
%% be used.
|
||||||
|
%%
|
||||||
|
%% Level 0 files lie outside of the manifest, and so if there is no L0
|
||||||
|
%% file present it is safe to write the current contents of memory. If
|
||||||
|
%% there is a L0 file present - then the memory can be dropped (it is
|
||||||
|
%% recoverable from the ledger, and there should not be a lot to recover
|
||||||
|
%% as presumably the ETS file has been recently flushed, hence the presence
|
||||||
|
%% of a L0 file).
|
||||||
|
%%
|
||||||
|
%% The penciller should close each file in the unreferenced files, and
|
||||||
|
%% then each file in the manifest, and cast a close on the clerk.
|
||||||
|
%% The cast may not succeed as the clerk could be synchronously calling
|
||||||
|
%% the penciller looking for a manifest commit
|
||||||
|
%%
|
||||||
|
if
|
||||||
|
State#state.is_snapshot == true ->
|
||||||
|
ok;
|
||||||
|
true ->
|
||||||
|
leveled_pclerk:clerk_stop(State#state.clerk),
|
||||||
|
Dump = ets:tab2list(State#state.memtable),
|
||||||
|
case {State#state.levelzero_pending,
|
||||||
|
get_item(0, State#state.manifest, []), length(Dump)} of
|
||||||
|
{?L0PEND_RESET, [], L} when L > 0 ->
|
||||||
|
MSN = State#state.manifest_sqn + 1,
|
||||||
|
FileName = State#state.root_path
|
||||||
|
++ "/" ++ ?FILES_FP ++ "/"
|
||||||
|
++ integer_to_list(MSN) ++ "_0_0",
|
||||||
|
NewSFT = leveled_sft:sft_new(FileName ++ ".pnd",
|
||||||
|
Dump,
|
||||||
|
[],
|
||||||
|
0),
|
||||||
|
{ok, L0Pid, {{[], []}, _SK, _HK}} = NewSFT,
|
||||||
|
io:format("Dump of memory on close to filename ~s~n",
|
||||||
|
[FileName]),
|
||||||
|
leveled_sft:sft_close(L0Pid),
|
||||||
|
file:rename(FileName ++ ".pnd", FileName ++ ".sft");
|
||||||
|
{?L0PEND_RESET, [], L} when L == 0 ->
|
||||||
|
io:format("No keys to dump from memory when closing~n");
|
||||||
|
{{true, L0Pid, _TS}, _, _} ->
|
||||||
|
leveled_sft:sft_close(L0Pid),
|
||||||
|
io:format("No opportunity to persist memory before closing"
|
||||||
|
++ " with ~w keys discarded~n",
|
||||||
|
[length(Dump)]);
|
||||||
|
_ ->
|
||||||
|
io:format("No opportunity to persist memory before closing"
|
||||||
|
++ " with ~w keys discarded~n",
|
||||||
|
[length(Dump)])
|
||||||
|
end,
|
||||||
|
ok = close_files(0, State#state.manifest),
|
||||||
|
lists:foreach(fun({_FN, Pid, _SN}) ->
|
||||||
|
leveled_sft:sft_close(Pid) end,
|
||||||
|
State#state.unreferenced_files),
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
|
{ok, State}.
|
||||||
|
|
||||||
|
|
||||||
|
writer_call({push_mem, DumpList}, From, State0) ->
|
||||||
% The process for pushing to memory is as follows
|
% The process for pushing to memory is as follows
|
||||||
% - Check that the inbound list does not contain any Keys with a lower
|
% - Check that the inbound list does not contain any Keys with a lower
|
||||||
% sequence number than any existing keys (assess_sqn/1)
|
% sequence number than any existing keys (assess_sqn/1)
|
||||||
|
@ -432,12 +605,7 @@ handle_call({push_mem, DumpList}, From, State0) ->
|
||||||
io:format("Empty request pushed to Penciller~n"),
|
io:format("Empty request pushed to Penciller~n"),
|
||||||
{reply, ok, State0}
|
{reply, ok, State0}
|
||||||
end;
|
end;
|
||||||
handle_call({fetch, Key}, _From, State) ->
|
writer_call({confirm_delete, FileName}, _From, State) ->
|
||||||
{reply, fetch(Key, State#state.manifest, State#state.memtable), State};
|
|
||||||
handle_call(work_for_clerk, From, State) ->
|
|
||||||
{UpdState, Work} = return_work(State, From),
|
|
||||||
{reply, {Work, UpdState#state.backlog}, UpdState};
|
|
||||||
handle_call({confirm_delete, FileName}, _From, State) ->
|
|
||||||
Reply = confirm_delete(FileName,
|
Reply = confirm_delete(FileName,
|
||||||
State#state.unreferenced_files,
|
State#state.unreferenced_files,
|
||||||
State#state.registered_snapshots),
|
State#state.registered_snapshots),
|
||||||
|
@ -448,7 +616,7 @@ handle_call({confirm_delete, FileName}, _From, State) ->
|
||||||
_ ->
|
_ ->
|
||||||
{reply, Reply, State}
|
{reply, Reply, State}
|
||||||
end;
|
end;
|
||||||
handle_call(prompt_compaction, _From, State) ->
|
writer_call(prompt_compaction, _From, State) ->
|
||||||
%% If there is a prompt immediately after a L0 async write event then
|
%% If there is a prompt immediately after a L0 async write event then
|
||||||
%% there exists the potential for the prompt to stall the database.
|
%% there exists the potential for the prompt to stall the database.
|
||||||
%% Should only accept prompts if there has been a safe wait from the
|
%% Should only accept prompts if there has been a safe wait from the
|
||||||
|
@ -480,99 +648,10 @@ handle_call(prompt_compaction, _From, State) ->
|
||||||
true ->
|
true ->
|
||||||
{reply, ok, State#state{backlog=false}}
|
{reply, ok, State#state{backlog=false}}
|
||||||
end;
|
end;
|
||||||
handle_call({manifest_change, WI}, _From, State) ->
|
writer_call({manifest_change, WI}, _From, State) ->
|
||||||
{ok, UpdState} = commit_manifest_change(WI, State),
|
{ok, UpdState} = commit_manifest_change(WI, State),
|
||||||
{reply, ok, UpdState};
|
{reply, ok, UpdState}.
|
||||||
handle_call(get_startup_sqn, _From, State) ->
|
|
||||||
{reply, State#state.ledger_sqn, State};
|
|
||||||
handle_call({register_snapshot, Snapshot}, _From, State) ->
|
|
||||||
Rs = [{Snapshot, State#state.ledger_sqn}|State#state.registered_snapshots],
|
|
||||||
{reply,
|
|
||||||
{ok,
|
|
||||||
State#state.ledger_sqn,
|
|
||||||
State#state.manifest,
|
|
||||||
State#state.memtable_copy},
|
|
||||||
State#state{registered_snapshots = Rs}};
|
|
||||||
handle_call({load_snapshot, Increment}, _From, State) ->
|
|
||||||
MemTableCopy = State#state.memtable_copy,
|
|
||||||
{Tree0, TreeSQN0} = roll_new_tree(MemTableCopy#l0snapshot.tree,
|
|
||||||
MemTableCopy#l0snapshot.increments,
|
|
||||||
MemTableCopy#l0snapshot.ledger_sqn),
|
|
||||||
if
|
|
||||||
TreeSQN0 > MemTableCopy#l0snapshot.ledger_sqn ->
|
|
||||||
pcl_updatesnapshotcache(State#state.source_penciller,
|
|
||||||
Tree0,
|
|
||||||
TreeSQN0)
|
|
||||||
end,
|
|
||||||
{Tree1, TreeSQN1} = roll_new_tree(Tree0, [Increment], TreeSQN0),
|
|
||||||
io:format("Snapshot loaded to start at SQN~w~n", [TreeSQN1]),
|
|
||||||
{reply, ok, State#state{levelzero_snapshot=Tree1,
|
|
||||||
ledger_sqn=TreeSQN1,
|
|
||||||
snapshot_fully_loaded=true}};
|
|
||||||
handle_call(close, _From, State) ->
|
|
||||||
{stop, normal, ok, State}.
|
|
||||||
|
|
||||||
handle_cast({update_snapshotcache, Tree, SQN}, State) ->
|
|
||||||
MemTableC = cache_tree_in_memcopy(State#state.memtable_copy, Tree, SQN),
|
|
||||||
{noreply, State#state{memtable_copy=MemTableC}};
|
|
||||||
handle_cast(_Msg, State) ->
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
handle_info(_Info, State) ->
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
terminate(_Reason, State) ->
|
|
||||||
%% When a Penciller shuts down it isn't safe to try an manage the safe
|
|
||||||
%% finishing of any outstanding work. The last commmitted manifest will
|
|
||||||
%% be used.
|
|
||||||
%%
|
|
||||||
%% Level 0 files lie outside of the manifest, and so if there is no L0
|
|
||||||
%% file present it is safe to write the current contents of memory. If
|
|
||||||
%% there is a L0 file present - then the memory can be dropped (it is
|
|
||||||
%% recoverable from the ledger, and there should not be a lot to recover
|
|
||||||
%% as presumably the ETS file has been recently flushed, hence the presence
|
|
||||||
%% of a L0 file).
|
|
||||||
%%
|
|
||||||
%% The penciller should close each file in the unreferenced files, and
|
|
||||||
%% then each file in the manifest, and cast a close on the clerk.
|
|
||||||
%% The cast may not succeed as the clerk could be synchronously calling
|
|
||||||
%% the penciller looking for a manifest commit
|
|
||||||
%%
|
|
||||||
leveled_pclerk:clerk_stop(State#state.clerk),
|
|
||||||
Dump = ets:tab2list(State#state.memtable),
|
|
||||||
case {State#state.levelzero_pending,
|
|
||||||
get_item(0, State#state.manifest, []), length(Dump)} of
|
|
||||||
{?L0PEND_RESET, [], L} when L > 0 ->
|
|
||||||
MSN = State#state.manifest_sqn + 1,
|
|
||||||
FileName = State#state.root_path
|
|
||||||
++ "/" ++ ?FILES_FP ++ "/"
|
|
||||||
++ integer_to_list(MSN) ++ "_0_0",
|
|
||||||
{ok,
|
|
||||||
L0Pid,
|
|
||||||
{{[], []}, _SK, _HK}} = leveled_sft:sft_new(FileName ++ ".pnd",
|
|
||||||
Dump,
|
|
||||||
[],
|
|
||||||
0),
|
|
||||||
io:format("Dump of memory on close to filename ~s~n", [FileName]),
|
|
||||||
leveled_sft:sft_close(L0Pid),
|
|
||||||
file:rename(FileName ++ ".pnd", FileName ++ ".sft");
|
|
||||||
{?L0PEND_RESET, [], L} when L == 0 ->
|
|
||||||
io:format("No keys to dump from memory when closing~n");
|
|
||||||
{{true, L0Pid, _TS}, _, _} ->
|
|
||||||
leveled_sft:sft_close(L0Pid),
|
|
||||||
io:format("No opportunity to persist memory before closing "
|
|
||||||
++ "with ~w keys discarded~n", [length(Dump)]);
|
|
||||||
_ ->
|
|
||||||
io:format("No opportunity to persist memory before closing "
|
|
||||||
++ "with ~w keys discarded~n", [length(Dump)])
|
|
||||||
end,
|
|
||||||
ok = close_files(0, State#state.manifest),
|
|
||||||
lists:foreach(fun({_FN, Pid, _SN}) -> leveled_sft:sft_close(Pid) end,
|
|
||||||
State#state.unreferenced_files),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
|
||||||
{ok, State}.
|
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
@ -744,6 +823,14 @@ roll_memory(State, MaxSize) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
fetch_snap(Key, Manifest, Tree) ->
|
||||||
|
case gb_trees:lookup(Key, Tree) of
|
||||||
|
{value, Value} ->
|
||||||
|
{Key, Value};
|
||||||
|
none ->
|
||||||
|
fetch(Key, Manifest, 0, fun leveled_sft:sft_get/2)
|
||||||
|
end.
|
||||||
|
|
||||||
fetch(Key, Manifest, TID) ->
|
fetch(Key, Manifest, TID) ->
|
||||||
case ets:lookup(TID, Key) of
|
case ets:lookup(TID, Key) of
|
||||||
[Object] ->
|
[Object] ->
|
||||||
|
@ -777,6 +864,7 @@ fetch(Key, Manifest, Level, FetchFun) ->
|
||||||
ObjectFound
|
ObjectFound
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
%% Manifest lock - don't have two changes to the manifest happening
|
%% Manifest lock - don't have two changes to the manifest happening
|
||||||
%% concurrently
|
%% concurrently
|
||||||
|
@ -862,7 +950,9 @@ roll_new_tree(Tree, [{SQN, KVList}|TailIncs], HighSQN) when SQN >= HighSQN ->
|
||||||
gb_trees:enter(K, V, TreeAcc) end,
|
gb_trees:enter(K, V, TreeAcc) end,
|
||||||
Tree,
|
Tree,
|
||||||
KVList),
|
KVList),
|
||||||
roll_new_tree(UpdTree, TailIncs, SQN).
|
roll_new_tree(UpdTree, TailIncs, SQN);
|
||||||
|
roll_new_tree(Tree, [_H|TailIncs], HighSQN) ->
|
||||||
|
roll_new_tree(Tree, TailIncs, HighSQN).
|
||||||
|
|
||||||
%% Update the memtable copy if the tree created advances the SQN
|
%% Update the memtable copy if the tree created advances the SQN
|
||||||
cache_tree_in_memcopy(MemCopy, Tree, SQN) ->
|
cache_tree_in_memcopy(MemCopy, Tree, SQN) ->
|
||||||
|
@ -1146,9 +1236,9 @@ simple_server_test() ->
|
||||||
?assertMatch(R3, Key1),
|
?assertMatch(R3, Key1),
|
||||||
?assertMatch(R4, Key2),
|
?assertMatch(R4, Key2),
|
||||||
S2 = pcl_pushmem(PCL, KL2),
|
S2 = pcl_pushmem(PCL, KL2),
|
||||||
if S2 == pause -> timer:sleep(2000); true -> ok end,
|
if S2 == pause -> timer:sleep(1000); true -> ok end,
|
||||||
S3 = pcl_pushmem(PCL, [Key3]),
|
S3 = pcl_pushmem(PCL, [Key3]),
|
||||||
if S3 == pause -> timer:sleep(2000); true -> ok end,
|
if S3 == pause -> timer:sleep(1000); true -> ok end,
|
||||||
R5 = pcl_fetch(PCL, {o,"Bucket0001", "Key0001"}),
|
R5 = pcl_fetch(PCL, {o,"Bucket0001", "Key0001"}),
|
||||||
R6 = pcl_fetch(PCL, {o,"Bucket0002", "Key0002"}),
|
R6 = pcl_fetch(PCL, {o,"Bucket0002", "Key0002"}),
|
||||||
R7 = pcl_fetch(PCL, {o,"Bucket0003", "Key0003"}),
|
R7 = pcl_fetch(PCL, {o,"Bucket0003", "Key0003"}),
|
||||||
|
@ -1163,7 +1253,7 @@ simple_server_test() ->
|
||||||
2001 ->
|
2001 ->
|
||||||
%% Last push not persisted
|
%% Last push not persisted
|
||||||
S3a = pcl_pushmem(PCL, [Key3]),
|
S3a = pcl_pushmem(PCL, [Key3]),
|
||||||
if S3a == pause -> timer:sleep(2000); true -> ok end,
|
if S3a == pause -> timer:sleep(1000); true -> ok end,
|
||||||
ok;
|
ok;
|
||||||
2002 ->
|
2002 ->
|
||||||
%% everything got persisted
|
%% everything got persisted
|
||||||
|
@ -1182,11 +1272,11 @@ simple_server_test() ->
|
||||||
?assertMatch(R9, Key2),
|
?assertMatch(R9, Key2),
|
||||||
?assertMatch(R10, Key3),
|
?assertMatch(R10, Key3),
|
||||||
S4 = pcl_pushmem(PCLr, KL3),
|
S4 = pcl_pushmem(PCLr, KL3),
|
||||||
if S4 == pause -> timer:sleep(2000); true -> ok end,
|
if S4 == pause -> timer:sleep(1000); true -> ok end,
|
||||||
S5 = pcl_pushmem(PCLr, [Key4]),
|
S5 = pcl_pushmem(PCLr, [Key4]),
|
||||||
if S5 == pause -> timer:sleep(2000); true -> ok end,
|
if S5 == pause -> timer:sleep(1000); true -> ok end,
|
||||||
S6 = pcl_pushmem(PCLr, KL4),
|
S6 = pcl_pushmem(PCLr, KL4),
|
||||||
if S6 == pause -> timer:sleep(2000); true -> ok end,
|
if S6 == pause -> timer:sleep(1000); true -> ok end,
|
||||||
R11 = pcl_fetch(PCLr, {o,"Bucket0001", "Key0001"}),
|
R11 = pcl_fetch(PCLr, {o,"Bucket0001", "Key0001"}),
|
||||||
R12 = pcl_fetch(PCLr, {o,"Bucket0002", "Key0002"}),
|
R12 = pcl_fetch(PCLr, {o,"Bucket0002", "Key0002"}),
|
||||||
R13 = pcl_fetch(PCLr, {o,"Bucket0003", "Key0003"}),
|
R13 = pcl_fetch(PCLr, {o,"Bucket0003", "Key0003"}),
|
||||||
|
@ -1195,6 +1285,52 @@ simple_server_test() ->
|
||||||
?assertMatch(R12, Key2),
|
?assertMatch(R12, Key2),
|
||||||
?assertMatch(R13, Key3),
|
?assertMatch(R13, Key3),
|
||||||
?assertMatch(R14, Key4),
|
?assertMatch(R14, Key4),
|
||||||
|
SnapOpts = #penciller_options{start_snapshot = true,
|
||||||
|
source_penciller = PCLr,
|
||||||
|
requestor = self()},
|
||||||
|
{ok, PclSnap} = pcl_start(SnapOpts),
|
||||||
|
ok = pcl_loadsnapshot(PclSnap, []),
|
||||||
|
?assertMatch(Key1, pcl_fetch(PclSnap, {o,"Bucket0001", "Key0001"})),
|
||||||
|
?assertMatch(Key2, pcl_fetch(PclSnap, {o,"Bucket0002", "Key0002"})),
|
||||||
|
?assertMatch(Key3, pcl_fetch(PclSnap, {o,"Bucket0003", "Key0003"})),
|
||||||
|
?assertMatch(Key4, pcl_fetch(PclSnap, {o,"Bucket0004", "Key0004"})),
|
||||||
|
?assertMatch(true, pcl_checksequencenumber(PclSnap,
|
||||||
|
{o,"Bucket0001", "Key0001"},
|
||||||
|
1)),
|
||||||
|
?assertMatch(true, pcl_checksequencenumber(PclSnap,
|
||||||
|
{o,"Bucket0002", "Key0002"},
|
||||||
|
1002)),
|
||||||
|
?assertMatch(true, pcl_checksequencenumber(PclSnap,
|
||||||
|
{o,"Bucket0003", "Key0003"},
|
||||||
|
2002)),
|
||||||
|
?assertMatch(true, pcl_checksequencenumber(PclSnap,
|
||||||
|
{o,"Bucket0004", "Key0004"},
|
||||||
|
3002)),
|
||||||
|
% Add some more keys and confirm that chekc sequence number still
|
||||||
|
% sees the old version in the previous snapshot, but will see the new version
|
||||||
|
% in a new snapshot
|
||||||
|
Key1A = {{o,"Bucket0001", "Key0001"}, {4002, {active, infinity}, null}},
|
||||||
|
KL1A = lists:sort(leveled_sft:generate_randomkeys({4002, 2})),
|
||||||
|
S7 = pcl_pushmem(PCLr, [Key1A]),
|
||||||
|
if S7 == pause -> timer:sleep(1000); true -> ok end,
|
||||||
|
S8 = pcl_pushmem(PCLr, KL1A),
|
||||||
|
if S8 == pause -> timer:sleep(1000); true -> ok end,
|
||||||
|
?assertMatch(true, pcl_checksequencenumber(PclSnap,
|
||||||
|
{o,"Bucket0001", "Key0001"},
|
||||||
|
1)),
|
||||||
|
ok = pcl_close(PclSnap),
|
||||||
|
{ok, PclSnap2} = pcl_start(SnapOpts),
|
||||||
|
ok = pcl_loadsnapshot(PclSnap2, []),
|
||||||
|
?assertMatch(false, pcl_checksequencenumber(PclSnap2,
|
||||||
|
{o,"Bucket0001", "Key0001"},
|
||||||
|
1)),
|
||||||
|
?assertMatch(true, pcl_checksequencenumber(PclSnap2,
|
||||||
|
{o,"Bucket0001", "Key0001"},
|
||||||
|
4002)),
|
||||||
|
?assertMatch(true, pcl_checksequencenumber(PclSnap2,
|
||||||
|
{o,"Bucket0002", "Key0002"},
|
||||||
|
1002)),
|
||||||
|
ok = pcl_close(PclSnap2),
|
||||||
ok = pcl_close(PCLr),
|
ok = pcl_close(PCLr),
|
||||||
clean_testdir(RootPath).
|
clean_testdir(RootPath).
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue