diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index fcb0657..3402aa8 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -232,7 +232,6 @@ terminate/2, code_change/3, pcl_start/1, - pcl_quickstart/1, pcl_pushmem/2, pcl_fetch/2, pcl_checksequencenumber/3, @@ -292,8 +291,6 @@ %%% API %%%============================================================================ -pcl_quickstart(RootPath) -> - pcl_start(#penciller_options{root_path=RootPath}). pcl_start(PCLopts) -> gen_server:start(?MODULE, [PCLopts], []). @@ -361,34 +358,139 @@ init([PCLopts]) -> end. -handle_call({push_mem, DumpList}, From, State) -> - if - State#state.is_snapshot == true -> - {reply, bad_request, State}; - true -> - writer_call({push_mem, DumpList}, From, State) +handle_call({push_mem, DumpList}, From, State=#state{is_snapshot=Snap}) + when Snap == false -> + % The process for pushing to memory is as follows + % - Check that the inbound list does not contain any Keys with a lower + % sequence number than any existing keys (assess_sqn/1) + % - Check that any file that had been sent to be written to L0 previously + % is now completed. If it is wipe out the in-memory view as this is now + % safely persisted. This will block waiting for this to complete if it + % hasn't (checkready_pushmem/1). + % - Quick check to see if there is a need to write a L0 file + % (quickcheck_pushmem/3). If there clearly isn't, then we can reply, and + % then add to memory in the background before updating the loop state + % - Push the update into memory (do_pushtomem/3) + % - If we haven't got through quickcheck now need to check if there is a + % definite need to write a new L0 file (roll_memory/2). If all clear this + % will write the file in the background and allow a response to the user. + % If not the change has still been made but the the L0 file will not have + % been prompted - so the reply does not indicate failure but returns the + % atom 'pause' to signal a loose desire for back-pressure to be applied. + % The only reason in this case why there should be a pause is if the + % manifest is locked pending completion of a manifest change - so reacting + % to the pause signal may not be sensible + StartWatch = os:timestamp(), + case assess_sqn(DumpList) of + {MinSQN, MaxSQN} when MaxSQN >= MinSQN, + MinSQN >= State#state.ledger_sqn -> + MaxTableSize = State#state.memtable_maxsize, + {TableSize0, State1} = checkready_pushtomem(State), + case quickcheck_pushtomem(DumpList, + TableSize0, + MaxTableSize) of + {twist, TableSize1} -> + gen_server:reply(From, ok), + io:format("Reply made on push in ~w microseconds~n", + [timer:now_diff(os:timestamp(), StartWatch)]), + L0Snap = do_pushtomem(DumpList, + State1#state.memtable, + State1#state.memtable_copy, + MaxSQN), + io:format("Push completed in ~w microseconds~n", + [timer:now_diff(os:timestamp(), StartWatch)]), + {noreply, + State1#state{memtable_copy=L0Snap, + table_size=TableSize1, + ledger_sqn=MaxSQN}}; + {maybe_roll, TableSize1} -> + L0Snap = do_pushtomem(DumpList, + State1#state.memtable, + State1#state.memtable_copy, + MaxSQN), + + case roll_memory(State1, MaxTableSize) of + {ok, L0Pend, ManSN, TableSize2} -> + io:format("Push completed in ~w microseconds~n", + [timer:now_diff(os:timestamp(), StartWatch)]), + {reply, + ok, + State1#state{levelzero_pending=L0Pend, + table_size=TableSize2, + manifest_sqn=ManSN, + memtable_copy=L0Snap, + ledger_sqn=MaxSQN, + backlog=false}}; + {pause, Reason, Details} -> + io:format("Excess work due to - " ++ Reason, + Details), + {reply, + pause, + State1#state{backlog=true, + memtable_copy=L0Snap, + table_size=TableSize1, + ledger_sqn=MaxSQN}} + end + end; + {MinSQN, MaxSQN} -> + io:format("Mismatch of sequence number expectations with push " + ++ "having sequence numbers between ~w and ~w " + ++ "but current sequence number is ~w~n", + [MinSQN, MaxSQN, State#state.ledger_sqn]), + {reply, refused, State}; + empty -> + io:format("Empty request pushed to Penciller~n"), + {reply, ok, State} end; -handle_call({confirm_delete, FileName}, _From, State) -> - if - State#state.is_snapshot == true -> - {reply, bad_request, State}; +handle_call({confirm_delete, FileName}, _From, State=#state{is_snapshot=Snap}) + when Snap == false -> + Reply = confirm_delete(FileName, + State#state.unreferenced_files, + State#state.registered_snapshots), + case Reply of true -> - writer_call({confirm_delete, FileName}, _From, State) + UF1 = lists:keydelete(FileName, 1, State#state.unreferenced_files), + {reply, true, State#state{unreferenced_files=UF1}}; + _ -> + {reply, Reply, State} end; -handle_call(prompt_compaction, _From, State) -> +handle_call(prompt_compaction, _From, State=#state{is_snapshot=Snap}) + when Snap == false -> + %% If there is a prompt immediately after a L0 async write event then + %% there exists the potential for the prompt to stall the database. + %% Should only accept prompts if there has been a safe wait from the + %% last L0 write event. + Proceed = case State#state.levelzero_pending of + {true, _Pid, TS} -> + TD = timer:now_diff(os:timestamp(),TS), + if + TD < ?PROMPT_WAIT_ONL0 * 1000000 -> false; + true -> true + end; + ?L0PEND_RESET -> + true + end, if - State#state.is_snapshot == true -> - {reply, bad_request, State}; + Proceed -> + {_TableSize, State1} = checkready_pushtomem(State), + case roll_memory(State1, State1#state.memtable_maxsize) of + {ok, L0Pend, MSN, TableSize} -> + io:format("Prompted push completed~n"), + {reply, ok, State1#state{levelzero_pending=L0Pend, + table_size=TableSize, + manifest_sqn=MSN, + backlog=false}}; + {pause, Reason, Details} -> + io:format("Excess work due to - " ++ Reason, Details), + {reply, pause, State1#state{backlog=true}} + end; true -> - writer_call(prompt_compaction, _From, State) - end; -handle_call({manifest_change, WI}, _From, State) -> - if - State#state.is_snapshot == true -> - {reply, bad_request, State}; - true -> - writer_call({manifest_change, WI}, _From, State) + {reply, ok, State#state{backlog=false}} end; +handle_call({manifest_change, WI}, _From, State=#state{is_snapshot=Snap}) + when Snap == false -> + {ok, UpdState} = commit_manifest_change(WI, State), + {reply, ok, UpdState}; handle_call({check_sqn, Key, SQN}, _From, State) -> Obj = if State#state.is_snapshot == true -> @@ -529,138 +631,6 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. -writer_call({push_mem, DumpList}, From, State0) -> - % The process for pushing to memory is as follows - % - Check that the inbound list does not contain any Keys with a lower - % sequence number than any existing keys (assess_sqn/1) - % - Check that any file that had been sent to be written to L0 previously - % is now completed. If it is wipe out the in-memory view as this is now - % safely persisted. This will block waiting for this to complete if it - % hasn't (checkready_pushmem/1). - % - Quick check to see if there is a need to write a L0 file - % (quickcheck_pushmem/3). If there clearly isn't, then we can reply, and - % then add to memory in the background before updating the loop state - % - Push the update into memory (do_pushtomem/3) - % - If we haven't got through quickcheck now need to check if there is a - % definite need to write a new L0 file (roll_memory/2). If all clear this - % will write the file in the background and allow a response to the user. - % If not the change has still been made but the the L0 file will not have - % been prompted - so the reply does not indicate failure but returns the - % atom 'pause' to signal a loose desire for back-pressure to be applied. - % The only reason in this case why there should be a pause is if the - % manifest is locked pending completion of a manifest change - so reacting - % to the pause signal may not be sensible - StartWatch = os:timestamp(), - case assess_sqn(DumpList) of - {MinSQN, MaxSQN} when MaxSQN >= MinSQN, - MinSQN >= State0#state.ledger_sqn -> - MaxTableSize = State0#state.memtable_maxsize, - {TableSize0, State1} = checkready_pushtomem(State0), - case quickcheck_pushtomem(DumpList, - TableSize0, - MaxTableSize) of - {twist, TableSize1} -> - gen_server:reply(From, ok), - io:format("Reply made on push in ~w microseconds~n", - [timer:now_diff(os:timestamp(), StartWatch)]), - L0Snap = do_pushtomem(DumpList, - State1#state.memtable, - State1#state.memtable_copy, - MaxSQN), - io:format("Push completed in ~w microseconds~n", - [timer:now_diff(os:timestamp(), StartWatch)]), - {noreply, - State1#state{memtable_copy=L0Snap, - table_size=TableSize1, - ledger_sqn=MaxSQN}}; - {maybe_roll, TableSize1} -> - L0Snap = do_pushtomem(DumpList, - State1#state.memtable, - State1#state.memtable_copy, - MaxSQN), - - case roll_memory(State1, MaxTableSize) of - {ok, L0Pend, ManSN, TableSize2} -> - io:format("Push completed in ~w microseconds~n", - [timer:now_diff(os:timestamp(), StartWatch)]), - {reply, - ok, - State1#state{levelzero_pending=L0Pend, - table_size=TableSize2, - manifest_sqn=ManSN, - memtable_copy=L0Snap, - ledger_sqn=MaxSQN, - backlog=false}}; - {pause, Reason, Details} -> - io:format("Excess work due to - " ++ Reason, - Details), - {reply, - pause, - State1#state{backlog=true, - memtable_copy=L0Snap, - table_size=TableSize1, - ledger_sqn=MaxSQN}} - end - end; - {MinSQN, MaxSQN} -> - io:format("Mismatch of sequence number expectations with push " - ++ "having sequence numbers between ~w and ~w " - ++ "but current sequence number is ~w~n", - [MinSQN, MaxSQN, State0#state.ledger_sqn]), - {reply, refused, State0}; - empty -> - io:format("Empty request pushed to Penciller~n"), - {reply, ok, State0} - end; -writer_call({confirm_delete, FileName}, _From, State) -> - Reply = confirm_delete(FileName, - State#state.unreferenced_files, - State#state.registered_snapshots), - case Reply of - true -> - UF1 = lists:keydelete(FileName, 1, State#state.unreferenced_files), - {reply, true, State#state{unreferenced_files=UF1}}; - _ -> - {reply, Reply, State} - end; -writer_call(prompt_compaction, _From, State) -> - %% If there is a prompt immediately after a L0 async write event then - %% there exists the potential for the prompt to stall the database. - %% Should only accept prompts if there has been a safe wait from the - %% last L0 write event. - Proceed = case State#state.levelzero_pending of - {true, _Pid, TS} -> - TD = timer:now_diff(os:timestamp(),TS), - if - TD < ?PROMPT_WAIT_ONL0 * 1000000 -> false; - true -> true - end; - ?L0PEND_RESET -> - true - end, - if - Proceed -> - {_TableSize, State1} = checkready_pushtomem(State), - case roll_memory(State1, State1#state.memtable_maxsize) of - {ok, L0Pend, MSN, TableSize} -> - io:format("Prompted push completed~n"), - {reply, ok, State1#state{levelzero_pending=L0Pend, - table_size=TableSize, - manifest_sqn=MSN, - backlog=false}}; - {pause, Reason, Details} -> - io:format("Excess work due to - " ++ Reason, Details), - {reply, pause, State1#state{backlog=true}} - end; - true -> - {reply, ok, State#state{backlog=false}} - end; -writer_call({manifest_change, WI}, _From, State) -> - {ok, UpdState} = commit_manifest_change(WI, State), - {reply, ok, UpdState}. - - - %%%============================================================================ %%% Internal functions %%%============================================================================ @@ -1286,8 +1256,6 @@ simple_server_test() -> ok; _ -> io:format("Unexpected sequence number on restart ~w~n", [TopSQN]), - ok = pcl_close(PCLr), - clean_testdir(RootPath), error end, ?assertMatch(Check, ok),