Add slow offer support on write pressure
When there is write pressure on the penciller and it returns to the bookie, the bookie will now punish the next PUT (and itself) with a pause. The longer the back-pressure state has been in place, the more frequent the pauses
This commit is contained in:
parent
7f456fa993
commit
70dc637c97
2 changed files with 23 additions and 9 deletions
|
@ -161,12 +161,14 @@
|
||||||
-define(LEDGER_FP, "ledger").
|
-define(LEDGER_FP, "ledger").
|
||||||
-define(SNAPSHOT_TIMEOUT, 300000).
|
-define(SNAPSHOT_TIMEOUT, 300000).
|
||||||
-define(CHECKJOURNAL_PROB, 0.2).
|
-define(CHECKJOURNAL_PROB, 0.2).
|
||||||
|
-define(SLOWOFFER_DELAY, 10).
|
||||||
|
|
||||||
-record(state, {inker :: pid(),
|
-record(state, {inker :: pid(),
|
||||||
penciller :: pid(),
|
penciller :: pid(),
|
||||||
cache_size :: integer(),
|
cache_size :: integer(),
|
||||||
ledger_cache :: gb_trees:tree(),
|
ledger_cache :: gb_trees:tree(),
|
||||||
is_snapshot :: boolean()}).
|
is_snapshot :: boolean(),
|
||||||
|
slow_offer = false :: boolean()}).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -286,11 +288,24 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) ->
|
||||||
ObjSize,
|
ObjSize,
|
||||||
{IndexSpecs, TTL}),
|
{IndexSpecs, TTL}),
|
||||||
Cache0 = addto_ledgercache(Changes, State#state.ledger_cache),
|
Cache0 = addto_ledgercache(Changes, State#state.ledger_cache),
|
||||||
|
% If the previous push to memory was returned then punish this PUT with a
|
||||||
|
% delay. If the back-pressure in the Penciller continues, these delays
|
||||||
|
% will beocme more frequent
|
||||||
|
case State#state.slow_offer of
|
||||||
|
true ->
|
||||||
|
timer:sleep(?SLOWOFFER_DELAY);
|
||||||
|
false ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
gen_server:reply(From, ok),
|
gen_server:reply(From, ok),
|
||||||
{ok, NewCache} = maybepush_ledgercache(State#state.cache_size,
|
case maybepush_ledgercache(State#state.cache_size,
|
||||||
Cache0,
|
Cache0,
|
||||||
State#state.penciller),
|
State#state.penciller) of
|
||||||
{noreply, State#state{ledger_cache=NewCache}};
|
{ok, NewCache} ->
|
||||||
|
{noreply, State#state{ledger_cache=NewCache, slow_offer=false}};
|
||||||
|
{returned, NewCache} ->
|
||||||
|
{noreply, State#state{ledger_cache=NewCache, slow_offer=true}}
|
||||||
|
end;
|
||||||
handle_call({get, Bucket, Key, Tag}, _From, State) ->
|
handle_call({get, Bucket, Key, Tag}, _From, State) ->
|
||||||
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
|
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
|
||||||
case fetch_head(LedgerKey,
|
case fetch_head(LedgerKey,
|
||||||
|
@ -799,7 +814,7 @@ maybepush_ledgercache(MaxCacheSize, Cache, Penciller) ->
|
||||||
ok ->
|
ok ->
|
||||||
{ok, gb_trees:empty()};
|
{ok, gb_trees:empty()};
|
||||||
returned ->
|
returned ->
|
||||||
{ok, Cache}
|
{returned, Cache}
|
||||||
end;
|
end;
|
||||||
true ->
|
true ->
|
||||||
{ok, Cache}
|
{ok, Cache}
|
||||||
|
@ -809,10 +824,9 @@ maybepush_ledgercache(MaxCacheSize, Cache, Penciller) ->
|
||||||
maybe_withjitter(CacheSize, MaxCacheSize) ->
|
maybe_withjitter(CacheSize, MaxCacheSize) ->
|
||||||
if
|
if
|
||||||
CacheSize > MaxCacheSize ->
|
CacheSize > MaxCacheSize ->
|
||||||
T = 2 * MaxCacheSize - CacheSize,
|
R = random:uniform(7 * MaxCacheSize),
|
||||||
R = random:uniform(CacheSize),
|
|
||||||
if
|
if
|
||||||
R > T ->
|
(CacheSize - MaxCacheSize) > R ->
|
||||||
true;
|
true;
|
||||||
true ->
|
true ->
|
||||||
false
|
false
|
||||||
|
|
|
@ -122,7 +122,7 @@
|
||||||
-define(JOURNAL_FILEX, "cdb").
|
-define(JOURNAL_FILEX, "cdb").
|
||||||
-define(MANIFEST_FILEX, "man").
|
-define(MANIFEST_FILEX, "man").
|
||||||
-define(PENDING_FILEX, "pnd").
|
-define(PENDING_FILEX, "pnd").
|
||||||
-define(LOADING_PAUSE, 5000).
|
-define(LOADING_PAUSE, 1000).
|
||||||
-define(LOADING_BATCH, 1000).
|
-define(LOADING_BATCH, 1000).
|
||||||
|
|
||||||
-record(state, {manifest = [] :: list(),
|
-record(state, {manifest = [] :: list(),
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue