From 70dc637c97e33db52b3f991e3350227776e158bb Mon Sep 17 00:00:00 2001 From: martinsumner Date: Sat, 5 Nov 2016 14:31:10 +0000 Subject: [PATCH] Add slow offer support on write pressure When there is write pressure on the penciller and it returns to the bookie, the bookie will now punish the next PUT (and itself) with a pause. The longer the back-pressure state has been in place, the more frequent the pauses --- src/leveled_bookie.erl | 30 ++++++++++++++++++++++-------- src/leveled_inker.erl | 2 +- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 82abb86..04d4a5c 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -161,12 +161,14 @@ -define(LEDGER_FP, "ledger"). -define(SNAPSHOT_TIMEOUT, 300000). -define(CHECKJOURNAL_PROB, 0.2). +-define(SLOWOFFER_DELAY, 10). -record(state, {inker :: pid(), penciller :: pid(), cache_size :: integer(), ledger_cache :: gb_trees:tree(), - is_snapshot :: boolean()}). + is_snapshot :: boolean(), + slow_offer = false :: boolean()}). @@ -286,11 +288,24 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) -> ObjSize, {IndexSpecs, TTL}), Cache0 = addto_ledgercache(Changes, State#state.ledger_cache), + % If the previous push to memory was returned then punish this PUT with a + % delay. If the back-pressure in the Penciller continues, these delays + % will beocme more frequent + case State#state.slow_offer of + true -> + timer:sleep(?SLOWOFFER_DELAY); + false -> + ok + end, gen_server:reply(From, ok), - {ok, NewCache} = maybepush_ledgercache(State#state.cache_size, + case maybepush_ledgercache(State#state.cache_size, Cache0, - State#state.penciller), - {noreply, State#state{ledger_cache=NewCache}}; + State#state.penciller) of + {ok, NewCache} -> + {noreply, State#state{ledger_cache=NewCache, slow_offer=false}}; + {returned, NewCache} -> + {noreply, State#state{ledger_cache=NewCache, slow_offer=true}} + end; handle_call({get, Bucket, Key, Tag}, _From, State) -> LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), case fetch_head(LedgerKey, @@ -799,7 +814,7 @@ maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> ok -> {ok, gb_trees:empty()}; returned -> - {ok, Cache} + {returned, Cache} end; true -> {ok, Cache} @@ -809,10 +824,9 @@ maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> maybe_withjitter(CacheSize, MaxCacheSize) -> if CacheSize > MaxCacheSize -> - T = 2 * MaxCacheSize - CacheSize, - R = random:uniform(CacheSize), + R = random:uniform(7 * MaxCacheSize), if - R > T -> + (CacheSize - MaxCacheSize) > R -> true; true -> false diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index f96800a..5689274 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -122,7 +122,7 @@ -define(JOURNAL_FILEX, "cdb"). -define(MANIFEST_FILEX, "man"). -define(PENDING_FILEX, "pnd"). --define(LOADING_PAUSE, 5000). +-define(LOADING_PAUSE, 1000). -define(LOADING_BATCH, 1000). -record(state, {manifest = [] :: list(),