From f16f71ae81c4637fd488829eed585e8e307e8787 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Wed, 19 Oct 2016 00:10:48 +0100 Subject: [PATCH] Revert ominshambles performance refactoring To try and improve performance index entries had been removed from the Ledger Cache, and a shadow list of the LedgerCache (in SQN order) was kept to avoid gb_trees:to_list on push_mem. This did not go well. The issue was that ets does not deal with duplicate keys in the list when inserting (it will only insert one, but it is not clear which one). This has been reverted back out. The ETS parameters have been changed to [set, private]. It is not used as an iterator, and is no longer passed out of the process (the memtable_copy is sent instead). This also avoids the tab2list function being called. --- src/leveled_bookie.erl | 47 +++++++++-------------- src/leveled_inker.erl | 5 ++- src/leveled_penciller.erl | 68 +++++++++++++++++++-------------- src/leveled_sft.erl | 17 ++++++++- test/end_to_end/basic_SUITE.erl | 2 + test/end_to_end/testutil.erl | 12 +++++- 6 files changed, 89 insertions(+), 62 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 3794dab..18b018f 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -160,7 +160,7 @@ penciller :: pid(), cache_size :: integer(), back_pressure :: boolean(), - ledger_cache :: {gb_trees:tree(), list()}, + ledger_cache :: gb_trees:tree(), is_snapshot :: boolean()}). @@ -242,7 +242,7 @@ init([Opts]) -> {ok, #state{inker=Inker, penciller=Penciller, cache_size=CacheSize, - ledger_cache={gb_trees:empty(), []}, + ledger_cache=gb_trees:empty(), is_snapshot=false}}; Bookie -> {ok, @@ -397,15 +397,16 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================ -bucket_stats(Penciller, {_ObjTree, ChangeList}, Bucket, Tag) -> +bucket_stats(Penciller, LedgerCache, Bucket, Tag) -> PCLopts = #penciller_options{start_snapshot=true, source_penciller=Penciller}, {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), Folder = fun() -> + Increment = gb_trees:to_list(LedgerCache), io:format("Length of increment in snapshot is ~w~n", - [length(ChangeList)]), + [length(Increment)]), ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, - {infinity, ChangeList}), + {infinity, Increment}), StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, @@ -418,7 +419,7 @@ bucket_stats(Penciller, {_ObjTree, ChangeList}, Bucket, Tag) -> end, {async, Folder}. -index_query(Penciller, {_ObjTree, ChangeList}, +index_query(Penciller, LedgerCache, Bucket, {IdxField, StartValue, EndValue}, {ReturnTerms, TermRegex}) -> @@ -426,10 +427,11 @@ index_query(Penciller, {_ObjTree, ChangeList}, source_penciller=Penciller}, {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), Folder = fun() -> + Increment = gb_trees:to_list(LedgerCache), io:format("Length of increment in snapshot is ~w~n", - [length(ChangeList)]), + [length(Increment)]), ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, - {infinity, ChangeList}), + {infinity, Increment}), StartKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, IdxField, StartValue), EndKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, @@ -491,9 +493,8 @@ startup(InkerOpts, PencillerOpts) -> {Inker, Penciller}. -fetch_head(Key, Penciller, {ObjTree, _ChangeList}) -> - - case gb_trees:lookup(Key, ObjTree) of +fetch_head(Key, Penciller, LedgerCache) -> + case gb_trees:lookup(Key, LedgerCache) of {value, Head} -> Head; none -> @@ -569,30 +570,20 @@ preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs) -> [PrimaryChange] ++ ConvSpecs. addto_ledgercache(Changes, Cache) -> - {ObjectTree, ChangeList} = Cache, - {lists:foldl(fun({K, V}, Acc) -> - case leveled_codec:is_indexkey(K) of - false -> - gb_trees:enter(K, V, Acc); - true -> - Acc - end - end, - ObjectTree, - Changes), - ChangeList ++ Changes}. + lists:foldl(fun({K, V}, Acc) -> gb_trees:enter(K, V, Acc) end, + Cache, + Changes). maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> - {_ObjectTree, ChangeList} = Cache, - CacheSize = length(ChangeList), + CacheSize = gb_trees:size(Cache), if CacheSize > MaxCacheSize -> case leveled_penciller:pcl_pushmem(Penciller, - ChangeList) of + gb_trees:to_list(Cache)) of ok -> - {ok, {gb_trees:empty(), []}}; + {ok, gb_trees:empty()}; pause -> - {pause, {gb_trees:empty(), []}}; + {pause, gb_trees:empty()}; refused -> {ok, Cache} end; diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 0ddc6d7..c346cc1 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -724,9 +724,10 @@ manifest_printer(Manifest) -> initiate_penciller_snapshot(Bookie) -> {ok, - {LedgerSnap, {_ObjTree, ChangeList}}, + {LedgerSnap, LedgerCache}, _} = leveled_bookie:book_snapshotledger(Bookie, self(), undefined), - ok = leveled_penciller:pcl_loadsnapshot(LedgerSnap, ChangeList), + ok = leveled_penciller:pcl_loadsnapshot(LedgerSnap, + gb_trees:to_list(LedgerCache)), MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap), {LedgerSnap, MaxSQN}. diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 32c70a3..2aa99c9 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -246,6 +246,7 @@ pcl_loadsnapshot/2, pcl_getstartupsequencenumber/1, roll_new_tree/3, + roll_into_list/1, clean_testdir/1]). -include_lib("eunit/include/eunit.hrl"). @@ -377,7 +378,7 @@ handle_call({push_mem, DumpList}, From, State=#state{is_snapshot=Snap}) % then add to memory in the background before updating the loop state % - Push the update into memory (do_pushtomem/3) % - If we haven't got through quickcheck now need to check if there is a - % definite need to write a new L0 file (roll_memory/2). If all clear this + % definite need to write a new L0 file (roll_memory/3). If all clear this % will write the file in the background and allow a response to the user. % If not the change has still been made but the the L0 file will not have % been prompted - so the reply does not indicate failure but returns the @@ -414,7 +415,7 @@ handle_call({push_mem, DumpList}, From, State=#state{is_snapshot=Snap}) State1#state.memtable_copy, MaxSQN), - case roll_memory(State1, MaxTableSize) of + case roll_memory(State1, MaxTableSize, L0Snap) of {ok, L0Pend, ManSN, TableSize2} -> io:format("Push completed in ~w microseconds~n", [timer:now_diff(os:timestamp(), StartWatch)]), @@ -587,9 +588,16 @@ terminate(Reason, State) -> no_change -> State end, - Dump = ets:tab2list(UpdState#state.memtable), + % TODO: + % This next section (to the end of the case clause), appears to be + % pointless. It will persist the in-memory state to a SFT file, but on + % startup that file will be ignored as the manifest has not bene updated + % + % Should we update the manifest, or stop trying to persist on closure? + Dump = roll_into_list(State#state.memtable_copy), case {UpdState#state.levelzero_pending, - get_item(0, UpdState#state.manifest, []), length(Dump)} of + get_item(0, UpdState#state.manifest, []), + length(Dump)} of {?L0PEND_RESET, [], L} when L > 0 -> MSN = UpdState#state.manifest_sqn + 1, FileName = UpdState#state.root_path @@ -616,6 +624,8 @@ terminate(Reason, State) -> ++ " with ~w keys discarded~n", [length(Dump)]) end, + + % Tidy shutdown of individual files ok = close_files(0, UpdState#state.manifest), lists:foreach(fun({_FN, Pid, _SN}) -> leveled_sft:sft_close(Pid) end, @@ -639,15 +649,9 @@ start_from_file(PCLopts) -> M -> M end, - % Options (not) chosen here: - % - As we pass the ETS table to the sft file when the L0 file is created - % then this cannot be private. - % - There is no use of iterator, so a set could be used, but then the - % output of tab2list would need to be sorted - % TODO: - % - Test switching to [set, private] and sending the L0 snapshots to the - % sft_new cast - TID = ets:new(?MEMTABLE, [ordered_set]), + % There is no need to export this ets table (hence private) or iterate + % over it (hence set not ordered_set) + TID = ets:new(?MEMTABLE, [set, private]), {ok, Clerk} = leveled_pclerk:clerk_new(self()), InitState = #state{memtable=TID, clerk=Clerk, @@ -767,12 +771,17 @@ quickcheck_pushtomem(DumpList, TableSize, MaxSize) -> do_pushtomem(DumpList, MemTable, Snapshot, MaxSQN) -> SW = os:timestamp(), UpdSnapshot = add_increment_to_memcopy(Snapshot, MaxSQN, DumpList), + % Note that the DumpList must have been taken from a source which + % naturally de-duplicates the keys. It is not possible just to cache + % changes in a list (in the Bookie for example), as the insert method does + % not apply the list in order, and so it is not clear which of a duplicate + % key will be applied ets:insert(MemTable, DumpList), io:format("Push into memory timed at ~w microseconds~n", [timer:now_diff(os:timestamp(), SW)]), UpdSnapshot. -roll_memory(State, MaxSize) -> +roll_memory(State, MaxSize, MemTableCopy) -> case ets:info(State#state.memtable, size) of Size when Size > MaxSize -> L0 = get_item(0, State#state.manifest, []), @@ -784,7 +793,7 @@ roll_memory(State, MaxSize) -> ++ integer_to_list(MSN) ++ "_0_0", Opts = #sft_options{wait=false}, {ok, L0Pid} = leveled_sft:sft_new(FileName, - State#state.memtable, + MemTableCopy, [], 0, Opts), @@ -938,7 +947,6 @@ return_work(State, From) -> %% This takes the three parts of a memtable copy - the increments, the tree %% and the SQN at which the tree was formed, and outputs a new tree - roll_new_tree(Tree, [], HighSQN) -> {Tree, HighSQN}; roll_new_tree(Tree, [{SQN, KVList}|TailIncs], HighSQN) when SQN >= HighSQN -> @@ -954,6 +962,14 @@ roll_new_tree(Tree, [{SQN, KVList}|TailIncs], HighSQN) when SQN >= HighSQN -> roll_new_tree(Tree, [_H|TailIncs], HighSQN) -> roll_new_tree(Tree, TailIncs, HighSQN). +%% This takes the three parts of a memtable copy - the increments, the tree +%% and the SQN at which the tree was formed, and outputs a sorted list +roll_into_list(MemTableCopy) -> + {Tree, _SQN} = roll_new_tree(MemTableCopy#l0snapshot.tree, + MemTableCopy#l0snapshot.increments, + MemTableCopy#l0snapshot.ledger_sqn), + gb_trees:to_list(Tree). + %% Update the memtable copy if the tree created advances the SQN cache_tree_in_memcopy(MemCopy, Tree, SQN) -> case MemCopy#l0snapshot.ledger_sqn of @@ -1331,7 +1347,7 @@ rename_manifest_files(RootPath, NewMSN) -> filelib:is_file(OldFN), NewFN, filelib:is_file(NewFN)]), - file:rename(OldFN,NewFN). + ok = file:rename(OldFN,NewFN). filepath(RootPath, manifest) -> RootPath ++ "/" ++ ?MANIFEST_FP; @@ -1379,13 +1395,14 @@ confirm_delete(Filename, UnreferencedFiles, RegisteredSnapshots) -> assess_sqn([]) -> empty; -assess_sqn([HeadKV|[]]) -> - {leveled_codec:strip_to_seqonly(HeadKV), - leveled_codec:strip_to_seqonly(HeadKV)}; -assess_sqn([HeadKV|DumpList]) -> - {leveled_codec:strip_to_seqonly(HeadKV), - leveled_codec:strip_to_seqonly(lists:last(DumpList))}. +assess_sqn(DumpList) -> + assess_sqn(DumpList, infinity, 0). +assess_sqn([], MinSQN, MaxSQN) -> + {MinSQN, MaxSQN}; +assess_sqn([HeadKey|Tail], MinSQN, MaxSQN) -> + SQN = leveled_codec:strip_to_seqonly(HeadKey), + assess_sqn(Tail, min(MinSQN, SQN), max(MaxSQN, SQN)). %%%============================================================================ %%% Test @@ -1499,11 +1516,6 @@ simple_server_test() -> max_inmemory_tablesize=1000}), TopSQN = pcl_getstartupsequencenumber(PCLr), Check = case TopSQN of - 2001 -> - %% Last push not persisted - S3a = pcl_pushmem(PCL, [Key3]), - if S3a == pause -> timer:sleep(1000); true -> ok end, - ok; 2002 -> %% everything got persisted ok; diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 1e846e2..af56fa9 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -183,7 +183,7 @@ -define(MERGE_SCANWIDTH, 8). -define(DELETE_TIMEOUT, 60000). -define(MAX_KEYS, ?SLOT_COUNT * ?BLOCK_COUNT * ?BLOCK_SIZE). - +-define(DISCARD_EXT, ".discarded"). -record(state, {version = ?CURRENT_VERSION :: tuple(), slot_index :: list(), @@ -387,7 +387,7 @@ create_levelzero(Inp1, Filename) -> true -> Inp1; false -> - ets:tab2list(Inp1) + leveled_penciller:roll_into_list(Inp1) end, {TmpFilename, PrmFilename} = generate_filenames(Filename), case create_file(TmpFilename) of @@ -510,6 +510,19 @@ complete_file(Handle, FileMD, KL1, KL2, Level, Rename) -> open_file(FileMD); {true, OldName, NewName} -> io:format("Renaming file from ~s to ~s~n", [OldName, NewName]), + case filelib:is_file(NewName) of + true -> + io:format("Filename ~s already exists~n", + [NewName]), + AltName = filename:join(filename:dirname(NewName), + filename:basename(NewName)) + ++ ?DISCARD_EXT, + io:format("Rename rogue filename ~s to ~s~n", + [NewName, AltName]), + ok = file:rename(NewName, AltName); + false -> + ok + end, ok = file:rename(OldName, NewName), open_file(FileMD#state{filename=NewName}) end, diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index bf5a700..dac295f 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -192,6 +192,7 @@ fetchput_snapshot(_Config) -> io:format("Checked for replacement objects in active bookie" ++ ", old objects in snapshot~n"), + ok = filelib:ensure_dir(RootPath ++ "/ledger/ledger_files"), {ok, FNsA} = file:list_dir(RootPath ++ "/ledger/ledger_files"), ObjList3 = testutil:generate_objects(15000, 5002), lists:foreach(fun({_RN, Obj, Spc}) -> @@ -212,6 +213,7 @@ fetchput_snapshot(_Config) -> testutil:check_forlist(SnapBookie3, lists:nth(length(CLs2), CLs2)), testutil:check_formissinglist(SnapBookie2, ChkList3), testutil:check_formissinglist(SnapBookie2, lists:nth(length(CLs2), CLs2)), + testutil:check_forlist(Bookie2, ChkList2), testutil:check_forlist(SnapBookie3, ChkList2), testutil:check_forlist(SnapBookie2, ChkList1), io:format("Started new snapshot and check for new objects~n"), diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index 0d472cc..c938d6a 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -53,14 +53,22 @@ check_forlist(Bookie, ChkList, Log) -> lists:foreach(fun({_RN, Obj, _Spc}) -> if Log == true -> - io:format("Fetching Key ~w~n", [Obj#r_object.key]); + io:format("Fetching Key ~s~n", [Obj#r_object.key]); true -> ok end, R = leveled_bookie:book_riakget(Bookie, Obj#r_object.bucket, Obj#r_object.key), - R = {ok, Obj} end, + ok = case R of + {ok, Obj} -> + ok; + not_found -> + io:format("Object not found for key ~s~n", + [Obj#r_object.key]), + error + end + end, ChkList), io:format("Fetch check took ~w microseconds checking list of length ~w~n", [timer:now_diff(os:timestamp(), SW), length(ChkList)]).