Handle gen_server:cast slowness
There was some unpredictable performance in tests, that was related to the amount of time it took the sft gen_server to accept a cast whihc passed the levelzero_cache. The response time looked to be broadly proportional to the size of the cache - so it appeared to be an issue with passing the large object to the process queue. To avoid this, the penciller now instructs the SFT gen_server to callback to the server for each tree in the cache in turn as it is building the list from the cache. Each of these requests should be reltaively short, and the processing in-between should space out the requests so the Pencille ris not blocked from answering queries when pompting a L0 write.
This commit is contained in:
parent
311179964a
commit
4cffecf2ca
6 changed files with 73 additions and 32 deletions
|
@ -66,6 +66,7 @@
|
||||||
{root_path :: string(),
|
{root_path :: string(),
|
||||||
cache_size :: integer(),
|
cache_size :: integer(),
|
||||||
max_journalsize :: integer(),
|
max_journalsize :: integer(),
|
||||||
|
max_pencillercachesize :: integer(),
|
||||||
snapshot_bookie :: pid(),
|
snapshot_bookie :: pid(),
|
||||||
reload_strategy = [] :: list(),
|
reload_strategy = [] :: list(),
|
||||||
max_run_length :: integer()}).
|
max_run_length :: integer()}).
|
||||||
|
|
|
@ -514,7 +514,7 @@ set_options(Opts) ->
|
||||||
|
|
||||||
AltStrategy = Opts#bookie_options.reload_strategy,
|
AltStrategy = Opts#bookie_options.reload_strategy,
|
||||||
ReloadStrategy = leveled_codec:inker_reload_strategy(AltStrategy),
|
ReloadStrategy = leveled_codec:inker_reload_strategy(AltStrategy),
|
||||||
|
PCLL0CacheSize = Opts#bookie_options.max_pencillercachesize,
|
||||||
JournalFP = Opts#bookie_options.root_path ++ "/" ++ ?JOURNAL_FP,
|
JournalFP = Opts#bookie_options.root_path ++ "/" ++ ?JOURNAL_FP,
|
||||||
LedgerFP = Opts#bookie_options.root_path ++ "/" ++ ?LEDGER_FP,
|
LedgerFP = Opts#bookie_options.root_path ++ "/" ++ ?LEDGER_FP,
|
||||||
{#inker_options{root_path = JournalFP,
|
{#inker_options{root_path = JournalFP,
|
||||||
|
@ -522,7 +522,8 @@ set_options(Opts) ->
|
||||||
max_run_length = Opts#bookie_options.max_run_length,
|
max_run_length = Opts#bookie_options.max_run_length,
|
||||||
cdb_options = #cdb_options{max_size=MaxJournalSize,
|
cdb_options = #cdb_options{max_size=MaxJournalSize,
|
||||||
binary_mode=true}},
|
binary_mode=true}},
|
||||||
#penciller_options{root_path = LedgerFP}}.
|
#penciller_options{root_path = LedgerFP,
|
||||||
|
max_inmemory_tablesize = PCLL0CacheSize}}.
|
||||||
|
|
||||||
startup(InkerOpts, PencillerOpts) ->
|
startup(InkerOpts, PencillerOpts) ->
|
||||||
{ok, Inker} = leveled_inker:ink_start(InkerOpts),
|
{ok, Inker} = leveled_inker:ink_start(InkerOpts),
|
||||||
|
|
|
@ -223,6 +223,7 @@
|
||||||
code_change/3,
|
code_change/3,
|
||||||
pcl_start/1,
|
pcl_start/1,
|
||||||
pcl_pushmem/2,
|
pcl_pushmem/2,
|
||||||
|
pcl_fetchlevelzero/2,
|
||||||
pcl_fetch/2,
|
pcl_fetch/2,
|
||||||
pcl_fetchkeys/5,
|
pcl_fetchkeys/5,
|
||||||
pcl_checksequencenumber/3,
|
pcl_checksequencenumber/3,
|
||||||
|
@ -287,6 +288,9 @@ pcl_start(PCLopts) ->
|
||||||
pcl_pushmem(Pid, DumpList) ->
|
pcl_pushmem(Pid, DumpList) ->
|
||||||
%% Bookie to dump memory onto penciller
|
%% Bookie to dump memory onto penciller
|
||||||
gen_server:call(Pid, {push_mem, DumpList}, infinity).
|
gen_server:call(Pid, {push_mem, DumpList}, infinity).
|
||||||
|
|
||||||
|
pcl_fetchlevelzero(Pid, Slot) ->
|
||||||
|
gen_server:call(Pid, {fetch_levelzero, Slot}, infinity).
|
||||||
|
|
||||||
pcl_fetch(Pid, Key) ->
|
pcl_fetch(Pid, Key) ->
|
||||||
gen_server:call(Pid, {fetch, Key}, infinity).
|
gen_server:call(Pid, {fetch, Key}, infinity).
|
||||||
|
@ -471,6 +475,8 @@ handle_call({load_snapshot, BookieIncrTree}, _From, State) ->
|
||||||
levelzero_size=L0Size,
|
levelzero_size=L0Size,
|
||||||
ledger_sqn=LedgerSQN,
|
ledger_sqn=LedgerSQN,
|
||||||
snapshot_fully_loaded=true}};
|
snapshot_fully_loaded=true}};
|
||||||
|
handle_call({fetch_levelzero, Slot}, _From, State) ->
|
||||||
|
{reply, lists:nth(Slot, State#state.levelzero_cache), State};
|
||||||
handle_call(close, _From, State) ->
|
handle_call(close, _From, State) ->
|
||||||
{stop, normal, ok, State}.
|
{stop, normal, ok, State}.
|
||||||
|
|
||||||
|
@ -553,7 +559,7 @@ terminate(Reason, State) ->
|
||||||
{false, [], 0} ->
|
{false, [], 0} ->
|
||||||
io:format("Level 0 cache empty at close of Penciller~n");
|
io:format("Level 0 cache empty at close of Penciller~n");
|
||||||
{false, [], _N} ->
|
{false, [], _N} ->
|
||||||
L0Pid = roll_memory(UpdState, State#state.levelzero_cache, true),
|
L0Pid = roll_memory(UpdState, true),
|
||||||
ok = leveled_sft:sft_close(L0Pid);
|
ok = leveled_sft:sft_close(L0Pid);
|
||||||
_ ->
|
_ ->
|
||||||
io:format("No level zero action on close of Penciller~n")
|
io:format("No level zero action on close of Penciller~n")
|
||||||
|
@ -696,7 +702,7 @@ update_levelzero(L0Index, L0Size, PushedTree, LedgerSQN, L0Cache, State) ->
|
||||||
Level0Free = length(get_item(0, State#state.manifest, [])) == 0,
|
Level0Free = length(get_item(0, State#state.manifest, [])) == 0,
|
||||||
case {CacheTooBig, Level0Free} of
|
case {CacheTooBig, Level0Free} of
|
||||||
{true, true} ->
|
{true, true} ->
|
||||||
L0Constructor = roll_memory(State, UpdL0Cache),
|
L0Constructor = roll_memory(UpdState, false),
|
||||||
UpdState#state{levelzero_pending=true,
|
UpdState#state{levelzero_pending=true,
|
||||||
levelzero_constructor=L0Constructor};
|
levelzero_constructor=L0Constructor};
|
||||||
_ ->
|
_ ->
|
||||||
|
@ -719,19 +725,46 @@ checkready(Pid) ->
|
||||||
timeout
|
timeout
|
||||||
end.
|
end.
|
||||||
|
|
||||||
roll_memory(State, L0Cache) ->
|
%% Casting a large object (the levelzero cache) to the gen_server did not lead
|
||||||
roll_memory(State, L0Cache, false).
|
%% to an immediate return as expected. With 32K keys in the TreeList it could
|
||||||
|
%% take around 35-40ms.
|
||||||
|
%%
|
||||||
|
%% To avoid blocking this gen_server, the SFT file cna request each item of the
|
||||||
|
%% cache one at a time.
|
||||||
|
%%
|
||||||
|
%% The Wait is set to false to use a cast when calling this in normal operation
|
||||||
|
%% where as the Wait of true is used at shutdown
|
||||||
|
|
||||||
roll_memory(State, L0Cache, Wait) ->
|
roll_memory(State, false) ->
|
||||||
|
FileName = levelzero_filename(State),
|
||||||
|
io:format("Rolling level zero to file ~s~n", [FileName]),
|
||||||
|
Opts = #sft_options{wait=false},
|
||||||
|
PCL = self(),
|
||||||
|
FetchFun = fun(Slot) -> pcl_fetchlevelzero(PCL, Slot) end,
|
||||||
|
% FetchFun = fun(Slot) -> lists:nth(Slot, State#state.levelzero_cache) end,
|
||||||
|
R = leveled_sft:sft_newfroml0cache(FileName,
|
||||||
|
length(State#state.levelzero_cache),
|
||||||
|
FetchFun,
|
||||||
|
Opts),
|
||||||
|
{ok, Constructor, _} = R,
|
||||||
|
Constructor;
|
||||||
|
roll_memory(State, true) ->
|
||||||
|
FileName = levelzero_filename(State),
|
||||||
|
Opts = #sft_options{wait=true},
|
||||||
|
FetchFun = fun(Slot) -> lists:nth(Slot, State#state.levelzero_cache) end,
|
||||||
|
R = leveled_sft:sft_newfroml0cache(FileName,
|
||||||
|
length(State#state.levelzero_cache),
|
||||||
|
FetchFun,
|
||||||
|
Opts),
|
||||||
|
{ok, Constructor, _} = R,
|
||||||
|
Constructor.
|
||||||
|
|
||||||
|
levelzero_filename(State) ->
|
||||||
MSN = State#state.manifest_sqn,
|
MSN = State#state.manifest_sqn,
|
||||||
FileName = State#state.root_path
|
FileName = State#state.root_path
|
||||||
++ "/" ++ ?FILES_FP ++ "/"
|
++ "/" ++ ?FILES_FP ++ "/"
|
||||||
++ integer_to_list(MSN) ++ "_0_0",
|
++ integer_to_list(MSN) ++ "_0_0",
|
||||||
Opts = #sft_options{wait=Wait},
|
FileName.
|
||||||
{ok, Constructor, _} = leveled_sft:sft_newfroml0cache(FileName,
|
|
||||||
L0Cache,
|
|
||||||
Opts),
|
|
||||||
Constructor.
|
|
||||||
|
|
||||||
|
|
||||||
fetch_mem(Key, Manifest, L0Index, L0Cache) ->
|
fetch_mem(Key, Manifest, L0Index, L0Cache) ->
|
||||||
|
@ -1636,10 +1669,12 @@ create_file_test() ->
|
||||||
ok = file:write_file(Filename, term_to_binary("hello")),
|
ok = file:write_file(Filename, term_to_binary("hello")),
|
||||||
KVL = lists:usort(leveled_sft:generate_randomkeys(10000)),
|
KVL = lists:usort(leveled_sft:generate_randomkeys(10000)),
|
||||||
Tree = gb_trees:from_orddict(KVL),
|
Tree = gb_trees:from_orddict(KVL),
|
||||||
|
FetchFun = fun(Slot) -> lists:nth(Slot, [Tree]) end,
|
||||||
{ok,
|
{ok,
|
||||||
SP,
|
SP,
|
||||||
noreply} = leveled_sft:sft_newfroml0cache(Filename,
|
noreply} = leveled_sft:sft_newfroml0cache(Filename,
|
||||||
[Tree],
|
1,
|
||||||
|
FetchFun,
|
||||||
#sft_options{wait=false}),
|
#sft_options{wait=false}),
|
||||||
lists:foreach(fun(X) ->
|
lists:foreach(fun(X) ->
|
||||||
case checkready(SP) of
|
case checkready(SP) of
|
||||||
|
|
|
@ -38,7 +38,7 @@
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
add_to_index/5,
|
add_to_index/5,
|
||||||
to_list/1,
|
to_list/2,
|
||||||
new_index/0,
|
new_index/0,
|
||||||
check_levelzero/3,
|
check_levelzero/3,
|
||||||
merge_trees/4
|
merge_trees/4
|
||||||
|
@ -80,17 +80,19 @@ add_to_index(L0Index, L0Size, LevelMinus1, LedgerSQN, TreeList) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
to_list(TreeList) ->
|
to_list(Slots, FetchFun) ->
|
||||||
SW = os:timestamp(),
|
SW = os:timestamp(),
|
||||||
OutList = lists:foldr(fun(Tree, CompleteList) ->
|
SlotList = lists:reverse(lists:seq(1, Slots)),
|
||||||
|
FullList = lists:foldl(fun(Slot, Acc) ->
|
||||||
|
Tree = FetchFun(Slot),
|
||||||
L = gb_trees:to_list(Tree),
|
L = gb_trees:to_list(Tree),
|
||||||
lists:ukeymerge(1, CompleteList, L)
|
lists:ukeymerge(1, Acc, L)
|
||||||
end,
|
end,
|
||||||
[],
|
[],
|
||||||
TreeList),
|
SlotList),
|
||||||
io:format("L0 cache converted to list of size ~w in ~w microseconds~n",
|
io:format("L0 cache converted to list of size ~w in ~w microseconds~n",
|
||||||
[length(OutList), timer:now_diff(os:timestamp(), SW)]),
|
[length(FullList), timer:now_diff(os:timestamp(), SW)]),
|
||||||
OutList.
|
FullList.
|
||||||
|
|
||||||
|
|
||||||
new_index() ->
|
new_index() ->
|
||||||
|
@ -237,7 +239,8 @@ compare_method_test() ->
|
||||||
StartKey = {o, "Bucket0100", null, null},
|
StartKey = {o, "Bucket0100", null, null},
|
||||||
EndKey = {o, "Bucket0200", null, null},
|
EndKey = {o, "Bucket0200", null, null},
|
||||||
SWa = os:timestamp(),
|
SWa = os:timestamp(),
|
||||||
DumpList = to_list(TreeList),
|
FetchFun = fun(Slot) -> lists:nth(Slot, TreeList) end,
|
||||||
|
DumpList = to_list(length(TreeList), FetchFun),
|
||||||
Q0 = lists:foldl(fun({K, V}, Acc) ->
|
Q0 = lists:foldl(fun({K, V}, Acc) ->
|
||||||
P = leveled_codec:endkey_passed(EndKey, K),
|
P = leveled_codec:endkey_passed(EndKey, K),
|
||||||
case {K, P} of
|
case {K, P} of
|
||||||
|
|
|
@ -152,7 +152,7 @@
|
||||||
terminate/2,
|
terminate/2,
|
||||||
code_change/3,
|
code_change/3,
|
||||||
sft_new/4,
|
sft_new/4,
|
||||||
sft_newfroml0cache/3,
|
sft_newfroml0cache/4,
|
||||||
sft_open/1,
|
sft_open/1,
|
||||||
sft_get/2,
|
sft_get/2,
|
||||||
sft_getkvrange/4,
|
sft_getkvrange/4,
|
||||||
|
@ -228,11 +228,11 @@ sft_new(Filename, KL1, KL2, LevelInfo) ->
|
||||||
infinity),
|
infinity),
|
||||||
{ok, Pid, Reply}.
|
{ok, Pid, Reply}.
|
||||||
|
|
||||||
sft_newfroml0cache(Filename, L0Cache, Options) ->
|
sft_newfroml0cache(Filename, Slots, FetchFun, Options) ->
|
||||||
{ok, Pid} = gen_server:start(?MODULE, [], []),
|
{ok, Pid} = gen_server:start(?MODULE, [], []),
|
||||||
case Options#sft_options.wait of
|
case Options#sft_options.wait of
|
||||||
true ->
|
true ->
|
||||||
KL1 = leveled_pmem:to_list(L0Cache),
|
KL1 = leveled_pmem:to_list(Slots, FetchFun),
|
||||||
Reply = gen_server:call(Pid,
|
Reply = gen_server:call(Pid,
|
||||||
{sft_new,
|
{sft_new,
|
||||||
Filename,
|
Filename,
|
||||||
|
@ -243,10 +243,10 @@ sft_newfroml0cache(Filename, L0Cache, Options) ->
|
||||||
{ok, Pid, Reply};
|
{ok, Pid, Reply};
|
||||||
false ->
|
false ->
|
||||||
gen_server:cast(Pid,
|
gen_server:cast(Pid,
|
||||||
{sft_newfromcache,
|
{sft_newfroml0cache,
|
||||||
Filename,
|
Filename,
|
||||||
L0Cache,
|
Slots,
|
||||||
#level{level=0}}),
|
FetchFun}),
|
||||||
{ok, Pid, noreply}
|
{ok, Pid, noreply}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -355,10 +355,9 @@ handle_call({set_for_delete, Penciller}, _From, State) ->
|
||||||
handle_call(get_maxsqn, _From, State) ->
|
handle_call(get_maxsqn, _From, State) ->
|
||||||
statecheck_onreply(State#state.highest_sqn, State).
|
statecheck_onreply(State#state.highest_sqn, State).
|
||||||
|
|
||||||
handle_cast({sft_newfromcache, Filename, L0Cache, _LevelR=#level{level=L}},
|
handle_cast({sft_newfroml0cache, Filename, Slots, FetchFun}, _State) ->
|
||||||
_State) when L == 0->
|
|
||||||
SW = os:timestamp(),
|
SW = os:timestamp(),
|
||||||
Inp1 = leveled_pmem:to_list(L0Cache),
|
Inp1 = leveled_pmem:to_list(Slots, FetchFun),
|
||||||
{ok, State} = create_levelzero(Inp1, Filename),
|
{ok, State} = create_levelzero(Inp1, Filename),
|
||||||
io:format("File creation of L0 file ~s took ~w microseconds~n",
|
io:format("File creation of L0 file ~s took ~w microseconds~n",
|
||||||
[Filename, timer:now_diff(os:timestamp(), SW)]),
|
[Filename, timer:now_diff(os:timestamp(), SW)]),
|
||||||
|
|
|
@ -69,14 +69,16 @@ simple_put_fetch_head_delete(_Config) ->
|
||||||
|
|
||||||
many_put_fetch_head(_Config) ->
|
many_put_fetch_head(_Config) ->
|
||||||
RootPath = testutil:reset_filestructure(),
|
RootPath = testutil:reset_filestructure(),
|
||||||
StartOpts1 = #bookie_options{root_path=RootPath},
|
StartOpts1 = #bookie_options{root_path=RootPath,
|
||||||
|
max_pencillercachesize=16000},
|
||||||
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
|
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
|
||||||
{TestObject, TestSpec} = testutil:generate_testobject(),
|
{TestObject, TestSpec} = testutil:generate_testobject(),
|
||||||
ok = leveled_bookie:book_riakput(Bookie1, TestObject, TestSpec),
|
ok = leveled_bookie:book_riakput(Bookie1, TestObject, TestSpec),
|
||||||
testutil:check_forobject(Bookie1, TestObject),
|
testutil:check_forobject(Bookie1, TestObject),
|
||||||
ok = leveled_bookie:book_close(Bookie1),
|
ok = leveled_bookie:book_close(Bookie1),
|
||||||
StartOpts2 = #bookie_options{root_path=RootPath,
|
StartOpts2 = #bookie_options{root_path=RootPath,
|
||||||
max_journalsize=1000000000},
|
max_journalsize=1000000000,
|
||||||
|
max_pencillercachesize=32000},
|
||||||
{ok, Bookie2} = leveled_bookie:book_start(StartOpts2),
|
{ok, Bookie2} = leveled_bookie:book_start(StartOpts2),
|
||||||
testutil:check_forobject(Bookie2, TestObject),
|
testutil:check_forobject(Bookie2, TestObject),
|
||||||
GenList = [2, 20002, 40002, 60002, 80002,
|
GenList = [2, 20002, 40002, 60002, 80002,
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue