Refactor snapshot

Better reuse snapshotting fucntions in the Bookie, and use it to support
doing Inker clone checks
This commit is contained in:
martinsumner 2016-10-31 17:26:28 +00:00
parent bd6c44e9b0
commit 7d3a04428b
3 changed files with 88 additions and 68 deletions

View file

@ -343,69 +343,36 @@ handle_call({head, Bucket, Key, Tag}, _From, State) ->
end end
end; end;
handle_call({snapshot, _Requestor, SnapType, _Timeout}, _From, State) -> handle_call({snapshot, _Requestor, SnapType, _Timeout}, _From, State) ->
PCLopts = #penciller_options{start_snapshot=true, Reply = snapshot_store(State, SnapType),
source_penciller=State#state.penciller}, {reply, Reply, State};
{ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts),
case SnapType of
store ->
InkerOpts = #inker_options{start_snapshot=true,
source_inker=State#state.inker},
{ok, JournalSnapshot} = leveled_inker:ink_start(InkerOpts),
{reply,
{ok,
{LedgerSnapshot,
State#state.ledger_cache},
JournalSnapshot},
State};
ledger ->
{reply,
{ok,
{LedgerSnapshot,
State#state.ledger_cache},
null},
State}
end;
handle_call({return_folder, FolderType}, _From, State) -> handle_call({return_folder, FolderType}, _From, State) ->
case FolderType of case FolderType of
{bucket_stats, Bucket} -> {bucket_stats, Bucket} ->
{reply, {reply,
bucket_stats(State#state.penciller, bucket_stats(State, Bucket, ?STD_TAG),
State#state.ledger_cache,
Bucket,
?STD_TAG),
State}; State};
{riakbucket_stats, Bucket} -> {riakbucket_stats, Bucket} ->
{reply, {reply,
bucket_stats(State#state.penciller, bucket_stats(State, Bucket, ?RIAK_TAG),
State#state.ledger_cache,
Bucket,
?RIAK_TAG),
State}; State};
{index_query, {index_query,
Bucket, Bucket,
{IdxField, StartValue, EndValue}, {IdxField, StartValue, EndValue},
{ReturnTerms, TermRegex}} -> {ReturnTerms, TermRegex}} ->
{reply, {reply,
index_query(State#state.penciller, index_query(State,
State#state.ledger_cache,
Bucket, Bucket,
{IdxField, StartValue, EndValue}, {IdxField, StartValue, EndValue},
{ReturnTerms, TermRegex}), {ReturnTerms, TermRegex}),
State}; State};
{keylist, Tag} -> {keylist, Tag} ->
{reply, {reply,
allkey_query(State#state.penciller, allkey_query(State, Tag),
State#state.ledger_cache, State};
Tag),
State};
{hashtree_query, Tag, JournalCheck} -> {hashtree_query, Tag, JournalCheck} ->
{reply, {reply,
hashtree_query(State#state.penciller, hashtree_query(State, Tag, JournalCheck),
State#state.ledger_cache, State}
State#state.inker,
Tag,
JournalCheck),
State}
end; end;
handle_call({compact_journal, Timeout}, _From, State) -> handle_call({compact_journal, Timeout}, _From, State) ->
ok = leveled_inker:ink_compactjournal(State#state.inker, ok = leveled_inker:ink_compactjournal(State#state.inker,
@ -437,10 +404,10 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions %%% Internal functions
%%%============================================================================ %%%============================================================================
bucket_stats(Penciller, LedgerCache, Bucket, Tag) -> bucket_stats(State, Bucket, Tag) ->
PCLopts = #penciller_options{start_snapshot=true, {ok,
source_penciller=Penciller}, {LedgerSnapshot, LedgerCache},
{ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), _JournalSnapshot} = snapshot_store(State, ledger),
Folder = fun() -> Folder = fun() ->
io:format("Length of increment in snapshot is ~w~n", io:format("Length of increment in snapshot is ~w~n",
[gb_trees:size(LedgerCache)]), [gb_trees:size(LedgerCache)]),
@ -459,13 +426,13 @@ bucket_stats(Penciller, LedgerCache, Bucket, Tag) ->
end, end,
{async, Folder}. {async, Folder}.
index_query(Penciller, LedgerCache, index_query(State,
Bucket, Bucket,
{IdxField, StartValue, EndValue}, {IdxField, StartValue, EndValue},
{ReturnTerms, TermRegex}) -> {ReturnTerms, TermRegex}) ->
PCLopts = #penciller_options{start_snapshot=true, {ok,
source_penciller=Penciller}, {LedgerSnapshot, LedgerCache},
{ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), _JournalSnapshot} = snapshot_store(State, ledger),
Folder = fun() -> Folder = fun() ->
io:format("Length of increment in snapshot is ~w~n", io:format("Length of increment in snapshot is ~w~n",
[gb_trees:size(LedgerCache)]), [gb_trees:size(LedgerCache)]),
@ -493,15 +460,16 @@ index_query(Penciller, LedgerCache,
{async, Folder}. {async, Folder}.
hashtree_query(Penciller, LedgerCache, _Inker, hashtree_query(State, Tag, JournalCheck) ->
Tag, JournalCheck) -> SnapType = case JournalCheck of
PCLopts = #penciller_options{start_snapshot=true,
source_penciller=Penciller},
{ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts),
JournalSnapshot = case JournalCheck of
false -> false ->
null ledger;
check_presence ->
store
end, end,
{ok,
{LedgerSnapshot, LedgerCache},
JournalSnapshot} = snapshot_store(State, SnapType),
Folder = fun() -> Folder = fun() ->
io:format("Length of increment in snapshot is ~w~n", io:format("Length of increment in snapshot is ~w~n",
[gb_trees:size(LedgerCache)]), [gb_trees:size(LedgerCache)]),
@ -509,7 +477,7 @@ hashtree_query(Penciller, LedgerCache, _Inker,
LedgerCache), LedgerCache),
StartKey = leveled_codec:to_ledgerkey(null, null, Tag), StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
EndKey = leveled_codec:to_ledgerkey(null, null, Tag), EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
AccFun = accumulate_hashes(), AccFun = accumulate_hashes(JournalCheck, JournalSnapshot),
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
StartKey, StartKey,
EndKey, EndKey,
@ -520,10 +488,10 @@ hashtree_query(Penciller, LedgerCache, _Inker,
end, end,
{async, Folder}. {async, Folder}.
allkey_query(Penciller, LedgerCache, Tag) -> allkey_query(State, Tag) ->
PCLopts = #penciller_options{start_snapshot=true, {ok,
source_penciller=Penciller}, {LedgerSnapshot, LedgerCache},
{ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), _JournalSnapshot} = snapshot_store(State, ledger),
Folder = fun() -> Folder = fun() ->
io:format("Length of increment in snapshot is ~w~n", io:format("Length of increment in snapshot is ~w~n",
[gb_trees:size(LedgerCache)]), [gb_trees:size(LedgerCache)]),
@ -543,6 +511,22 @@ allkey_query(Penciller, LedgerCache, Tag) ->
{async, Folder}. {async, Folder}.
snapshot_store(State, SnapType) ->
PCLopts = #penciller_options{start_snapshot=true,
source_penciller=State#state.penciller},
{ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts),
case SnapType of
store ->
InkerOpts = #inker_options{start_snapshot=true,
source_inker=State#state.inker},
{ok, JournalSnapshot} = leveled_inker:ink_start(InkerOpts),
{ok, {LedgerSnapshot, State#state.ledger_cache},
JournalSnapshot};
ledger ->
{ok, {LedgerSnapshot, State#state.ledger_cache},
null}
end.
shutdown_wait([], _Inker) -> shutdown_wait([], _Inker) ->
false; false;
shutdown_wait([TopPause|Rest], Inker) -> shutdown_wait([TopPause|Rest], Inker) ->
@ -625,18 +609,38 @@ accumulate_size() ->
end, end,
AccFun. AccFun.
accumulate_hashes() -> accumulate_hashes(JournalCheck, InkerClone) ->
Now = leveled_codec:integer_now(), Now = leveled_codec:integer_now(),
AccFun = fun(Key, Value, KHList) -> AccFun = fun(LK, V, KHList) ->
case leveled_codec:is_active(Key, Value, Now) of case leveled_codec:is_active(LK, V, Now) of
true -> true ->
[leveled_codec:get_keyandhash(Key, Value)|KHList]; {B, K, H} = leveled_codec:get_keyandhash(LK, V),
case JournalCheck of
false ->
[{B, K, H}|KHList];
check_presence ->
case check_presence(LK, V, InkerClone) of
true ->
[{B, K, H}|KHList];
false ->
KHList
end
end;
false -> false ->
KHList KHList
end end
end, end,
AccFun. AccFun.
check_presence(Key, Value, InkerClone) ->
{LedgerKey, SQN} = leveled_codec:strip_to_keyseqonly({Key, Value}),
case leveled_inker:ink_keycheck(InkerClone, LedgerKey, SQN) of
probably ->
true;
missing ->
false
end.
accumulate_keys() -> accumulate_keys() ->
Now = leveled_codec:integer_now(), Now = leveled_codec:integer_now(),
AccFun = fun(Key, Value, KeyList) -> AccFun = fun(Key, Value, KeyList) ->

View file

@ -38,6 +38,7 @@
strip_to_seqonly/1, strip_to_seqonly/1,
strip_to_statusonly/1, strip_to_statusonly/1,
strip_to_keyseqstatusonly/1, strip_to_keyseqstatusonly/1,
strip_to_keyseqonly/1,
striphead_to_details/1, striphead_to_details/1,
is_active/3, is_active/3,
endkey_passed/2, endkey_passed/2,
@ -89,6 +90,8 @@ strip_to_statusonly({_, {_, St, _}}) -> St.
strip_to_seqonly({_, {SeqN, _, _}}) -> SeqN. strip_to_seqonly({_, {SeqN, _, _}}) -> SeqN.
strip_to_keyseqonly({LK, {SeqN, _, _}}) -> {LK, SeqN}.
striphead_to_details({SeqN, St, MD}) -> {SeqN, St, MD}. striphead_to_details({SeqN, St, MD}) -> {SeqN, St, MD}.
key_dominates(LeftKey, RightKey) -> key_dominates(LeftKey, RightKey) ->

View file

@ -97,6 +97,7 @@
ink_put/4, ink_put/4,
ink_get/3, ink_get/3,
ink_fetch/3, ink_fetch/3,
ink_keycheck/3,
ink_loadpcl/4, ink_loadpcl/4,
ink_registersnapshot/2, ink_registersnapshot/2,
ink_confirmdelete/2, ink_confirmdelete/2,
@ -154,6 +155,9 @@ ink_get(Pid, PrimaryKey, SQN) ->
ink_fetch(Pid, PrimaryKey, SQN) -> ink_fetch(Pid, PrimaryKey, SQN) ->
gen_server:call(Pid, {fetch, PrimaryKey, SQN}, infinity). gen_server:call(Pid, {fetch, PrimaryKey, SQN}, infinity).
ink_keycheck(Pid, PrimaryKey, SQN) ->
gen_server:call(Pid, {key_check, PrimaryKey, SQN}, infinity).
ink_registersnapshot(Pid, Requestor) -> ink_registersnapshot(Pid, Requestor) ->
gen_server:call(Pid, {register_snapshot, Requestor}, infinity). gen_server:call(Pid, {register_snapshot, Requestor}, infinity).
@ -250,6 +254,8 @@ handle_call({fetch, Key, SQN}, _From, State) ->
end; end;
handle_call({get, Key, SQN}, _From, State) -> handle_call({get, Key, SQN}, _From, State) ->
{reply, get_object(Key, SQN, State#state.manifest), State}; {reply, get_object(Key, SQN, State#state.manifest), State};
handle_call({key_check, Key, SQN}, _From, State) ->
{reply, key_check(Key, SQN, State#state.manifest), State};
handle_call({load_pcl, StartSQN, FilterFun, Penciller}, _From, State) -> handle_call({load_pcl, StartSQN, FilterFun, Penciller}, _From, State) ->
Manifest = lists:reverse(State#state.manifest), Manifest = lists:reverse(State#state.manifest),
Reply = load_from_sequence(StartSQN, FilterFun, Penciller, Manifest), Reply = load_from_sequence(StartSQN, FilterFun, Penciller, Manifest),
@ -440,6 +446,13 @@ get_object(LedgerKey, SQN, Manifest) ->
Obj = leveled_cdb:cdb_get(JournalP, InkerKey), Obj = leveled_cdb:cdb_get(JournalP, InkerKey),
leveled_codec:from_inkerkv(Obj). leveled_codec:from_inkerkv(Obj).
key_check(LedgerKey, SQN, Manifest) ->
JournalP = find_in_manifest(SQN, Manifest),
{InkerKey, _V, true} = leveled_codec:to_inkerkv(LedgerKey,
SQN,
to_fetch,
null),
leveled_cdb:cdb_keycheck(JournalP, InkerKey).
build_manifest(ManifestFilenames, build_manifest(ManifestFilenames,
RootPath, RootPath,