diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index f95b37c..9d61661 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -156,6 +156,7 @@ -define(SHUTDOWN_WAITS, 60). -define(SHUTDOWN_PAUSE, 10000). -define(SNAPSHOT_TIMEOUT, 300000). +-define(JITTER_PROBABILITY, 0.1). -record(state, {inker :: pid(), penciller :: pid(), @@ -582,23 +583,41 @@ addto_ledgercache(Changes, Cache) -> maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> CacheSize = gb_trees:size(Cache), + TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize), if - CacheSize > MaxCacheSize -> - case leveled_penciller:pcl_pushmem(Penciller, - gb_trees:to_list(Cache)) of + TimeToPush -> + Dump = gb_trees:to_list(Cache), + case leveled_penciller:pcl_pushmem(Penciller, Dump) of ok -> {ok, gb_trees:empty()}; pause -> {pause, gb_trees:empty()}; - refused -> + returned -> {ok, Cache} end; true -> - {ok, Cache} + {ok, Cache} + end. + + +maybe_withjitter(CacheSize, MaxCacheSize) -> + if + CacheSize > 2 * MaxCacheSize -> + true; + CacheSize > MaxCacheSize -> + R = random:uniform(), + if + R < ?JITTER_PROBABILITY -> + true; + true -> + false + end; + true -> + false end. load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) -> - {MinSQN, MaxSQN, Output} = Acc0, + {MinSQN, MaxSQN, OutputTree} = Acc0, {SQN, PK} = KeyInLedger, % VBin may already be a term {VBin, VSize} = ExtractFun(ValueInLedger), @@ -613,11 +632,11 @@ load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) -> {loop, Acc0}; SQN when SQN < MaxSQN -> Changes = preparefor_ledgercache(PK, SQN, Obj, VSize, IndexSpecs), - {loop, {MinSQN, MaxSQN, Output ++ Changes}}; + {loop, {MinSQN, MaxSQN, addto_ledgercache(Changes, OutputTree)}}; MaxSQN -> io:format("Reached end of load batch with SQN ~w~n", [SQN]), Changes = preparefor_ledgercache(PK, SQN, Obj, VSize, IndexSpecs), - {stop, {MinSQN, MaxSQN, Output ++ Changes}}; + {stop, {MinSQN, MaxSQN, addto_ledgercache(Changes, OutputTree)}}; SQN when SQN > MaxSQN -> io:format("Skipping as exceeded MaxSQN ~w with SQN ~w~n", [MaxSQN, SQN]), diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 5be590b..79a0cd8 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -609,7 +609,7 @@ load_from_sequence(MinSQN, FilterFun, Penciller, [{_LowSQN, FN, Pid}|Rest]) -> load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller, CDBpid, StartPos, FN, Rest) -> io:format("Loading from filename ~s from SQN ~w~n", [FN, MinSQN]), - InitAcc = {MinSQN, MaxSQN, []}, + InitAcc = {MinSQN, MaxSQN, gb_trees:empty()}, Res = case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitAcc, StartPos) of {eof, {AccMinSQN, _AccMaxSQN, AccKL}} -> ok = push_to_penciller(Penciller, AccKL), @@ -633,12 +633,18 @@ load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller, ok end. -push_to_penciller(Penciller, KeyList) -> +push_to_penciller(Penciller, KeyTree) -> + % The push to penciller must start as a tree to correctly de-duplicate + % the list by order before becoming a de-duplicated list for loading + KeyList = gb_trees:to_list(KeyTree), R = leveled_penciller:pcl_pushmem(Penciller, KeyList), if R == pause -> timer:sleep(?LOADING_PAUSE); - true -> + R == returned -> + timer:sleep(?LOADING_PAUSE), + push_to_penciller(Penciller, KeyTree); + R == ok -> ok end. diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index cac5d6e..2a915ae 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -67,9 +67,8 @@ -include_lib("eunit/include/eunit.hrl"). --define(INACTIVITY_TIMEOUT, 5000). --define(QUICK_TIMEOUT, 500). --define(HAPPYTIME_MULTIPLIER, 2). +-define(MAX_TIMEOUT, 2000). +-define(MIN_TIMEOUT, 200). -record(state, {owner :: pid(), change_pending=false :: boolean(), @@ -98,7 +97,7 @@ init([]) -> {ok, #state{}}. handle_call({register, Owner}, _From, State) -> - {reply, ok, State#state{owner=Owner}, ?INACTIVITY_TIMEOUT}; + {reply, ok, State#state{owner=Owner}, ?MIN_TIMEOUT}; handle_call({manifest_change, return, true}, _From, State) -> io:format("Request for manifest change from clerk on closing~n"), case State#state.change_pending of @@ -124,12 +123,11 @@ handle_call({manifest_change, confirm, Closing}, From, State) -> State#state.owner), {noreply, State#state{work_item=null, change_pending=false}, - ?QUICK_TIMEOUT} + ?MIN_TIMEOUT} end. handle_cast(prompt, State) -> - io:format("Clerk reducing timeout due to prompt~n"), - {noreply, State, ?QUICK_TIMEOUT}. + {noreply, State, ?MIN_TIMEOUT}. handle_info(timeout, State=#state{change_pending=Pnd}) when Pnd == false -> case requestandhandle_work(State) of @@ -155,15 +153,10 @@ code_change(_OldVsn, State, _Extra) -> requestandhandle_work(State) -> case leveled_penciller:pcl_workforclerk(State#state.owner) of - {none, Backlog} -> + none -> io:format("Work prompted but none needed~n"), - case Backlog of - false -> - {false, ?INACTIVITY_TIMEOUT * ?HAPPYTIME_MULTIPLIER}; - _ -> - {false, ?INACTIVITY_TIMEOUT} - end; - {WI, _} -> + {false, ?MAX_TIMEOUT}; + WI -> {NewManifest, FilesToDelete} = merge(WI), UpdWI = WI#penciller_work{new_manifest=NewManifest, unreferenced_files=FilesToDelete}, diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 0ea83c8..39c2626 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -306,7 +306,6 @@ memtable_copy = #l0snapshot{} :: #l0snapshot{}, levelzero_snapshot = gb_trees:empty() :: gb_trees:tree(), memtable, - backlog = false :: boolean(), memtable_maxsize :: integer(), is_snapshot = false :: boolean(), snapshot_fully_loaded = false :: boolean(), @@ -412,58 +411,25 @@ handle_call({push_mem, DumpList}, From, State=#state{is_snapshot=Snap}) case assess_sqn(DumpList) of {MinSQN, MaxSQN} when MaxSQN >= MinSQN, MinSQN >= State#state.ledger_sqn -> - MaxTableSize = State#state.memtable_maxsize, - {TableSize0, State1} = checkready_pushtomem(State), - case quickcheck_pushtomem(DumpList, - TableSize0, - MaxTableSize) of - {twist, TableSize1} -> - gen_server:reply(From, ok), - io:format("Reply made on push in ~w microseconds~n", + case checkready_pushtomem(State) of + {ok, TableSize0, State1} -> + push_and_roll(DumpList, + TableSize0, + State#state.memtable_maxsize, + MaxSQN, + StartWatch, + From, + State1); + timeout -> + io:format("Timeout of ~w microseconds awaiting " ++ + "L0 SFT write~n", [timer:now_diff(os:timestamp(), StartWatch)]), - L0Snap = do_pushtomem(DumpList, - State1#state.memtable, - State1#state.memtable_copy, - MaxSQN), - io:format("Push completed in ~w microseconds~n", - [timer:now_diff(os:timestamp(), StartWatch)]), - {noreply, - State1#state{memtable_copy=L0Snap, - table_size=TableSize1, - ledger_sqn=MaxSQN}}; - {maybe_roll, TableSize1} -> - L0Snap = do_pushtomem(DumpList, - State1#state.memtable, - State1#state.memtable_copy, - MaxSQN), - - case roll_memory(State1, MaxTableSize, L0Snap) of - {ok, L0Pend, ManSN, TableSize2} -> - io:format("Push completed in ~w microseconds~n", - [timer:now_diff(os:timestamp(), StartWatch)]), - {reply, - ok, - State1#state{levelzero_pending=L0Pend, - table_size=TableSize2, - manifest_sqn=ManSN, - memtable_copy=L0Snap, - ledger_sqn=MaxSQN, - backlog=false}}; - {pause, Reason, Details} -> - io:format("Excess work due to - " ++ Reason, - Details), - {reply, - pause, - State1#state{backlog=true, - memtable_copy=L0Snap, - table_size=TableSize1, - ledger_sqn=MaxSQN}} - end + {reply, returned, State} end; {MinSQN, MaxSQN} -> io:format("Mismatch of sequence number expectations with push " - ++ "having sequence numbers between ~w and ~w " - ++ "but current sequence number is ~w~n", + ++ "having sequence numbers between ~w and ~w " + ++ "but current sequence number is ~w~n", [MinSQN, MaxSQN, State#state.ledger_sqn]), {reply, refused, State}; empty -> @@ -520,7 +486,7 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc}, {reply, Acc, State}; handle_call(work_for_clerk, From, State) -> {UpdState, Work} = return_work(State, From), - {reply, {Work, UpdState#state.backlog}, UpdState}; + {reply, Work, UpdState}; handle_call(get_startup_sqn, _From, State) -> {reply, State#state.ledger_sqn, State}; handle_call({register_snapshot, Snapshot}, _From, State) -> @@ -568,7 +534,8 @@ handle_cast({release_snapshot, Snapshot}, State) -> io:format("Penciller snapshot ~w released~n", [Snapshot]), {noreply, State#state{registered_snapshots=Rs}}. -handle_info(_Info, State) -> +handle_info({_Ref, {ok, SrcFN, _StartKey, _EndKey}}, State) -> + io:format("Orphaned reply after timeout on L0 file write ~s~n", [SrcFN]), {noreply, State}. terminate(Reason, State=#state{is_snapshot=Snap}) when Snap == true -> @@ -749,28 +716,87 @@ start_from_file(PCLopts) -> checkready_pushtomem(State) -> - {TableSize, UpdState} = case State#state.levelzero_pending of + case State#state.levelzero_pending of {true, Pid, _TS} -> - % N.B. Sync call - so will be ready - {ok, SrcFN, StartKey, EndKey} = leveled_sft:sft_checkready(Pid), - true = ets:delete_all_objects(State#state.memtable), - ManifestEntry = #manifest_entry{start_key=StartKey, - end_key=EndKey, - owner=Pid, - filename=SrcFN}, - % Prompt clerk to ask about work - do this for every L0 roll - ok = leveled_pclerk:clerk_prompt(State#state.clerk), - {0, - State#state{manifest=lists:keystore(0, + case checkready(Pid) of + timeout -> + timeout; + {ok, SrcFN, StartKey, EndKey} -> + true = ets:delete_all_objects(State#state.memtable), + ManifestEntry = #manifest_entry{start_key=StartKey, + end_key=EndKey, + owner=Pid, + filename=SrcFN}, + % Prompt clerk to ask about work - do this for every + % L0 roll + ok = leveled_pclerk:clerk_prompt(State#state.clerk), + UpdManifest = lists:keystore(0, 1, State#state.manifest, {0, [ManifestEntry]}), - levelzero_pending=?L0PEND_RESET, - memtable_copy=#l0snapshot{}}}; + {ok, + 0, + State#state{manifest=UpdManifest, + levelzero_pending=?L0PEND_RESET, + memtable_copy=#l0snapshot{}}} + end; ?L0PEND_RESET -> - {State#state.table_size, State} - end, - {TableSize, UpdState}. + {ok, State#state.table_size, State} + end. + + +checkready(Pid) -> + try + leveled_sft:sft_checkready(Pid) + catch + exit:{timeout, _} -> + timeout + end. + + +push_and_roll(DumpList, TableSize, MaxTableSize, MaxSQN, StartWatch, From, State) -> + case quickcheck_pushtomem(DumpList, TableSize, MaxTableSize) of + {twist, TableSize1} -> + gen_server:reply(From, ok), + io:format("Reply made on push in ~w microseconds~n", + [timer:now_diff(os:timestamp(), StartWatch)]), + L0Snap = do_pushtomem(DumpList, + State#state.memtable, + State#state.memtable_copy, + MaxSQN), + io:format("Push completed in ~w microseconds~n", + [timer:now_diff(os:timestamp(), StartWatch)]), + {noreply, + State#state{memtable_copy=L0Snap, + table_size=TableSize1, + ledger_sqn=MaxSQN}}; + {maybe_roll, TableSize1} -> + L0Snap = do_pushtomem(DumpList, + State#state.memtable, + State#state.memtable_copy, + MaxSQN), + + case roll_memory(State, MaxTableSize, L0Snap) of + {ok, L0Pend, ManSN, TableSize2} -> + io:format("Push completed in ~w microseconds~n", + [timer:now_diff(os:timestamp(), StartWatch)]), + {reply, + ok, + State#state{levelzero_pending=L0Pend, + table_size=TableSize2, + manifest_sqn=ManSN, + memtable_copy=L0Snap, + ledger_sqn=MaxSQN}}; + {pause, Reason, Details} -> + io:format("Excess work due to - " ++ Reason, + Details), + {reply, + pause, + State#state{memtable_copy=L0Snap, + table_size=TableSize1, + ledger_sqn=MaxSQN}} + end + end. quickcheck_pushtomem(DumpList, TableSize, MaxSize) -> case TableSize + length(DumpList) of @@ -894,9 +920,10 @@ return_work(State, From) -> case length(WorkQueue) of L when L > 0 -> [{SrcLevel, Manifest}|OtherWork] = WorkQueue, + Backlog = length(OtherWork), io:format("Work at Level ~w to be scheduled for ~w with ~w " ++ "queue items outstanding~n", - [SrcLevel, From, length(OtherWork)]), + [SrcLevel, From, Backlog]), case element(1, State#state.levelzero_pending) of true -> % Once the L0 file is completed there will be more work @@ -1557,7 +1584,7 @@ simple_server_test() -> "Key0004", null}, 3002)), - % Add some more keys and confirm that chekc sequence number still + % Add some more keys and confirm that check sequence number still % sees the old version in the previous snapshot, but will see the new version % in a new snapshot Key1A = {{o,"Bucket0001", "Key0001", null}, {4002, {active, infinity}, null}}, diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 7669660..f292339 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -252,7 +252,7 @@ sft_close(Pid) -> gen_server:call(Pid, close, infinity). sft_checkready(Pid) -> - gen_server:call(Pid, background_complete, infinity). + gen_server:call(Pid, background_complete, 50). sft_getmaxsequencenumber(Pid) -> gen_server:call(Pid, get_maxsqn, infinity).