diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 82abb86..04d4a5c 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -161,12 +161,14 @@ -define(LEDGER_FP, "ledger"). -define(SNAPSHOT_TIMEOUT, 300000). -define(CHECKJOURNAL_PROB, 0.2). +-define(SLOWOFFER_DELAY, 10). -record(state, {inker :: pid(), penciller :: pid(), cache_size :: integer(), ledger_cache :: gb_trees:tree(), - is_snapshot :: boolean()}). + is_snapshot :: boolean(), + slow_offer = false :: boolean()}). @@ -286,11 +288,24 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) -> ObjSize, {IndexSpecs, TTL}), Cache0 = addto_ledgercache(Changes, State#state.ledger_cache), + % If the previous push to memory was returned then punish this PUT with a + % delay. If the back-pressure in the Penciller continues, these delays + % will beocme more frequent + case State#state.slow_offer of + true -> + timer:sleep(?SLOWOFFER_DELAY); + false -> + ok + end, gen_server:reply(From, ok), - {ok, NewCache} = maybepush_ledgercache(State#state.cache_size, + case maybepush_ledgercache(State#state.cache_size, Cache0, - State#state.penciller), - {noreply, State#state{ledger_cache=NewCache}}; + State#state.penciller) of + {ok, NewCache} -> + {noreply, State#state{ledger_cache=NewCache, slow_offer=false}}; + {returned, NewCache} -> + {noreply, State#state{ledger_cache=NewCache, slow_offer=true}} + end; handle_call({get, Bucket, Key, Tag}, _From, State) -> LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), case fetch_head(LedgerKey, @@ -799,7 +814,7 @@ maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> ok -> {ok, gb_trees:empty()}; returned -> - {ok, Cache} + {returned, Cache} end; true -> {ok, Cache} @@ -809,10 +824,9 @@ maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> maybe_withjitter(CacheSize, MaxCacheSize) -> if CacheSize > MaxCacheSize -> - T = 2 * MaxCacheSize - CacheSize, - R = random:uniform(CacheSize), + R = random:uniform(7 * MaxCacheSize), if - R > T -> + (CacheSize - MaxCacheSize) > R -> true; true -> false diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index f96800a..5689274 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -122,7 +122,7 @@ -define(JOURNAL_FILEX, "cdb"). -define(MANIFEST_FILEX, "man"). -define(PENDING_FILEX, "pnd"). --define(LOADING_PAUSE, 5000). +-define(LOADING_PAUSE, 1000). -define(LOADING_BATCH, 1000). -record(state, {manifest = [] :: list(),