From c64d67d9fbcc07063257492983d851adc5423a3d Mon Sep 17 00:00:00 2001 From: martinsumner Date: Fri, 23 Sep 2016 18:50:29 +0100 Subject: [PATCH] Snapshot Work - Interim Commit Some initial work to get snapshots going. Changes required, as need to snapshot through the Bookie to ensure that there is no race between extracting the Bookie's in-memory view and the Penciller's view if a push_to_mem has occurred inbetween. A lot still outstanding, especially around Inker snapshots, and handling timeouts --- include/leveled.hrl | 11 +++-- src/leveled_bookie.erl | 21 +++++++++ src/leveled_iclerk.erl | 13 ------ src/leveled_inker.erl | 94 ++++++++++++++++++++++---------------- src/leveled_penciller.erl | 96 +++++++++++++++++++++++++++++++-------- 5 files changed, 160 insertions(+), 75 deletions(-) diff --git a/include/leveled.hrl b/include/leveled.hrl index 0f929cc..232aed7 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -26,12 +26,17 @@ -record(inker_options, {cdb_max_size :: integer(), root_path :: string(), - cdb_options :: #cdb_options{}}). + cdb_options :: #cdb_options{}, + start_snapshot = false :: boolean, + source_inker :: pid(), + requestor :: pid()}). -record(penciller_options, {root_path :: string(), - penciller :: pid(), - max_inmemory_tablesize :: integer()}). + max_inmemory_tablesize :: integer(), + start_snapshot = false :: boolean(), + source_penciller :: pid(), + requestor :: pid()}). -record(bookie_options, {root_path :: string(), diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 0ae31e8..948ea66 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -138,6 +138,8 @@ book_riakput/3, book_riakget/3, book_riakhead/3, + book_snapshotstore/3, + book_snapshotledger/3, book_close/1, strip_to_keyonly/1, strip_to_keyseqonly/1, @@ -180,6 +182,12 @@ book_riakhead(Pid, Bucket, Key) -> PrimaryKey = {o, Bucket, Key}, gen_server:call(Pid, {head, PrimaryKey}, infinity). +book_snapshotstore(Pid, Requestor, Timeout) -> + gen_server:call(Pid, {snapshot, Requestor, store, Timeout}, infinity). + +book_snapshotledger(Pid, Requestor, Timeout) -> + gen_server:call(Pid, {snapshot, Requestor, ledger, Timeout}, infinity). + book_close(Pid) -> gen_server:call(Pid, close, infinity). @@ -268,6 +276,19 @@ handle_call({head, Key}, _From, State) -> {reply, {ok, OMD}, State} end end; +handle_call({snapshot, Requestor, SnapType, _Timeout}, _From, State) -> + PCLopts = #penciller_options{start_snapshot=true, + source_penciller=State#state.penciller, + requestor=Requestor}, + {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), + case SnapType of + store -> + InkerOpts = #inker_options{}, + {ok, JournalSnapshot} = leveled_inker:ink_start(InkerOpts), + {reply, {ok, LedgerSnapshot, JournalSnapshot}, State}; + ledger -> + {reply, {ok, LedgerSnapshot, null}, State} + end; handle_call(close, _From, State) -> {stop, normal, ok, State}. diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 335a649..1fe049d 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -12,7 +12,6 @@ handle_info/2, terminate/2, clerk_new/1, - clerk_compact/3, clerk_remove/2, clerk_stop/1, code_change/3]). @@ -33,10 +32,6 @@ clerk_new(Owner) -> ok = gen_server:call(Pid, {register, Owner}, infinity), {ok, Pid}. -clerk_compact(Pid, InkerManifest, Penciller) -> - gen_server:cast(Pid, {compact, InkerManifest, Penciller}), - ok. - clerk_remove(Pid, Removals) -> gen_server:cast(Pid, {remove, Removals}), ok. @@ -54,9 +49,6 @@ init([]) -> handle_call({register, Owner}, _From, State) -> {reply, ok, State#state{owner=Owner}}. -handle_cast({compact, InkerManifest, Penciller, Timeout}, State) -> - ok = journal_compact(InkerManifest, Penciller, Timeout, State#state.owner), - {noreply, State}; handle_cast({remove, _Removals}, State) -> {noreply, State}; handle_cast(stop, State) -> @@ -76,11 +68,6 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================ -journal_compact(_InkerManifest, _Penciller, _Timeout, _Owner) -> - ok. - -check_all_files(_InkerManifest) -> - ok. check_single_file(CDB, _PencilSnapshot, SampleSize, BatchSize) -> PositionList = leveled_cdb:cdb_getpositions(CDB, SampleSize), diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 9d150f4..c5ddf94 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -103,7 +103,7 @@ ink_get/3, ink_fetch/3, ink_loadpcl/4, - ink_snap/1, + ink_registersnapshot/2, ink_close/1, ink_print_manifest/1, build_dummy_journal/0, @@ -146,8 +146,8 @@ ink_get(Pid, PrimaryKey, SQN) -> ink_fetch(Pid, PrimaryKey, SQN) -> gen_server:call(Pid, {fetch, PrimaryKey, SQN}, infinity). -ink_snap(Pid) -> - gen_server:call(Pid, snapshot, infinity). +ink_registersnapshot(Pid, Requestor) -> + gen_server:call(Pid, {snapshot, Requestor}, infinity). ink_close(Pid) -> gen_server:call(Pid, close, infinity). @@ -163,39 +163,20 @@ ink_print_manifest(Pid) -> %%%============================================================================ init([InkerOpts]) -> - RootPath = InkerOpts#inker_options.root_path, - CDBopts = InkerOpts#inker_options.cdb_options, - JournalFP = filepath(RootPath, journal_dir), - {ok, JournalFilenames} = case filelib:is_dir(JournalFP) of - true -> - file:list_dir(JournalFP); - false -> - filelib:ensure_dir(JournalFP), - {ok, []} - end, - ManifestFP = filepath(RootPath, manifest_dir), - {ok, ManifestFilenames} = case filelib:is_dir(ManifestFP) of - true -> - file:list_dir(ManifestFP); - false -> - filelib:ensure_dir(ManifestFP), - {ok, []} - end, - {Manifest, - {ActiveJournal, LowActiveSQN}, - JournalSQN, - ManifestSQN} = build_manifest(ManifestFilenames, - JournalFilenames, - fun simple_manifest_reader/2, - RootPath, - CDBopts), - {ok, #state{manifest = lists:reverse(lists:keysort(1, Manifest)), - manifest_sqn = ManifestSQN, - journal_sqn = JournalSQN, - active_journaldb = ActiveJournal, - active_journaldb_sqn = LowActiveSQN, - root_path = RootPath, - cdb_options = CDBopts}}. + case {InkerOpts#inker_options.root_path, + InkerOpts#inker_options.start_snapshot} of + {undefined, true} -> + SrcInker = InkerOpts#inker_options.source_inker, + Requestor = InkerOpts#inker_options.requestor, + {ok, + {ActiveJournalDB, + Manifest}} = ink_registersnapshot(SrcInker, Requestor), + {ok, #state{manifest=Manifest, + active_journaldb=ActiveJournalDB}}; + %% Need to do something about timeout + {_RootPath, false} -> + start_from_file(InkerOpts) + end. handle_call({put, Key, Object, KeyChanges}, From, State) -> @@ -240,12 +221,11 @@ handle_call({load_pcl, StartSQN, FilterFun, Penciller}, _From, State) -> State#state.active_journaldb}], Reply = load_from_sequence(StartSQN, FilterFun, Penciller, Manifest), {reply, Reply, State}; -handle_call(snapshot, _From , State) -> +handle_call({register_snapshot, _Requestor}, _From , State) -> %% TODO: Not yet implemented registration of snapshot %% Should return manifest and register the snapshot {reply, {State#state.manifest, - State#state.active_journaldb, - State#state.active_journaldb_sqn}, + State#state.active_journaldb}, State}; handle_call(print_manifest, _From, State) -> manifest_printer(State#state.manifest), @@ -275,6 +255,42 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================ +start_from_file(InkerOpts) -> + RootPath = InkerOpts#inker_options.root_path, + CDBopts = InkerOpts#inker_options.cdb_options, + JournalFP = filepath(RootPath, journal_dir), + {ok, JournalFilenames} = case filelib:is_dir(JournalFP) of + true -> + file:list_dir(JournalFP); + false -> + filelib:ensure_dir(JournalFP), + {ok, []} + end, + ManifestFP = filepath(RootPath, manifest_dir), + {ok, ManifestFilenames} = case filelib:is_dir(ManifestFP) of + true -> + file:list_dir(ManifestFP); + false -> + filelib:ensure_dir(ManifestFP), + {ok, []} + end, + {Manifest, + {ActiveJournal, LowActiveSQN}, + JournalSQN, + ManifestSQN} = build_manifest(ManifestFilenames, + JournalFilenames, + fun simple_manifest_reader/2, + RootPath, + CDBopts), + {ok, #state{manifest = lists:reverse(lists:keysort(1, Manifest)), + manifest_sqn = ManifestSQN, + journal_sqn = JournalSQN, + active_journaldb = ActiveJournal, + active_journaldb_sqn = LowActiveSQN, + root_path = RootPath, + cdb_options = CDBopts}}. + + put_object(PrimaryKey, Object, KeyChanges, State) -> NewSQN = State#state.journal_sqn + 1, %% TODO: The term goes through a double binary_to_term conversion diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 225431f..53c48fe 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -235,6 +235,7 @@ pcl_close/1, pcl_registersnapshot/2, pcl_updatesnapshotcache/3, + pcl_loadsnapshot/2, pcl_getstartupsequencenumber/1, roll_new_tree/3, clean_testdir/1]). @@ -254,7 +255,7 @@ -define(PROMPT_WAIT_ONL0, 5). -define(L0PEND_RESET, {false, null, null}). --record(l0snapshot, {increments = [] :: list, +-record(l0snapshot, {increments = [] :: list(), tree = gb_trees:empty() :: gb_trees:tree(), ledger_sqn = 0 :: integer()}). @@ -269,9 +270,13 @@ clerk :: pid(), levelzero_pending = ?L0PEND_RESET :: tuple(), memtable_copy = #l0snapshot{} :: #l0snapshot{}, + levelzero_snapshot = gb_trees:empty() :: gb_trees:tree(), memtable, backlog = false :: boolean(), - memtable_maxsize :: integer()}). + memtable_maxsize :: integer(), + is_snapshot = false :: boolean(), + snapshot_fully_loaded = false :: boolean(), + source_penciller :: pid()}). @@ -313,6 +318,9 @@ pcl_registersnapshot(Pid, Snapshot) -> pcl_updatesnapshotcache(Pid, Tree, SQN) -> gen_server:cast(Pid, {update_snapshotcache, Tree, SQN}). +pcl_loadsnapshot(Pid, Increment) -> + gen_server:call(Pid, {load_snapshot, Increment}, infinity). + pcl_close(Pid) -> gen_server:call(Pid, close). @@ -322,10 +330,21 @@ pcl_close(Pid) -> %%%============================================================================ init([PCLopts]) -> - case PCLopts#penciller_options.root_path of - undefined -> - {ok, #state{}}; - _RootPath -> + case {PCLopts#penciller_options.root_path, + PCLopts#penciller_options.start_snapshot} of + {undefined, true} -> + SrcPenciller = PCLopts#penciller_options.source_penciller, + {ok, {LedgerSQN, + MemTableCopy, + Manifest}} = pcl_registersnapshot(SrcPenciller, self()), + + {ok, #state{memtable_copy=MemTableCopy, + is_snapshot=true, + source_penciller=SrcPenciller, + manifest=Manifest, + ledger_sqn=LedgerSQN}}; + %% Need to do something about timeout + {_RootPath, false} -> start_from_file(PCLopts) end. @@ -474,6 +493,22 @@ handle_call({register_snapshot, Snapshot}, _From, State) -> State#state.manifest, State#state.memtable_copy}, State#state{registered_snapshots = Rs}}; +handle_call({load_snapshot, Increment}, _From, State) -> + MemTableCopy = State#state.memtable_copy, + {Tree0, TreeSQN0} = roll_new_tree(MemTableCopy#l0snapshot.tree, + MemTableCopy#l0snapshot.increments, + MemTableCopy#l0snapshot.ledger_sqn), + if + TreeSQN0 > MemTableCopy#l0snapshot.ledger_sqn -> + pcl_updatesnapshotcache(State#state.source_penciller, + Tree0, + TreeSQN0) + end, + {Tree1, TreeSQN1} = roll_new_tree(Tree0, [Increment], TreeSQN0), + io:format("Snapshot loaded to start at SQN~w~n", [TreeSQN1]), + {reply, ok, State#state{levelzero_snapshot=Tree1, + ledger_sqn=TreeSQN1, + snapshot_fully_loaded=true}}; handle_call(close, _From, State) -> {stop, normal, ok, State}. @@ -539,19 +574,6 @@ terminate(_Reason, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%%============================================================================ -%%% External functions -%%%============================================================================ - -roll_new_tree(Tree, [], HighSQN) -> - {Tree, HighSQN}; -roll_new_tree(Tree, [{SQN, KVList}|TailIncs], HighSQN) when SQN >= HighSQN -> - UpdTree = lists:foldl(fun({K, V}, TreeAcc) -> - gb_trees:enter(K, V, TreeAcc) end, - Tree, - KVList), - roll_new_tree(UpdTree, TailIncs, SQN). - %%%============================================================================ %%% Internal functions @@ -829,6 +851,19 @@ return_work(State, From) -> {State, none} end. + +%% This takes the three parts of a memtable copy - the increments, the tree +%% and the SQN at which the tree was formed, and outputs a new tree + +roll_new_tree(Tree, [], HighSQN) -> + {Tree, HighSQN}; +roll_new_tree(Tree, [{SQN, KVList}|TailIncs], HighSQN) when SQN >= HighSQN -> + UpdTree = lists:foldl(fun({K, V}, TreeAcc) -> + gb_trees:enter(K, V, TreeAcc) end, + Tree, + KVList), + roll_new_tree(UpdTree, TailIncs, SQN). + %% Update the memtable copy if the tree created advances the SQN cache_tree_in_memcopy(MemCopy, Tree, SQN) -> case MemCopy#l0snapshot.ledger_sqn of @@ -855,7 +890,6 @@ add_increment_to_memcopy(MemCopy, SQN, KVList) -> Incs = MemCopy#l0snapshot.increments ++ [{SQN, KVList}], MemCopy#l0snapshot{increments=Incs}. - close_files(?MAX_LEVELS - 1, _Manifest) -> ok; close_files(Level, Manifest) -> @@ -1182,5 +1216,27 @@ memcopy_test() -> Size1 = gb_trees:size(Tree1), ?assertMatch(2000, Size1), ?assertMatch(3000, HighSQN1). + +memcopy_updatecache_test() -> + KVL1 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X), + "Value" ++ integer_to_list(X) ++ "A"} end, + lists:seq(1, 1000)), + KVL2 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X), + "Value" ++ integer_to_list(X) ++ "B"} end, + lists:seq(1001, 2000)), + KVL3 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X), + "Value" ++ integer_to_list(X) ++ "C"} end, + lists:seq(1, 1000)), + MemCopy0 = #l0snapshot{}, + MemCopy1 = add_increment_to_memcopy(MemCopy0, 1000, KVL1), + MemCopy2 = add_increment_to_memcopy(MemCopy1, 2000, KVL2), + MemCopy3 = add_increment_to_memcopy(MemCopy2, 3000, KVL3), + ?assertMatch(0, MemCopy3#l0snapshot.ledger_sqn), + {Tree1, HighSQN1} = roll_new_tree(gb_trees:empty(), MemCopy3#l0snapshot.increments, 0), + MemCopy4 = cache_tree_in_memcopy(MemCopy3, Tree1, HighSQN1), + ?assertMatch(0, length(MemCopy4#l0snapshot.increments)), + Size2 = gb_trees:size(MemCopy4#l0snapshot.tree), + ?assertMatch(2000, Size2), + ?assertMatch(3000, MemCopy4#l0snapshot.ledger_sqn). -endif. \ No newline at end of file