From 20cc17f9160711a30c7f0481f0fc491e81c3541d Mon Sep 17 00:00:00 2001 From: martinsumner Date: Thu, 27 Oct 2016 20:56:18 +0100 Subject: [PATCH] Penciller Refactor Removed o(100) lines of code by refactoring the Penciller to no longer use ETS tables. The code is less confusing, and probably not an awful lot slower. --- src/leveled_bookie.erl | 27 +- src/leveled_inker.erl | 20 +- src/leveled_pclerk.erl | 75 ++- src/leveled_penciller.erl | 929 +++++++++++++++----------------------- src/leveled_sft.erl | 36 +- 5 files changed, 465 insertions(+), 622 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 5808690..f89e399 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -268,7 +268,8 @@ init([Opts]) -> {ok, {Penciller, LedgerCache}, Inker} = book_snapshotstore(Bookie, self(), ?SNAPSHOT_TIMEOUT), - ok = leveled_penciller:pcl_loadsnapshot(Penciller, []), + ok = leveled_penciller:pcl_loadsnapshot(Penciller, + gb_trees:empty()), io:format("Snapshot starting with Pcl ~w Ink ~w~n", [Penciller, Inker]), {ok, #state{penciller=Penciller, @@ -431,11 +432,10 @@ bucket_stats(Penciller, LedgerCache, Bucket, Tag) -> source_penciller=Penciller}, {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), Folder = fun() -> - Increment = gb_trees:to_list(LedgerCache), io:format("Length of increment in snapshot is ~w~n", - [length(Increment)]), + [gb_trees:size(LedgerCache)]), ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, - {infinity, Increment}), + LedgerCache), StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, @@ -456,11 +456,10 @@ index_query(Penciller, LedgerCache, source_penciller=Penciller}, {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), Folder = fun() -> - Increment = gb_trees:to_list(LedgerCache), io:format("Length of increment in snapshot is ~w~n", - [length(Increment)]), + [gb_trees:size(LedgerCache)]), ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, - {infinity, Increment}), + LedgerCache), StartKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, IdxField, StartValue), EndKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, @@ -487,11 +486,10 @@ allkey_query(Penciller, LedgerCache, Tag) -> source_penciller=Penciller}, {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), Folder = fun() -> - Increment = gb_trees:to_list(LedgerCache), io:format("Length of increment in snapshot is ~w~n", - [length(Increment)]), + [gb_trees:size(LedgerCache)]), ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, - {infinity, Increment}), + LedgerCache), SK = leveled_codec:to_ledgerkey(null, null, Tag), EK = leveled_codec:to_ledgerkey(null, null, Tag), Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, @@ -648,13 +646,10 @@ maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize), if TimeToPush -> - Dump = gb_trees:to_list(Cache), - case leveled_penciller:pcl_pushmem(Penciller, Dump) of + case leveled_penciller:pcl_pushmem(Penciller, Cache) of ok -> {ok, gb_trees:empty()}; - pause -> - {pause, gb_trees:empty()}; - returned -> + {returned, _Reason} -> {ok, Cache} end; true -> @@ -794,7 +789,7 @@ multi_key_test() -> {ok, F2B} = book_riakget(Bookie1, B2, K2), ?assertMatch(F2B, Obj2), ok = book_close(Bookie1), - %% Now reopen the file, and confirm that a fetch is still possible + % Now reopen the file, and confirm that a fetch is still possible {ok, Bookie2} = book_start(#bookie_options{root_path=RootPath}), {ok, F1C} = book_riakget(Bookie2, B1, K1), ?assertMatch(F1C, Obj1), diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 7ef5cf4..b929c91 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -261,15 +261,15 @@ handle_call({load_pcl, StartSQN, FilterFun, Penciller}, _From, State) -> handle_call({register_snapshot, Requestor}, _From , State) -> Rs = [{Requestor, State#state.manifest_sqn}|State#state.registered_snapshots], - io:format("Inker snapshot ~w registered at SQN ~w~n", + io:format("Journal snapshot ~w registered at SQN ~w~n", [Requestor, State#state.manifest_sqn]), {reply, {State#state.manifest, State#state.active_journaldb}, State#state{registered_snapshots=Rs}}; handle_call({release_snapshot, Snapshot}, _From , State) -> Rs = lists:keydelete(Snapshot, 1, State#state.registered_snapshots), - io:format("Ledger snapshot ~w released~n", [Snapshot]), - io:format("Remaining ledger snapshots are ~w~n", [Rs]), + io:format("Journal snapshot ~w released~n", [Snapshot]), + io:format("Remaining journal snapshots are ~w~n", [Rs]), {reply, ok, State#state{registered_snapshots=Rs}}; handle_call({confirm_delete, ManSQN}, _From, State) -> Reply = lists:foldl(fun({_R, SnapSQN}, Bool) -> @@ -646,15 +646,12 @@ load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller, push_to_penciller(Penciller, KeyTree) -> % The push to penciller must start as a tree to correctly de-duplicate % the list by order before becoming a de-duplicated list for loading - KeyList = gb_trees:to_list(KeyTree), - R = leveled_penciller:pcl_pushmem(Penciller, KeyList), - if - R == pause -> - timer:sleep(?LOADING_PAUSE); - R == returned -> + R = leveled_penciller:pcl_pushmem(Penciller, KeyTree), + case R of + {returned, _Reason} -> timer:sleep(?LOADING_PAUSE), push_to_penciller(Penciller, KeyTree); - R == ok -> + ok -> ok end. @@ -742,8 +739,7 @@ initiate_penciller_snapshot(Bookie) -> {ok, {LedgerSnap, LedgerCache}, _} = leveled_bookie:book_snapshotledger(Bookie, self(), undefined), - ok = leveled_penciller:pcl_loadsnapshot(LedgerSnap, - gb_trees:to_list(LedgerCache)), + ok = leveled_penciller:pcl_loadsnapshot(LedgerSnap, LedgerCache), MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap), {LedgerSnap, MaxSQN}. diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index 7ccab3f..792acb4 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -59,9 +59,11 @@ handle_cast/2, handle_info/2, terminate/2, - clerk_new/1, - clerk_prompt/1, - clerk_manifestchange/3, + clerk_new/2, + mergeclerk_prompt/1, + mergeclerk_manifestchange/3, + rollclerk_levelzero/5, + rollclerk_close/1, code_change/3]). -include_lib("eunit/include/eunit.hrl"). @@ -71,24 +73,33 @@ -record(state, {owner :: pid(), change_pending=false :: boolean(), - work_item :: #penciller_work{}|null}). + work_item :: #penciller_work{}|null, + merge_clerk = false :: boolean(), + roll_clerk = false ::boolean()}). %%%============================================================================ %%% API %%%============================================================================ -clerk_new(Owner) -> +clerk_new(Owner, Type) -> {ok, Pid} = gen_server:start(?MODULE, [], []), - ok = gen_server:call(Pid, {register, Owner}, infinity), + ok = gen_server:call(Pid, {register, Owner, Type}, infinity), io:format("Penciller's clerk ~w started with owner ~w~n", [Pid, Owner]), {ok, Pid}. -clerk_manifestchange(Pid, Action, Closing) -> +mergeclerk_manifestchange(Pid, Action, Closing) -> gen_server:call(Pid, {manifest_change, Action, Closing}, infinity). -clerk_prompt(Pid) -> +mergeclerk_prompt(Pid) -> gen_server:cast(Pid, prompt). +rollclerk_levelzero(Pid, LevelZero, LevelMinus1, LedgerSQN, PCL) -> + gen_server:cast(Pid, + {roll_levelzero, LevelZero, LevelMinus1, LedgerSQN, PCL}). + +rollclerk_close(Pid) -> + gen_server:call(Pid, close, infinity). + %%%============================================================================ %%% gen_server callbacks %%%============================================================================ @@ -96,8 +107,18 @@ clerk_prompt(Pid) -> init([]) -> {ok, #state{}}. -handle_call({register, Owner}, _From, State) -> - {reply, ok, State#state{owner=Owner}, ?MIN_TIMEOUT}; +handle_call({register, Owner, Type}, _From, State) -> + case Type of + merge -> + {reply, + ok, + State#state{owner=Owner, merge_clerk = true}, + ?MIN_TIMEOUT}; + roll -> + {reply, + ok, + State#state{owner=Owner, roll_clerk = true}} + end; handle_call({manifest_change, return, true}, _From, State) -> io:format("Request for manifest change from clerk on closing~n"), case State#state.change_pending of @@ -124,20 +145,34 @@ handle_call({manifest_change, confirm, Closing}, From, State) -> {noreply, State#state{work_item=null, change_pending=false}, ?MIN_TIMEOUT} - end. + end; +handle_call(close, _From, State) -> + {stop, normal, ok, State}. handle_cast(prompt, State) -> - {noreply, State, ?MIN_TIMEOUT}. + {noreply, State, ?MIN_TIMEOUT}; +handle_cast({roll_levelzero, LevelZero, LevelMinus1, LedgerSQN, PCL}, State) -> + SW = os:timestamp(), + {NewL0, Size, MaxSQN} = leveled_penciller:roll_new_tree(LevelZero, + LevelMinus1, + LedgerSQN), + ok = leveled_penciller:pcl_updatelevelzero(PCL, NewL0, Size, MaxSQN), + io:format("Rolled tree to size ~w in ~w microseconds~n", + [Size, timer:now_diff(os:timestamp(), SW)]), + {noreply, State}. handle_info(timeout, State=#state{change_pending=Pnd}) when Pnd == false -> - case requestandhandle_work(State) of - {false, Timeout} -> - {noreply, State, Timeout}; - {true, WI} -> - % No timeout now as will wait for call to return manifest - % change - {noreply, - State#state{change_pending=true, work_item=WI}} + if + State#state.merge_clerk -> + case requestandhandle_work(State) of + {false, Timeout} -> + {noreply, State, Timeout}; + {true, WI} -> + % No timeout now as will wait for call to return manifest + % change + {noreply, + State#state{change_pending=true, work_item=WI}} + end end. terminate(Reason, _State) -> diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index b30aa25..2509ad2 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -9,7 +9,7 @@ %% the Penciller's Clerk %% - The Penciller can be cloned and maintains a register of clones who have %% requested snapshots of the Ledger -%% - The accepts new dumps (in the form of lists of keys) from the Bookie, and +%% - The accepts new dumps (in the form of a gb_tree) from the Bookie, and %% calls the Bookie once the process of pencilling this data in the Ledger is %% complete - and the Bookie is free to forget about the data %% - The Penciller's persistence of the ledger may not be reliable, in that it @@ -21,20 +21,24 @@ %% -------- LEDGER --------- %% %% The Ledger is divided into many levels -%% - L0: New keys are received from the Bookie and merged into a single ETS -%% table, until that table is the size of a SFT file, and it is then persisted +%% - L0: New keys are received from the Bookie and merged into a single +%% gb_tree, until that tree is the size of a SFT file, and it is then persisted %% as a SFT file at this level. L0 SFT files can be larger than the normal %% maximum size - so we don't have to consider problems of either having more %% than one L0 file (and handling what happens on a crash between writing the %% files when the second may have overlapping sequence numbers), or having a %% remainder with overlapping in sequence numbers in memory after the file is -%% written. Once the persistence is completed, the ETS table can be erased. +%% written. Once the persistence is completed, the L0 tree can be erased. %% There can be only one SFT file at Level 0, so the work to merge that file %% to the lower level must be the highest priority, as otherwise writes to the %% ledger will stall, when there is next a need to persist. %% - L1 TO L7: May contain multiple processes managing non-overlapping sft %% files. Compaction work should be sheduled if the number of files exceeds %% the target size of the level, where the target size is 8 ^ n. +%% - L Minus 1: Used to cache the last ledger cache push for use in queries +%% whilst the Penciller awaits a callback from the roll_clerk with the new +%% merged L0 file containing the L-1 updates. +%% %% %% The most recent revision of a Key can be found by checking each level until %% the key is found. To check a level the correct file must be sought from the @@ -69,7 +73,8 @@ %% %% The Penciller must support the PUSH of a dump of keys from the Bookie. The %% call to PUSH should be immediately acknowledged, and then work should be -%% completed to merge the ETS table into the L0 ETS table. +%% completed to merge the tree into the L0 tree (with the tree being cached as +%% a Level -1 tree so as not to block reads whilst it waits. %% %% The Penciller MUST NOT accept a new PUSH if the Clerk has commenced the %% conversion of the current ETS table into a SFT file, but not completed this @@ -85,15 +90,8 @@ %% ---------- SNAPSHOT ---------- %% %% Iterators may request a snapshot of the database. A snapshot is a cloned -%% Penciller seeded not from disk, but by the in-memory ETS table and the -%% in-memory manifest. - -%% To provide a snapshot the Penciller must snapshot the ETS table. The -%% snapshot of the ETS table is managed by the Penciller storing a list of the -%% batches of Keys which have been pushed to the Penciller, and it is expected -%% that this will be converted by the clone into a gb_tree. The clone may -%% then update the master Penciller with the gb_tree to be cached and used by -%% other cloned processes. +%% Penciller seeded not from disk, but by the in-memory L0 gb_tree and the +%% in-memory manifest, allowing for direct reference for the SFT file processes. %% %% Clones formed to support snapshots are registered by the Penciller, so that %% SFT files valid at the point of the snapshot until either the iterator is @@ -154,12 +152,11 @@ %% allowed to again reach capacity %% %% The writing of L0 files do not require the involvement of the clerk. -%% The L0 files are prompted directly by the penciller when the in-memory ets -%% table has reached capacity. When there is a next push into memory the -%% penciller calls to check that the file is now active (which may pause if the -%% write is ongoing the acceptence of the push), and if so it can clear the ets -%% table and build a new table starting with the remainder, and the keys from -%% the latest push. +%% The L0 files are prompted directly by the penciller when the in-memory tree +%% has reached capacity. When there is a next push into memory the Penciller +%% calls to check that the file is now active (which may pause if the write is +%% ongoing the acceptence of the push), and if so it can clear the L0 tree +%% and build a new tree from an empty tree and the keys from the latest push. %% %% Only a single L0 file may exist at any one moment in time. If pushes are %% received when memory is over the maximum size, the pushes must be kept into @@ -175,73 +172,33 @@ %% manifest SQN n+1 %% 5 - The clerk will prompt the penciller about the change, and the Penciller %% will then commit the change (by renaming the manifest file to be active, and -%% advancing th ein-memory state of the manifest and manifest SQN) +%% advancing the in-memory state of the manifest and manifest SQN) %% 6 - The Penciller having committed the change will cast back to the Clerk %% to inform the Clerk that the chnage has been committed, and so it can carry %% on requetsing new work %% 7 - If the Penciller now receives a Push to over the max size, a new L0 file %% can now be created with the ManifestSQN of n+1 %% -%% ---------- NOTES ON THE USE OF ETS ---------- +%% ---------- NOTES ON THE (NON) USE OF ETS ---------- %% %% Insertion into ETS is very fast, and so using ETS does not slow the PUT %% path. However, an ETS table is mutable, so it does complicate the %% snapshotting of the Ledger. %% -%% Some alternatives have been considered: +%% Originally the solution had used an ETS tbale for insertion speed as the L0 +%% cache. Insertion speed was an order or magnitude faster than gb_trees. To +%% resolving issues of trying to have fast start-up snapshots though led to +%% keeping a seperate set of trees alongside the ETS table to be used by +%% snapshots. %% -%% A1 - Use gb_trees not ETS table -%% * Speed of inserts are too slow especially as the Bookie is blocked until -%% the insert is complete. Inserting 32K very simple keys takes 250ms. Only -%% the naive commands can be used, as Keys may be present - so not easy to -%% optimise. There is a lack of bulk operations +%% The current strategy is to perform the expensive operation (merging the +%% Ledger cache into the Level0 cache), within a dedicated Penciller's clerk, +%% known as the roll_clerk. This may take 30-40ms, but during this period +%% the Penciller will keep a Level -1 cache of the unmerged elements which +%% it will wipe once the roll_clerk returns with an updated L0 cache. %% -%% A2 - Use some other structure other than gb_trees or ETS tables -%% * There is nothing else that will support iterators, so snapshots would -%% either need to do a conversion when they request the snapshot if -%% they need to iterate, or iterate through map functions scanning all the -%% keys. The conversion may not be expensive, as we know loading into an ETS -%% table is fast - but there may be some hidden overheads with creating and -%% destroying many ETS tables. -%% -%% A3 - keep a parallel list of lists of things that have gone in the ETS -%% table in the format they arrived in -%% * There is doubling up of memory, and the snapshot must do some work to -%% make use of these lists. This combines the continued use of fast ETS -%% with the solution of A2 at a memory cost. -%% -%% A4 - Try and cache the conversion to be shared between snapshots registered -%% at the same Ledger SQN -%% * This is a rif on A2/A3, but if generally there is o(10) or o(100) seconds -%% between memory pushes, but much more frequent snapshots this may be more -%% efficient -%% -%% A5 - Produce a specific snapshot of the ETS table via an iterator on demand -%% for each snapshot -%% * So if a snapshot was required for na iterator, the Penciller would block -%% whilst it iterated over the ETS table first to produce a snapshot-specific -%% immutbale view. If the snapshot was required for a long-lived complete view -%% of the database the Penciller would block for a tab2list. -%% -%% A6 - Have snapshots incrementally create and share immutable trees, from a -%% parallel cache of changes -%% * This is a variance on A3. As changes are pushed to the Penciller in the -%% form of lists the Penciller updates a cache of the lists that are contained -%% in the current ETS table. These lists are returned to the snapshot when -%% the snapshot is registered. All snapshots it is assumed will convert these -%% lists into a gb_tree to use, but following that conversion they can cast -%% to the Penciller to refine the cache, so that the cache will become a -%% gb_tree up the ledger SQN at which the snapshot is registered, and now only -%% store new lists for subsequent updates. Future snapshot requests (before -%% the ets table is flushed) will now receive the array (if no changes have) -%% been made, or the array and only the lists needed to incrementally change -%% the array. If changes are infrequent, each snapshot request will pay the -%% full 20ms to 250ms cost of producing the array (although perhaps the clerk -%% could also update periodiclaly to avoid this). If changes are frequent, -%% the snapshot will generally not require to do a conversion, or will only -%% be required to do a small conversion -%% -%% A6 is the preferred option +%% This means that the in-memory cache is now using immutable objects, and +%% so can be shared with snapshots. -module(leveled_penciller). @@ -267,7 +224,7 @@ pcl_close/1, pcl_registersnapshot/2, pcl_releasesnapshot/2, - pcl_updatesnapshotcache/3, + pcl_updatelevelzero/4, pcl_loadsnapshot/2, pcl_getstartupsequencenumber/1, roll_new_tree/3, @@ -288,29 +245,32 @@ -define(MEMTABLE, mem). -define(MAX_TABLESIZE, 32000). -define(PROMPT_WAIT_ONL0, 5). --define(L0PEND_RESET, {false, null, null}). --record(l0snapshot, {increments = [] :: list(), - tree = gb_trees:empty() :: gb_trees:tree(), - ledger_sqn = 0 :: integer()}). -record(state, {manifest = [] :: list(), - ongoing_work = [] :: list(), manifest_sqn = 0 :: integer(), ledger_sqn = 0 :: integer(), registered_snapshots = [] :: list(), unreferenced_files = [] :: list(), root_path = "../test" :: string(), - table_size = 0 :: integer(), - clerk :: pid(), - levelzero_pending = ?L0PEND_RESET :: tuple(), - memtable_copy = #l0snapshot{} :: #l0snapshot{}, - levelzero_snapshot = gb_trees:empty() :: gb_trees:tree(), - memtable, - memtable_maxsize :: integer(), + + merge_clerk :: pid(), + roll_clerk :: pid(), + + levelzero_pending = false :: boolean(), + levelzero_constructor :: pid(), + levelzero_cache = gb_trees:empty() :: gb_trees:tree(), + levelzero_cachesize = 0 :: integer(), + levelzero_maxcachesize :: integer(), + + levelminus1_active = false :: boolean(), + levelminus1_cache = gb_trees:empty() :: gb_trees:tree(), + is_snapshot = false :: boolean(), snapshot_fully_loaded = false :: boolean(), - source_penciller :: pid()}). + source_penciller :: pid(), + + ongoing_work = [] :: list()}). %%%============================================================================ @@ -354,12 +314,12 @@ pcl_registersnapshot(Pid, Snapshot) -> pcl_releasesnapshot(Pid, Snapshot) -> gen_server:cast(Pid, {release_snapshot, Snapshot}). -pcl_updatesnapshotcache(Pid, Tree, SQN) -> - gen_server:cast(Pid, {update_snapshotcache, Tree, SQN}). - pcl_loadsnapshot(Pid, Increment) -> gen_server:call(Pid, {load_snapshot, Increment}, infinity). +pcl_updatelevelzero(Pid, L0Cache, L0Size, L0SQN) -> + gen_server:cast(Pid, {load_levelzero, L0Cache, L0Size, L0SQN}). + pcl_close(Pid) -> gen_server:call(Pid, close, 60000). @@ -373,101 +333,123 @@ init([PCLopts]) -> PCLopts#penciller_options.start_snapshot} of {undefined, true} -> SrcPenciller = PCLopts#penciller_options.source_penciller, - {ok, - LedgerSQN, - Manifest, - MemTableCopy} = pcl_registersnapshot(SrcPenciller, self()), - - {ok, #state{memtable_copy=MemTableCopy, - is_snapshot=true, - source_penciller=SrcPenciller, - manifest=Manifest, - ledger_sqn=LedgerSQN}}; + io:format("Registering ledger snapshot~n"), + {ok, State} = pcl_registersnapshot(SrcPenciller, self()), + io:format("Lesger snapshot registered~n"), + {ok, State#state{is_snapshot=true, source_penciller=SrcPenciller}}; %% Need to do something about timeout {_RootPath, false} -> start_from_file(PCLopts) end. -handle_call({push_mem, DumpList}, From, State=#state{is_snapshot=Snap}) - when Snap == false -> - % The process for pushing to memory is as follows - % - Check that the inbound list does not contain any Keys with a lower - % sequence number than any existing keys (assess_sqn/1) - % - Check that any file that had been sent to be written to L0 previously - % is now completed. If it is wipe out the in-memory view as this is now - % safely persisted. This will block waiting for this to complete if it - % hasn't (checkready_pushmem/1). - % - Quick check to see if there is a need to write a L0 file - % (quickcheck_pushmem/3). If there clearly isn't, then we can reply, and - % then add to memory in the background before updating the loop state - % - Push the update into memory (do_pushtomem/3) - % - If we haven't got through quickcheck now need to check if there is a - % definite need to write a new L0 file (roll_memory/3). If all clear this - % will write the file in the background and allow a response to the user. - % If not the change has still been made but the the L0 file will not have - % been prompted - so the reply does not indicate failure but returns the - % atom 'pause' to signal a loose desire for back-pressure to be applied. - StartWatch = os:timestamp(), - case assess_sqn(DumpList) of - {MinSQN, MaxSQN} when MaxSQN >= MinSQN, - MinSQN >= State#state.ledger_sqn -> - case checkready_pushtomem(State) of - {ok, TableSize0, State1} -> - push_and_roll(DumpList, - TableSize0, - State#state.memtable_maxsize, - MaxSQN, - StartWatch, - From, - State1); - timeout -> - io:format("Timeout of ~w microseconds awaiting " ++ - "L0 SFT write~n", - [timer:now_diff(os:timestamp(), StartWatch)]), - {reply, returned, State} - end; - {MinSQN, MaxSQN} -> - io:format("Mismatch of sequence number expectations with push " - ++ "having sequence numbers between ~w and ~w " - ++ "but current sequence number is ~w~n", - [MinSQN, MaxSQN, State#state.ledger_sqn]), - {reply, refused, State}; - empty -> - io:format("Empty request pushed to Penciller~n"), - {reply, ok, State} - end; -handle_call({fetch, Key}, _From, State=#state{is_snapshot=Snap}) +handle_call({push_mem, PushedTree}, From, State=#state{is_snapshot=Snap}) when Snap == false -> + % The push_mem process is as follows: + % + % 1 - Receive a gb_tree containing the latest Key/Value pairs (note that + % we mean value from the perspective of the Ledger, not the full value + % stored in the Inker) + % + % 2 - Check to see if the levelminus1 cache is active, if so the update + % cannot yet be accepted and must be returned (so the Bookie will maintain + % the cache and try again later). + % + % 3 - Check to see if there is a levelzero file pending. If so check if + % the levelzero file is complete. If it is complete, the levelzero tree + % can be flushed, the in-memory manifest updated, and the new tree can + % be accepted as the new levelzero cache. If not, the update must be + % returned. + % + % 3 - Make the new tree the levelminus1 cache, and mark this as active + % + % 4 - The Penciller can now reply to the Bookie to show that the push has + % been accepted + % + % 5 - A background worker clerk can now be triggered to produce a new + % levelzero cache (tree) containing the level minus 1 tree. When this + % completes it will cast back the updated tree, and on receipt of this + % the Penciller may: + % a) Clear down the levelminus1 cache + % b) Determine if the cache is full and it is necessary to build a new + % persisted levelzero file + + SW = os:timestamp(), + + S = case {State#state.levelzero_pending, + State#state.levelminus1_active} of + {_, true} -> + log_pushmem_reply(From, {returned, "L-1 Active"}, SW), + State; + {true, _} -> + L0Pid = State#state.levelzero_constructor, + case checkready(L0Pid) of + timeout -> + log_pushmem_reply(From, + {returned, + "L-0 persist pending"}, + SW), + State; + {ok, SrcFN, StartKey, EndKey} -> + log_pushmem_reply(From, ok, SW), + ManEntry = #manifest_entry{start_key=StartKey, + end_key=EndKey, + owner=L0Pid, + filename=SrcFN}, + % Prompt clerk to ask about work - do this for + % every L0 roll + ok = leveled_pclerk:mergeclerk_prompt(State#state.merge_clerk), + UpdMan = lists:keystore(0, + 1, + State#state.manifest, + {0, [ManEntry]}), + {MinSQN, MaxSQN, Size, _L} = assess_sqn(PushedTree), + if + MinSQN > State#state.ledger_sqn -> + State#state{manifest=UpdMan, + levelzero_cache=PushedTree, + levelzero_cachesize=Size, + levelzero_pending=false, + ledger_sqn=MaxSQN} + end + end; + {false, false} -> + log_pushmem_reply(From, ok, SW), + ok = leveled_pclerk:rollclerk_levelzero(State#state.roll_clerk, + State#state.levelzero_cache, + PushedTree, + State#state.ledger_sqn, + self()), + State#state{levelminus1_active=true, + levelminus1_cache=PushedTree} + end, + io:format("Handling of push completed in ~w microseconds with " + ++ "L0 cache size now ~w~n", + [timer:now_diff(os:timestamp(), SW), + S#state.levelzero_cachesize]), + {noreply, S}; +handle_call({fetch, Key}, _From, State) -> {reply, fetch(Key, State#state.manifest, - State#state.memtable), + State#state.levelminus1_active, + State#state.levelminus1_cache, + State#state.levelzero_cache), State}; -handle_call({fetch, Key}, - _From, - State=#state{snapshot_fully_loaded=Ready}) - when Ready == true -> +handle_call({check_sqn, Key, SQN}, _From, State) -> {reply, - fetch_snap(Key, - State#state.manifest, - State#state.levelzero_snapshot), - State}; -handle_call({check_sqn, Key, SQN}, - _From, - State=#state{snapshot_fully_loaded=Ready}) - when Ready == true -> - {reply, - compare_to_sqn(fetch_snap(Key, - State#state.manifest, - State#state.levelzero_snapshot), + compare_to_sqn(fetch(Key, + State#state.manifest, + State#state.levelminus1_active, + State#state.levelminus1_cache, + State#state.levelzero_cache), SQN), State}; handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc}, _From, State=#state{snapshot_fully_loaded=Ready}) when Ready == true -> - L0iter = gb_trees:iterator_from(StartKey, State#state.levelzero_snapshot), + L0iter = gb_trees:iterator_from(StartKey, State#state.levelzero_cache), SFTiter = initiate_rangequery_frommanifest(StartKey, EndKey, State#state.manifest), @@ -480,47 +462,41 @@ handle_call(get_startup_sqn, _From, State) -> {reply, State#state.ledger_sqn, State}; handle_call({register_snapshot, Snapshot}, _From, State) -> Rs = [{Snapshot, State#state.manifest_sqn}|State#state.registered_snapshots], - {reply, - {ok, - State#state.ledger_sqn, - State#state.manifest, - State#state.memtable_copy}, - State#state{registered_snapshots = Rs}}; -handle_call({load_snapshot, Increment}, _From, State) -> - MemTableCopy = State#state.memtable_copy, - {Tree0, TreeSQN0} = roll_new_tree(MemTableCopy#l0snapshot.tree, - MemTableCopy#l0snapshot.increments, - MemTableCopy#l0snapshot.ledger_sqn), - if - TreeSQN0 > MemTableCopy#l0snapshot.ledger_sqn -> - pcl_updatesnapshotcache(State#state.source_penciller, - Tree0, - TreeSQN0); - true -> - io:format("No update required to snapshot cache~n"), - ok - end, - {Tree1, TreeSQN1} = roll_new_tree(Tree0, [Increment], TreeSQN0), - io:format("Snapshot loaded with increments to start at SQN=~w~n", - [TreeSQN1]), - {reply, ok, State#state{levelzero_snapshot=Tree1, - ledger_sqn=TreeSQN1, + {reply, {ok, State}, State#state{registered_snapshots = Rs}}; +handle_call({load_snapshot, BookieIncrTree}, _From, State) -> + {L0T0, _, L0SQN0} = case State#state.levelminus1_active of + true -> + roll_new_tree(State#state.levelzero_cache, + State#state.levelminus1_cache, + State#state.ledger_sqn); + false -> + {State#state.levelzero_cache, + 0, + State#state.ledger_sqn} + end, + {L0T1, _, L0SQN1} = roll_new_tree(L0T0, + BookieIncrTree, + L0SQN0), + io:format("Ledger snapshot loaded with increments to start at SQN=~w~n", + [L0SQN1]), + {reply, ok, State#state{levelzero_cache=L0T1, + ledger_sqn=L0SQN1, + levelminus1_active=false, snapshot_fully_loaded=true}}; handle_call(close, _From, State) -> {stop, normal, ok, State}. -handle_cast({update_snapshotcache, Tree, SQN}, State) -> - MemTableC = cache_tree_in_memcopy(State#state.memtable_copy, Tree, SQN), - {noreply, State#state{memtable_copy=MemTableC}}; + handle_cast({manifest_change, WI}, State) -> {ok, UpdState} = commit_manifest_change(WI, State), - ok = leveled_pclerk:clerk_manifestchange(State#state.clerk, - confirm, - false), + ok = leveled_pclerk:mergeclerk_manifestchange(State#state.merge_clerk, + confirm, + false), {noreply, UpdState}; handle_cast({release_snapshot, Snapshot}, State) -> Rs = lists:keydelete(Snapshot, 1, State#state.registered_snapshots), - io:format("Penciller snapshot ~w released~n", [Snapshot]), + io:format("Ledger snapshot ~w released~n", [Snapshot]), + io:format("Remaining ledger snapshots are ~w~n", [Rs]), {noreply, State#state{registered_snapshots=Rs}}; handle_cast({confirm_delete, FileName}, State=#state{is_snapshot=Snap}) when Snap == false -> @@ -537,6 +513,37 @@ handle_cast({confirm_delete, FileName}, State=#state{is_snapshot=Snap}) {noreply, State#state{unreferenced_files=UF1}}; _ -> {noreply, State} + end; +handle_cast({load_levelzero, L0Cache, L0Size, L0SQN}, State) -> + if + L0SQN >= State#state.ledger_sqn -> + CacheTooBig = L0Size > State#state.levelzero_maxcachesize, + Level0Free = length(get_item(0, State#state.manifest, [])) == 0, + case {CacheTooBig, Level0Free} of + {true, true} -> + L0Constructor = roll_memory(State, L0Cache), + {noreply, + State#state{levelminus1_active=false, + levelminus1_cache=gb_trees:empty(), + levelzero_cache=L0Cache, + levelzero_cachesize=L0Size, + levelzero_pending=true, + levelzero_constructor=L0Constructor, + ledger_sqn=L0SQN}}; + _ -> + {noreply, + State#state{levelminus1_active=false, + levelminus1_cache=gb_trees:empty(), + levelzero_cache=L0Cache, + levelzero_cachesize=L0Size, + ledger_sqn=L0SQN}} + end; + L0Size == 0 -> + {noreply, + State#state{levelminus1_active=false, + levelminus1_cache=gb_trees:empty(), + levelzero_cache=L0Cache, + levelzero_cachesize=L0Size}} end. handle_info({_Ref, {ok, SrcFN, _StartKey, _EndKey}}, State) -> @@ -566,51 +573,34 @@ terminate(Reason, State) -> %% the penciller looking for a manifest commit %% io:format("Penciller closing for reason - ~w~n", [Reason]), - MC = leveled_pclerk:clerk_manifestchange(State#state.clerk, - return, - true), + MC = leveled_pclerk:mergeclerk_manifestchange(State#state.merge_clerk, + return, + true), UpdState = case MC of {ok, WI} -> {ok, NewState} = commit_manifest_change(WI, State), - Clerk = State#state.clerk, - ok = leveled_pclerk:clerk_manifestchange(Clerk, - confirm, - true), + Clerk = State#state.merge_clerk, + ok = leveled_pclerk:mergeclerk_manifestchange(Clerk, + confirm, + true), NewState; no_change -> State end, - Dump = roll_into_list(State#state.memtable_copy), case {UpdState#state.levelzero_pending, - get_item(0, UpdState#state.manifest, []), - length(Dump)} of - {?L0PEND_RESET, [], L} when L > 0 -> - MSN = UpdState#state.manifest_sqn, - FileName = UpdState#state.root_path - ++ "/" ++ ?FILES_FP ++ "/" - ++ integer_to_list(MSN) ++ "_0_0", - NewSFT = leveled_sft:sft_new(FileName ++ ".pnd", - Dump, - [], - 0), - {ok, L0Pid, {{[], []}, _SK, _HK}} = NewSFT, - io:format("Dump of memory on close to filename ~s~n", - [FileName]), - leveled_sft:sft_close(L0Pid), - file:rename(FileName ++ ".pnd", FileName ++ ".sft"); - {?L0PEND_RESET, [], L} when L == 0 -> - io:format("No keys to dump from memory when closing~n"); - {{true, L0Pid, _TS}, _, _} -> - leveled_sft:sft_close(L0Pid), - io:format("No opportunity to persist memory before closing" - ++ " with ~w keys discarded~n", - [length(Dump)]); + get_item(0, State#state.manifest, [])} of + {true, []} -> + ok = leveled_sft:sft_close(State#state.levelzero_constructor); + {false, []} -> + KL = roll_into_list(State#state.levelzero_cache), + L0Pid = roll_memory(UpdState, KL, true), + ok = leveled_sft:sft_close(L0Pid); _ -> - io:format("No opportunity to persist memory before closing" - ++ " with ~w keys discarded~n", - [length(Dump)]) + ok end, + leveled_pclerk:rollclerk_close(State#state.roll_clerk), + % Tidy shutdown of individual files ok = close_files(0, UpdState#state.manifest), lists:foreach(fun({_FN, Pid, _SN}) -> @@ -636,14 +626,13 @@ start_from_file(PCLopts) -> M -> M end, - % There is no need to export this ets table (hence private) or iterate - % over it (hence set not ordered_set) - TID = ets:new(?MEMTABLE, [set, private]), - {ok, Clerk} = leveled_pclerk:clerk_new(self()), - InitState = #state{memtable=TID, - clerk=Clerk, + + {ok, MergeClerk} = leveled_pclerk:clerk_new(self(), merge), + {ok, RollClerk} = leveled_pclerk:clerk_new(self(), roll), + InitState = #state{merge_clerk=MergeClerk, + roll_clerk=RollClerk, root_path=RootPath, - memtable_maxsize=MaxTableSize}, + levelzero_maxcachesize=MaxTableSize}, %% Open manifest ManifestPath = InitState#state.root_path ++ "/" ++ ?MANIFEST_FP ++ "/", @@ -672,84 +661,60 @@ start_from_file(PCLopts) -> ValidManSQNs), io:format("Store to be started based on " ++ "manifest sequence number of ~w~n", [TopManSQN]), - case TopManSQN of - 0 -> - io:format("Seqence number of 0 indicates no valid manifest~n"), - {ok, InitState}; - _ -> - {ok, Bin} = file:read_file(filepath(InitState#state.root_path, - TopManSQN, - current_manifest)), - Manifest = binary_to_term(Bin), - {UpdManifest, MaxSQN} = open_all_filesinmanifest(Manifest), - io:format("Maximum sequence number of ~w " - ++ "found in nonzero levels~n", - [MaxSQN]), + ManUpdate = case TopManSQN of + 0 -> + io:format("Seqence number of 0 indicates no valid " ++ + "manifest~n"), + {[], 0}; + _ -> + CurrManFile = filepath(InitState#state.root_path, + TopManSQN, + current_manifest), + {ok, Bin} = file:read_file(CurrManFile), + Manifest = binary_to_term(Bin), + open_all_filesinmanifest(Manifest) + end, + + {UpdManifest, MaxSQN} = ManUpdate, + io:format("Maximum sequence number of ~w found in nonzero levels~n", + [MaxSQN]), - %% Find any L0 files - L0FN = filepath(RootPath, - TopManSQN, - new_merge_files) ++ "_0_0.sft", - case filelib:is_file(L0FN) of - true -> - io:format("L0 file found ~s~n", [L0FN]), - {ok, - L0Pid, - {L0StartKey, L0EndKey}} = leveled_sft:sft_open(L0FN), - L0SQN = leveled_sft:sft_getmaxsequencenumber(L0Pid), - ManifestEntry = #manifest_entry{start_key=L0StartKey, - end_key=L0EndKey, - owner=L0Pid, - filename=L0FN}, - UpdManifest2 = lists:keystore(0, - 1, - UpdManifest, - {0, [ManifestEntry]}), - io:format("L0 file had maximum sequence number of ~w~n", - [L0SQN]), - {ok, - InitState#state{manifest=UpdManifest2, - manifest_sqn=TopManSQN, - ledger_sqn=max(MaxSQN, L0SQN)}}; - false -> - io:format("No L0 file found~n"), - {ok, - InitState#state{manifest=UpdManifest, - manifest_sqn=TopManSQN, - ledger_sqn=MaxSQN}} - end + %% Find any L0 files + L0FN = filepath(RootPath, TopManSQN, new_merge_files) ++ "_0_0.sft", + case filelib:is_file(L0FN) of + true -> + io:format("L0 file found ~s~n", [L0FN]), + {ok, + L0Pid, + {L0StartKey, L0EndKey}} = leveled_sft:sft_open(L0FN), + L0SQN = leveled_sft:sft_getmaxsequencenumber(L0Pid), + ManifestEntry = #manifest_entry{start_key=L0StartKey, + end_key=L0EndKey, + owner=L0Pid, + filename=L0FN}, + UpdManifest2 = lists:keystore(0, + 1, + UpdManifest, + {0, [ManifestEntry]}), + io:format("L0 file had maximum sequence number of ~w~n", + [L0SQN]), + {ok, + InitState#state{manifest=UpdManifest2, + manifest_sqn=TopManSQN, + ledger_sqn=max(MaxSQN, L0SQN)}}; + false -> + io:format("No L0 file found~n"), + {ok, + InitState#state{manifest=UpdManifest, + manifest_sqn=TopManSQN, + ledger_sqn=MaxSQN}} end. -checkready_pushtomem(State) -> - case State#state.levelzero_pending of - {true, Pid, _TS} -> - case checkready(Pid) of - timeout -> - timeout; - {ok, SrcFN, StartKey, EndKey} -> - true = ets:delete_all_objects(State#state.memtable), - ManifestEntry = #manifest_entry{start_key=StartKey, - end_key=EndKey, - owner=Pid, - filename=SrcFN}, - % Prompt clerk to ask about work - do this for every - % L0 roll - ok = leveled_pclerk:clerk_prompt(State#state.clerk), - UpdManifest = lists:keystore(0, - 1, - State#state.manifest, - {0, [ManifestEntry]}), - {ok, - 0, - State#state{manifest=UpdManifest, - levelzero_pending=?L0PEND_RESET, - memtable_copy=#l0snapshot{}}} - end; - ?L0PEND_RESET -> - {ok, State#state.table_size, State} - end. - +log_pushmem_reply(From, Reply, SW) -> + io:format("Respone to push_mem of ~w took ~w microseconds~n", + [Reply, timer:now_diff(os:timestamp(), SW)]), + gen_server:reply(From, Reply). checkready(Pid) -> try @@ -759,114 +724,71 @@ checkready(Pid) -> timeout end. +roll_memory(State, L0Tree) -> + roll_memory(State, L0Tree, false). -push_and_roll(DumpList, TableSize, MaxTableSize, MaxSQN, StartWatch, From, State) -> - case quickcheck_pushtomem(DumpList, TableSize, MaxTableSize) of - {twist, TableSize1} -> - gen_server:reply(From, ok), - io:format("Reply made on push in ~w microseconds~n", - [timer:now_diff(os:timestamp(), StartWatch)]), - L0Snap = do_pushtomem(DumpList, - State#state.memtable, - State#state.memtable_copy, - MaxSQN), - io:format("Push completed in ~w microseconds~n", - [timer:now_diff(os:timestamp(), StartWatch)]), - {noreply, - State#state{memtable_copy=L0Snap, - table_size=TableSize1, - ledger_sqn=MaxSQN}}; - {maybe_roll, TableSize1} -> - L0Snap = do_pushtomem(DumpList, - State#state.memtable, - State#state.memtable_copy, - MaxSQN), +roll_memory(State, L0Tree, Wait) -> + MSN = State#state.manifest_sqn, + FileName = State#state.root_path + ++ "/" ++ ?FILES_FP ++ "/" + ++ integer_to_list(MSN) ++ "_0_0", + Opts = #sft_options{wait=Wait}, + {ok, Constructor, _} = leveled_sft:sft_new(FileName, L0Tree, [], 0, Opts), + Constructor. - case roll_memory(State, MaxTableSize, L0Snap) of - {ok, L0Pend, ManSN, TableSize2} -> - io:format("Push completed in ~w microseconds~n", - [timer:now_diff(os:timestamp(), StartWatch)]), - {reply, - ok, - State#state{levelzero_pending=L0Pend, - table_size=TableSize2, - manifest_sqn=ManSN, - memtable_copy=L0Snap, - ledger_sqn=MaxSQN}}; - {pause, Reason, Details} -> - io:format("Excess work due to - " ++ Reason, - Details), - {reply, - pause, - State#state{memtable_copy=L0Snap, - table_size=TableSize1, - ledger_sqn=MaxSQN}} - end +% Merge the Level Minus 1 tree to the Level 0 tree, incrementing the +% SQN, and ensuring all entries do increment the SQN + +roll_new_tree(L0Cache, LMinus1Cache, LedgerSQN) -> + {MinSQN, MaxSQN, Size, LMinus1List} = assess_sqn(LMinus1Cache), + if + MinSQN >= LedgerSQN -> + UpdTree = lists:foldl(fun({Kx, Vx}, TreeAcc) -> + gb_trees:enter(Kx, Vx, TreeAcc) + end, + L0Cache, + LMinus1List), + {UpdTree, gb_trees:size(UpdTree), MaxSQN}; + Size == 0 -> + {L0Cache, gb_trees:size(L0Cache), LedgerSQN} end. -quickcheck_pushtomem(DumpList, TableSize, MaxSize) -> - case TableSize + length(DumpList) of - ApproxTableSize when ApproxTableSize > MaxSize -> - {maybe_roll, ApproxTableSize}; - ApproxTableSize -> - io:format("Table size is approximately ~w~n", [ApproxTableSize]), - {twist, ApproxTableSize} - end. +%% This takes the three parts of a memtable copy - the increments, the tree +%% and the SQN at which the tree was formed, and outputs a sorted list +roll_into_list(Tree) -> + gb_trees:to_list(Tree). -do_pushtomem(DumpList, MemTable, Snapshot, MaxSQN) -> - SW = os:timestamp(), - UpdSnapshot = add_increment_to_memcopy(Snapshot, MaxSQN, DumpList), - % Note that the DumpList must have been taken from a source which - % naturally de-duplicates the keys. It is not possible just to cache - % changes in a list (in the Bookie for example), as the insert method does - % not apply the list in order, and so it is not clear which of a duplicate - % key will be applied - ets:insert(MemTable, DumpList), - io:format("Push into memory timed at ~w microseconds~n", - [timer:now_diff(os:timestamp(), SW)]), - UpdSnapshot. +assess_sqn(Tree) -> + L = roll_into_list(Tree), + FoldFun = fun(KV, {AccMinSQN, AccMaxSQN, AccSize}) -> + SQN = leveled_codec:strip_to_seqonly(KV), + {min(SQN, AccMinSQN), max(SQN, AccMaxSQN), AccSize + 1} + end, + {MinSQN, MaxSQN, Size} = lists:foldl(FoldFun, {infinity, 0, 0}, L), + {MinSQN, MaxSQN, Size, L}. -roll_memory(State, MaxSize, MemTableCopy) -> - case ets:info(State#state.memtable, size) of - Size when Size > MaxSize -> - L0 = get_item(0, State#state.manifest, []), - case L0 of - [] -> - MSN = State#state.manifest_sqn, - FileName = State#state.root_path - ++ "/" ++ ?FILES_FP ++ "/" - ++ integer_to_list(MSN) ++ "_0_0", - Opts = #sft_options{wait=false}, - {ok, L0Pid} = leveled_sft:sft_new(FileName, - MemTableCopy, - [], - 0, - Opts), - {ok, {true, L0Pid, os:timestamp()}, MSN, Size}; - _ -> - {pause, - "L0 file write blocked by L0 file in manifest~n", - []} + +fetch(Key, Manifest, LM1Active, LM1Tree, L0Tree) -> + case LM1Active of + true -> + case gb_trees:lookup(Key, LM1Tree) of + none -> + case gb_trees:lookup(Key, L0Tree) of + none -> + fetch(Key, Manifest, 0, fun leveled_sft:sft_get/2); + {value, Value} -> + {Key, Value} + end; + {value, Value} -> + {Key, Value} end; - Size -> - {ok, ?L0PEND_RESET, State#state.manifest_sqn, Size} - end. - - -fetch_snap(Key, Manifest, Tree) -> - case gb_trees:lookup(Key, Tree) of - {value, Value} -> - {Key, Value}; - none -> - fetch(Key, Manifest, 0, fun leveled_sft:sft_get/2) - end. - -fetch(Key, Manifest, TID) -> - case ets:lookup(TID, Key) of - [Object] -> - Object; - [] -> - fetch(Key, Manifest, 0, fun leveled_sft:sft_get/2) + false -> + case gb_trees:lookup(Key, L0Tree) of + none -> + fetch(Key, Manifest, 0, fun leveled_sft:sft_get/2); + {value, Value} -> + {Key, Value} + end end. fetch(_Key, _Manifest, ?MAX_LEVELS + 1, _FetchFun) -> @@ -934,13 +856,13 @@ return_work(State, From) -> true -> false end, - case element(1, State#state.levelzero_pending) of + case State#state.levelzero_pending of true -> % Once the L0 file is completed there will be more work % - so don't be busy doing other work now io:format("Allocation of work blocked as L0 pending~n"), {State, none}; - _ -> + false -> %% No work currently outstanding %% Can allocate work NextSQN = State#state.manifest_sqn + 1, @@ -965,57 +887,6 @@ return_work(State, From) -> end. -%% This takes the three parts of a memtable copy - the increments, the tree -%% and the SQN at which the tree was formed, and outputs a new tree -roll_new_tree(Tree, [], HighSQN) -> - {Tree, HighSQN}; -roll_new_tree(Tree, [{SQN, KVList}|TailIncs], HighSQN) when SQN >= HighSQN -> - R = lists:foldl(fun({Kx, Vx}, {TreeAcc, MaxSQN}) -> - UpdTree = gb_trees:enter(Kx, Vx, TreeAcc), - SQNx = leveled_codec:strip_to_seqonly({Kx, Vx}), - {UpdTree, max(SQNx, MaxSQN)} - end, - {Tree, HighSQN}, - KVList), - {UpdTree, UpdSQN} = R, - roll_new_tree(UpdTree, TailIncs, UpdSQN); -roll_new_tree(Tree, [_H|TailIncs], HighSQN) -> - roll_new_tree(Tree, TailIncs, HighSQN). - -%% This takes the three parts of a memtable copy - the increments, the tree -%% and the SQN at which the tree was formed, and outputs a sorted list -roll_into_list(MemTableCopy) -> - {Tree, _SQN} = roll_new_tree(MemTableCopy#l0snapshot.tree, - MemTableCopy#l0snapshot.increments, - MemTableCopy#l0snapshot.ledger_sqn), - gb_trees:to_list(Tree). - -%% Update the memtable copy if the tree created advances the SQN -cache_tree_in_memcopy(MemCopy, Tree, SQN) -> - case MemCopy#l0snapshot.ledger_sqn of - CurrentSQN when SQN > CurrentSQN -> - % Discard any merged increments - io:format("Updating cache with new tree at SQN=~w~n", [SQN]), - Incs = lists:foldl(fun({PushSQN, PushL}, Acc) -> - if - SQN >= PushSQN -> - Acc; - true -> - Acc ++ [{PushSQN, PushL}] - end end, - [], - MemCopy#l0snapshot.increments), - #l0snapshot{ledger_sqn = SQN, - increments = Incs, - tree = Tree}; - _CurrentSQN -> - MemCopy - end. - -add_increment_to_memcopy(MemCopy, SQN, KVList) -> - Incs = MemCopy#l0snapshot.increments ++ [{SQN, KVList}], - MemCopy#l0snapshot{increments=Incs}. - close_files(?MAX_LEVELS - 1, _Manifest) -> ok; close_files(Level, Manifest) -> @@ -1371,7 +1242,6 @@ commit_manifest_change(ReturnedWorkItem, State) -> NewManifest, {0, [L0ManEntry]}) end, - print_manifest(RevisedManifest), {ok, State#state{ongoing_work=[], manifest_sqn=NewMSN, manifest=RevisedManifest, @@ -1437,16 +1307,6 @@ confirm_delete(Filename, UnreferencedFiles, RegisteredSnapshots) -> end. -assess_sqn([]) -> - empty; -assess_sqn(DumpList) -> - assess_sqn(DumpList, infinity, 0). - -assess_sqn([], MinSQN, MaxSQN) -> - {MinSQN, MaxSQN}; -assess_sqn([HeadKey|Tail], MinSQN, MaxSQN) -> - SQN = leveled_codec:strip_to_seqonly(HeadKey), - assess_sqn(Tail, min(MinSQN, SQN), max(MaxSQN, SQN)). %%%============================================================================ %%% Test @@ -1473,12 +1333,6 @@ clean_subdir(DirPath) -> ok end. -assess_sqn_test() -> - L1 = [{{}, {5, active, {}}}, {{}, {6, active, {}}}], - ?assertMatch({5, 6}, assess_sqn(L1)), - L2 = [{{}, {5, active, {}}}], - ?assertMatch({5, 5}, assess_sqn(L2)), - ?assertMatch(empty, assess_sqn([])). compaction_work_assessment_test() -> L0 = [{{o, "B1", "K1", null}, {o, "B3", "K3", null}, dummy_pid}], @@ -1522,15 +1376,15 @@ confirm_delete_test() -> maybe_pause_push(PCL, KL) -> - R = pcl_pushmem(PCL, KL), - if - R == pause -> - io:format("Pausing push~n"), - timer:sleep(500), - ok; - R == returned -> + T0 = gb_trees:empty(), + T1 = lists:foldl(fun({K, V}, Acc) -> gb_trees:enter(K, V, Acc) end, + T0, + KL), + case pcl_pushmem(PCL, T1) of + {returned, _Reason} -> + timer:sleep(50), maybe_pause_push(PCL, KL); - true -> + ok -> ok end. @@ -1542,28 +1396,35 @@ simple_server_test() -> Key1 = {{o,"Bucket0001", "Key0001", null}, {1, {active, infinity}, null}}, KL1 = leveled_sft:generate_randomkeys({1000, 2}), Key2 = {{o,"Bucket0002", "Key0002", null}, {1002, {active, infinity}, null}}, - KL2 = leveled_sft:generate_randomkeys({1000, 1002}), - Key3 = {{o,"Bucket0003", "Key0003", null}, {2002, {active, infinity}, null}}, - KL3 = leveled_sft:generate_randomkeys({1000, 2002}), - Key4 = {{o,"Bucket0004", "Key0004", null}, {3002, {active, infinity}, null}}, - KL4 = leveled_sft:generate_randomkeys({1000, 3002}), - ok = pcl_pushmem(PCL, [Key1]), + KL2 = leveled_sft:generate_randomkeys({1000, 1003}), + Key3 = {{o,"Bucket0003", "Key0003", null}, {2003, {active, infinity}, null}}, + KL3 = leveled_sft:generate_randomkeys({1000, 2004}), + Key4 = {{o,"Bucket0004", "Key0004", null}, {3004, {active, infinity}, null}}, + KL4 = leveled_sft:generate_randomkeys({1000, 3005}), + ok = maybe_pause_push(PCL, [Key1]), ?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001", null})), - ok = pcl_pushmem(PCL, KL1), + ok = maybe_pause_push(PCL, KL1), ?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001", null})), ok = maybe_pause_push(PCL, [Key2]), ?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001", null})), ?assertMatch(Key2, pcl_fetch(PCL, {o,"Bucket0002", "Key0002", null})), + ?assertMatch(Key2, pcl_fetch(PCL, {o,"Bucket0002", "Key0002", null})), + ok = maybe_pause_push(PCL, KL2), + ?assertMatch(Key2, pcl_fetch(PCL, {o,"Bucket0002", "Key0002", null})), ok = maybe_pause_push(PCL, [Key3]), ?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001", null})), ?assertMatch(Key2, pcl_fetch(PCL, {o,"Bucket0002", "Key0002", null})), ?assertMatch(Key3, pcl_fetch(PCL, {o,"Bucket0003", "Key0003", null})), ok = pcl_close(PCL), + {ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath, max_inmemory_tablesize=1000}), - ?assertMatch(2002, pcl_getstartupsequencenumber(PCLr)), + ?assertMatch(1001, pcl_getstartupsequencenumber(PCLr)), + ok = maybe_pause_push(PCLr, [Key2] ++ KL2 ++ [Key3]), + io:format("Back to starting position with lost data recovered~n"), + ?assertMatch(Key1, pcl_fetch(PCLr, {o,"Bucket0001", "Key0001", null})), ?assertMatch(Key2, pcl_fetch(PCLr, {o,"Bucket0002", "Key0002", null})), ?assertMatch(Key3, pcl_fetch(PCLr, {o,"Bucket0003", "Key0003", null})), @@ -1577,7 +1438,7 @@ simple_server_test() -> SnapOpts = #penciller_options{start_snapshot = true, source_penciller = PCLr}, {ok, PclSnap} = pcl_start(SnapOpts), - ok = pcl_loadsnapshot(PclSnap, []), + ok = pcl_loadsnapshot(PclSnap, gb_trees:empty()), ?assertMatch(Key1, pcl_fetch(PclSnap, {o,"Bucket0001", "Key0001", null})), ?assertMatch(Key2, pcl_fetch(PclSnap, {o,"Bucket0002", "Key0002", null})), ?assertMatch(Key3, pcl_fetch(PclSnap, {o,"Bucket0003", "Key0003", null})), @@ -1599,18 +1460,18 @@ simple_server_test() -> "Bucket0003", "Key0003", null}, - 2002)), + 2003)), ?assertMatch(true, pcl_checksequencenumber(PclSnap, {o, "Bucket0004", "Key0004", null}, - 3002)), + 3004)), % Add some more keys and confirm that check sequence number still % sees the old version in the previous snapshot, but will see the new version % in a new snapshot - Key1A = {{o,"Bucket0001", "Key0001", null}, {4002, {active, infinity}, null}}, - KL1A = leveled_sft:generate_randomkeys({4002, 2}), + Key1A = {{o,"Bucket0001", "Key0001", null}, {4005, {active, infinity}, null}}, + KL1A = leveled_sft:generate_randomkeys({2000, 4006}), ok = maybe_pause_push(PCLr, [Key1A]), ok = maybe_pause_push(PCLr, KL1A), ?assertMatch(true, pcl_checksequencenumber(PclSnap, @@ -1620,8 +1481,9 @@ simple_server_test() -> null}, 1)), ok = pcl_close(PclSnap), + {ok, PclSnap2} = pcl_start(SnapOpts), - ok = pcl_loadsnapshot(PclSnap2, []), + ok = pcl_loadsnapshot(PclSnap2, gb_trees:empty()), ?assertMatch(false, pcl_checksequencenumber(PclSnap2, {o, "Bucket0001", @@ -1633,7 +1495,7 @@ simple_server_test() -> "Bucket0001", "Key0001", null}, - 4002)), + 4005)), ?assertMatch(true, pcl_checksequencenumber(PclSnap2, {o, "Bucket0002", @@ -1644,55 +1506,6 @@ simple_server_test() -> ok = pcl_close(PCLr), clean_testdir(RootPath). -memcopy_updatecache1_test() -> - KVL1 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X), - {X, null, "Val" ++ integer_to_list(X) ++ "A"}} - end, - lists:seq(1, 1000)), - KVL2 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X), - {X, null, "Val" ++ integer_to_list(X) ++ "B"}} - end, - lists:seq(1001, 2000)), - KVL3 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X), - {X, null, "Val" ++ integer_to_list(X) ++ "C"}} - end, - lists:seq(2001, 3000)), - MemCopy0 = #l0snapshot{}, - MemCopy1 = add_increment_to_memcopy(MemCopy0, 1000, KVL1), - MemCopy2 = add_increment_to_memcopy(MemCopy1, 2000, KVL2), - MemCopy3 = add_increment_to_memcopy(MemCopy2, 3000, KVL3), - ?assertMatch(0, MemCopy3#l0snapshot.ledger_sqn), - {Tree1, HighSQN1} = roll_new_tree(gb_trees:empty(), MemCopy3#l0snapshot.increments, 0), - MemCopy4 = cache_tree_in_memcopy(MemCopy3, Tree1, HighSQN1), - ?assertMatch(0, length(MemCopy4#l0snapshot.increments)), - Size2 = gb_trees:size(MemCopy4#l0snapshot.tree), - ?assertMatch(3000, Size2), - ?assertMatch(3000, MemCopy4#l0snapshot.ledger_sqn). - -memcopy_updatecache2_test() -> - KVL1 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X), - {X, null, "Val" ++ integer_to_list(X) ++ "A"}} - end, - lists:seq(1, 1000)), - KVL2 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X), - {X, null, "Val" ++ integer_to_list(X) ++ "B"}} - end, - lists:seq(1001, 2000)), - KVL3 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X), - {X, null, "Val" ++ integer_to_list(X) ++ "C"}} - end, - lists:seq(1, 1000)), - MemCopy0 = #l0snapshot{}, - MemCopy1 = add_increment_to_memcopy(MemCopy0, 1000, KVL1), - MemCopy2 = add_increment_to_memcopy(MemCopy1, 2000, KVL2), - MemCopy3 = add_increment_to_memcopy(MemCopy2, 3000, KVL3), - ?assertMatch(0, MemCopy3#l0snapshot.ledger_sqn), - {Tree1, HighSQN1} = roll_new_tree(gb_trees:empty(), MemCopy3#l0snapshot.increments, 0), - MemCopy4 = cache_tree_in_memcopy(MemCopy3, Tree1, HighSQN1), - ?assertMatch(1, length(MemCopy4#l0snapshot.increments)), - Size2 = gb_trees:size(MemCopy4#l0snapshot.tree), - ?assertMatch(2000, Size2), - ?assertMatch(2000, MemCopy4#l0snapshot.ledger_sqn). rangequery_manifest_test() -> {E1, diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 4649f5d..9cebef2 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -234,7 +234,7 @@ sft_new(Filename, KL1, KL2, LevelInfo, Options) -> false -> gen_server:cast(Pid, {sft_new, Filename, KL1, KL2, LevelR}), - {ok, Pid} + {ok, Pid, noreply} end. sft_open(Filename) -> @@ -532,25 +532,29 @@ complete_file(Handle, FileMD, KL1, KL2, LevelR, Rename) -> false -> open_file(FileMD); {true, OldName, NewName} -> - io:format("Renaming file from ~s to ~s~n", [OldName, NewName]), - case filelib:is_file(NewName) of - true -> - io:format("Filename ~s already exists~n", - [NewName]), - AltName = filename:join(filename:dirname(NewName), - filename:basename(NewName)) - ++ ?DISCARD_EXT, - io:format("Rename rogue filename ~s to ~s~n", - [NewName, AltName]), - ok = file:rename(NewName, AltName); - false -> - ok - end, - ok = file:rename(OldName, NewName), + ok = rename_file(OldName, NewName), open_file(FileMD#state{filename=NewName}) end, {ReadHandle, UpdFileMD, KeyRemainders}. +rename_file(OldName, NewName) -> + io:format("Renaming file from ~s to ~s~n", [OldName, NewName]), + case filelib:is_file(NewName) of + true -> + io:format("Filename ~s already exists~n", + [NewName]), + AltName = filename:join(filename:dirname(NewName), + filename:basename(NewName)) + ++ ?DISCARD_EXT, + io:format("Rename rogue filename ~s to ~s~n", + [NewName, AltName]), + ok = file:rename(NewName, AltName); + false -> + ok + end, + file:rename(OldName, NewName). + + %% Fetch a Key and Value from a file, returns %% {value, KV} or not_present %% The key must be pre-checked to ensure it is in the valid range for the file