diff --git a/include/leveled.hrl b/include/leveled.hrl index 2e1f2a1..5db22d3 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -54,6 +54,8 @@ {root_path :: string(), max_inmemory_tablesize :: integer(), start_snapshot = false :: boolean(), + snapshot_query, + bookies_mem :: tuple(), source_penciller :: pid(), levelzero_cointoss = false :: boolean()}). diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 725db8e..018b6d6 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -66,10 +66,10 @@ -export([get_opt/2, get_opt/3, - load_snapshot/2, empty_ledgercache/0, loadqueue_ledgercache/1, - push_ledgercache/2]). + push_ledgercache/2, + snapshot_store/5]). -include_lib("eunit/include/eunit.hrl"). @@ -374,10 +374,9 @@ init([Opts]) -> ledger_cache=#ledger_cache{mem = NewETS}, is_snapshot=false}}; Bookie -> - {ok, - {Penciller, LedgerCache}, - Inker} = book_snapshotstore(Bookie, self(), ?SNAPSHOT_TIMEOUT), - ok = load_snapshot(Penciller, LedgerCache), + {ok, Penciller, Inker} = book_snapshotstore(Bookie, + self(), + ?SNAPSHOT_TIMEOUT), leveled_log:log("B0002", [Inker, Penciller]), {ok, #state{penciller=Penciller, inker=Inker, @@ -588,17 +587,6 @@ code_change(_OldVsn, State, _Extra) -> %%% External functions %%%============================================================================ -%% @doc Load a snapshot of the penciller with the contents of the ledger cache -%% If the snapshot is to be loaded for a query then #ledger_cache.index may -%% be empty_index (As no need to update lookups), also #ledger_cache.loader -%% may also be empty_cache if there are no relevant results in the LedgerCache -load_snapshot(LedgerSnapshot, LedgerCache) -> - CacheToLoad = {LedgerCache#ledger_cache.loader, - LedgerCache#ledger_cache.index, - LedgerCache#ledger_cache.min_sqn, - LedgerCache#ledger_cache.max_sqn}, - ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, CacheToLoad). - %% @doc Empty the ledger cache table following a push empty_ledgercache() -> #ledger_cache{mem = ets:new(empty, [ordered_set])}. @@ -624,6 +612,51 @@ loadqueue_ledgercache(Cache) -> T = leveled_tree:from_orderedlist(SL, ?CACHE_TYPE), Cache#ledger_cache{load_queue = [], loader = T}. +%% @doc Allow all a snapshot to be created from part of the store, preferably +%% passing in a query filter so that all of the LoopState does not need to +%% be copied from the real actor to the clone +%% +%% SnapType can be store (requires journal and ledger) or ledger (requires +%% ledger only) +%% +%% Query can be no_lookup, indicating the snapshot will be used for non-specific +%% range queries and not direct fetch requests. {StartKey, EndKey} if the the +%% snapshot is to be used for one specific query only (this is much quicker to +%% setup, assuming the range is a small subset of the overall key space). +snapshot_store(LedgerCache0, Penciller, Inker, SnapType, Query) -> + SW = os:timestamp(), + LedgerCache = readycache_forsnapshot(LedgerCache0, Query), + BookiesMem = {LedgerCache#ledger_cache.loader, + LedgerCache#ledger_cache.index, + LedgerCache#ledger_cache.min_sqn, + LedgerCache#ledger_cache.max_sqn}, + PCLopts = #penciller_options{start_snapshot = true, + source_penciller = Penciller, + snapshot_query = Query, + bookies_mem = BookiesMem}, + {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), + leveled_log:log_randomtimer("B0004", [cache_size(LedgerCache)], SW, 0.1), + case SnapType of + store -> + InkerOpts = #inker_options{start_snapshot=true, + source_inker=Inker}, + {ok, JournalSnapshot} = leveled_inker:ink_start(InkerOpts), + {ok, LedgerSnapshot, JournalSnapshot}; + ledger -> + {ok, LedgerSnapshot, null} + end. + +snapshot_store(State, SnapType) -> + snapshot_store(State, SnapType, undefined). + +snapshot_store(State, SnapType, Query) -> + snapshot_store(State#state.ledger_cache, + State#state.penciller, + State#state.inker, + SnapType, + Query). + + %%%============================================================================ %%% Internal functions %%%============================================================================ @@ -640,11 +673,10 @@ cache_size(LedgerCache) -> ets:info(LedgerCache#ledger_cache.mem, size). bucket_stats(State, Bucket, Tag) -> - {ok, - {LedgerSnapshot, LedgerCache}, - _JournalSnapshot} = snapshot_store(State, ledger, no_lookup), + {ok, LedgerSnapshot, _JournalSnapshot} = snapshot_store(State, + ledger, + no_lookup), Folder = fun() -> - load_snapshot(LedgerSnapshot, LedgerCache), StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), AccFun = accumulate_size(), @@ -661,11 +693,10 @@ bucket_stats(State, Bucket, Tag) -> binary_bucketlist(State, Tag, {FoldBucketsFun, InitAcc}) -> % List buckets for tag, assuming bucket names are all binary type - {ok, - {LedgerSnapshot, LedgerCache}, - _JournalSnapshot} = snapshot_store(State, ledger, no_lookup), + {ok, LedgerSnapshot, _JournalSnapshot} = snapshot_store(State, + ledger, + no_lookup), Folder = fun() -> - load_snapshot(LedgerSnapshot, LedgerCache), BucketAcc = get_nextbucket(null, Tag, LedgerSnapshot, @@ -718,11 +749,11 @@ index_query(State, ?IDX_TAG, IdxField, EndValue), - {ok, - {LedgerSnapshot, LedgerCache}, - _JournalSnapshot} = snapshot_store(State, ledger, {StartKey, EndKey}), + {ok, LedgerSnapshot, _JournalSnapshot} = snapshot_store(State, + ledger, + {StartKey, + EndKey}), Folder = fun() -> - load_snapshot(LedgerSnapshot, LedgerCache), AddFun = case ReturnTerms of true -> fun add_terms/2; @@ -748,11 +779,10 @@ hashtree_query(State, Tag, JournalCheck) -> check_presence -> store end, - {ok, - {LedgerSnapshot, LedgerCache}, - JournalSnapshot} = snapshot_store(State, SnapType, no_lookup), + {ok, LedgerSnapshot, JournalSnapshot} = snapshot_store(State, + SnapType, + no_lookup), Folder = fun() -> - load_snapshot(LedgerSnapshot, LedgerCache), StartKey = leveled_codec:to_ledgerkey(null, null, Tag), EndKey = leveled_codec:to_ledgerkey(null, null, Tag), AccFun = accumulate_hashes(JournalCheck, JournalSnapshot), @@ -791,9 +821,7 @@ foldobjects_byindex(State, Tag, Bucket, Field, FromTerm, ToTerm, FoldObjectsFun) foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun). foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun) -> - {ok, - {LedgerSnapshot, LedgerCache}, - JournalSnapshot} = snapshot_store(State, store), + {ok, LedgerSnapshot, JournalSnapshot} = snapshot_store(State, store), {FoldFun, InitAcc} = case is_tuple(FoldObjectsFun) of true -> FoldObjectsFun; @@ -801,7 +829,6 @@ foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun) -> {FoldObjectsFun, []} end, Folder = fun() -> - load_snapshot(LedgerSnapshot, LedgerCache), AccFun = accumulate_objects(FoldFun, JournalSnapshot, Tag), Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, StartKey, @@ -816,11 +843,10 @@ foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun) -> bucketkey_query(State, Tag, Bucket, {FoldKeysFun, InitAcc}) -> - {ok, - {LedgerSnapshot, LedgerCache}, - _JournalSnapshot} = snapshot_store(State, ledger, no_lookup), + {ok, LedgerSnapshot, _JournalSnapshot} = snapshot_store(State, + ledger, + no_lookup), Folder = fun() -> - load_snapshot(LedgerSnapshot, LedgerCache), SK = leveled_codec:to_ledgerkey(Bucket, null, Tag), EK = leveled_codec:to_ledgerkey(Bucket, null, Tag), AccFun = accumulate_keys(FoldKeysFun), @@ -838,26 +864,6 @@ allkey_query(State, Tag, {FoldKeysFun, InitAcc}) -> bucketkey_query(State, Tag, null, {FoldKeysFun, InitAcc}). -snapshot_store(State, SnapType) -> - snapshot_store(State, SnapType, undefined). - -snapshot_store(State, SnapType, Query) -> - SW = os:timestamp(), - PCLopts = #penciller_options{start_snapshot=true, - source_penciller=State#state.penciller}, - {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), - LedgerCache = readycache_forsnapshot(State#state.ledger_cache, Query), - leveled_log:log_randomtimer("B0004", [cache_size(LedgerCache)], SW, 0.1), - case SnapType of - store -> - InkerOpts = #inker_options{start_snapshot=true, - source_inker=State#state.inker}, - {ok, JournalSnapshot} = leveled_inker:ink_start(InkerOpts), - {ok, {LedgerSnapshot, LedgerCache}, JournalSnapshot}; - ledger -> - {ok, {LedgerSnapshot, LedgerCache}, null} - end. - readycache_forsnapshot(LedgerCache, {StartKey, EndKey}) -> {KL, MinSQN, MaxSQN} = scan_table(LedgerCache#ledger_cache.mem, StartKey, diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 00ab41b..540e70f 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -711,10 +711,7 @@ filepath(CompactFilePath, NewSQN, compact_journal) -> initiate_penciller_snapshot(Bookie) -> - {ok, - {LedgerSnap, LedgerCache}, - _} = leveled_bookie:book_snapshotledger(Bookie, self(), undefined), - leveled_bookie:load_snapshot(LedgerSnap, LedgerCache), + {ok, LedgerSnap, _} = leveled_bookie:book_snapshotledger(Bookie, self(), undefined), MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap), {LedgerSnap, MaxSQN}. diff --git a/src/leveled_log.erl b/src/leveled_log.erl index 358a820..21f2edf 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -132,7 +132,7 @@ {"P0036", {info, "Garbage collection on manifest removes key for filename ~s"}}, {"P0037", - {info, "Merging of penciller L0 tree to size ~w complete"}}, + {info, "Merging of penciller L0 tree from size ~w complete"}}, {"PC001", {info, "Penciller's clerk ~w started with owner ~w"}}, diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index b5e8fea..a7685ee 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -184,9 +184,8 @@ pcl_confirmdelete/3, pcl_close/1, pcl_doom/1, - pcl_registersnapshot/2, pcl_releasesnapshot/2, - pcl_loadsnapshot/2, + pcl_registersnapshot/4, pcl_getstartupsequencenumber/1]). -export([ @@ -303,15 +302,14 @@ pcl_confirmdelete(Pid, FileName, FilePid) -> pcl_getstartupsequencenumber(Pid) -> gen_server:call(Pid, get_startup_sqn, infinity). -pcl_registersnapshot(Pid, Snapshot) -> - gen_server:call(Pid, {register_snapshot, Snapshot}, infinity). +pcl_registersnapshot(Pid, Snapshot, Query, BookiesMem) -> + gen_server:call(Pid, + {register_snapshot, Snapshot, Query, BookiesMem}, + infinity). pcl_releasesnapshot(Pid, Snapshot) -> gen_server:cast(Pid, {release_snapshot, Snapshot}). -pcl_loadsnapshot(Pid, Increment) -> - gen_server:call(Pid, {load_snapshot, Increment}, infinity). - pcl_close(Pid) -> gen_server:call(Pid, close, 60000). @@ -324,17 +322,16 @@ pcl_doom(Pid) -> init([PCLopts]) -> case {PCLopts#penciller_options.root_path, - PCLopts#penciller_options.start_snapshot} of - {undefined, true} -> + PCLopts#penciller_options.start_snapshot, + PCLopts#penciller_options.snapshot_query, + PCLopts#penciller_options.bookies_mem} of + {undefined, true, Query, BookiesMem} -> SrcPenciller = PCLopts#penciller_options.source_penciller, - {ok, State} = pcl_registersnapshot(SrcPenciller, self()), - ManifestClone = leveled_pmanifest:copy_manifest(State#state.manifest), + {ok, State} = pcl_registersnapshot(SrcPenciller, self(), Query, BookiesMem), leveled_log:log("P0001", [self()]), {ok, State#state{is_snapshot=true, - source_penciller=SrcPenciller, - manifest=ManifestClone}}; - %% Need to do something about timeout - {_RootPath, false} -> + source_penciller=SrcPenciller}}; + {_RootPath, false, _Q, _BM} -> start_from_file(PCLopts) end. @@ -430,39 +427,79 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys}, {reply, Acc, State#state{levelzero_astree = L0AsList}}; handle_call(get_startup_sqn, _From, State) -> {reply, State#state.persisted_sqn, State}; -handle_call({register_snapshot, Snapshot}, _From, State) -> +handle_call({register_snapshot, Snapshot, Query, BookiesMem}, _From, State) -> + % Register and load a snapshot + % + % For setup of the snapshot to be efficient should pass a query + % of (StartKey, EndKey) - this will avoid a fully copy of the penciller's + % memory being required to be trasnferred to the clone. However, this + % will not be a valid clone for fetch Manifest0 = leveled_pmanifest:add_snapshot(State#state.manifest, Snapshot, ?SNAPSHOT_TIMEOUT), - {reply, {ok, State}, State#state{manifest = Manifest0}}; -handle_call({load_snapshot, {BookieIncrTree, BookieIdx, MinSQN, MaxSQN}}, - _From, State) -> - {LedgerSQN, L0Size, L0Cache} = + + {BookieIncrTree, BookieIdx, MinSQN, MaxSQN} = BookiesMem, + LM1Cache = case BookieIncrTree of empty_cache -> - {State#state.ledger_sqn, - State#state.levelzero_size, - State#state.levelzero_cache}; + leveled_tree:empty(?CACHE_TYPE); _ -> - leveled_pmem:add_to_cache(State#state.levelzero_size, - {BookieIncrTree, MinSQN, MaxSQN}, - State#state.ledger_sqn, - State#state.levelzero_cache) + BookieIncrTree end, - L0Index = - case BookieIdx of - empty_index -> - State#state.levelzero_index; - _ -> - leveled_pmem:add_to_index(BookieIdx, - State#state.levelzero_index, - length(L0Cache)) + + CloneState = + case Query of + no_lookup -> + {UpdMaxSQN, UpdSize, L0Cache} = + leveled_pmem:add_to_cache(State#state.levelzero_size, + {LM1Cache, MinSQN, MaxSQN}, + State#state.ledger_sqn, + State#state.levelzero_cache), + #state{levelzero_cache = L0Cache, + ledger_sqn = UpdMaxSQN, + levelzero_size = UpdSize, + persisted_sqn = State#state.persisted_sqn}; + {StartKey, EndKey} -> + SW = os:timestamp(), + L0AsTree = + leveled_pmem:merge_trees(StartKey, + EndKey, + State#state.levelzero_cache, + LM1Cache), + leveled_log:log_randomtimer("P0037", + [State#state.levelzero_size], + SW, + 0.1), + #state{levelzero_astree = L0AsTree, + ledger_sqn = MaxSQN, + persisted_sqn = State#state.persisted_sqn}; + undefined -> + {UpdMaxSQN, UpdSize, L0Cache} = + leveled_pmem:add_to_cache(State#state.levelzero_size, + {LM1Cache, MinSQN, MaxSQN}, + State#state.ledger_sqn, + State#state.levelzero_cache), + L0Index = + case BookieIdx of + empty_index -> + State#state.levelzero_index; + _ -> + leveled_pmem:add_to_index(BookieIdx, + State#state.levelzero_index, + length(L0Cache)) + end, + #state{levelzero_cache = L0Cache, + levelzero_index = L0Index, + levelzero_size = UpdSize, + ledger_sqn = UpdMaxSQN, + persisted_sqn = State#state.persisted_sqn} end, - {reply, ok, State#state{levelzero_cache=L0Cache, - levelzero_size=L0Size, - levelzero_index=L0Index, - ledger_sqn=LedgerSQN, - snapshot_fully_loaded=true}}; + ManifestClone = leveled_pmanifest:copy_manifest(State#state.manifest), + {reply, + {ok, + CloneState#state{snapshot_fully_loaded=true, + manifest=ManifestClone}}, + State#state{manifest = Manifest0}}; handle_call({fetch_levelzero, Slot}, _From, State) -> {reply, lists:nth(Slot, State#state.levelzero_cache), State}; handle_call(close, _From, State) -> @@ -1190,11 +1227,12 @@ simple_server_test() -> ?assertMatch(Key3, pcl_fetch(PCLr, {o,"Bucket0003", "Key0003", null})), ?assertMatch(Key4, pcl_fetch(PCLr, {o,"Bucket0004", "Key0004", null})), - SnapOpts = #penciller_options{start_snapshot = true, - source_penciller = PCLr}, - {ok, PclSnap} = pcl_start(SnapOpts), - leveled_bookie:load_snapshot(PclSnap, - leveled_bookie:empty_ledgercache()), + {ok, PclSnap, null} = + leveled_bookie:snapshot_store(leveled_bookie:empty_ledgercache(), + PCLr, + null, + ledger, + undefined), ?assertMatch(Key1, pcl_fetch(PclSnap, {o,"Bucket0001", "Key0001", null})), ?assertMatch(Key2, pcl_fetch(PclSnap, {o,"Bucket0002", "Key0002", null})), @@ -1242,8 +1280,13 @@ simple_server_test() -> 1)), ok = pcl_close(PclSnap), - {ok, PclSnap2} = pcl_start(SnapOpts), - leveled_bookie:load_snapshot(PclSnap2, leveled_bookie:empty_ledgercache()), + {ok, PclSnap2, null} = + leveled_bookie:snapshot_store(leveled_bookie:empty_ledgercache(), + PCLr, + null, + ledger, + undefined), + ?assertMatch(false, pcl_checksequencenumber(PclSnap2, {o, "Bucket0001",