Merge pull request #41 from martinsumner/mas-2iphase2-i34
Mas 2iphase2 i34
This commit is contained in:
commit
3c5740e7bf
7 changed files with 198 additions and 124 deletions
|
@ -54,6 +54,8 @@
|
||||||
{root_path :: string(),
|
{root_path :: string(),
|
||||||
max_inmemory_tablesize :: integer(),
|
max_inmemory_tablesize :: integer(),
|
||||||
start_snapshot = false :: boolean(),
|
start_snapshot = false :: boolean(),
|
||||||
|
snapshot_query,
|
||||||
|
bookies_mem :: tuple(),
|
||||||
source_penciller :: pid(),
|
source_penciller :: pid(),
|
||||||
levelzero_cointoss = false :: boolean()}).
|
levelzero_cointoss = false :: boolean()}).
|
||||||
|
|
||||||
|
|
|
@ -66,10 +66,10 @@
|
||||||
|
|
||||||
-export([get_opt/2,
|
-export([get_opt/2,
|
||||||
get_opt/3,
|
get_opt/3,
|
||||||
load_snapshot/2,
|
|
||||||
empty_ledgercache/0,
|
empty_ledgercache/0,
|
||||||
loadqueue_ledgercache/1,
|
loadqueue_ledgercache/1,
|
||||||
push_ledgercache/2]).
|
push_ledgercache/2,
|
||||||
|
snapshot_store/5]).
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
@ -358,6 +358,8 @@ book_destroy(Pid) ->
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
init([Opts]) ->
|
init([Opts]) ->
|
||||||
|
SW = os:timestamp(),
|
||||||
|
random:seed(erlang:phash2(self()), element(2, SW), element(3, SW)),
|
||||||
case get_opt(snapshot_bookie, Opts) of
|
case get_opt(snapshot_bookie, Opts) of
|
||||||
undefined ->
|
undefined ->
|
||||||
% Start from file not snapshot
|
% Start from file not snapshot
|
||||||
|
@ -374,10 +376,9 @@ init([Opts]) ->
|
||||||
ledger_cache=#ledger_cache{mem = NewETS},
|
ledger_cache=#ledger_cache{mem = NewETS},
|
||||||
is_snapshot=false}};
|
is_snapshot=false}};
|
||||||
Bookie ->
|
Bookie ->
|
||||||
{ok,
|
{ok, Penciller, Inker} = book_snapshotstore(Bookie,
|
||||||
{Penciller, LedgerCache},
|
self(),
|
||||||
Inker} = book_snapshotstore(Bookie, self(), ?SNAPSHOT_TIMEOUT),
|
?SNAPSHOT_TIMEOUT),
|
||||||
ok = load_snapshot(Penciller, LedgerCache),
|
|
||||||
leveled_log:log("B0002", [Inker, Penciller]),
|
leveled_log:log("B0002", [Inker, Penciller]),
|
||||||
{ok, #state{penciller=Penciller,
|
{ok, #state{penciller=Penciller,
|
||||||
inker=Inker,
|
inker=Inker,
|
||||||
|
@ -588,17 +589,6 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%% External functions
|
%%% External functions
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
%% @doc Load a snapshot of the penciller with the contents of the ledger cache
|
|
||||||
%% If the snapshot is to be loaded for a query then #ledger_cache.index may
|
|
||||||
%% be empty_index (As no need to update lookups), also #ledger_cache.loader
|
|
||||||
%% may also be empty_cache if there are no relevant results in the LedgerCache
|
|
||||||
load_snapshot(LedgerSnapshot, LedgerCache) ->
|
|
||||||
CacheToLoad = {LedgerCache#ledger_cache.loader,
|
|
||||||
LedgerCache#ledger_cache.index,
|
|
||||||
LedgerCache#ledger_cache.min_sqn,
|
|
||||||
LedgerCache#ledger_cache.max_sqn},
|
|
||||||
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, CacheToLoad).
|
|
||||||
|
|
||||||
%% @doc Empty the ledger cache table following a push
|
%% @doc Empty the ledger cache table following a push
|
||||||
empty_ledgercache() ->
|
empty_ledgercache() ->
|
||||||
#ledger_cache{mem = ets:new(empty, [ordered_set])}.
|
#ledger_cache{mem = ets:new(empty, [ordered_set])}.
|
||||||
|
@ -624,6 +614,51 @@ loadqueue_ledgercache(Cache) ->
|
||||||
T = leveled_tree:from_orderedlist(SL, ?CACHE_TYPE),
|
T = leveled_tree:from_orderedlist(SL, ?CACHE_TYPE),
|
||||||
Cache#ledger_cache{load_queue = [], loader = T}.
|
Cache#ledger_cache{load_queue = [], loader = T}.
|
||||||
|
|
||||||
|
%% @doc Allow all a snapshot to be created from part of the store, preferably
|
||||||
|
%% passing in a query filter so that all of the LoopState does not need to
|
||||||
|
%% be copied from the real actor to the clone
|
||||||
|
%%
|
||||||
|
%% SnapType can be store (requires journal and ledger) or ledger (requires
|
||||||
|
%% ledger only)
|
||||||
|
%%
|
||||||
|
%% Query can be no_lookup, indicating the snapshot will be used for non-specific
|
||||||
|
%% range queries and not direct fetch requests. {StartKey, EndKey} if the the
|
||||||
|
%% snapshot is to be used for one specific query only (this is much quicker to
|
||||||
|
%% setup, assuming the range is a small subset of the overall key space).
|
||||||
|
snapshot_store(LedgerCache0, Penciller, Inker, SnapType, Query) ->
|
||||||
|
SW = os:timestamp(),
|
||||||
|
LedgerCache = readycache_forsnapshot(LedgerCache0, Query),
|
||||||
|
BookiesMem = {LedgerCache#ledger_cache.loader,
|
||||||
|
LedgerCache#ledger_cache.index,
|
||||||
|
LedgerCache#ledger_cache.min_sqn,
|
||||||
|
LedgerCache#ledger_cache.max_sqn},
|
||||||
|
PCLopts = #penciller_options{start_snapshot = true,
|
||||||
|
source_penciller = Penciller,
|
||||||
|
snapshot_query = Query,
|
||||||
|
bookies_mem = BookiesMem},
|
||||||
|
{ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts),
|
||||||
|
leveled_log:log_randomtimer("B0004", [cache_size(LedgerCache)], SW, 0.02),
|
||||||
|
case SnapType of
|
||||||
|
store ->
|
||||||
|
InkerOpts = #inker_options{start_snapshot=true,
|
||||||
|
source_inker=Inker},
|
||||||
|
{ok, JournalSnapshot} = leveled_inker:ink_start(InkerOpts),
|
||||||
|
{ok, LedgerSnapshot, JournalSnapshot};
|
||||||
|
ledger ->
|
||||||
|
{ok, LedgerSnapshot, null}
|
||||||
|
end.
|
||||||
|
|
||||||
|
snapshot_store(State, SnapType) ->
|
||||||
|
snapshot_store(State, SnapType, undefined).
|
||||||
|
|
||||||
|
snapshot_store(State, SnapType, Query) ->
|
||||||
|
snapshot_store(State#state.ledger_cache,
|
||||||
|
State#state.penciller,
|
||||||
|
State#state.inker,
|
||||||
|
SnapType,
|
||||||
|
Query).
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
@ -640,12 +675,10 @@ cache_size(LedgerCache) ->
|
||||||
ets:info(LedgerCache#ledger_cache.mem, size).
|
ets:info(LedgerCache#ledger_cache.mem, size).
|
||||||
|
|
||||||
bucket_stats(State, Bucket, Tag) ->
|
bucket_stats(State, Bucket, Tag) ->
|
||||||
{ok,
|
{ok, LedgerSnapshot, _JournalSnapshot} = snapshot_store(State,
|
||||||
{LedgerSnapshot, LedgerCache},
|
ledger,
|
||||||
_JournalSnapshot} = snapshot_store(State, ledger, no_lookup),
|
no_lookup),
|
||||||
Folder = fun() ->
|
Folder = fun() ->
|
||||||
leveled_log:log("B0004", [cache_size(LedgerCache)]),
|
|
||||||
load_snapshot(LedgerSnapshot, LedgerCache),
|
|
||||||
StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
||||||
EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
||||||
AccFun = accumulate_size(),
|
AccFun = accumulate_size(),
|
||||||
|
@ -662,12 +695,10 @@ bucket_stats(State, Bucket, Tag) ->
|
||||||
|
|
||||||
binary_bucketlist(State, Tag, {FoldBucketsFun, InitAcc}) ->
|
binary_bucketlist(State, Tag, {FoldBucketsFun, InitAcc}) ->
|
||||||
% List buckets for tag, assuming bucket names are all binary type
|
% List buckets for tag, assuming bucket names are all binary type
|
||||||
{ok,
|
{ok, LedgerSnapshot, _JournalSnapshot} = snapshot_store(State,
|
||||||
{LedgerSnapshot, LedgerCache},
|
ledger,
|
||||||
_JournalSnapshot} = snapshot_store(State, ledger, no_lookup),
|
no_lookup),
|
||||||
Folder = fun() ->
|
Folder = fun() ->
|
||||||
leveled_log:log("B0004", [cache_size(LedgerCache)]),
|
|
||||||
load_snapshot(LedgerSnapshot, LedgerCache),
|
|
||||||
BucketAcc = get_nextbucket(null,
|
BucketAcc = get_nextbucket(null,
|
||||||
Tag,
|
Tag,
|
||||||
LedgerSnapshot,
|
LedgerSnapshot,
|
||||||
|
@ -720,12 +751,11 @@ index_query(State,
|
||||||
?IDX_TAG,
|
?IDX_TAG,
|
||||||
IdxField,
|
IdxField,
|
||||||
EndValue),
|
EndValue),
|
||||||
{ok,
|
{ok, LedgerSnapshot, _JournalSnapshot} = snapshot_store(State,
|
||||||
{LedgerSnapshot, LedgerCache},
|
ledger,
|
||||||
_JournalSnapshot} = snapshot_store(State, ledger, {StartKey, EndKey}),
|
{StartKey,
|
||||||
|
EndKey}),
|
||||||
Folder = fun() ->
|
Folder = fun() ->
|
||||||
leveled_log:log("B0004", [cache_size(LedgerCache)]),
|
|
||||||
load_snapshot(LedgerSnapshot, LedgerCache),
|
|
||||||
AddFun = case ReturnTerms of
|
AddFun = case ReturnTerms of
|
||||||
true ->
|
true ->
|
||||||
fun add_terms/2;
|
fun add_terms/2;
|
||||||
|
@ -751,12 +781,10 @@ hashtree_query(State, Tag, JournalCheck) ->
|
||||||
check_presence ->
|
check_presence ->
|
||||||
store
|
store
|
||||||
end,
|
end,
|
||||||
{ok,
|
{ok, LedgerSnapshot, JournalSnapshot} = snapshot_store(State,
|
||||||
{LedgerSnapshot, LedgerCache},
|
SnapType,
|
||||||
JournalSnapshot} = snapshot_store(State, SnapType, no_lookup),
|
no_lookup),
|
||||||
Folder = fun() ->
|
Folder = fun() ->
|
||||||
leveled_log:log("B0004", [cache_size(LedgerCache)]),
|
|
||||||
load_snapshot(LedgerSnapshot, LedgerCache),
|
|
||||||
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
||||||
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
||||||
AccFun = accumulate_hashes(JournalCheck, JournalSnapshot),
|
AccFun = accumulate_hashes(JournalCheck, JournalSnapshot),
|
||||||
|
@ -795,9 +823,7 @@ foldobjects_byindex(State, Tag, Bucket, Field, FromTerm, ToTerm, FoldObjectsFun)
|
||||||
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun).
|
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun).
|
||||||
|
|
||||||
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun) ->
|
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun) ->
|
||||||
{ok,
|
{ok, LedgerSnapshot, JournalSnapshot} = snapshot_store(State, store),
|
||||||
{LedgerSnapshot, LedgerCache},
|
|
||||||
JournalSnapshot} = snapshot_store(State, store),
|
|
||||||
{FoldFun, InitAcc} = case is_tuple(FoldObjectsFun) of
|
{FoldFun, InitAcc} = case is_tuple(FoldObjectsFun) of
|
||||||
true ->
|
true ->
|
||||||
FoldObjectsFun;
|
FoldObjectsFun;
|
||||||
|
@ -805,8 +831,6 @@ foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun) ->
|
||||||
{FoldObjectsFun, []}
|
{FoldObjectsFun, []}
|
||||||
end,
|
end,
|
||||||
Folder = fun() ->
|
Folder = fun() ->
|
||||||
leveled_log:log("B0004", [cache_size(LedgerCache)]),
|
|
||||||
load_snapshot(LedgerSnapshot, LedgerCache),
|
|
||||||
AccFun = accumulate_objects(FoldFun, JournalSnapshot, Tag),
|
AccFun = accumulate_objects(FoldFun, JournalSnapshot, Tag),
|
||||||
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
||||||
StartKey,
|
StartKey,
|
||||||
|
@ -821,12 +845,10 @@ foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun) ->
|
||||||
|
|
||||||
|
|
||||||
bucketkey_query(State, Tag, Bucket, {FoldKeysFun, InitAcc}) ->
|
bucketkey_query(State, Tag, Bucket, {FoldKeysFun, InitAcc}) ->
|
||||||
{ok,
|
{ok, LedgerSnapshot, _JournalSnapshot} = snapshot_store(State,
|
||||||
{LedgerSnapshot, LedgerCache},
|
ledger,
|
||||||
_JournalSnapshot} = snapshot_store(State, ledger, no_lookup),
|
no_lookup),
|
||||||
Folder = fun() ->
|
Folder = fun() ->
|
||||||
leveled_log:log("B0004", [cache_size(LedgerCache)]),
|
|
||||||
load_snapshot(LedgerSnapshot, LedgerCache),
|
|
||||||
SK = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
SK = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
||||||
EK = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
EK = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
||||||
AccFun = accumulate_keys(FoldKeysFun),
|
AccFun = accumulate_keys(FoldKeysFun),
|
||||||
|
@ -844,24 +866,6 @@ allkey_query(State, Tag, {FoldKeysFun, InitAcc}) ->
|
||||||
bucketkey_query(State, Tag, null, {FoldKeysFun, InitAcc}).
|
bucketkey_query(State, Tag, null, {FoldKeysFun, InitAcc}).
|
||||||
|
|
||||||
|
|
||||||
snapshot_store(State, SnapType) ->
|
|
||||||
snapshot_store(State, SnapType, undefined).
|
|
||||||
|
|
||||||
snapshot_store(State, SnapType, Query) ->
|
|
||||||
PCLopts = #penciller_options{start_snapshot=true,
|
|
||||||
source_penciller=State#state.penciller},
|
|
||||||
{ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts),
|
|
||||||
LedgerCache = readycache_forsnapshot(State#state.ledger_cache, Query),
|
|
||||||
case SnapType of
|
|
||||||
store ->
|
|
||||||
InkerOpts = #inker_options{start_snapshot=true,
|
|
||||||
source_inker=State#state.inker},
|
|
||||||
{ok, JournalSnapshot} = leveled_inker:ink_start(InkerOpts),
|
|
||||||
{ok, {LedgerSnapshot, LedgerCache}, JournalSnapshot};
|
|
||||||
ledger ->
|
|
||||||
{ok, {LedgerSnapshot, LedgerCache}, null}
|
|
||||||
end.
|
|
||||||
|
|
||||||
readycache_forsnapshot(LedgerCache, {StartKey, EndKey}) ->
|
readycache_forsnapshot(LedgerCache, {StartKey, EndKey}) ->
|
||||||
{KL, MinSQN, MaxSQN} = scan_table(LedgerCache#ledger_cache.mem,
|
{KL, MinSQN, MaxSQN} = scan_table(LedgerCache#ledger_cache.mem,
|
||||||
StartKey,
|
StartKey,
|
||||||
|
|
|
@ -222,6 +222,8 @@ ink_print_manifest(Pid) ->
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
init([InkerOpts]) ->
|
init([InkerOpts]) ->
|
||||||
|
SW = os:timestamp(),
|
||||||
|
random:seed(erlang:phash2(self()), element(2, SW), element(3, SW)),
|
||||||
case {InkerOpts#inker_options.root_path,
|
case {InkerOpts#inker_options.root_path,
|
||||||
InkerOpts#inker_options.start_snapshot} of
|
InkerOpts#inker_options.start_snapshot} of
|
||||||
{undefined, true} ->
|
{undefined, true} ->
|
||||||
|
@ -711,10 +713,7 @@ filepath(CompactFilePath, NewSQN, compact_journal) ->
|
||||||
|
|
||||||
|
|
||||||
initiate_penciller_snapshot(Bookie) ->
|
initiate_penciller_snapshot(Bookie) ->
|
||||||
{ok,
|
{ok, LedgerSnap, _} = leveled_bookie:book_snapshotledger(Bookie, self(), undefined),
|
||||||
{LedgerSnap, LedgerCache},
|
|
||||||
_} = leveled_bookie:book_snapshotledger(Bookie, self(), undefined),
|
|
||||||
leveled_bookie:load_snapshot(LedgerSnap, LedgerCache),
|
|
||||||
MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap),
|
MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap),
|
||||||
{LedgerSnap, MaxSQN}.
|
{LedgerSnap, MaxSQN}.
|
||||||
|
|
||||||
|
|
|
@ -9,6 +9,7 @@
|
||||||
|
|
||||||
-export([log/2,
|
-export([log/2,
|
||||||
log_timer/3,
|
log_timer/3,
|
||||||
|
log_randomtimer/4,
|
||||||
put_timing/4,
|
put_timing/4,
|
||||||
head_timing/4,
|
head_timing/4,
|
||||||
get_timing/3,
|
get_timing/3,
|
||||||
|
@ -25,6 +26,8 @@
|
||||||
|
|
||||||
{"G0001",
|
{"G0001",
|
||||||
{info, "Generic log point"}},
|
{info, "Generic log point"}},
|
||||||
|
{"G0002",
|
||||||
|
{info, "Generic log point with term ~w"}},
|
||||||
{"D0001",
|
{"D0001",
|
||||||
{debug, "Generic debug log"}},
|
{debug, "Generic debug log"}},
|
||||||
|
|
||||||
|
@ -35,7 +38,7 @@
|
||||||
{"B0003",
|
{"B0003",
|
||||||
{info, "Bookie closing for reason ~w"}},
|
{info, "Bookie closing for reason ~w"}},
|
||||||
{"B0004",
|
{"B0004",
|
||||||
{info, "Length of increment in snapshot is ~w"}},
|
{info, "Initialised PCL clone and length of increment in snapshot is ~w"}},
|
||||||
{"B0005",
|
{"B0005",
|
||||||
{info, "LedgerSQN=~w at startup"}},
|
{info, "LedgerSQN=~w at startup"}},
|
||||||
{"B0006",
|
{"B0006",
|
||||||
|
@ -59,9 +62,9 @@
|
||||||
{info, "Get timing for result ~w is sample ~w total ~w and max ~w"}},
|
{info, "Get timing for result ~w is sample ~w total ~w and max ~w"}},
|
||||||
|
|
||||||
{"P0001",
|
{"P0001",
|
||||||
{info, "Ledger snapshot ~w registered"}},
|
{debug, "Ledger snapshot ~w registered"}},
|
||||||
{"P0003",
|
{"P0003",
|
||||||
{info, "Ledger snapshot ~w released"}},
|
{debug, "Ledger snapshot ~w released"}},
|
||||||
{"P0004",
|
{"P0004",
|
||||||
{info, "Remaining ledger snapshots are ~w"}},
|
{info, "Remaining ledger snapshots are ~w"}},
|
||||||
{"P0005",
|
{"P0005",
|
||||||
|
@ -129,7 +132,9 @@
|
||||||
{"P0035",
|
{"P0035",
|
||||||
{info, "Startup with Manifest SQN of ~w"}},
|
{info, "Startup with Manifest SQN of ~w"}},
|
||||||
{"P0036",
|
{"P0036",
|
||||||
{info, "Garbage collection on mnaifest removes key for filename ~s"}},
|
{info, "Garbage collection on manifest removes key for filename ~s"}},
|
||||||
|
{"P0037",
|
||||||
|
{info, "Merging of penciller L0 tree from size ~w complete"}},
|
||||||
|
|
||||||
{"PC001",
|
{"PC001",
|
||||||
{info, "Penciller's clerk ~w started with owner ~w"}},
|
{info, "Penciller's clerk ~w started with owner ~w"}},
|
||||||
|
@ -347,6 +352,15 @@ log_timer(LogReference, Subs, StartTime) ->
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
log_randomtimer(LogReference, Subs, StartTime, RandomProb) ->
|
||||||
|
R = random:uniform(),
|
||||||
|
case R < RandomProb of
|
||||||
|
true ->
|
||||||
|
log_timer(LogReference, Subs, StartTime);
|
||||||
|
false ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
%% Make a log of put timings split out by actor - one log for every
|
%% Make a log of put timings split out by actor - one log for every
|
||||||
%% PUT_LOGPOINT puts
|
%% PUT_LOGPOINT puts
|
||||||
|
|
||||||
|
|
|
@ -184,9 +184,8 @@
|
||||||
pcl_confirmdelete/3,
|
pcl_confirmdelete/3,
|
||||||
pcl_close/1,
|
pcl_close/1,
|
||||||
pcl_doom/1,
|
pcl_doom/1,
|
||||||
pcl_registersnapshot/2,
|
|
||||||
pcl_releasesnapshot/2,
|
pcl_releasesnapshot/2,
|
||||||
pcl_loadsnapshot/2,
|
pcl_registersnapshot/4,
|
||||||
pcl_getstartupsequencenumber/1]).
|
pcl_getstartupsequencenumber/1]).
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
|
@ -303,15 +302,14 @@ pcl_confirmdelete(Pid, FileName, FilePid) ->
|
||||||
pcl_getstartupsequencenumber(Pid) ->
|
pcl_getstartupsequencenumber(Pid) ->
|
||||||
gen_server:call(Pid, get_startup_sqn, infinity).
|
gen_server:call(Pid, get_startup_sqn, infinity).
|
||||||
|
|
||||||
pcl_registersnapshot(Pid, Snapshot) ->
|
pcl_registersnapshot(Pid, Snapshot, Query, BookiesMem) ->
|
||||||
gen_server:call(Pid, {register_snapshot, Snapshot}, infinity).
|
gen_server:call(Pid,
|
||||||
|
{register_snapshot, Snapshot, Query, BookiesMem},
|
||||||
|
infinity).
|
||||||
|
|
||||||
pcl_releasesnapshot(Pid, Snapshot) ->
|
pcl_releasesnapshot(Pid, Snapshot) ->
|
||||||
gen_server:cast(Pid, {release_snapshot, Snapshot}).
|
gen_server:cast(Pid, {release_snapshot, Snapshot}).
|
||||||
|
|
||||||
pcl_loadsnapshot(Pid, Increment) ->
|
|
||||||
gen_server:call(Pid, {load_snapshot, Increment}, infinity).
|
|
||||||
|
|
||||||
pcl_close(Pid) ->
|
pcl_close(Pid) ->
|
||||||
gen_server:call(Pid, close, 60000).
|
gen_server:call(Pid, close, 60000).
|
||||||
|
|
||||||
|
@ -323,18 +321,19 @@ pcl_doom(Pid) ->
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
init([PCLopts]) ->
|
init([PCLopts]) ->
|
||||||
|
SW = os:timestamp(),
|
||||||
|
random:seed(erlang:phash2(self()), element(2, SW), element(3, SW)),
|
||||||
case {PCLopts#penciller_options.root_path,
|
case {PCLopts#penciller_options.root_path,
|
||||||
PCLopts#penciller_options.start_snapshot} of
|
PCLopts#penciller_options.start_snapshot,
|
||||||
{undefined, true} ->
|
PCLopts#penciller_options.snapshot_query,
|
||||||
|
PCLopts#penciller_options.bookies_mem} of
|
||||||
|
{undefined, true, Query, BookiesMem} ->
|
||||||
SrcPenciller = PCLopts#penciller_options.source_penciller,
|
SrcPenciller = PCLopts#penciller_options.source_penciller,
|
||||||
{ok, State} = pcl_registersnapshot(SrcPenciller, self()),
|
{ok, State} = pcl_registersnapshot(SrcPenciller, self(), Query, BookiesMem),
|
||||||
ManifestClone = leveled_pmanifest:copy_manifest(State#state.manifest),
|
|
||||||
leveled_log:log("P0001", [self()]),
|
leveled_log:log("P0001", [self()]),
|
||||||
{ok, State#state{is_snapshot=true,
|
{ok, State#state{is_snapshot=true,
|
||||||
source_penciller=SrcPenciller,
|
source_penciller=SrcPenciller}};
|
||||||
manifest=ManifestClone}};
|
{_RootPath, false, _Q, _BM} ->
|
||||||
%% Need to do something about timeout
|
|
||||||
{_RootPath, false} ->
|
|
||||||
start_from_file(PCLopts)
|
start_from_file(PCLopts)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -397,6 +396,7 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys},
|
||||||
_From,
|
_From,
|
||||||
State=#state{snapshot_fully_loaded=Ready})
|
State=#state{snapshot_fully_loaded=Ready})
|
||||||
when Ready == true ->
|
when Ready == true ->
|
||||||
|
SW = os:timestamp(),
|
||||||
L0AsList =
|
L0AsList =
|
||||||
case State#state.levelzero_astree of
|
case State#state.levelzero_astree of
|
||||||
undefined ->
|
undefined ->
|
||||||
|
@ -407,7 +407,10 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys},
|
||||||
List ->
|
List ->
|
||||||
List
|
List
|
||||||
end,
|
end,
|
||||||
|
leveled_log:log_randomtimer("P0037",
|
||||||
|
[State#state.levelzero_size],
|
||||||
|
SW,
|
||||||
|
0.01),
|
||||||
SetupFoldFun =
|
SetupFoldFun =
|
||||||
fun(Level, Acc) ->
|
fun(Level, Acc) ->
|
||||||
Pointers = leveled_pmanifest:range_lookup(State#state.manifest,
|
Pointers = leveled_pmanifest:range_lookup(State#state.manifest,
|
||||||
|
@ -429,39 +432,79 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys},
|
||||||
{reply, Acc, State#state{levelzero_astree = L0AsList}};
|
{reply, Acc, State#state{levelzero_astree = L0AsList}};
|
||||||
handle_call(get_startup_sqn, _From, State) ->
|
handle_call(get_startup_sqn, _From, State) ->
|
||||||
{reply, State#state.persisted_sqn, State};
|
{reply, State#state.persisted_sqn, State};
|
||||||
handle_call({register_snapshot, Snapshot}, _From, State) ->
|
handle_call({register_snapshot, Snapshot, Query, BookiesMem}, _From, State) ->
|
||||||
|
% Register and load a snapshot
|
||||||
|
%
|
||||||
|
% For setup of the snapshot to be efficient should pass a query
|
||||||
|
% of (StartKey, EndKey) - this will avoid a fully copy of the penciller's
|
||||||
|
% memory being required to be trasnferred to the clone. However, this
|
||||||
|
% will not be a valid clone for fetch
|
||||||
Manifest0 = leveled_pmanifest:add_snapshot(State#state.manifest,
|
Manifest0 = leveled_pmanifest:add_snapshot(State#state.manifest,
|
||||||
Snapshot,
|
Snapshot,
|
||||||
?SNAPSHOT_TIMEOUT),
|
?SNAPSHOT_TIMEOUT),
|
||||||
{reply, {ok, State}, State#state{manifest = Manifest0}};
|
|
||||||
handle_call({load_snapshot, {BookieIncrTree, BookieIdx, MinSQN, MaxSQN}},
|
{BookieIncrTree, BookieIdx, MinSQN, MaxSQN} = BookiesMem,
|
||||||
_From, State) ->
|
LM1Cache =
|
||||||
{LedgerSQN, L0Size, L0Cache} =
|
|
||||||
case BookieIncrTree of
|
case BookieIncrTree of
|
||||||
empty_cache ->
|
empty_cache ->
|
||||||
{State#state.ledger_sqn,
|
leveled_tree:empty(?CACHE_TYPE);
|
||||||
State#state.levelzero_size,
|
|
||||||
State#state.levelzero_cache};
|
|
||||||
_ ->
|
_ ->
|
||||||
leveled_pmem:add_to_cache(State#state.levelzero_size,
|
BookieIncrTree
|
||||||
{BookieIncrTree, MinSQN, MaxSQN},
|
|
||||||
State#state.ledger_sqn,
|
|
||||||
State#state.levelzero_cache)
|
|
||||||
end,
|
end,
|
||||||
L0Index =
|
|
||||||
case BookieIdx of
|
CloneState =
|
||||||
empty_index ->
|
case Query of
|
||||||
State#state.levelzero_index;
|
no_lookup ->
|
||||||
_ ->
|
{UpdMaxSQN, UpdSize, L0Cache} =
|
||||||
leveled_pmem:add_to_index(BookieIdx,
|
leveled_pmem:add_to_cache(State#state.levelzero_size,
|
||||||
State#state.levelzero_index,
|
{LM1Cache, MinSQN, MaxSQN},
|
||||||
length(L0Cache))
|
State#state.ledger_sqn,
|
||||||
|
State#state.levelzero_cache),
|
||||||
|
#state{levelzero_cache = L0Cache,
|
||||||
|
ledger_sqn = UpdMaxSQN,
|
||||||
|
levelzero_size = UpdSize,
|
||||||
|
persisted_sqn = State#state.persisted_sqn};
|
||||||
|
{StartKey, EndKey} ->
|
||||||
|
SW = os:timestamp(),
|
||||||
|
L0AsTree =
|
||||||
|
leveled_pmem:merge_trees(StartKey,
|
||||||
|
EndKey,
|
||||||
|
State#state.levelzero_cache,
|
||||||
|
LM1Cache),
|
||||||
|
leveled_log:log_randomtimer("P0037",
|
||||||
|
[State#state.levelzero_size],
|
||||||
|
SW,
|
||||||
|
0.1),
|
||||||
|
#state{levelzero_astree = L0AsTree,
|
||||||
|
ledger_sqn = MaxSQN,
|
||||||
|
persisted_sqn = State#state.persisted_sqn};
|
||||||
|
undefined ->
|
||||||
|
{UpdMaxSQN, UpdSize, L0Cache} =
|
||||||
|
leveled_pmem:add_to_cache(State#state.levelzero_size,
|
||||||
|
{LM1Cache, MinSQN, MaxSQN},
|
||||||
|
State#state.ledger_sqn,
|
||||||
|
State#state.levelzero_cache),
|
||||||
|
L0Index =
|
||||||
|
case BookieIdx of
|
||||||
|
empty_index ->
|
||||||
|
State#state.levelzero_index;
|
||||||
|
_ ->
|
||||||
|
leveled_pmem:add_to_index(BookieIdx,
|
||||||
|
State#state.levelzero_index,
|
||||||
|
length(L0Cache))
|
||||||
|
end,
|
||||||
|
#state{levelzero_cache = L0Cache,
|
||||||
|
levelzero_index = L0Index,
|
||||||
|
levelzero_size = UpdSize,
|
||||||
|
ledger_sqn = UpdMaxSQN,
|
||||||
|
persisted_sqn = State#state.persisted_sqn}
|
||||||
end,
|
end,
|
||||||
{reply, ok, State#state{levelzero_cache=L0Cache,
|
ManifestClone = leveled_pmanifest:copy_manifest(State#state.manifest),
|
||||||
levelzero_size=L0Size,
|
{reply,
|
||||||
levelzero_index=L0Index,
|
{ok,
|
||||||
ledger_sqn=LedgerSQN,
|
CloneState#state{snapshot_fully_loaded=true,
|
||||||
snapshot_fully_loaded=true}};
|
manifest=ManifestClone}},
|
||||||
|
State#state{manifest = Manifest0}};
|
||||||
handle_call({fetch_levelzero, Slot}, _From, State) ->
|
handle_call({fetch_levelzero, Slot}, _From, State) ->
|
||||||
{reply, lists:nth(Slot, State#state.levelzero_cache), State};
|
{reply, lists:nth(Slot, State#state.levelzero_cache), State};
|
||||||
handle_call(close, _From, State) ->
|
handle_call(close, _From, State) ->
|
||||||
|
@ -594,7 +637,13 @@ terminate(Reason, State) ->
|
||||||
% Tidy shutdown of individual files
|
% Tidy shutdown of individual files
|
||||||
EntryCloseFun =
|
EntryCloseFun =
|
||||||
fun(ME) ->
|
fun(ME) ->
|
||||||
ok = leveled_sst:sst_close(ME#manifest_entry.owner)
|
case is_record(ME, manifest_entry) of
|
||||||
|
true ->
|
||||||
|
ok = leveled_sst:sst_close(ME#manifest_entry.owner);
|
||||||
|
false ->
|
||||||
|
{_SK, ME0} = ME,
|
||||||
|
ok = leveled_sst:sst_close(ME0#manifest_entry.owner)
|
||||||
|
end
|
||||||
end,
|
end,
|
||||||
leveled_pmanifest:close_manifest(State#state.manifest, EntryCloseFun),
|
leveled_pmanifest:close_manifest(State#state.manifest, EntryCloseFun),
|
||||||
leveled_log:log("P0011", []),
|
leveled_log:log("P0011", []),
|
||||||
|
@ -1189,11 +1238,12 @@ simple_server_test() ->
|
||||||
?assertMatch(Key3, pcl_fetch(PCLr, {o,"Bucket0003", "Key0003", null})),
|
?assertMatch(Key3, pcl_fetch(PCLr, {o,"Bucket0003", "Key0003", null})),
|
||||||
?assertMatch(Key4, pcl_fetch(PCLr, {o,"Bucket0004", "Key0004", null})),
|
?assertMatch(Key4, pcl_fetch(PCLr, {o,"Bucket0004", "Key0004", null})),
|
||||||
|
|
||||||
SnapOpts = #penciller_options{start_snapshot = true,
|
{ok, PclSnap, null} =
|
||||||
source_penciller = PCLr},
|
leveled_bookie:snapshot_store(leveled_bookie:empty_ledgercache(),
|
||||||
{ok, PclSnap} = pcl_start(SnapOpts),
|
PCLr,
|
||||||
leveled_bookie:load_snapshot(PclSnap,
|
null,
|
||||||
leveled_bookie:empty_ledgercache()),
|
ledger,
|
||||||
|
undefined),
|
||||||
|
|
||||||
?assertMatch(Key1, pcl_fetch(PclSnap, {o,"Bucket0001", "Key0001", null})),
|
?assertMatch(Key1, pcl_fetch(PclSnap, {o,"Bucket0001", "Key0001", null})),
|
||||||
?assertMatch(Key2, pcl_fetch(PclSnap, {o,"Bucket0002", "Key0002", null})),
|
?assertMatch(Key2, pcl_fetch(PclSnap, {o,"Bucket0002", "Key0002", null})),
|
||||||
|
@ -1241,8 +1291,13 @@ simple_server_test() ->
|
||||||
1)),
|
1)),
|
||||||
ok = pcl_close(PclSnap),
|
ok = pcl_close(PclSnap),
|
||||||
|
|
||||||
{ok, PclSnap2} = pcl_start(SnapOpts),
|
{ok, PclSnap2, null} =
|
||||||
leveled_bookie:load_snapshot(PclSnap2, leveled_bookie:empty_ledgercache()),
|
leveled_bookie:snapshot_store(leveled_bookie:empty_ledgercache(),
|
||||||
|
PCLr,
|
||||||
|
null,
|
||||||
|
ledger,
|
||||||
|
undefined),
|
||||||
|
|
||||||
?assertMatch(false, pcl_checksequencenumber(PclSnap2,
|
?assertMatch(false, pcl_checksequencenumber(PclSnap2,
|
||||||
{o,
|
{o,
|
||||||
"Bucket0001",
|
"Bucket0001",
|
||||||
|
|
|
@ -289,12 +289,12 @@ release_snapshot(Manifest, Pid) ->
|
||||||
{SnapList0, MinSnapSQN} = lists:foldl(FilterFun,
|
{SnapList0, MinSnapSQN} = lists:foldl(FilterFun,
|
||||||
{[], infinity},
|
{[], infinity},
|
||||||
Manifest#manifest.snapshots),
|
Manifest#manifest.snapshots),
|
||||||
leveled_log:log("P0004", [SnapList0]),
|
|
||||||
case SnapList0 of
|
case SnapList0 of
|
||||||
[] ->
|
[] ->
|
||||||
Manifest#manifest{snapshots = SnapList0,
|
Manifest#manifest{snapshots = SnapList0,
|
||||||
min_snapshot_sqn = 0};
|
min_snapshot_sqn = 0};
|
||||||
_ ->
|
_ ->
|
||||||
|
leveled_log:log("P0004", [SnapList0]),
|
||||||
Manifest#manifest{snapshots = SnapList0,
|
Manifest#manifest{snapshots = SnapList0,
|
||||||
min_snapshot_sqn = MinSnapSQN}
|
min_snapshot_sqn = MinSnapSQN}
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -155,7 +155,7 @@ journal_compaction(_Config) ->
|
||||||
testutil:wait_for_compaction(Bookie1),
|
testutil:wait_for_compaction(Bookie1),
|
||||||
% Start snapshot - should not stop deletions
|
% Start snapshot - should not stop deletions
|
||||||
{ok,
|
{ok,
|
||||||
{PclClone, _LdgCache},
|
PclClone,
|
||||||
InkClone} = leveled_bookie:book_snapshotstore(Bookie1,
|
InkClone} = leveled_bookie:book_snapshotstore(Bookie1,
|
||||||
self(),
|
self(),
|
||||||
300000),
|
300000),
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue