From 7d3a04428b1927f3a46941d958d18440c458fd80 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Mon, 31 Oct 2016 17:26:28 +0000 Subject: [PATCH] Refactor snapshot Better reuse snapshotting fucntions in the Bookie, and use it to support doing Inker clone checks --- src/leveled_bookie.erl | 140 +++++++++++++++++++++-------------------- src/leveled_codec.erl | 3 + src/leveled_inker.erl | 13 ++++ 3 files changed, 88 insertions(+), 68 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 889be9e..294b11a 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -343,69 +343,36 @@ handle_call({head, Bucket, Key, Tag}, _From, State) -> end end; handle_call({snapshot, _Requestor, SnapType, _Timeout}, _From, State) -> - PCLopts = #penciller_options{start_snapshot=true, - source_penciller=State#state.penciller}, - {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), - case SnapType of - store -> - InkerOpts = #inker_options{start_snapshot=true, - source_inker=State#state.inker}, - {ok, JournalSnapshot} = leveled_inker:ink_start(InkerOpts), - {reply, - {ok, - {LedgerSnapshot, - State#state.ledger_cache}, - JournalSnapshot}, - State}; - ledger -> - {reply, - {ok, - {LedgerSnapshot, - State#state.ledger_cache}, - null}, - State} - end; + Reply = snapshot_store(State, SnapType), + {reply, Reply, State}; handle_call({return_folder, FolderType}, _From, State) -> case FolderType of {bucket_stats, Bucket} -> {reply, - bucket_stats(State#state.penciller, - State#state.ledger_cache, - Bucket, - ?STD_TAG), + bucket_stats(State, Bucket, ?STD_TAG), State}; {riakbucket_stats, Bucket} -> {reply, - bucket_stats(State#state.penciller, - State#state.ledger_cache, - Bucket, - ?RIAK_TAG), + bucket_stats(State, Bucket, ?RIAK_TAG), State}; {index_query, Bucket, {IdxField, StartValue, EndValue}, {ReturnTerms, TermRegex}} -> {reply, - index_query(State#state.penciller, - State#state.ledger_cache, + index_query(State, Bucket, {IdxField, StartValue, EndValue}, {ReturnTerms, TermRegex}), State}; {keylist, Tag} -> - {reply, - allkey_query(State#state.penciller, - State#state.ledger_cache, - Tag), - State}; + {reply, + allkey_query(State, Tag), + State}; {hashtree_query, Tag, JournalCheck} -> - {reply, - hashtree_query(State#state.penciller, - State#state.ledger_cache, - State#state.inker, - Tag, - JournalCheck), - State} + {reply, + hashtree_query(State, Tag, JournalCheck), + State} end; handle_call({compact_journal, Timeout}, _From, State) -> ok = leveled_inker:ink_compactjournal(State#state.inker, @@ -437,10 +404,10 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================ -bucket_stats(Penciller, LedgerCache, Bucket, Tag) -> - PCLopts = #penciller_options{start_snapshot=true, - source_penciller=Penciller}, - {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), +bucket_stats(State, Bucket, Tag) -> + {ok, + {LedgerSnapshot, LedgerCache}, + _JournalSnapshot} = snapshot_store(State, ledger), Folder = fun() -> io:format("Length of increment in snapshot is ~w~n", [gb_trees:size(LedgerCache)]), @@ -459,13 +426,13 @@ bucket_stats(Penciller, LedgerCache, Bucket, Tag) -> end, {async, Folder}. -index_query(Penciller, LedgerCache, +index_query(State, Bucket, {IdxField, StartValue, EndValue}, {ReturnTerms, TermRegex}) -> - PCLopts = #penciller_options{start_snapshot=true, - source_penciller=Penciller}, - {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), + {ok, + {LedgerSnapshot, LedgerCache}, + _JournalSnapshot} = snapshot_store(State, ledger), Folder = fun() -> io:format("Length of increment in snapshot is ~w~n", [gb_trees:size(LedgerCache)]), @@ -493,15 +460,16 @@ index_query(Penciller, LedgerCache, {async, Folder}. -hashtree_query(Penciller, LedgerCache, _Inker, - Tag, JournalCheck) -> - PCLopts = #penciller_options{start_snapshot=true, - source_penciller=Penciller}, - {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), - JournalSnapshot = case JournalCheck of +hashtree_query(State, Tag, JournalCheck) -> + SnapType = case JournalCheck of false -> - null + ledger; + check_presence -> + store end, + {ok, + {LedgerSnapshot, LedgerCache}, + JournalSnapshot} = snapshot_store(State, SnapType), Folder = fun() -> io:format("Length of increment in snapshot is ~w~n", [gb_trees:size(LedgerCache)]), @@ -509,7 +477,7 @@ hashtree_query(Penciller, LedgerCache, _Inker, LedgerCache), StartKey = leveled_codec:to_ledgerkey(null, null, Tag), EndKey = leveled_codec:to_ledgerkey(null, null, Tag), - AccFun = accumulate_hashes(), + AccFun = accumulate_hashes(JournalCheck, JournalSnapshot), Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, StartKey, EndKey, @@ -520,10 +488,10 @@ hashtree_query(Penciller, LedgerCache, _Inker, end, {async, Folder}. -allkey_query(Penciller, LedgerCache, Tag) -> - PCLopts = #penciller_options{start_snapshot=true, - source_penciller=Penciller}, - {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), +allkey_query(State, Tag) -> + {ok, + {LedgerSnapshot, LedgerCache}, + _JournalSnapshot} = snapshot_store(State, ledger), Folder = fun() -> io:format("Length of increment in snapshot is ~w~n", [gb_trees:size(LedgerCache)]), @@ -543,6 +511,22 @@ allkey_query(Penciller, LedgerCache, Tag) -> {async, Folder}. +snapshot_store(State, SnapType) -> + PCLopts = #penciller_options{start_snapshot=true, + source_penciller=State#state.penciller}, + {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), + case SnapType of + store -> + InkerOpts = #inker_options{start_snapshot=true, + source_inker=State#state.inker}, + {ok, JournalSnapshot} = leveled_inker:ink_start(InkerOpts), + {ok, {LedgerSnapshot, State#state.ledger_cache}, + JournalSnapshot}; + ledger -> + {ok, {LedgerSnapshot, State#state.ledger_cache}, + null} + end. + shutdown_wait([], _Inker) -> false; shutdown_wait([TopPause|Rest], Inker) -> @@ -625,18 +609,38 @@ accumulate_size() -> end, AccFun. -accumulate_hashes() -> +accumulate_hashes(JournalCheck, InkerClone) -> Now = leveled_codec:integer_now(), - AccFun = fun(Key, Value, KHList) -> - case leveled_codec:is_active(Key, Value, Now) of + AccFun = fun(LK, V, KHList) -> + case leveled_codec:is_active(LK, V, Now) of true -> - [leveled_codec:get_keyandhash(Key, Value)|KHList]; + {B, K, H} = leveled_codec:get_keyandhash(LK, V), + case JournalCheck of + false -> + [{B, K, H}|KHList]; + check_presence -> + case check_presence(LK, V, InkerClone) of + true -> + [{B, K, H}|KHList]; + false -> + KHList + end + end; false -> KHList end end, AccFun. +check_presence(Key, Value, InkerClone) -> + {LedgerKey, SQN} = leveled_codec:strip_to_keyseqonly({Key, Value}), + case leveled_inker:ink_keycheck(InkerClone, LedgerKey, SQN) of + probably -> + true; + missing -> + false + end. + accumulate_keys() -> Now = leveled_codec:integer_now(), AccFun = fun(Key, Value, KeyList) -> diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 8015f23..384d12b 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -38,6 +38,7 @@ strip_to_seqonly/1, strip_to_statusonly/1, strip_to_keyseqstatusonly/1, + strip_to_keyseqonly/1, striphead_to_details/1, is_active/3, endkey_passed/2, @@ -89,6 +90,8 @@ strip_to_statusonly({_, {_, St, _}}) -> St. strip_to_seqonly({_, {SeqN, _, _}}) -> SeqN. +strip_to_keyseqonly({LK, {SeqN, _, _}}) -> {LK, SeqN}. + striphead_to_details({SeqN, St, MD}) -> {SeqN, St, MD}. key_dominates(LeftKey, RightKey) -> diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 23b1f49..b91976c 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -97,6 +97,7 @@ ink_put/4, ink_get/3, ink_fetch/3, + ink_keycheck/3, ink_loadpcl/4, ink_registersnapshot/2, ink_confirmdelete/2, @@ -154,6 +155,9 @@ ink_get(Pid, PrimaryKey, SQN) -> ink_fetch(Pid, PrimaryKey, SQN) -> gen_server:call(Pid, {fetch, PrimaryKey, SQN}, infinity). +ink_keycheck(Pid, PrimaryKey, SQN) -> + gen_server:call(Pid, {key_check, PrimaryKey, SQN}, infinity). + ink_registersnapshot(Pid, Requestor) -> gen_server:call(Pid, {register_snapshot, Requestor}, infinity). @@ -250,6 +254,8 @@ handle_call({fetch, Key, SQN}, _From, State) -> end; handle_call({get, Key, SQN}, _From, State) -> {reply, get_object(Key, SQN, State#state.manifest), State}; +handle_call({key_check, Key, SQN}, _From, State) -> + {reply, key_check(Key, SQN, State#state.manifest), State}; handle_call({load_pcl, StartSQN, FilterFun, Penciller}, _From, State) -> Manifest = lists:reverse(State#state.manifest), Reply = load_from_sequence(StartSQN, FilterFun, Penciller, Manifest), @@ -440,6 +446,13 @@ get_object(LedgerKey, SQN, Manifest) -> Obj = leveled_cdb:cdb_get(JournalP, InkerKey), leveled_codec:from_inkerkv(Obj). +key_check(LedgerKey, SQN, Manifest) -> + JournalP = find_in_manifest(SQN, Manifest), + {InkerKey, _V, true} = leveled_codec:to_inkerkv(LedgerKey, + SQN, + to_fetch, + null), + leveled_cdb:cdb_keycheck(JournalP, InkerKey). build_manifest(ManifestFilenames, RootPath,