2i order of events

When running a load of mainly 2i queries, there is a huge cost in the
previous snapshot code.  The time taken to create a clone of the
Penciller (duplicating all the LoopState) varied between 1 and 200ms
depedning on the size of the LoopState.

For 2i queries, most of that LoopState was then being thrown away after
running the query against the levelzero_cache.  This was taking < 1ms on
average.  It would be better to avoid the o(100)ms of CPU  burning and
block for o(1)ms - so th eorder of events have been changed ot filter
first so only the small part of the LoopState actually required is
copied to the clone.
This commit is contained in:
martinsumner 2017-03-06 18:42:32 +00:00
parent eb6f668fcd
commit c92107e4b4
5 changed files with 161 additions and 113 deletions

View file

@ -54,6 +54,8 @@
{root_path :: string(),
max_inmemory_tablesize :: integer(),
start_snapshot = false :: boolean(),
snapshot_query,
bookies_mem :: tuple(),
source_penciller :: pid(),
levelzero_cointoss = false :: boolean()}).

View file

@ -66,10 +66,10 @@
-export([get_opt/2,
get_opt/3,
load_snapshot/2,
empty_ledgercache/0,
loadqueue_ledgercache/1,
push_ledgercache/2]).
push_ledgercache/2,
snapshot_store/5]).
-include_lib("eunit/include/eunit.hrl").
@ -374,10 +374,9 @@ init([Opts]) ->
ledger_cache=#ledger_cache{mem = NewETS},
is_snapshot=false}};
Bookie ->
{ok,
{Penciller, LedgerCache},
Inker} = book_snapshotstore(Bookie, self(), ?SNAPSHOT_TIMEOUT),
ok = load_snapshot(Penciller, LedgerCache),
{ok, Penciller, Inker} = book_snapshotstore(Bookie,
self(),
?SNAPSHOT_TIMEOUT),
leveled_log:log("B0002", [Inker, Penciller]),
{ok, #state{penciller=Penciller,
inker=Inker,
@ -588,17 +587,6 @@ code_change(_OldVsn, State, _Extra) ->
%%% External functions
%%%============================================================================
%% @doc Load a snapshot of the penciller with the contents of the ledger cache
%% If the snapshot is to be loaded for a query then #ledger_cache.index may
%% be empty_index (As no need to update lookups), also #ledger_cache.loader
%% may also be empty_cache if there are no relevant results in the LedgerCache
load_snapshot(LedgerSnapshot, LedgerCache) ->
CacheToLoad = {LedgerCache#ledger_cache.loader,
LedgerCache#ledger_cache.index,
LedgerCache#ledger_cache.min_sqn,
LedgerCache#ledger_cache.max_sqn},
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, CacheToLoad).
%% @doc Empty the ledger cache table following a push
empty_ledgercache() ->
#ledger_cache{mem = ets:new(empty, [ordered_set])}.
@ -624,6 +612,51 @@ loadqueue_ledgercache(Cache) ->
T = leveled_tree:from_orderedlist(SL, ?CACHE_TYPE),
Cache#ledger_cache{load_queue = [], loader = T}.
%% @doc Allow all a snapshot to be created from part of the store, preferably
%% passing in a query filter so that all of the LoopState does not need to
%% be copied from the real actor to the clone
%%
%% SnapType can be store (requires journal and ledger) or ledger (requires
%% ledger only)
%%
%% Query can be no_lookup, indicating the snapshot will be used for non-specific
%% range queries and not direct fetch requests. {StartKey, EndKey} if the the
%% snapshot is to be used for one specific query only (this is much quicker to
%% setup, assuming the range is a small subset of the overall key space).
snapshot_store(LedgerCache0, Penciller, Inker, SnapType, Query) ->
SW = os:timestamp(),
LedgerCache = readycache_forsnapshot(LedgerCache0, Query),
BookiesMem = {LedgerCache#ledger_cache.loader,
LedgerCache#ledger_cache.index,
LedgerCache#ledger_cache.min_sqn,
LedgerCache#ledger_cache.max_sqn},
PCLopts = #penciller_options{start_snapshot = true,
source_penciller = Penciller,
snapshot_query = Query,
bookies_mem = BookiesMem},
{ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts),
leveled_log:log_randomtimer("B0004", [cache_size(LedgerCache)], SW, 0.1),
case SnapType of
store ->
InkerOpts = #inker_options{start_snapshot=true,
source_inker=Inker},
{ok, JournalSnapshot} = leveled_inker:ink_start(InkerOpts),
{ok, LedgerSnapshot, JournalSnapshot};
ledger ->
{ok, LedgerSnapshot, null}
end.
snapshot_store(State, SnapType) ->
snapshot_store(State, SnapType, undefined).
snapshot_store(State, SnapType, Query) ->
snapshot_store(State#state.ledger_cache,
State#state.penciller,
State#state.inker,
SnapType,
Query).
%%%============================================================================
%%% Internal functions
%%%============================================================================
@ -640,11 +673,10 @@ cache_size(LedgerCache) ->
ets:info(LedgerCache#ledger_cache.mem, size).
bucket_stats(State, Bucket, Tag) ->
{ok,
{LedgerSnapshot, LedgerCache},
_JournalSnapshot} = snapshot_store(State, ledger, no_lookup),
{ok, LedgerSnapshot, _JournalSnapshot} = snapshot_store(State,
ledger,
no_lookup),
Folder = fun() ->
load_snapshot(LedgerSnapshot, LedgerCache),
StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
AccFun = accumulate_size(),
@ -661,11 +693,10 @@ bucket_stats(State, Bucket, Tag) ->
binary_bucketlist(State, Tag, {FoldBucketsFun, InitAcc}) ->
% List buckets for tag, assuming bucket names are all binary type
{ok,
{LedgerSnapshot, LedgerCache},
_JournalSnapshot} = snapshot_store(State, ledger, no_lookup),
{ok, LedgerSnapshot, _JournalSnapshot} = snapshot_store(State,
ledger,
no_lookup),
Folder = fun() ->
load_snapshot(LedgerSnapshot, LedgerCache),
BucketAcc = get_nextbucket(null,
Tag,
LedgerSnapshot,
@ -718,11 +749,11 @@ index_query(State,
?IDX_TAG,
IdxField,
EndValue),
{ok,
{LedgerSnapshot, LedgerCache},
_JournalSnapshot} = snapshot_store(State, ledger, {StartKey, EndKey}),
{ok, LedgerSnapshot, _JournalSnapshot} = snapshot_store(State,
ledger,
{StartKey,
EndKey}),
Folder = fun() ->
load_snapshot(LedgerSnapshot, LedgerCache),
AddFun = case ReturnTerms of
true ->
fun add_terms/2;
@ -748,11 +779,10 @@ hashtree_query(State, Tag, JournalCheck) ->
check_presence ->
store
end,
{ok,
{LedgerSnapshot, LedgerCache},
JournalSnapshot} = snapshot_store(State, SnapType, no_lookup),
{ok, LedgerSnapshot, JournalSnapshot} = snapshot_store(State,
SnapType,
no_lookup),
Folder = fun() ->
load_snapshot(LedgerSnapshot, LedgerCache),
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
AccFun = accumulate_hashes(JournalCheck, JournalSnapshot),
@ -791,9 +821,7 @@ foldobjects_byindex(State, Tag, Bucket, Field, FromTerm, ToTerm, FoldObjectsFun)
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun).
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun) ->
{ok,
{LedgerSnapshot, LedgerCache},
JournalSnapshot} = snapshot_store(State, store),
{ok, LedgerSnapshot, JournalSnapshot} = snapshot_store(State, store),
{FoldFun, InitAcc} = case is_tuple(FoldObjectsFun) of
true ->
FoldObjectsFun;
@ -801,7 +829,6 @@ foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun) ->
{FoldObjectsFun, []}
end,
Folder = fun() ->
load_snapshot(LedgerSnapshot, LedgerCache),
AccFun = accumulate_objects(FoldFun, JournalSnapshot, Tag),
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
StartKey,
@ -816,11 +843,10 @@ foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun) ->
bucketkey_query(State, Tag, Bucket, {FoldKeysFun, InitAcc}) ->
{ok,
{LedgerSnapshot, LedgerCache},
_JournalSnapshot} = snapshot_store(State, ledger, no_lookup),
{ok, LedgerSnapshot, _JournalSnapshot} = snapshot_store(State,
ledger,
no_lookup),
Folder = fun() ->
load_snapshot(LedgerSnapshot, LedgerCache),
SK = leveled_codec:to_ledgerkey(Bucket, null, Tag),
EK = leveled_codec:to_ledgerkey(Bucket, null, Tag),
AccFun = accumulate_keys(FoldKeysFun),
@ -838,26 +864,6 @@ allkey_query(State, Tag, {FoldKeysFun, InitAcc}) ->
bucketkey_query(State, Tag, null, {FoldKeysFun, InitAcc}).
snapshot_store(State, SnapType) ->
snapshot_store(State, SnapType, undefined).
snapshot_store(State, SnapType, Query) ->
SW = os:timestamp(),
PCLopts = #penciller_options{start_snapshot=true,
source_penciller=State#state.penciller},
{ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts),
LedgerCache = readycache_forsnapshot(State#state.ledger_cache, Query),
leveled_log:log_randomtimer("B0004", [cache_size(LedgerCache)], SW, 0.1),
case SnapType of
store ->
InkerOpts = #inker_options{start_snapshot=true,
source_inker=State#state.inker},
{ok, JournalSnapshot} = leveled_inker:ink_start(InkerOpts),
{ok, {LedgerSnapshot, LedgerCache}, JournalSnapshot};
ledger ->
{ok, {LedgerSnapshot, LedgerCache}, null}
end.
readycache_forsnapshot(LedgerCache, {StartKey, EndKey}) ->
{KL, MinSQN, MaxSQN} = scan_table(LedgerCache#ledger_cache.mem,
StartKey,

View file

@ -711,10 +711,7 @@ filepath(CompactFilePath, NewSQN, compact_journal) ->
initiate_penciller_snapshot(Bookie) ->
{ok,
{LedgerSnap, LedgerCache},
_} = leveled_bookie:book_snapshotledger(Bookie, self(), undefined),
leveled_bookie:load_snapshot(LedgerSnap, LedgerCache),
{ok, LedgerSnap, _} = leveled_bookie:book_snapshotledger(Bookie, self(), undefined),
MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap),
{LedgerSnap, MaxSQN}.

View file

@ -132,7 +132,7 @@
{"P0036",
{info, "Garbage collection on manifest removes key for filename ~s"}},
{"P0037",
{info, "Merging of penciller L0 tree to size ~w complete"}},
{info, "Merging of penciller L0 tree from size ~w complete"}},
{"PC001",
{info, "Penciller's clerk ~w started with owner ~w"}},

View file

@ -184,9 +184,8 @@
pcl_confirmdelete/3,
pcl_close/1,
pcl_doom/1,
pcl_registersnapshot/2,
pcl_releasesnapshot/2,
pcl_loadsnapshot/2,
pcl_registersnapshot/4,
pcl_getstartupsequencenumber/1]).
-export([
@ -303,15 +302,14 @@ pcl_confirmdelete(Pid, FileName, FilePid) ->
pcl_getstartupsequencenumber(Pid) ->
gen_server:call(Pid, get_startup_sqn, infinity).
pcl_registersnapshot(Pid, Snapshot) ->
gen_server:call(Pid, {register_snapshot, Snapshot}, infinity).
pcl_registersnapshot(Pid, Snapshot, Query, BookiesMem) ->
gen_server:call(Pid,
{register_snapshot, Snapshot, Query, BookiesMem},
infinity).
pcl_releasesnapshot(Pid, Snapshot) ->
gen_server:cast(Pid, {release_snapshot, Snapshot}).
pcl_loadsnapshot(Pid, Increment) ->
gen_server:call(Pid, {load_snapshot, Increment}, infinity).
pcl_close(Pid) ->
gen_server:call(Pid, close, 60000).
@ -324,17 +322,16 @@ pcl_doom(Pid) ->
init([PCLopts]) ->
case {PCLopts#penciller_options.root_path,
PCLopts#penciller_options.start_snapshot} of
{undefined, true} ->
PCLopts#penciller_options.start_snapshot,
PCLopts#penciller_options.snapshot_query,
PCLopts#penciller_options.bookies_mem} of
{undefined, true, Query, BookiesMem} ->
SrcPenciller = PCLopts#penciller_options.source_penciller,
{ok, State} = pcl_registersnapshot(SrcPenciller, self()),
ManifestClone = leveled_pmanifest:copy_manifest(State#state.manifest),
{ok, State} = pcl_registersnapshot(SrcPenciller, self(), Query, BookiesMem),
leveled_log:log("P0001", [self()]),
{ok, State#state{is_snapshot=true,
source_penciller=SrcPenciller,
manifest=ManifestClone}};
%% Need to do something about timeout
{_RootPath, false} ->
source_penciller=SrcPenciller}};
{_RootPath, false, _Q, _BM} ->
start_from_file(PCLopts)
end.
@ -430,25 +427,58 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys},
{reply, Acc, State#state{levelzero_astree = L0AsList}};
handle_call(get_startup_sqn, _From, State) ->
{reply, State#state.persisted_sqn, State};
handle_call({register_snapshot, Snapshot}, _From, State) ->
handle_call({register_snapshot, Snapshot, Query, BookiesMem}, _From, State) ->
% Register and load a snapshot
%
% For setup of the snapshot to be efficient should pass a query
% of (StartKey, EndKey) - this will avoid a fully copy of the penciller's
% memory being required to be trasnferred to the clone. However, this
% will not be a valid clone for fetch
Manifest0 = leveled_pmanifest:add_snapshot(State#state.manifest,
Snapshot,
?SNAPSHOT_TIMEOUT),
{reply, {ok, State}, State#state{manifest = Manifest0}};
handle_call({load_snapshot, {BookieIncrTree, BookieIdx, MinSQN, MaxSQN}},
_From, State) ->
{LedgerSQN, L0Size, L0Cache} =
{BookieIncrTree, BookieIdx, MinSQN, MaxSQN} = BookiesMem,
LM1Cache =
case BookieIncrTree of
empty_cache ->
{State#state.ledger_sqn,
State#state.levelzero_size,
State#state.levelzero_cache};
leveled_tree:empty(?CACHE_TYPE);
_ ->
leveled_pmem:add_to_cache(State#state.levelzero_size,
{BookieIncrTree, MinSQN, MaxSQN},
State#state.ledger_sqn,
State#state.levelzero_cache)
BookieIncrTree
end,
CloneState =
case Query of
no_lookup ->
{UpdMaxSQN, UpdSize, L0Cache} =
leveled_pmem:add_to_cache(State#state.levelzero_size,
{LM1Cache, MinSQN, MaxSQN},
State#state.ledger_sqn,
State#state.levelzero_cache),
#state{levelzero_cache = L0Cache,
ledger_sqn = UpdMaxSQN,
levelzero_size = UpdSize,
persisted_sqn = State#state.persisted_sqn};
{StartKey, EndKey} ->
SW = os:timestamp(),
L0AsTree =
leveled_pmem:merge_trees(StartKey,
EndKey,
State#state.levelzero_cache,
LM1Cache),
leveled_log:log_randomtimer("P0037",
[State#state.levelzero_size],
SW,
0.1),
#state{levelzero_astree = L0AsTree,
ledger_sqn = MaxSQN,
persisted_sqn = State#state.persisted_sqn};
undefined ->
{UpdMaxSQN, UpdSize, L0Cache} =
leveled_pmem:add_to_cache(State#state.levelzero_size,
{LM1Cache, MinSQN, MaxSQN},
State#state.ledger_sqn,
State#state.levelzero_cache),
L0Index =
case BookieIdx of
empty_index ->
@ -458,11 +488,18 @@ handle_call({load_snapshot, {BookieIncrTree, BookieIdx, MinSQN, MaxSQN}},
State#state.levelzero_index,
length(L0Cache))
end,
{reply, ok, State#state{levelzero_cache=L0Cache,
levelzero_size=L0Size,
#state{levelzero_cache = L0Cache,
levelzero_index = L0Index,
ledger_sqn=LedgerSQN,
snapshot_fully_loaded=true}};
levelzero_size = UpdSize,
ledger_sqn = UpdMaxSQN,
persisted_sqn = State#state.persisted_sqn}
end,
ManifestClone = leveled_pmanifest:copy_manifest(State#state.manifest),
{reply,
{ok,
CloneState#state{snapshot_fully_loaded=true,
manifest=ManifestClone}},
State#state{manifest = Manifest0}};
handle_call({fetch_levelzero, Slot}, _From, State) ->
{reply, lists:nth(Slot, State#state.levelzero_cache), State};
handle_call(close, _From, State) ->
@ -1190,11 +1227,12 @@ simple_server_test() ->
?assertMatch(Key3, pcl_fetch(PCLr, {o,"Bucket0003", "Key0003", null})),
?assertMatch(Key4, pcl_fetch(PCLr, {o,"Bucket0004", "Key0004", null})),
SnapOpts = #penciller_options{start_snapshot = true,
source_penciller = PCLr},
{ok, PclSnap} = pcl_start(SnapOpts),
leveled_bookie:load_snapshot(PclSnap,
leveled_bookie:empty_ledgercache()),
{ok, PclSnap, null} =
leveled_bookie:snapshot_store(leveled_bookie:empty_ledgercache(),
PCLr,
null,
ledger,
undefined),
?assertMatch(Key1, pcl_fetch(PclSnap, {o,"Bucket0001", "Key0001", null})),
?assertMatch(Key2, pcl_fetch(PclSnap, {o,"Bucket0002", "Key0002", null})),
@ -1242,8 +1280,13 @@ simple_server_test() ->
1)),
ok = pcl_close(PclSnap),
{ok, PclSnap2} = pcl_start(SnapOpts),
leveled_bookie:load_snapshot(PclSnap2, leveled_bookie:empty_ledgercache()),
{ok, PclSnap2, null} =
leveled_bookie:snapshot_store(leveled_bookie:empty_ledgercache(),
PCLr,
null,
ledger,
undefined),
?assertMatch(false, pcl_checksequencenumber(PclSnap2,
{o,
"Bucket0001",