From 95609702bda09966cfc31f0918db5c053dae0b82 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Sun, 30 Oct 2016 18:25:30 +0000 Subject: [PATCH] Penciller Memory Refactor Plugged the ne wpencille rmemory into the Penciller, and took advantage of the increased speed to simplify the callbacks involved. The outcome is much simpler code --- src/leveled_bookie.erl | 2 +- src/leveled_inker.erl | 2 +- src/leveled_pclerk.erl | 73 +++----- src/leveled_penciller.erl | 371 ++++++++++++++++---------------------- src/leveled_pmem.erl | 14 +- src/leveled_sft.erl | 30 ++- 6 files changed, 214 insertions(+), 278 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 206e859..50a886d 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -645,7 +645,7 @@ maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> case leveled_penciller:pcl_pushmem(Penciller, Cache) of ok -> {ok, gb_trees:empty()}; - {returned, _Reason} -> + returned -> {ok, Cache} end; true -> diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 1c8ed17..f3b3075 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -645,7 +645,7 @@ push_to_penciller(Penciller, KeyTree) -> % the list by order before becoming a de-duplicated list for loading R = leveled_penciller:pcl_pushmem(Penciller, KeyTree), case R of - {returned, _Reason} -> + returned -> timer:sleep(?LOADING_PAUSE), push_to_penciller(Penciller, KeyTree); ok -> diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index 792acb4..635d661 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -59,11 +59,9 @@ handle_cast/2, handle_info/2, terminate/2, - clerk_new/2, - mergeclerk_prompt/1, - mergeclerk_manifestchange/3, - rollclerk_levelzero/5, - rollclerk_close/1, + clerk_new/1, + clerk_prompt/1, + clerk_manifestchange/3, code_change/3]). -include_lib("eunit/include/eunit.hrl"). @@ -73,32 +71,25 @@ -record(state, {owner :: pid(), change_pending=false :: boolean(), - work_item :: #penciller_work{}|null, - merge_clerk = false :: boolean(), - roll_clerk = false ::boolean()}). + work_item :: #penciller_work{}|null}). %%%============================================================================ %%% API %%%============================================================================ -clerk_new(Owner, Type) -> +clerk_new(Owner) -> {ok, Pid} = gen_server:start(?MODULE, [], []), - ok = gen_server:call(Pid, {register, Owner, Type}, infinity), + ok = gen_server:call(Pid, {register, Owner}, infinity), io:format("Penciller's clerk ~w started with owner ~w~n", [Pid, Owner]), {ok, Pid}. -mergeclerk_manifestchange(Pid, Action, Closing) -> +clerk_manifestchange(Pid, Action, Closing) -> gen_server:call(Pid, {manifest_change, Action, Closing}, infinity). -mergeclerk_prompt(Pid) -> +clerk_prompt(Pid) -> gen_server:cast(Pid, prompt). -rollclerk_levelzero(Pid, LevelZero, LevelMinus1, LedgerSQN, PCL) -> - gen_server:cast(Pid, - {roll_levelzero, LevelZero, LevelMinus1, LedgerSQN, PCL}). -rollclerk_close(Pid) -> - gen_server:call(Pid, close, infinity). %%%============================================================================ %%% gen_server callbacks @@ -107,18 +98,11 @@ rollclerk_close(Pid) -> init([]) -> {ok, #state{}}. -handle_call({register, Owner, Type}, _From, State) -> - case Type of - merge -> - {reply, - ok, - State#state{owner=Owner, merge_clerk = true}, - ?MIN_TIMEOUT}; - roll -> - {reply, - ok, - State#state{owner=Owner, roll_clerk = true}} - end; +handle_call({register, Owner}, _From, State) -> + {reply, + ok, + State#state{owner=Owner}, + ?MIN_TIMEOUT}; handle_call({manifest_change, return, true}, _From, State) -> io:format("Request for manifest change from clerk on closing~n"), case State#state.change_pending of @@ -150,31 +134,20 @@ handle_call(close, _From, State) -> {stop, normal, ok, State}. handle_cast(prompt, State) -> - {noreply, State, ?MIN_TIMEOUT}; -handle_cast({roll_levelzero, LevelZero, LevelMinus1, LedgerSQN, PCL}, State) -> - SW = os:timestamp(), - {NewL0, Size, MaxSQN} = leveled_penciller:roll_new_tree(LevelZero, - LevelMinus1, - LedgerSQN), - ok = leveled_penciller:pcl_updatelevelzero(PCL, NewL0, Size, MaxSQN), - io:format("Rolled tree to size ~w in ~w microseconds~n", - [Size, timer:now_diff(os:timestamp(), SW)]), - {noreply, State}. + {noreply, State, ?MIN_TIMEOUT}. handle_info(timeout, State=#state{change_pending=Pnd}) when Pnd == false -> - if - State#state.merge_clerk -> - case requestandhandle_work(State) of - {false, Timeout} -> - {noreply, State, Timeout}; - {true, WI} -> - % No timeout now as will wait for call to return manifest - % change - {noreply, - State#state{change_pending=true, work_item=WI}} - end + case requestandhandle_work(State) of + {false, Timeout} -> + {noreply, State, Timeout}; + {true, WI} -> + % No timeout now as will wait for call to return manifest + % change + {noreply, + State#state{change_pending=true, work_item=WI}} end. + terminate(Reason, _State) -> io:format("Penciller's Clerk ~w shutdown now complete for reason ~w~n", [self(), Reason]). diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 8522a30..17554d9 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -185,21 +185,29 @@ %% path. However, an ETS table is mutable, so it does complicate the %% snapshotting of the Ledger. %% -%% Originally the solution had used an ETS tbale for insertion speed as the L0 +%% Originally the solution had used an ETS table for insertion speed as the L0 %% cache. Insertion speed was an order or magnitude faster than gb_trees. To %% resolving issues of trying to have fast start-up snapshots though led to %% keeping a seperate set of trees alongside the ETS table to be used by %% snapshots. %% -%% The current strategy is to perform the expensive operation (merging the +%% The next strategy was to perform the expensive operation (merging the %% Ledger cache into the Level0 cache), within a dedicated Penciller's clerk, %% known as the roll_clerk. This may take 30-40ms, but during this period %% the Penciller will keep a Level -1 cache of the unmerged elements which %% it will wipe once the roll_clerk returns with an updated L0 cache. %% -%% This means that the in-memory cache is now using immutable objects, and -%% so can be shared with snapshots. - +%% This was still a bit complicated, and did a lot of processing to +%% making updates to the large L0 cache - which will have created a lot of GC +%% effort required. The processing was inefficient +%% +%% The current paproach is to simply append each new tree pushed to a list, and +%% use an array of hashes to index for the presence of objects in the list. +%% When attempting to iterate, the caches are all merged for the range relevant +%% to the given iterator only. The main downside to the approahc is that the +%% Penciller cna no longer accurately measure the size of the L0 cache (as it +%% cannot determine how many replacements there are in the Cache - so it may +%% prematurely write a smaller than necessary L0 file. -module(leveled_penciller). @@ -224,11 +232,8 @@ pcl_close/1, pcl_registersnapshot/2, pcl_releasesnapshot/2, - pcl_updatelevelzero/4, pcl_loadsnapshot/2, pcl_getstartupsequencenumber/1, - roll_new_tree/3, - roll_into_list/1, clean_testdir/1]). -include_lib("eunit/include/eunit.hrl"). @@ -249,23 +254,21 @@ -record(state, {manifest = [] :: list(), manifest_sqn = 0 :: integer(), - ledger_sqn = 0 :: integer(), + ledger_sqn = 0 :: integer(), % The highest SQN added to L0 + persisted_sqn = 0 :: integer(), % The highest SQN persisted registered_snapshots = [] :: list(), unreferenced_files = [] :: list(), root_path = "../test" :: string(), - merge_clerk :: pid(), - roll_clerk :: pid(), + clerk :: pid(), levelzero_pending = false :: boolean(), levelzero_constructor :: pid(), - levelzero_cache = gb_trees:empty() :: gb_trees:tree(), - levelzero_cachesize = 0 :: integer(), + levelzero_cache = [] :: list(), % a list of gb_trees + levelzero_index :: erlang:array(), + levelzero_size = 0 :: integer(), levelzero_maxcachesize :: integer(), - levelminus1_active = false :: boolean(), - levelminus1_cache = gb_trees:empty() :: gb_trees:tree(), - is_snapshot = false :: boolean(), snapshot_fully_loaded = false :: boolean(), source_penciller :: pid(), @@ -317,8 +320,6 @@ pcl_releasesnapshot(Pid, Snapshot) -> pcl_loadsnapshot(Pid, Increment) -> gen_server:call(Pid, {load_snapshot, Increment}, infinity). -pcl_updatelevelzero(Pid, L0Cache, L0Size, L0SQN) -> - gen_server:cast(Pid, {load_levelzero, L0Cache, L0Size, L0SQN}). pcl_close(Pid) -> gen_server:call(Pid, close, 60000). @@ -335,7 +336,7 @@ init([PCLopts]) -> SrcPenciller = PCLopts#penciller_options.source_penciller, io:format("Registering ledger snapshot~n"), {ok, State} = pcl_registersnapshot(SrcPenciller, self()), - io:format("Lesger snapshot registered~n"), + io:format("Ledger snapshot registered~n"), {ok, State#state{is_snapshot=true, source_penciller=SrcPenciller}}; %% Need to do something about timeout {_RootPath, false} -> @@ -351,37 +352,26 @@ handle_call({push_mem, PushedTree}, From, State=#state{is_snapshot=Snap}) % we mean value from the perspective of the Ledger, not the full value % stored in the Inker) % - % 2 - Check to see if the levelminus1 cache is active, if so the update - % cannot yet be accepted and must be returned (so the Bookie will maintain - % the cache and try again later). - % - % 3 - Check to see if there is a levelzero file pending. If so check if + % 2 - Check to see if there is a levelzero file pending. If so check if % the levelzero file is complete. If it is complete, the levelzero tree % can be flushed, the in-memory manifest updated, and the new tree can % be accepted as the new levelzero cache. If not, the update must be % returned. % - % 3 - Make the new tree the levelminus1 cache, and mark this as active - % - % 4 - The Penciller can now reply to the Bookie to show that the push has + % 3 - The Penciller can now reply to the Bookie to show that the push has % been accepted % - % 5 - A background worker clerk can now be triggered to produce a new - % levelzero cache (tree) containing the level minus 1 tree. When this - % completes it will cast back the updated tree, and on receipt of this - % the Penciller may: - % a) Clear down the levelminus1 cache - % b) Determine if the cache is full and it is necessary to build a new - % persisted levelzero file + % 4 - Update the cache: + % a) Append the cache to the list + % b) Add hashes for all the elements to the index + % + % Check the approximate size of the cache. If it is over the maximum size, + % trigger a backgroun L0 file write and update state of levelzero_pending. SW = os:timestamp(), - S = case {State#state.levelzero_pending, - State#state.levelminus1_active} of - {_, true} -> - log_pushmem_reply(From, {returned, "L-1 Active"}, SW), - State; - {true, _} -> + S = case State#state.levelzero_pending of + true -> L0Pid = State#state.levelzero_constructor, case checkready(L0Pid) of timeout -> @@ -391,65 +381,71 @@ handle_call({push_mem, PushedTree}, From, State=#state{is_snapshot=Snap}) SW), State; {ok, SrcFN, StartKey, EndKey} -> - log_pushmem_reply(From, ok, SW), + log_pushmem_reply(From, + {ok, + "L-0 persist completed"}, + SW), ManEntry = #manifest_entry{start_key=StartKey, end_key=EndKey, owner=L0Pid, filename=SrcFN}, - % Prompt clerk to ask about work - do this for - % every L0 roll - ok = leveled_pclerk:mergeclerk_prompt(State#state.merge_clerk), UpdMan = lists:keystore(0, 1, State#state.manifest, {0, [ManEntry]}), - {MinSQN, MaxSQN, Size, _L} = assess_sqn(PushedTree), - if - MinSQN > State#state.ledger_sqn -> - State#state{manifest=UpdMan, - levelzero_cache=PushedTree, - levelzero_cachesize=Size, + LedgerSQN = State#state.ledger_sqn, + UpdState = State#state{manifest=UpdMan, levelzero_pending=false, - ledger_sqn=MaxSQN} - end + persisted_sqn=LedgerSQN}, + % Prompt clerk to ask about work - do this for + % every L0 roll + ok = leveled_pclerk:clerk_prompt(State#state.clerk), + NewL0Index = leveled_pmem:new_index(), + update_levelzero(NewL0Index, + 0, + PushedTree, + LedgerSQN, + [], + UpdState) end; - {false, false} -> - log_pushmem_reply(From, ok, SW), - ok = leveled_pclerk:rollclerk_levelzero(State#state.roll_clerk, - State#state.levelzero_cache, - PushedTree, - State#state.ledger_sqn, - self()), - State#state{levelminus1_active=true, - levelminus1_cache=PushedTree} + false -> + log_pushmem_reply(From, {ok, "L0 memory updated"}, SW), + update_levelzero(State#state.levelzero_index, + State#state.levelzero_size, + PushedTree, + State#state.ledger_sqn, + State#state.levelzero_cache, + State) end, io:format("Handling of push completed in ~w microseconds with " ++ "L0 cache size now ~w~n", [timer:now_diff(os:timestamp(), SW), - S#state.levelzero_cachesize]), + S#state.levelzero_size]), {noreply, S}; handle_call({fetch, Key}, _From, State) -> {reply, - fetch(Key, - State#state.manifest, - State#state.levelminus1_active, - State#state.levelminus1_cache, - State#state.levelzero_cache), + fetch_mem(Key, + State#state.manifest, + State#state.levelzero_index, + State#state.levelzero_cache), State}; handle_call({check_sqn, Key, SQN}, _From, State) -> {reply, - compare_to_sqn(fetch(Key, - State#state.manifest, - State#state.levelminus1_active, - State#state.levelminus1_cache, - State#state.levelzero_cache), + compare_to_sqn(fetch_mem(Key, + State#state.manifest, + State#state.levelzero_index, + State#state.levelzero_cache), SQN), State}; handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc}, _From, State=#state{snapshot_fully_loaded=Ready}) when Ready == true -> - L0iter = gb_trees:iterator_from(StartKey, State#state.levelzero_cache), + L0AsTree = leveled_pmem:merge_trees(StartKey, + EndKey, + State#state.levelzero_cache, + gb_trees:empty()), + L0iter = gb_trees:iterator(L0AsTree), SFTiter = initiate_rangequery_frommanifest(StartKey, EndKey, State#state.manifest), @@ -459,39 +455,31 @@ handle_call(work_for_clerk, From, State) -> {UpdState, Work} = return_work(State, From), {reply, Work, UpdState}; handle_call(get_startup_sqn, _From, State) -> - {reply, State#state.ledger_sqn, State}; + {reply, State#state.persisted_sqn, State}; handle_call({register_snapshot, Snapshot}, _From, State) -> Rs = [{Snapshot, State#state.manifest_sqn}|State#state.registered_snapshots], {reply, {ok, State}, State#state{registered_snapshots = Rs}}; handle_call({load_snapshot, BookieIncrTree}, _From, State) -> - {L0T0, _, L0SQN0} = case State#state.levelminus1_active of - true -> - roll_new_tree(State#state.levelzero_cache, - State#state.levelminus1_cache, - State#state.ledger_sqn); - false -> - {State#state.levelzero_cache, - 0, - State#state.ledger_sqn} - end, - {L0T1, _, L0SQN1} = roll_new_tree(L0T0, + L0D = leveled_pmem:add_to_index(State#state.levelzero_index, + State#state.levelzero_size, BookieIncrTree, - L0SQN0), - io:format("Ledger snapshot loaded with increments to start at SQN=~w~n", - [L0SQN1]), - {reply, ok, State#state{levelzero_cache=L0T1, - ledger_sqn=L0SQN1, - levelminus1_active=false, - snapshot_fully_loaded=true}}; + State#state.ledger_sqn, + State#state.levelzero_cache), + {LedgerSQN, L0Size, L0Index, L0Cache} = L0D, + {reply, ok, State#state{levelzero_cache=L0Cache, + levelzero_index=L0Index, + levelzero_size=L0Size, + ledger_sqn=LedgerSQN, + snapshot_fully_loaded=true}}; handle_call(close, _From, State) -> {stop, normal, ok, State}. handle_cast({manifest_change, WI}, State) -> {ok, UpdState} = commit_manifest_change(WI, State), - ok = leveled_pclerk:mergeclerk_manifestchange(State#state.merge_clerk, - confirm, - false), + ok = leveled_pclerk:clerk_manifestchange(State#state.clerk, + confirm, + false), {noreply, UpdState}; handle_cast({release_snapshot, Snapshot}, State) -> Rs = lists:keydelete(Snapshot, 1, State#state.registered_snapshots), @@ -513,39 +501,9 @@ handle_cast({confirm_delete, FileName}, State=#state{is_snapshot=Snap}) {noreply, State#state{unreferenced_files=UF1}}; _ -> {noreply, State} - end; -handle_cast({load_levelzero, L0Cache, L0Size, L0SQN}, State) -> - if - L0SQN >= State#state.ledger_sqn -> - CacheTooBig = L0Size > State#state.levelzero_maxcachesize, - Level0Free = length(get_item(0, State#state.manifest, [])) == 0, - case {CacheTooBig, Level0Free} of - {true, true} -> - L0Constructor = roll_memory(State, L0Cache), - {noreply, - State#state{levelminus1_active=false, - levelminus1_cache=gb_trees:empty(), - levelzero_cache=L0Cache, - levelzero_cachesize=L0Size, - levelzero_pending=true, - levelzero_constructor=L0Constructor, - ledger_sqn=L0SQN}}; - _ -> - {noreply, - State#state{levelminus1_active=false, - levelminus1_cache=gb_trees:empty(), - levelzero_cache=L0Cache, - levelzero_cachesize=L0Size, - ledger_sqn=L0SQN}} - end; - L0Size == 0 -> - {noreply, - State#state{levelminus1_active=false, - levelminus1_cache=gb_trees:empty(), - levelzero_cache=L0Cache, - levelzero_cachesize=L0Size}} end. + handle_info({_Ref, {ok, SrcFN, _StartKey, _EndKey}}, State) -> io:format("Orphaned reply after timeout on L0 file write ~s~n", [SrcFN]), {noreply, State}. @@ -573,37 +531,34 @@ terminate(Reason, State) -> %% the penciller looking for a manifest commit %% io:format("Penciller closing for reason - ~w~n", [Reason]), - MC = leveled_pclerk:mergeclerk_manifestchange(State#state.merge_clerk, - return, - true), + MC = leveled_pclerk:clerk_manifestchange(State#state.clerk, + return, + true), UpdState = case MC of {ok, WI} -> {ok, NewState} = commit_manifest_change(WI, State), - Clerk = State#state.merge_clerk, - ok = leveled_pclerk:mergeclerk_manifestchange(Clerk, - confirm, - true), + Clerk = State#state.clerk, + ok = leveled_pclerk:clerk_manifestchange(Clerk, + confirm, + true), NewState; no_change -> State end, case {UpdState#state.levelzero_pending, get_item(0, State#state.manifest, []), - gb_trees:size(State#state.levelzero_cache)} of + State#state.levelzero_size} of {true, [], _} -> ok = leveled_sft:sft_close(State#state.levelzero_constructor); {false, [], 0} -> io:format("Level 0 cache empty at close of Penciller~n"); {false, [], _N} -> - KL = roll_into_list(State#state.levelzero_cache), - L0Pid = roll_memory(UpdState, KL, true), + L0Pid = roll_memory(UpdState, State#state.levelzero_cache, true), ok = leveled_sft:sft_close(L0Pid); _ -> io:format("No level zero action on close of Penciller~n") end, - leveled_pclerk:rollclerk_close(State#state.roll_clerk), - % Tidy shutdown of individual files ok = close_files(0, UpdState#state.manifest), lists:foreach(fun({_FN, Pid, _SN}) -> @@ -630,11 +585,10 @@ start_from_file(PCLopts) -> M end, - {ok, MergeClerk} = leveled_pclerk:clerk_new(self(), merge), - {ok, RollClerk} = leveled_pclerk:clerk_new(self(), roll), - InitState = #state{merge_clerk=MergeClerk, - roll_clerk=RollClerk, + {ok, MergeClerk} = leveled_pclerk:clerk_new(self()), + InitState = #state{clerk=MergeClerk, root_path=RootPath, + levelzero_index = leveled_pmem:new_index(), levelzero_maxcachesize=MaxTableSize}, %% Open manifest @@ -701,23 +655,61 @@ start_from_file(PCLopts) -> {0, [ManifestEntry]}), io:format("L0 file had maximum sequence number of ~w~n", [L0SQN]), + LedgerSQN = max(MaxSQN, L0SQN), {ok, InitState#state{manifest=UpdManifest2, manifest_sqn=TopManSQN, - ledger_sqn=max(MaxSQN, L0SQN)}}; + ledger_sqn=LedgerSQN, + persisted_sqn=LedgerSQN}}; false -> io:format("No L0 file found~n"), {ok, InitState#state{manifest=UpdManifest, manifest_sqn=TopManSQN, - ledger_sqn=MaxSQN}} + ledger_sqn=MaxSQN, + persisted_sqn=MaxSQN}} end. log_pushmem_reply(From, Reply, SW) -> - io:format("Respone to push_mem of ~w took ~w microseconds~n", - [Reply, timer:now_diff(os:timestamp(), SW)]), - gen_server:reply(From, Reply). + io:format("Respone to push_mem of ~w ~s took ~w microseconds~n", + [element(1, Reply), + element(2, Reply), + timer:now_diff(os:timestamp(), SW)]), + gen_server:reply(From, element(1, Reply)). + + +update_levelzero(L0Index, L0Size, PushedTree, LedgerSQN, L0Cache, State) -> + Update = leveled_pmem:add_to_index(L0Index, + L0Size, + PushedTree, + LedgerSQN, + L0Cache), + {MaxSQN, NewL0Size, UpdL0Index, UpdL0Cache} = Update, + if + MaxSQN >= LedgerSQN -> + UpdState = State#state{levelzero_cache=UpdL0Cache, + levelzero_index=UpdL0Index, + levelzero_size=NewL0Size, + ledger_sqn=MaxSQN}, + CacheTooBig = NewL0Size > State#state.levelzero_maxcachesize, + Level0Free = length(get_item(0, State#state.manifest, [])) == 0, + case {CacheTooBig, Level0Free} of + {true, true} -> + L0Constructor = roll_memory(State, UpdL0Cache), + UpdState#state{levelzero_pending=true, + levelzero_constructor=L0Constructor}; + _ -> + UpdState + end; + NewL0Size == L0Size -> + State#state{levelzero_cache=L0Cache, + levelzero_index=L0Index, + levelzero_size=L0Size, + ledger_sqn=LedgerSQN} + end. + + checkready(Pid) -> try @@ -727,71 +719,28 @@ checkready(Pid) -> timeout end. -roll_memory(State, L0Tree) -> - roll_memory(State, L0Tree, false). +roll_memory(State, L0Cache) -> + roll_memory(State, L0Cache, false). -roll_memory(State, L0Tree, Wait) -> +roll_memory(State, L0Cache, Wait) -> MSN = State#state.manifest_sqn, FileName = State#state.root_path ++ "/" ++ ?FILES_FP ++ "/" ++ integer_to_list(MSN) ++ "_0_0", Opts = #sft_options{wait=Wait}, - {ok, Constructor, _} = leveled_sft:sft_new(FileName, L0Tree, [], 0, Opts), + {ok, Constructor, _} = leveled_sft:sft_newfroml0cache(FileName, + L0Cache, + Opts), Constructor. -% Merge the Level Minus 1 tree to the Level 0 tree, incrementing the -% SQN, and ensuring all entries do increment the SQN -roll_new_tree(L0Cache, LMinus1Cache, LedgerSQN) -> - {MinSQN, MaxSQN, Size, LMinus1List} = assess_sqn(LMinus1Cache), - if - MinSQN >= LedgerSQN -> - UpdTree = lists:foldl(fun({Kx, Vx}, TreeAcc) -> - gb_trees:enter(Kx, Vx, TreeAcc) - end, - L0Cache, - LMinus1List), - {UpdTree, gb_trees:size(UpdTree), MaxSQN}; - Size == 0 -> - {L0Cache, gb_trees:size(L0Cache), LedgerSQN} - end. - -%% This takes the three parts of a memtable copy - the increments, the tree -%% and the SQN at which the tree was formed, and outputs a sorted list -roll_into_list(Tree) -> - gb_trees:to_list(Tree). - -assess_sqn(Tree) -> - L = roll_into_list(Tree), - FoldFun = fun(KV, {AccMinSQN, AccMaxSQN, AccSize}) -> - SQN = leveled_codec:strip_to_seqonly(KV), - {min(SQN, AccMinSQN), max(SQN, AccMaxSQN), AccSize + 1} - end, - {MinSQN, MaxSQN, Size} = lists:foldl(FoldFun, {infinity, 0, 0}, L), - {MinSQN, MaxSQN, Size, L}. - - -fetch(Key, Manifest, LM1Active, LM1Tree, L0Tree) -> - case LM1Active of - true -> - case gb_trees:lookup(Key, LM1Tree) of - none -> - case gb_trees:lookup(Key, L0Tree) of - none -> - fetch(Key, Manifest, 0, fun leveled_sft:sft_get/2); - {value, Value} -> - {Key, Value} - end; - {value, Value} -> - {Key, Value} - end; - false -> - case gb_trees:lookup(Key, L0Tree) of - none -> - fetch(Key, Manifest, 0, fun leveled_sft:sft_get/2); - {value, Value} -> - {Key, Value} - end +fetch_mem(Key, Manifest, L0Index, L0Cache) -> + L0Check = leveled_pmem:check_levelzero(Key, L0Index, L0Cache), + case L0Check of + {false, not_found} -> + fetch(Key, Manifest, 0, fun leveled_sft:sft_get/2); + {true, KV} -> + KV end. fetch(_Key, _Manifest, ?MAX_LEVELS + 1, _FetchFun) -> @@ -1375,7 +1324,7 @@ maybe_pause_push(PCL, KL) -> T0, KL), case pcl_pushmem(PCL, T1) of - {returned, _Reason} -> + returned -> timer:sleep(50), maybe_pause_push(PCL, KL); ok -> @@ -1402,7 +1351,6 @@ simple_server_test() -> ok = maybe_pause_push(PCL, [Key2]), ?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001", null})), ?assertMatch(Key2, pcl_fetch(PCL, {o,"Bucket0002", "Key0002", null})), - ?assertMatch(Key2, pcl_fetch(PCL, {o,"Bucket0002", "Key0002", null})), ok = maybe_pause_push(PCL, KL2), ?assertMatch(Key2, pcl_fetch(PCL, {o,"Bucket0002", "Key0002", null})), @@ -1686,12 +1634,13 @@ foldwithimm_simple_test() -> create_file_test() -> Filename = "../test/new_file.sft", ok = file:write_file(Filename, term_to_binary("hello")), - {KL1, KL2} = {lists:sort(leveled_sft:generate_randomkeys(10000)), []}, - {ok, SP, noreply} = leveled_sft:sft_new(Filename, - KL1, - KL2, - 0, - #sft_options{wait=false}), + KVL = lists:usort(leveled_sft:generate_randomkeys(10000)), + Tree = gb_trees:from_orddict(KVL), + {ok, + SP, + noreply} = leveled_sft:sft_newfroml0cache(Filename, + [Tree], + #sft_options{wait=false}), lists:foreach(fun(X) -> case checkready(SP) of timeout -> diff --git a/src/leveled_pmem.erl b/src/leveled_pmem.erl index 9cf498b..ae7bf09 100644 --- a/src/leveled_pmem.erl +++ b/src/leveled_pmem.erl @@ -41,7 +41,7 @@ to_list/1, new_index/0, check_levelzero/3, - merge_trees/3 + merge_trees/4 ]). -include_lib("eunit/include/eunit.hrl"). @@ -69,7 +69,7 @@ add_to_index(L0Index, L0Size, LevelMinus1, LedgerSQN, TreeList) -> {infinity, 0, L0Index}, LM1List), NewL0Size = length(LM1List) + L0Size, - io:format("Rolled tree to size ~w in ~w microseconds~n", + io:format("Rolled L0 cache to size ~w in ~w microseconds~n", [NewL0Size, timer:now_diff(os:timestamp(), SW)]), if MinSQN > LedgerSQN -> @@ -84,11 +84,11 @@ to_list(TreeList) -> SW = os:timestamp(), OutList = lists:foldr(fun(Tree, CompleteList) -> L = gb_trees:to_list(Tree), - lists:umerge(CompleteList, L) + lists:ukeymerge(1, CompleteList, L) end, [], TreeList), - io:format("Rolled tree to list of size ~w in ~w microseconds~n", + io:format("L0 cache converted to list of size ~w in ~w microseconds~n", [length(OutList), timer:now_diff(os:timestamp(), SW)]), OutList. @@ -128,11 +128,11 @@ check_levelzero(Key, L0Index, TreeList) -> lists:reverse(lists:usort(SlotList))). -merge_trees(StartKey, EndKey, TreeList) -> +merge_trees(StartKey, EndKey, TreeList, LevelMinus1) -> lists:foldl(fun(Tree, TreeAcc) -> merge_nexttree(Tree, TreeAcc, StartKey, EndKey) end, gb_trees:empty(), - TreeList). + lists:append(TreeList, [LevelMinus1])). %%%============================================================================ %%% Internal Functions @@ -254,7 +254,7 @@ compare_method_test() -> "size ~w~n", [timer:now_diff(os:timestamp(), SWa), Sz0]), SWb = os:timestamp(), - Q1 = merge_trees(StartKey, EndKey, TreeList), + Q1 = merge_trees(StartKey, EndKey, TreeList, gb_trees:empty()), Sz1 = gb_trees:size(Q1), io:format("Merge method took ~w microseconds resulting in tree of " ++ "size ~w~n", diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index c9cfd7f..64b903c 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -152,7 +152,7 @@ terminate/2, code_change/3, sft_new/4, - sft_new/5, + sft_newfroml0cache/3, sft_open/1, sft_get/2, sft_getkvrange/4, @@ -211,10 +211,8 @@ %%% API %%%============================================================================ -sft_new(Filename, KL1, KL2, LevelInfo) -> - sft_new(Filename, KL1, KL2, LevelInfo, #sft_options{}). -sft_new(Filename, KL1, KL2, LevelInfo, Options) -> +sft_new(Filename, KL1, KL2, LevelInfo) -> LevelR = case is_integer(LevelInfo) of true -> #level{level=LevelInfo}; @@ -225,15 +223,30 @@ sft_new(Filename, KL1, KL2, LevelInfo, Options) -> end end, {ok, Pid} = gen_server:start(?MODULE, [], []), + Reply = gen_server:call(Pid, + {sft_new, Filename, KL1, KL2, LevelR}, + infinity), + {ok, Pid, Reply}. + +sft_newfroml0cache(Filename, L0Cache, Options) -> + {ok, Pid} = gen_server:start(?MODULE, [], []), case Options#sft_options.wait of true -> + KL1 = leveled_pmem:to_list(L0Cache), Reply = gen_server:call(Pid, - {sft_new, Filename, KL1, KL2, LevelR}, + {sft_new, + Filename, + KL1, + [], + #level{level=0}}, infinity), {ok, Pid, Reply}; false -> gen_server:cast(Pid, - {sft_new, Filename, KL1, KL2, LevelR}), + {sft_newfromcache, + Filename, + L0Cache, + #level{level=0}}), {ok, Pid, noreply} end. @@ -342,9 +355,10 @@ handle_call({set_for_delete, Penciller}, _From, State) -> handle_call(get_maxsqn, _From, State) -> statecheck_onreply(State#state.highest_sqn, State). -handle_cast({sft_new, Filename, Inp1, [], _LevelR=#level{level=L}}, _State) - when L == 0-> +handle_cast({sft_newfromcache, Filename, L0Cache, _LevelR=#level{level=L}}, + _State) when L == 0-> SW = os:timestamp(), + Inp1 = leveled_pmem:to_list(L0Cache), {ok, State} = create_levelzero(Inp1, Filename), io:format("File creation of L0 file ~s took ~w microseconds~n", [Filename, timer:now_diff(os:timestamp(), SW)]),