Revert ominshambles performance refactoring
To try and improve performance index entries had been removed from the Ledger Cache, and a shadow list of the LedgerCache (in SQN order) was kept to avoid gb_trees:to_list on push_mem. This did not go well. The issue was that ets does not deal with duplicate keys in the list when inserting (it will only insert one, but it is not clear which one). This has been reverted back out. The ETS parameters have been changed to [set, private]. It is not used as an iterator, and is no longer passed out of the process (the memtable_copy is sent instead). This also avoids the tab2list function being called.
This commit is contained in:
parent
8f29a6c40f
commit
f16f71ae81
6 changed files with 89 additions and 62 deletions
|
@ -160,7 +160,7 @@
|
|||
penciller :: pid(),
|
||||
cache_size :: integer(),
|
||||
back_pressure :: boolean(),
|
||||
ledger_cache :: {gb_trees:tree(), list()},
|
||||
ledger_cache :: gb_trees:tree(),
|
||||
is_snapshot :: boolean()}).
|
||||
|
||||
|
||||
|
@ -242,7 +242,7 @@ init([Opts]) ->
|
|||
{ok, #state{inker=Inker,
|
||||
penciller=Penciller,
|
||||
cache_size=CacheSize,
|
||||
ledger_cache={gb_trees:empty(), []},
|
||||
ledger_cache=gb_trees:empty(),
|
||||
is_snapshot=false}};
|
||||
Bookie ->
|
||||
{ok,
|
||||
|
@ -397,15 +397,16 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%%% Internal functions
|
||||
%%%============================================================================
|
||||
|
||||
bucket_stats(Penciller, {_ObjTree, ChangeList}, Bucket, Tag) ->
|
||||
bucket_stats(Penciller, LedgerCache, Bucket, Tag) ->
|
||||
PCLopts = #penciller_options{start_snapshot=true,
|
||||
source_penciller=Penciller},
|
||||
{ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts),
|
||||
Folder = fun() ->
|
||||
Increment = gb_trees:to_list(LedgerCache),
|
||||
io:format("Length of increment in snapshot is ~w~n",
|
||||
[length(ChangeList)]),
|
||||
[length(Increment)]),
|
||||
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
|
||||
{infinity, ChangeList}),
|
||||
{infinity, Increment}),
|
||||
StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
||||
EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
||||
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
||||
|
@ -418,7 +419,7 @@ bucket_stats(Penciller, {_ObjTree, ChangeList}, Bucket, Tag) ->
|
|||
end,
|
||||
{async, Folder}.
|
||||
|
||||
index_query(Penciller, {_ObjTree, ChangeList},
|
||||
index_query(Penciller, LedgerCache,
|
||||
Bucket,
|
||||
{IdxField, StartValue, EndValue},
|
||||
{ReturnTerms, TermRegex}) ->
|
||||
|
@ -426,10 +427,11 @@ index_query(Penciller, {_ObjTree, ChangeList},
|
|||
source_penciller=Penciller},
|
||||
{ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts),
|
||||
Folder = fun() ->
|
||||
Increment = gb_trees:to_list(LedgerCache),
|
||||
io:format("Length of increment in snapshot is ~w~n",
|
||||
[length(ChangeList)]),
|
||||
[length(Increment)]),
|
||||
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
|
||||
{infinity, ChangeList}),
|
||||
{infinity, Increment}),
|
||||
StartKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG,
|
||||
IdxField, StartValue),
|
||||
EndKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG,
|
||||
|
@ -491,9 +493,8 @@ startup(InkerOpts, PencillerOpts) ->
|
|||
{Inker, Penciller}.
|
||||
|
||||
|
||||
fetch_head(Key, Penciller, {ObjTree, _ChangeList}) ->
|
||||
|
||||
case gb_trees:lookup(Key, ObjTree) of
|
||||
fetch_head(Key, Penciller, LedgerCache) ->
|
||||
case gb_trees:lookup(Key, LedgerCache) of
|
||||
{value, Head} ->
|
||||
Head;
|
||||
none ->
|
||||
|
@ -569,30 +570,20 @@ preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs) ->
|
|||
[PrimaryChange] ++ ConvSpecs.
|
||||
|
||||
addto_ledgercache(Changes, Cache) ->
|
||||
{ObjectTree, ChangeList} = Cache,
|
||||
{lists:foldl(fun({K, V}, Acc) ->
|
||||
case leveled_codec:is_indexkey(K) of
|
||||
false ->
|
||||
gb_trees:enter(K, V, Acc);
|
||||
true ->
|
||||
Acc
|
||||
end
|
||||
end,
|
||||
ObjectTree,
|
||||
Changes),
|
||||
ChangeList ++ Changes}.
|
||||
lists:foldl(fun({K, V}, Acc) -> gb_trees:enter(K, V, Acc) end,
|
||||
Cache,
|
||||
Changes).
|
||||
|
||||
maybepush_ledgercache(MaxCacheSize, Cache, Penciller) ->
|
||||
{_ObjectTree, ChangeList} = Cache,
|
||||
CacheSize = length(ChangeList),
|
||||
CacheSize = gb_trees:size(Cache),
|
||||
if
|
||||
CacheSize > MaxCacheSize ->
|
||||
case leveled_penciller:pcl_pushmem(Penciller,
|
||||
ChangeList) of
|
||||
gb_trees:to_list(Cache)) of
|
||||
ok ->
|
||||
{ok, {gb_trees:empty(), []}};
|
||||
{ok, gb_trees:empty()};
|
||||
pause ->
|
||||
{pause, {gb_trees:empty(), []}};
|
||||
{pause, gb_trees:empty()};
|
||||
refused ->
|
||||
{ok, Cache}
|
||||
end;
|
||||
|
|
|
@ -724,9 +724,10 @@ manifest_printer(Manifest) ->
|
|||
|
||||
initiate_penciller_snapshot(Bookie) ->
|
||||
{ok,
|
||||
{LedgerSnap, {_ObjTree, ChangeList}},
|
||||
{LedgerSnap, LedgerCache},
|
||||
_} = leveled_bookie:book_snapshotledger(Bookie, self(), undefined),
|
||||
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnap, ChangeList),
|
||||
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnap,
|
||||
gb_trees:to_list(LedgerCache)),
|
||||
MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap),
|
||||
{LedgerSnap, MaxSQN}.
|
||||
|
||||
|
|
|
@ -246,6 +246,7 @@
|
|||
pcl_loadsnapshot/2,
|
||||
pcl_getstartupsequencenumber/1,
|
||||
roll_new_tree/3,
|
||||
roll_into_list/1,
|
||||
clean_testdir/1]).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
@ -377,7 +378,7 @@ handle_call({push_mem, DumpList}, From, State=#state{is_snapshot=Snap})
|
|||
% then add to memory in the background before updating the loop state
|
||||
% - Push the update into memory (do_pushtomem/3)
|
||||
% - If we haven't got through quickcheck now need to check if there is a
|
||||
% definite need to write a new L0 file (roll_memory/2). If all clear this
|
||||
% definite need to write a new L0 file (roll_memory/3). If all clear this
|
||||
% will write the file in the background and allow a response to the user.
|
||||
% If not the change has still been made but the the L0 file will not have
|
||||
% been prompted - so the reply does not indicate failure but returns the
|
||||
|
@ -414,7 +415,7 @@ handle_call({push_mem, DumpList}, From, State=#state{is_snapshot=Snap})
|
|||
State1#state.memtable_copy,
|
||||
MaxSQN),
|
||||
|
||||
case roll_memory(State1, MaxTableSize) of
|
||||
case roll_memory(State1, MaxTableSize, L0Snap) of
|
||||
{ok, L0Pend, ManSN, TableSize2} ->
|
||||
io:format("Push completed in ~w microseconds~n",
|
||||
[timer:now_diff(os:timestamp(), StartWatch)]),
|
||||
|
@ -587,9 +588,16 @@ terminate(Reason, State) ->
|
|||
no_change ->
|
||||
State
|
||||
end,
|
||||
Dump = ets:tab2list(UpdState#state.memtable),
|
||||
% TODO:
|
||||
% This next section (to the end of the case clause), appears to be
|
||||
% pointless. It will persist the in-memory state to a SFT file, but on
|
||||
% startup that file will be ignored as the manifest has not bene updated
|
||||
%
|
||||
% Should we update the manifest, or stop trying to persist on closure?
|
||||
Dump = roll_into_list(State#state.memtable_copy),
|
||||
case {UpdState#state.levelzero_pending,
|
||||
get_item(0, UpdState#state.manifest, []), length(Dump)} of
|
||||
get_item(0, UpdState#state.manifest, []),
|
||||
length(Dump)} of
|
||||
{?L0PEND_RESET, [], L} when L > 0 ->
|
||||
MSN = UpdState#state.manifest_sqn + 1,
|
||||
FileName = UpdState#state.root_path
|
||||
|
@ -616,6 +624,8 @@ terminate(Reason, State) ->
|
|||
++ " with ~w keys discarded~n",
|
||||
[length(Dump)])
|
||||
end,
|
||||
|
||||
% Tidy shutdown of individual files
|
||||
ok = close_files(0, UpdState#state.manifest),
|
||||
lists:foreach(fun({_FN, Pid, _SN}) ->
|
||||
leveled_sft:sft_close(Pid) end,
|
||||
|
@ -639,15 +649,9 @@ start_from_file(PCLopts) ->
|
|||
M ->
|
||||
M
|
||||
end,
|
||||
% Options (not) chosen here:
|
||||
% - As we pass the ETS table to the sft file when the L0 file is created
|
||||
% then this cannot be private.
|
||||
% - There is no use of iterator, so a set could be used, but then the
|
||||
% output of tab2list would need to be sorted
|
||||
% TODO:
|
||||
% - Test switching to [set, private] and sending the L0 snapshots to the
|
||||
% sft_new cast
|
||||
TID = ets:new(?MEMTABLE, [ordered_set]),
|
||||
% There is no need to export this ets table (hence private) or iterate
|
||||
% over it (hence set not ordered_set)
|
||||
TID = ets:new(?MEMTABLE, [set, private]),
|
||||
{ok, Clerk} = leveled_pclerk:clerk_new(self()),
|
||||
InitState = #state{memtable=TID,
|
||||
clerk=Clerk,
|
||||
|
@ -767,12 +771,17 @@ quickcheck_pushtomem(DumpList, TableSize, MaxSize) ->
|
|||
do_pushtomem(DumpList, MemTable, Snapshot, MaxSQN) ->
|
||||
SW = os:timestamp(),
|
||||
UpdSnapshot = add_increment_to_memcopy(Snapshot, MaxSQN, DumpList),
|
||||
% Note that the DumpList must have been taken from a source which
|
||||
% naturally de-duplicates the keys. It is not possible just to cache
|
||||
% changes in a list (in the Bookie for example), as the insert method does
|
||||
% not apply the list in order, and so it is not clear which of a duplicate
|
||||
% key will be applied
|
||||
ets:insert(MemTable, DumpList),
|
||||
io:format("Push into memory timed at ~w microseconds~n",
|
||||
[timer:now_diff(os:timestamp(), SW)]),
|
||||
UpdSnapshot.
|
||||
|
||||
roll_memory(State, MaxSize) ->
|
||||
roll_memory(State, MaxSize, MemTableCopy) ->
|
||||
case ets:info(State#state.memtable, size) of
|
||||
Size when Size > MaxSize ->
|
||||
L0 = get_item(0, State#state.manifest, []),
|
||||
|
@ -784,7 +793,7 @@ roll_memory(State, MaxSize) ->
|
|||
++ integer_to_list(MSN) ++ "_0_0",
|
||||
Opts = #sft_options{wait=false},
|
||||
{ok, L0Pid} = leveled_sft:sft_new(FileName,
|
||||
State#state.memtable,
|
||||
MemTableCopy,
|
||||
[],
|
||||
0,
|
||||
Opts),
|
||||
|
@ -938,7 +947,6 @@ return_work(State, From) ->
|
|||
|
||||
%% This takes the three parts of a memtable copy - the increments, the tree
|
||||
%% and the SQN at which the tree was formed, and outputs a new tree
|
||||
|
||||
roll_new_tree(Tree, [], HighSQN) ->
|
||||
{Tree, HighSQN};
|
||||
roll_new_tree(Tree, [{SQN, KVList}|TailIncs], HighSQN) when SQN >= HighSQN ->
|
||||
|
@ -954,6 +962,14 @@ roll_new_tree(Tree, [{SQN, KVList}|TailIncs], HighSQN) when SQN >= HighSQN ->
|
|||
roll_new_tree(Tree, [_H|TailIncs], HighSQN) ->
|
||||
roll_new_tree(Tree, TailIncs, HighSQN).
|
||||
|
||||
%% This takes the three parts of a memtable copy - the increments, the tree
|
||||
%% and the SQN at which the tree was formed, and outputs a sorted list
|
||||
roll_into_list(MemTableCopy) ->
|
||||
{Tree, _SQN} = roll_new_tree(MemTableCopy#l0snapshot.tree,
|
||||
MemTableCopy#l0snapshot.increments,
|
||||
MemTableCopy#l0snapshot.ledger_sqn),
|
||||
gb_trees:to_list(Tree).
|
||||
|
||||
%% Update the memtable copy if the tree created advances the SQN
|
||||
cache_tree_in_memcopy(MemCopy, Tree, SQN) ->
|
||||
case MemCopy#l0snapshot.ledger_sqn of
|
||||
|
@ -1331,7 +1347,7 @@ rename_manifest_files(RootPath, NewMSN) ->
|
|||
filelib:is_file(OldFN),
|
||||
NewFN,
|
||||
filelib:is_file(NewFN)]),
|
||||
file:rename(OldFN,NewFN).
|
||||
ok = file:rename(OldFN,NewFN).
|
||||
|
||||
filepath(RootPath, manifest) ->
|
||||
RootPath ++ "/" ++ ?MANIFEST_FP;
|
||||
|
@ -1379,13 +1395,14 @@ confirm_delete(Filename, UnreferencedFiles, RegisteredSnapshots) ->
|
|||
|
||||
assess_sqn([]) ->
|
||||
empty;
|
||||
assess_sqn([HeadKV|[]]) ->
|
||||
{leveled_codec:strip_to_seqonly(HeadKV),
|
||||
leveled_codec:strip_to_seqonly(HeadKV)};
|
||||
assess_sqn([HeadKV|DumpList]) ->
|
||||
{leveled_codec:strip_to_seqonly(HeadKV),
|
||||
leveled_codec:strip_to_seqonly(lists:last(DumpList))}.
|
||||
assess_sqn(DumpList) ->
|
||||
assess_sqn(DumpList, infinity, 0).
|
||||
|
||||
assess_sqn([], MinSQN, MaxSQN) ->
|
||||
{MinSQN, MaxSQN};
|
||||
assess_sqn([HeadKey|Tail], MinSQN, MaxSQN) ->
|
||||
SQN = leveled_codec:strip_to_seqonly(HeadKey),
|
||||
assess_sqn(Tail, min(MinSQN, SQN), max(MaxSQN, SQN)).
|
||||
|
||||
%%%============================================================================
|
||||
%%% Test
|
||||
|
@ -1499,11 +1516,6 @@ simple_server_test() ->
|
|||
max_inmemory_tablesize=1000}),
|
||||
TopSQN = pcl_getstartupsequencenumber(PCLr),
|
||||
Check = case TopSQN of
|
||||
2001 ->
|
||||
%% Last push not persisted
|
||||
S3a = pcl_pushmem(PCL, [Key3]),
|
||||
if S3a == pause -> timer:sleep(1000); true -> ok end,
|
||||
ok;
|
||||
2002 ->
|
||||
%% everything got persisted
|
||||
ok;
|
||||
|
|
|
@ -183,7 +183,7 @@
|
|||
-define(MERGE_SCANWIDTH, 8).
|
||||
-define(DELETE_TIMEOUT, 60000).
|
||||
-define(MAX_KEYS, ?SLOT_COUNT * ?BLOCK_COUNT * ?BLOCK_SIZE).
|
||||
|
||||
-define(DISCARD_EXT, ".discarded").
|
||||
|
||||
-record(state, {version = ?CURRENT_VERSION :: tuple(),
|
||||
slot_index :: list(),
|
||||
|
@ -387,7 +387,7 @@ create_levelzero(Inp1, Filename) ->
|
|||
true ->
|
||||
Inp1;
|
||||
false ->
|
||||
ets:tab2list(Inp1)
|
||||
leveled_penciller:roll_into_list(Inp1)
|
||||
end,
|
||||
{TmpFilename, PrmFilename} = generate_filenames(Filename),
|
||||
case create_file(TmpFilename) of
|
||||
|
@ -510,6 +510,19 @@ complete_file(Handle, FileMD, KL1, KL2, Level, Rename) ->
|
|||
open_file(FileMD);
|
||||
{true, OldName, NewName} ->
|
||||
io:format("Renaming file from ~s to ~s~n", [OldName, NewName]),
|
||||
case filelib:is_file(NewName) of
|
||||
true ->
|
||||
io:format("Filename ~s already exists~n",
|
||||
[NewName]),
|
||||
AltName = filename:join(filename:dirname(NewName),
|
||||
filename:basename(NewName))
|
||||
++ ?DISCARD_EXT,
|
||||
io:format("Rename rogue filename ~s to ~s~n",
|
||||
[NewName, AltName]),
|
||||
ok = file:rename(NewName, AltName);
|
||||
false ->
|
||||
ok
|
||||
end,
|
||||
ok = file:rename(OldName, NewName),
|
||||
open_file(FileMD#state{filename=NewName})
|
||||
end,
|
||||
|
|
|
@ -192,6 +192,7 @@ fetchput_snapshot(_Config) ->
|
|||
io:format("Checked for replacement objects in active bookie" ++
|
||||
", old objects in snapshot~n"),
|
||||
|
||||
ok = filelib:ensure_dir(RootPath ++ "/ledger/ledger_files"),
|
||||
{ok, FNsA} = file:list_dir(RootPath ++ "/ledger/ledger_files"),
|
||||
ObjList3 = testutil:generate_objects(15000, 5002),
|
||||
lists:foreach(fun({_RN, Obj, Spc}) ->
|
||||
|
@ -212,6 +213,7 @@ fetchput_snapshot(_Config) ->
|
|||
testutil:check_forlist(SnapBookie3, lists:nth(length(CLs2), CLs2)),
|
||||
testutil:check_formissinglist(SnapBookie2, ChkList3),
|
||||
testutil:check_formissinglist(SnapBookie2, lists:nth(length(CLs2), CLs2)),
|
||||
testutil:check_forlist(Bookie2, ChkList2),
|
||||
testutil:check_forlist(SnapBookie3, ChkList2),
|
||||
testutil:check_forlist(SnapBookie2, ChkList1),
|
||||
io:format("Started new snapshot and check for new objects~n"),
|
||||
|
|
|
@ -53,14 +53,22 @@ check_forlist(Bookie, ChkList, Log) ->
|
|||
lists:foreach(fun({_RN, Obj, _Spc}) ->
|
||||
if
|
||||
Log == true ->
|
||||
io:format("Fetching Key ~w~n", [Obj#r_object.key]);
|
||||
io:format("Fetching Key ~s~n", [Obj#r_object.key]);
|
||||
true ->
|
||||
ok
|
||||
end,
|
||||
R = leveled_bookie:book_riakget(Bookie,
|
||||
Obj#r_object.bucket,
|
||||
Obj#r_object.key),
|
||||
R = {ok, Obj} end,
|
||||
ok = case R of
|
||||
{ok, Obj} ->
|
||||
ok;
|
||||
not_found ->
|
||||
io:format("Object not found for key ~s~n",
|
||||
[Obj#r_object.key]),
|
||||
error
|
||||
end
|
||||
end,
|
||||
ChkList),
|
||||
io:format("Fetch check took ~w microseconds checking list of length ~w~n",
|
||||
[timer:now_diff(os:timestamp(), SW), length(ChkList)]).
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue