Penciller Memory Refactor

Plugged the ne wpencille rmemory into the Penciller, and took advantage
of the increased speed to simplify the callbacks involved.

The outcome is much simpler code
This commit is contained in:
martinsumner 2016-10-30 18:25:30 +00:00
parent c7a56068c5
commit 95609702bd
6 changed files with 214 additions and 278 deletions

View file

@ -645,7 +645,7 @@ maybepush_ledgercache(MaxCacheSize, Cache, Penciller) ->
case leveled_penciller:pcl_pushmem(Penciller, Cache) of
ok ->
{ok, gb_trees:empty()};
{returned, _Reason} ->
returned ->
{ok, Cache}
end;
true ->

View file

@ -645,7 +645,7 @@ push_to_penciller(Penciller, KeyTree) ->
% the list by order before becoming a de-duplicated list for loading
R = leveled_penciller:pcl_pushmem(Penciller, KeyTree),
case R of
{returned, _Reason} ->
returned ->
timer:sleep(?LOADING_PAUSE),
push_to_penciller(Penciller, KeyTree);
ok ->

View file

@ -59,11 +59,9 @@
handle_cast/2,
handle_info/2,
terminate/2,
clerk_new/2,
mergeclerk_prompt/1,
mergeclerk_manifestchange/3,
rollclerk_levelzero/5,
rollclerk_close/1,
clerk_new/1,
clerk_prompt/1,
clerk_manifestchange/3,
code_change/3]).
-include_lib("eunit/include/eunit.hrl").
@ -73,32 +71,25 @@
-record(state, {owner :: pid(),
change_pending=false :: boolean(),
work_item :: #penciller_work{}|null,
merge_clerk = false :: boolean(),
roll_clerk = false ::boolean()}).
work_item :: #penciller_work{}|null}).
%%%============================================================================
%%% API
%%%============================================================================
clerk_new(Owner, Type) ->
clerk_new(Owner) ->
{ok, Pid} = gen_server:start(?MODULE, [], []),
ok = gen_server:call(Pid, {register, Owner, Type}, infinity),
ok = gen_server:call(Pid, {register, Owner}, infinity),
io:format("Penciller's clerk ~w started with owner ~w~n", [Pid, Owner]),
{ok, Pid}.
mergeclerk_manifestchange(Pid, Action, Closing) ->
clerk_manifestchange(Pid, Action, Closing) ->
gen_server:call(Pid, {manifest_change, Action, Closing}, infinity).
mergeclerk_prompt(Pid) ->
clerk_prompt(Pid) ->
gen_server:cast(Pid, prompt).
rollclerk_levelzero(Pid, LevelZero, LevelMinus1, LedgerSQN, PCL) ->
gen_server:cast(Pid,
{roll_levelzero, LevelZero, LevelMinus1, LedgerSQN, PCL}).
rollclerk_close(Pid) ->
gen_server:call(Pid, close, infinity).
%%%============================================================================
%%% gen_server callbacks
@ -107,18 +98,11 @@ rollclerk_close(Pid) ->
init([]) ->
{ok, #state{}}.
handle_call({register, Owner, Type}, _From, State) ->
case Type of
merge ->
handle_call({register, Owner}, _From, State) ->
{reply,
ok,
State#state{owner=Owner, merge_clerk = true},
State#state{owner=Owner},
?MIN_TIMEOUT};
roll ->
{reply,
ok,
State#state{owner=Owner, roll_clerk = true}}
end;
handle_call({manifest_change, return, true}, _From, State) ->
io:format("Request for manifest change from clerk on closing~n"),
case State#state.change_pending of
@ -150,20 +134,9 @@ handle_call(close, _From, State) ->
{stop, normal, ok, State}.
handle_cast(prompt, State) ->
{noreply, State, ?MIN_TIMEOUT};
handle_cast({roll_levelzero, LevelZero, LevelMinus1, LedgerSQN, PCL}, State) ->
SW = os:timestamp(),
{NewL0, Size, MaxSQN} = leveled_penciller:roll_new_tree(LevelZero,
LevelMinus1,
LedgerSQN),
ok = leveled_penciller:pcl_updatelevelzero(PCL, NewL0, Size, MaxSQN),
io:format("Rolled tree to size ~w in ~w microseconds~n",
[Size, timer:now_diff(os:timestamp(), SW)]),
{noreply, State}.
{noreply, State, ?MIN_TIMEOUT}.
handle_info(timeout, State=#state{change_pending=Pnd}) when Pnd == false ->
if
State#state.merge_clerk ->
case requestandhandle_work(State) of
{false, Timeout} ->
{noreply, State, Timeout};
@ -172,9 +145,9 @@ handle_info(timeout, State=#state{change_pending=Pnd}) when Pnd == false ->
% change
{noreply,
State#state{change_pending=true, work_item=WI}}
end
end.
terminate(Reason, _State) ->
io:format("Penciller's Clerk ~w shutdown now complete for reason ~w~n",
[self(), Reason]).

View file

@ -185,21 +185,29 @@
%% path. However, an ETS table is mutable, so it does complicate the
%% snapshotting of the Ledger.
%%
%% Originally the solution had used an ETS tbale for insertion speed as the L0
%% Originally the solution had used an ETS table for insertion speed as the L0
%% cache. Insertion speed was an order or magnitude faster than gb_trees. To
%% resolving issues of trying to have fast start-up snapshots though led to
%% keeping a seperate set of trees alongside the ETS table to be used by
%% snapshots.
%%
%% The current strategy is to perform the expensive operation (merging the
%% The next strategy was to perform the expensive operation (merging the
%% Ledger cache into the Level0 cache), within a dedicated Penciller's clerk,
%% known as the roll_clerk. This may take 30-40ms, but during this period
%% the Penciller will keep a Level -1 cache of the unmerged elements which
%% it will wipe once the roll_clerk returns with an updated L0 cache.
%%
%% This means that the in-memory cache is now using immutable objects, and
%% so can be shared with snapshots.
%% This was still a bit complicated, and did a lot of processing to
%% making updates to the large L0 cache - which will have created a lot of GC
%% effort required. The processing was inefficient
%%
%% The current paproach is to simply append each new tree pushed to a list, and
%% use an array of hashes to index for the presence of objects in the list.
%% When attempting to iterate, the caches are all merged for the range relevant
%% to the given iterator only. The main downside to the approahc is that the
%% Penciller cna no longer accurately measure the size of the L0 cache (as it
%% cannot determine how many replacements there are in the Cache - so it may
%% prematurely write a smaller than necessary L0 file.
-module(leveled_penciller).
@ -224,11 +232,8 @@
pcl_close/1,
pcl_registersnapshot/2,
pcl_releasesnapshot/2,
pcl_updatelevelzero/4,
pcl_loadsnapshot/2,
pcl_getstartupsequencenumber/1,
roll_new_tree/3,
roll_into_list/1,
clean_testdir/1]).
-include_lib("eunit/include/eunit.hrl").
@ -249,23 +254,21 @@
-record(state, {manifest = [] :: list(),
manifest_sqn = 0 :: integer(),
ledger_sqn = 0 :: integer(),
ledger_sqn = 0 :: integer(), % The highest SQN added to L0
persisted_sqn = 0 :: integer(), % The highest SQN persisted
registered_snapshots = [] :: list(),
unreferenced_files = [] :: list(),
root_path = "../test" :: string(),
merge_clerk :: pid(),
roll_clerk :: pid(),
clerk :: pid(),
levelzero_pending = false :: boolean(),
levelzero_constructor :: pid(),
levelzero_cache = gb_trees:empty() :: gb_trees:tree(),
levelzero_cachesize = 0 :: integer(),
levelzero_cache = [] :: list(), % a list of gb_trees
levelzero_index :: erlang:array(),
levelzero_size = 0 :: integer(),
levelzero_maxcachesize :: integer(),
levelminus1_active = false :: boolean(),
levelminus1_cache = gb_trees:empty() :: gb_trees:tree(),
is_snapshot = false :: boolean(),
snapshot_fully_loaded = false :: boolean(),
source_penciller :: pid(),
@ -317,8 +320,6 @@ pcl_releasesnapshot(Pid, Snapshot) ->
pcl_loadsnapshot(Pid, Increment) ->
gen_server:call(Pid, {load_snapshot, Increment}, infinity).
pcl_updatelevelzero(Pid, L0Cache, L0Size, L0SQN) ->
gen_server:cast(Pid, {load_levelzero, L0Cache, L0Size, L0SQN}).
pcl_close(Pid) ->
gen_server:call(Pid, close, 60000).
@ -335,7 +336,7 @@ init([PCLopts]) ->
SrcPenciller = PCLopts#penciller_options.source_penciller,
io:format("Registering ledger snapshot~n"),
{ok, State} = pcl_registersnapshot(SrcPenciller, self()),
io:format("Lesger snapshot registered~n"),
io:format("Ledger snapshot registered~n"),
{ok, State#state{is_snapshot=true, source_penciller=SrcPenciller}};
%% Need to do something about timeout
{_RootPath, false} ->
@ -351,37 +352,26 @@ handle_call({push_mem, PushedTree}, From, State=#state{is_snapshot=Snap})
% we mean value from the perspective of the Ledger, not the full value
% stored in the Inker)
%
% 2 - Check to see if the levelminus1 cache is active, if so the update
% cannot yet be accepted and must be returned (so the Bookie will maintain
% the cache and try again later).
%
% 3 - Check to see if there is a levelzero file pending. If so check if
% 2 - Check to see if there is a levelzero file pending. If so check if
% the levelzero file is complete. If it is complete, the levelzero tree
% can be flushed, the in-memory manifest updated, and the new tree can
% be accepted as the new levelzero cache. If not, the update must be
% returned.
%
% 3 - Make the new tree the levelminus1 cache, and mark this as active
%
% 4 - The Penciller can now reply to the Bookie to show that the push has
% 3 - The Penciller can now reply to the Bookie to show that the push has
% been accepted
%
% 5 - A background worker clerk can now be triggered to produce a new
% levelzero cache (tree) containing the level minus 1 tree. When this
% completes it will cast back the updated tree, and on receipt of this
% the Penciller may:
% a) Clear down the levelminus1 cache
% b) Determine if the cache is full and it is necessary to build a new
% persisted levelzero file
% 4 - Update the cache:
% a) Append the cache to the list
% b) Add hashes for all the elements to the index
%
% Check the approximate size of the cache. If it is over the maximum size,
% trigger a backgroun L0 file write and update state of levelzero_pending.
SW = os:timestamp(),
S = case {State#state.levelzero_pending,
State#state.levelminus1_active} of
{_, true} ->
log_pushmem_reply(From, {returned, "L-1 Active"}, SW),
State;
{true, _} ->
S = case State#state.levelzero_pending of
true ->
L0Pid = State#state.levelzero_constructor,
case checkready(L0Pid) of
timeout ->
@ -391,57 +381,59 @@ handle_call({push_mem, PushedTree}, From, State=#state{is_snapshot=Snap})
SW),
State;
{ok, SrcFN, StartKey, EndKey} ->
log_pushmem_reply(From, ok, SW),
log_pushmem_reply(From,
{ok,
"L-0 persist completed"},
SW),
ManEntry = #manifest_entry{start_key=StartKey,
end_key=EndKey,
owner=L0Pid,
filename=SrcFN},
% Prompt clerk to ask about work - do this for
% every L0 roll
ok = leveled_pclerk:mergeclerk_prompt(State#state.merge_clerk),
UpdMan = lists:keystore(0,
1,
State#state.manifest,
{0, [ManEntry]}),
{MinSQN, MaxSQN, Size, _L} = assess_sqn(PushedTree),
if
MinSQN > State#state.ledger_sqn ->
State#state{manifest=UpdMan,
levelzero_cache=PushedTree,
levelzero_cachesize=Size,
LedgerSQN = State#state.ledger_sqn,
UpdState = State#state{manifest=UpdMan,
levelzero_pending=false,
ledger_sqn=MaxSQN}
end
persisted_sqn=LedgerSQN},
% Prompt clerk to ask about work - do this for
% every L0 roll
ok = leveled_pclerk:clerk_prompt(State#state.clerk),
NewL0Index = leveled_pmem:new_index(),
update_levelzero(NewL0Index,
0,
PushedTree,
LedgerSQN,
[],
UpdState)
end;
{false, false} ->
log_pushmem_reply(From, ok, SW),
ok = leveled_pclerk:rollclerk_levelzero(State#state.roll_clerk,
State#state.levelzero_cache,
false ->
log_pushmem_reply(From, {ok, "L0 memory updated"}, SW),
update_levelzero(State#state.levelzero_index,
State#state.levelzero_size,
PushedTree,
State#state.ledger_sqn,
self()),
State#state{levelminus1_active=true,
levelminus1_cache=PushedTree}
State#state.levelzero_cache,
State)
end,
io:format("Handling of push completed in ~w microseconds with "
++ "L0 cache size now ~w~n",
[timer:now_diff(os:timestamp(), SW),
S#state.levelzero_cachesize]),
S#state.levelzero_size]),
{noreply, S};
handle_call({fetch, Key}, _From, State) ->
{reply,
fetch(Key,
fetch_mem(Key,
State#state.manifest,
State#state.levelminus1_active,
State#state.levelminus1_cache,
State#state.levelzero_index,
State#state.levelzero_cache),
State};
handle_call({check_sqn, Key, SQN}, _From, State) ->
{reply,
compare_to_sqn(fetch(Key,
compare_to_sqn(fetch_mem(Key,
State#state.manifest,
State#state.levelminus1_active,
State#state.levelminus1_cache,
State#state.levelzero_index,
State#state.levelzero_cache),
SQN),
State};
@ -449,7 +441,11 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc},
_From,
State=#state{snapshot_fully_loaded=Ready})
when Ready == true ->
L0iter = gb_trees:iterator_from(StartKey, State#state.levelzero_cache),
L0AsTree = leveled_pmem:merge_trees(StartKey,
EndKey,
State#state.levelzero_cache,
gb_trees:empty()),
L0iter = gb_trees:iterator(L0AsTree),
SFTiter = initiate_rangequery_frommanifest(StartKey,
EndKey,
State#state.manifest),
@ -459,29 +455,21 @@ handle_call(work_for_clerk, From, State) ->
{UpdState, Work} = return_work(State, From),
{reply, Work, UpdState};
handle_call(get_startup_sqn, _From, State) ->
{reply, State#state.ledger_sqn, State};
{reply, State#state.persisted_sqn, State};
handle_call({register_snapshot, Snapshot}, _From, State) ->
Rs = [{Snapshot, State#state.manifest_sqn}|State#state.registered_snapshots],
{reply, {ok, State}, State#state{registered_snapshots = Rs}};
handle_call({load_snapshot, BookieIncrTree}, _From, State) ->
{L0T0, _, L0SQN0} = case State#state.levelminus1_active of
true ->
roll_new_tree(State#state.levelzero_cache,
State#state.levelminus1_cache,
State#state.ledger_sqn);
false ->
{State#state.levelzero_cache,
0,
State#state.ledger_sqn}
end,
{L0T1, _, L0SQN1} = roll_new_tree(L0T0,
L0D = leveled_pmem:add_to_index(State#state.levelzero_index,
State#state.levelzero_size,
BookieIncrTree,
L0SQN0),
io:format("Ledger snapshot loaded with increments to start at SQN=~w~n",
[L0SQN1]),
{reply, ok, State#state{levelzero_cache=L0T1,
ledger_sqn=L0SQN1,
levelminus1_active=false,
State#state.ledger_sqn,
State#state.levelzero_cache),
{LedgerSQN, L0Size, L0Index, L0Cache} = L0D,
{reply, ok, State#state{levelzero_cache=L0Cache,
levelzero_index=L0Index,
levelzero_size=L0Size,
ledger_sqn=LedgerSQN,
snapshot_fully_loaded=true}};
handle_call(close, _From, State) ->
{stop, normal, ok, State}.
@ -489,7 +477,7 @@ handle_call(close, _From, State) ->
handle_cast({manifest_change, WI}, State) ->
{ok, UpdState} = commit_manifest_change(WI, State),
ok = leveled_pclerk:mergeclerk_manifestchange(State#state.merge_clerk,
ok = leveled_pclerk:clerk_manifestchange(State#state.clerk,
confirm,
false),
{noreply, UpdState};
@ -513,39 +501,9 @@ handle_cast({confirm_delete, FileName}, State=#state{is_snapshot=Snap})
{noreply, State#state{unreferenced_files=UF1}};
_ ->
{noreply, State}
end;
handle_cast({load_levelzero, L0Cache, L0Size, L0SQN}, State) ->
if
L0SQN >= State#state.ledger_sqn ->
CacheTooBig = L0Size > State#state.levelzero_maxcachesize,
Level0Free = length(get_item(0, State#state.manifest, [])) == 0,
case {CacheTooBig, Level0Free} of
{true, true} ->
L0Constructor = roll_memory(State, L0Cache),
{noreply,
State#state{levelminus1_active=false,
levelminus1_cache=gb_trees:empty(),
levelzero_cache=L0Cache,
levelzero_cachesize=L0Size,
levelzero_pending=true,
levelzero_constructor=L0Constructor,
ledger_sqn=L0SQN}};
_ ->
{noreply,
State#state{levelminus1_active=false,
levelminus1_cache=gb_trees:empty(),
levelzero_cache=L0Cache,
levelzero_cachesize=L0Size,
ledger_sqn=L0SQN}}
end;
L0Size == 0 ->
{noreply,
State#state{levelminus1_active=false,
levelminus1_cache=gb_trees:empty(),
levelzero_cache=L0Cache,
levelzero_cachesize=L0Size}}
end.
handle_info({_Ref, {ok, SrcFN, _StartKey, _EndKey}}, State) ->
io:format("Orphaned reply after timeout on L0 file write ~s~n", [SrcFN]),
{noreply, State}.
@ -573,14 +531,14 @@ terminate(Reason, State) ->
%% the penciller looking for a manifest commit
%%
io:format("Penciller closing for reason - ~w~n", [Reason]),
MC = leveled_pclerk:mergeclerk_manifestchange(State#state.merge_clerk,
MC = leveled_pclerk:clerk_manifestchange(State#state.clerk,
return,
true),
UpdState = case MC of
{ok, WI} ->
{ok, NewState} = commit_manifest_change(WI, State),
Clerk = State#state.merge_clerk,
ok = leveled_pclerk:mergeclerk_manifestchange(Clerk,
Clerk = State#state.clerk,
ok = leveled_pclerk:clerk_manifestchange(Clerk,
confirm,
true),
NewState;
@ -589,21 +547,18 @@ terminate(Reason, State) ->
end,
case {UpdState#state.levelzero_pending,
get_item(0, State#state.manifest, []),
gb_trees:size(State#state.levelzero_cache)} of
State#state.levelzero_size} of
{true, [], _} ->
ok = leveled_sft:sft_close(State#state.levelzero_constructor);
{false, [], 0} ->
io:format("Level 0 cache empty at close of Penciller~n");
{false, [], _N} ->
KL = roll_into_list(State#state.levelzero_cache),
L0Pid = roll_memory(UpdState, KL, true),
L0Pid = roll_memory(UpdState, State#state.levelzero_cache, true),
ok = leveled_sft:sft_close(L0Pid);
_ ->
io:format("No level zero action on close of Penciller~n")
end,
leveled_pclerk:rollclerk_close(State#state.roll_clerk),
% Tidy shutdown of individual files
ok = close_files(0, UpdState#state.manifest),
lists:foreach(fun({_FN, Pid, _SN}) ->
@ -630,11 +585,10 @@ start_from_file(PCLopts) ->
M
end,
{ok, MergeClerk} = leveled_pclerk:clerk_new(self(), merge),
{ok, RollClerk} = leveled_pclerk:clerk_new(self(), roll),
InitState = #state{merge_clerk=MergeClerk,
roll_clerk=RollClerk,
{ok, MergeClerk} = leveled_pclerk:clerk_new(self()),
InitState = #state{clerk=MergeClerk,
root_path=RootPath,
levelzero_index = leveled_pmem:new_index(),
levelzero_maxcachesize=MaxTableSize},
%% Open manifest
@ -701,23 +655,61 @@ start_from_file(PCLopts) ->
{0, [ManifestEntry]}),
io:format("L0 file had maximum sequence number of ~w~n",
[L0SQN]),
LedgerSQN = max(MaxSQN, L0SQN),
{ok,
InitState#state{manifest=UpdManifest2,
manifest_sqn=TopManSQN,
ledger_sqn=max(MaxSQN, L0SQN)}};
ledger_sqn=LedgerSQN,
persisted_sqn=LedgerSQN}};
false ->
io:format("No L0 file found~n"),
{ok,
InitState#state{manifest=UpdManifest,
manifest_sqn=TopManSQN,
ledger_sqn=MaxSQN}}
ledger_sqn=MaxSQN,
persisted_sqn=MaxSQN}}
end.
log_pushmem_reply(From, Reply, SW) ->
io:format("Respone to push_mem of ~w took ~w microseconds~n",
[Reply, timer:now_diff(os:timestamp(), SW)]),
gen_server:reply(From, Reply).
io:format("Respone to push_mem of ~w ~s took ~w microseconds~n",
[element(1, Reply),
element(2, Reply),
timer:now_diff(os:timestamp(), SW)]),
gen_server:reply(From, element(1, Reply)).
update_levelzero(L0Index, L0Size, PushedTree, LedgerSQN, L0Cache, State) ->
Update = leveled_pmem:add_to_index(L0Index,
L0Size,
PushedTree,
LedgerSQN,
L0Cache),
{MaxSQN, NewL0Size, UpdL0Index, UpdL0Cache} = Update,
if
MaxSQN >= LedgerSQN ->
UpdState = State#state{levelzero_cache=UpdL0Cache,
levelzero_index=UpdL0Index,
levelzero_size=NewL0Size,
ledger_sqn=MaxSQN},
CacheTooBig = NewL0Size > State#state.levelzero_maxcachesize,
Level0Free = length(get_item(0, State#state.manifest, [])) == 0,
case {CacheTooBig, Level0Free} of
{true, true} ->
L0Constructor = roll_memory(State, UpdL0Cache),
UpdState#state{levelzero_pending=true,
levelzero_constructor=L0Constructor};
_ ->
UpdState
end;
NewL0Size == L0Size ->
State#state{levelzero_cache=L0Cache,
levelzero_index=L0Index,
levelzero_size=L0Size,
ledger_sqn=LedgerSQN}
end.
checkready(Pid) ->
try
@ -727,71 +719,28 @@ checkready(Pid) ->
timeout
end.
roll_memory(State, L0Tree) ->
roll_memory(State, L0Tree, false).
roll_memory(State, L0Cache) ->
roll_memory(State, L0Cache, false).
roll_memory(State, L0Tree, Wait) ->
roll_memory(State, L0Cache, Wait) ->
MSN = State#state.manifest_sqn,
FileName = State#state.root_path
++ "/" ++ ?FILES_FP ++ "/"
++ integer_to_list(MSN) ++ "_0_0",
Opts = #sft_options{wait=Wait},
{ok, Constructor, _} = leveled_sft:sft_new(FileName, L0Tree, [], 0, Opts),
{ok, Constructor, _} = leveled_sft:sft_newfroml0cache(FileName,
L0Cache,
Opts),
Constructor.
% Merge the Level Minus 1 tree to the Level 0 tree, incrementing the
% SQN, and ensuring all entries do increment the SQN
roll_new_tree(L0Cache, LMinus1Cache, LedgerSQN) ->
{MinSQN, MaxSQN, Size, LMinus1List} = assess_sqn(LMinus1Cache),
if
MinSQN >= LedgerSQN ->
UpdTree = lists:foldl(fun({Kx, Vx}, TreeAcc) ->
gb_trees:enter(Kx, Vx, TreeAcc)
end,
L0Cache,
LMinus1List),
{UpdTree, gb_trees:size(UpdTree), MaxSQN};
Size == 0 ->
{L0Cache, gb_trees:size(L0Cache), LedgerSQN}
end.
%% This takes the three parts of a memtable copy - the increments, the tree
%% and the SQN at which the tree was formed, and outputs a sorted list
roll_into_list(Tree) ->
gb_trees:to_list(Tree).
assess_sqn(Tree) ->
L = roll_into_list(Tree),
FoldFun = fun(KV, {AccMinSQN, AccMaxSQN, AccSize}) ->
SQN = leveled_codec:strip_to_seqonly(KV),
{min(SQN, AccMinSQN), max(SQN, AccMaxSQN), AccSize + 1}
end,
{MinSQN, MaxSQN, Size} = lists:foldl(FoldFun, {infinity, 0, 0}, L),
{MinSQN, MaxSQN, Size, L}.
fetch(Key, Manifest, LM1Active, LM1Tree, L0Tree) ->
case LM1Active of
true ->
case gb_trees:lookup(Key, LM1Tree) of
none ->
case gb_trees:lookup(Key, L0Tree) of
none ->
fetch_mem(Key, Manifest, L0Index, L0Cache) ->
L0Check = leveled_pmem:check_levelzero(Key, L0Index, L0Cache),
case L0Check of
{false, not_found} ->
fetch(Key, Manifest, 0, fun leveled_sft:sft_get/2);
{value, Value} ->
{Key, Value}
end;
{value, Value} ->
{Key, Value}
end;
false ->
case gb_trees:lookup(Key, L0Tree) of
none ->
fetch(Key, Manifest, 0, fun leveled_sft:sft_get/2);
{value, Value} ->
{Key, Value}
end
{true, KV} ->
KV
end.
fetch(_Key, _Manifest, ?MAX_LEVELS + 1, _FetchFun) ->
@ -1375,7 +1324,7 @@ maybe_pause_push(PCL, KL) ->
T0,
KL),
case pcl_pushmem(PCL, T1) of
{returned, _Reason} ->
returned ->
timer:sleep(50),
maybe_pause_push(PCL, KL);
ok ->
@ -1402,7 +1351,6 @@ simple_server_test() ->
ok = maybe_pause_push(PCL, [Key2]),
?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001", null})),
?assertMatch(Key2, pcl_fetch(PCL, {o,"Bucket0002", "Key0002", null})),
?assertMatch(Key2, pcl_fetch(PCL, {o,"Bucket0002", "Key0002", null})),
ok = maybe_pause_push(PCL, KL2),
?assertMatch(Key2, pcl_fetch(PCL, {o,"Bucket0002", "Key0002", null})),
@ -1686,11 +1634,12 @@ foldwithimm_simple_test() ->
create_file_test() ->
Filename = "../test/new_file.sft",
ok = file:write_file(Filename, term_to_binary("hello")),
{KL1, KL2} = {lists:sort(leveled_sft:generate_randomkeys(10000)), []},
{ok, SP, noreply} = leveled_sft:sft_new(Filename,
KL1,
KL2,
0,
KVL = lists:usort(leveled_sft:generate_randomkeys(10000)),
Tree = gb_trees:from_orddict(KVL),
{ok,
SP,
noreply} = leveled_sft:sft_newfroml0cache(Filename,
[Tree],
#sft_options{wait=false}),
lists:foreach(fun(X) ->
case checkready(SP) of

View file

@ -41,7 +41,7 @@
to_list/1,
new_index/0,
check_levelzero/3,
merge_trees/3
merge_trees/4
]).
-include_lib("eunit/include/eunit.hrl").
@ -69,7 +69,7 @@ add_to_index(L0Index, L0Size, LevelMinus1, LedgerSQN, TreeList) ->
{infinity, 0, L0Index},
LM1List),
NewL0Size = length(LM1List) + L0Size,
io:format("Rolled tree to size ~w in ~w microseconds~n",
io:format("Rolled L0 cache to size ~w in ~w microseconds~n",
[NewL0Size, timer:now_diff(os:timestamp(), SW)]),
if
MinSQN > LedgerSQN ->
@ -84,11 +84,11 @@ to_list(TreeList) ->
SW = os:timestamp(),
OutList = lists:foldr(fun(Tree, CompleteList) ->
L = gb_trees:to_list(Tree),
lists:umerge(CompleteList, L)
lists:ukeymerge(1, CompleteList, L)
end,
[],
TreeList),
io:format("Rolled tree to list of size ~w in ~w microseconds~n",
io:format("L0 cache converted to list of size ~w in ~w microseconds~n",
[length(OutList), timer:now_diff(os:timestamp(), SW)]),
OutList.
@ -128,11 +128,11 @@ check_levelzero(Key, L0Index, TreeList) ->
lists:reverse(lists:usort(SlotList))).
merge_trees(StartKey, EndKey, TreeList) ->
merge_trees(StartKey, EndKey, TreeList, LevelMinus1) ->
lists:foldl(fun(Tree, TreeAcc) ->
merge_nexttree(Tree, TreeAcc, StartKey, EndKey) end,
gb_trees:empty(),
TreeList).
lists:append(TreeList, [LevelMinus1])).
%%%============================================================================
%%% Internal Functions
@ -254,7 +254,7 @@ compare_method_test() ->
"size ~w~n",
[timer:now_diff(os:timestamp(), SWa), Sz0]),
SWb = os:timestamp(),
Q1 = merge_trees(StartKey, EndKey, TreeList),
Q1 = merge_trees(StartKey, EndKey, TreeList, gb_trees:empty()),
Sz1 = gb_trees:size(Q1),
io:format("Merge method took ~w microseconds resulting in tree of " ++
"size ~w~n",

View file

@ -152,7 +152,7 @@
terminate/2,
code_change/3,
sft_new/4,
sft_new/5,
sft_newfroml0cache/3,
sft_open/1,
sft_get/2,
sft_getkvrange/4,
@ -211,10 +211,8 @@
%%% API
%%%============================================================================
sft_new(Filename, KL1, KL2, LevelInfo) ->
sft_new(Filename, KL1, KL2, LevelInfo, #sft_options{}).
sft_new(Filename, KL1, KL2, LevelInfo, Options) ->
sft_new(Filename, KL1, KL2, LevelInfo) ->
LevelR = case is_integer(LevelInfo) of
true ->
#level{level=LevelInfo};
@ -225,15 +223,30 @@ sft_new(Filename, KL1, KL2, LevelInfo, Options) ->
end
end,
{ok, Pid} = gen_server:start(?MODULE, [], []),
case Options#sft_options.wait of
true ->
Reply = gen_server:call(Pid,
{sft_new, Filename, KL1, KL2, LevelR},
infinity),
{ok, Pid, Reply}.
sft_newfroml0cache(Filename, L0Cache, Options) ->
{ok, Pid} = gen_server:start(?MODULE, [], []),
case Options#sft_options.wait of
true ->
KL1 = leveled_pmem:to_list(L0Cache),
Reply = gen_server:call(Pid,
{sft_new,
Filename,
KL1,
[],
#level{level=0}},
infinity),
{ok, Pid, Reply};
false ->
gen_server:cast(Pid,
{sft_new, Filename, KL1, KL2, LevelR}),
{sft_newfromcache,
Filename,
L0Cache,
#level{level=0}}),
{ok, Pid, noreply}
end.
@ -342,9 +355,10 @@ handle_call({set_for_delete, Penciller}, _From, State) ->
handle_call(get_maxsqn, _From, State) ->
statecheck_onreply(State#state.highest_sqn, State).
handle_cast({sft_new, Filename, Inp1, [], _LevelR=#level{level=L}}, _State)
when L == 0->
handle_cast({sft_newfromcache, Filename, L0Cache, _LevelR=#level{level=L}},
_State) when L == 0->
SW = os:timestamp(),
Inp1 = leveled_pmem:to_list(L0Cache),
{ok, State} = create_levelzero(Inp1, Filename),
io:format("File creation of L0 file ~s took ~w microseconds~n",
[Filename, timer:now_diff(os:timestamp(), SW)]),