diff --git a/include/leveled.hrl b/include/leveled.hrl index 0e62cf3..25216f6 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -64,7 +64,8 @@ {root_path :: string(), max_inmemory_tablesize :: integer(), start_snapshot = false :: boolean(), - source_penciller :: pid()}). + source_penciller :: pid(), + levelzero_cointoss = false :: boolean}). -record(iclerk_options, {inker :: pid(), diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 891d6a4..ce444fb 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -146,7 +146,7 @@ -define(SNAPSHOT_TIMEOUT, 300000). -define(CHECKJOURNAL_PROB, 0.2). -define(CACHE_SIZE_JITTER, 25). --define(JOURNAL_SIZE_JITTER, 10). +-define(JOURNAL_SIZE_JITTER, 20). -record(state, {inker :: pid(), penciller :: pid(), @@ -692,7 +692,8 @@ set_options(Opts) -> binary_mode=true, sync_strategy=SyncStrat}}, #penciller_options{root_path = LedgerFP, - max_inmemory_tablesize = PCLL0CacheSize}}. + max_inmemory_tablesize = PCLL0CacheSize, + levelzero_cointoss = true}}. startup(InkerOpts, PencillerOpts) -> {ok, Inker} = leveled_inker:ink_start(InkerOpts), diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 94bac54..be62cf9 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -198,7 +198,7 @@ -define(MAX_TABLESIZE, 32000). -define(PROMPT_WAIT_ONL0, 5). -define(WORKQUEUE_BACKLOG_TOLERANCE, 4). - +-define(COIN_SIDECOUNT, 4). -record(state, {manifest = [] :: list(), manifest_sqn = 0 :: integer(), @@ -217,6 +217,7 @@ % is an array - but cannot specif due to OTP compatability levelzero_size = 0 :: integer(), levelzero_maxcachesize :: integer(), + levelzero_cointoss = false :: boolean(), is_snapshot = false :: boolean(), snapshot_fully_loaded = false :: boolean(), @@ -537,10 +538,17 @@ start_from_file(PCLopts) -> end, {ok, MergeClerk} = leveled_pclerk:clerk_new(self()), + + CoinToss = PCLopts#penciller_options.levelzero_cointoss, + % Used to randomly defer the writing of L0 file. Intended to help with + % vnode syncronisation issues (e.g. stop them all by default merging to + % level zero concurrently) + InitState = #state{clerk=MergeClerk, root_path=RootPath, levelzero_index = leveled_pmem:new_index(), - levelzero_maxcachesize=MaxTableSize}, + levelzero_maxcachesize=MaxTableSize, + levelzero_cointoss=CoinToss}, %% Open manifest ManifestPath = InitState#state.root_path ++ "/" ++ ?MANIFEST_FP ++ "/", @@ -629,8 +637,20 @@ update_levelzero(L0Index, L0Size, PushedTree, LedgerSQN, L0Cache, State) -> ledger_sqn=MaxSQN}, CacheTooBig = NewL0Size > State#state.levelzero_maxcachesize, Level0Free = length(get_item(0, State#state.manifest, [])) == 0, - case {CacheTooBig, Level0Free} of - {true, true} -> + RandomFactor = + case State#state.levelzero_cointoss of + true -> + case random:uniform(?COIN_SIDECOUNT) of + 1 -> + true; + _ -> + false + end; + false -> + true + end, + case {CacheTooBig, Level0Free, RandomFactor} of + {true, true, true} -> L0Constructor = roll_memory(UpdState, false), UpdState#state{levelzero_pending=true, levelzero_constructor=L0Constructor}; diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 70b0b0f..9c67721 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -192,6 +192,8 @@ -define(DELETE_TIMEOUT, 10000). -define(MAX_KEYS, ?SLOT_COUNT * ?BLOCK_COUNT * ?BLOCK_SIZE). -define(DISCARD_EXT, ".discarded"). +-define(WRITE_OPS, [binary, raw, read, write, delayed_write]). +-define(READ_OPS, [binary, raw, read]). -record(state, {version = ?CURRENT_VERSION :: tuple(), slot_index :: list(), @@ -469,7 +471,7 @@ generate_filenames(RootFilename) -> create_file(FileName) when is_list(FileName) -> leveled_log:log("SFT01", [FileName]), ok = filelib:ensure_dir(FileName), - {ok, Handle} = file:open(FileName, [binary, raw, read, write]), + {ok, Handle} = file:open(FileName, ?WRITE_OPS), Header = create_header(initial), {ok, _} = file:position(Handle, bof), ok = file:write(Handle, Header),