diff --git a/include/leveled.hrl b/include/leveled.hrl index 028eb95..e685a39 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -17,7 +17,8 @@ -record(sft_options, {wait = true :: boolean(), - expire_tombstones = false :: boolean()}). + expire_tombstones = false :: boolean(), + penciller :: pid()}). -record(penciller_work, {next_sqn :: integer(), diff --git a/src/leveled_log.erl b/src/leveled_log.erl index d0ab7f5..581e02c 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -93,6 +93,8 @@ {info, "Rename of manifest from ~s ~w to ~s ~w"}}, {"P0028", {info, "Adding cleared file ~s to deletion list"}}, + {"P0029", + {info, "L0 completion confirmed and will transition to not pending"}}, {"PC001", {info, "Penciller's clerk ~w started with owner ~w"}}, diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 81216e4..5c1ebce 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -229,6 +229,7 @@ pcl_checksequencenumber/3, pcl_workforclerk/1, pcl_promptmanifestchange/2, + pcl_confirml0complete/4, pcl_confirmdelete/2, pcl_close/1, pcl_registersnapshot/2, @@ -314,6 +315,9 @@ pcl_workforclerk(Pid) -> pcl_promptmanifestchange(Pid, WI) -> gen_server:cast(Pid, {manifest_change, WI}). +pcl_confirml0complete(Pid, FN, StartKey, EndKey) -> + gen_server:cast(Pid, {levelzero_complete, FN, StartKey, EndKey}). + pcl_confirmdelete(Pid, FileName) -> gen_server:cast(Pid, {confirm_delete, FileName}). @@ -360,13 +364,10 @@ handle_call({push_mem, PushedTree}, From, State=#state{is_snapshot=Snap}) % we mean value from the perspective of the Ledger, not the full value % stored in the Inker) % - % 2 - Check to see if there is a levelzero file pending. If so check if - % the levelzero file is complete. If it is complete, the levelzero tree - % can be flushed, the in-memory manifest updated, and the new tree can - % be accepted as the new levelzero cache. If not, the update must be - % returned. + % 2 - Check to see if there is a levelzero file pending. If so, the + % update must be returned. If not the update can be accepted % - % 3 - The Penciller can now reply to the Bookie to show that the push has + % 3 - The Penciller can now reply to the Bookie to show if the push has % been accepted % % 4 - Update the cache: @@ -375,47 +376,12 @@ handle_call({push_mem, PushedTree}, From, State=#state{is_snapshot=Snap}) % % Check the approximate size of the cache. If it is over the maximum size, % trigger a backgroun L0 file write and update state of levelzero_pending. - - SW = os:timestamp(), + SW = os:timestamp(), S = case State#state.levelzero_pending of true -> - L0Pid = State#state.levelzero_constructor, - case checkready(L0Pid) of - timeout -> - log_pushmem_reply(From, - {returned, - "L-0 persist pending"}, - SW), - State; - {ok, SrcFN, StartKey, EndKey} -> - log_pushmem_reply(From, - {ok, - "L-0 persist completed"}, - SW), - ManEntry = #manifest_entry{start_key=StartKey, - end_key=EndKey, - owner=L0Pid, - filename=SrcFN}, - UpdMan = lists:keystore(0, - 1, - State#state.manifest, - {0, [ManEntry]}), - LedgerSQN = State#state.ledger_sqn, - UpdState = State#state{manifest=UpdMan, - levelzero_pending=false, - persisted_sqn=LedgerSQN}, - % Prompt clerk to ask about work - do this for - % every L0 roll - ok = leveled_pclerk:clerk_prompt(State#state.clerk), - NewL0Index = leveled_pmem:new_index(), - update_levelzero(NewL0Index, - 0, - PushedTree, - LedgerSQN, - [], - UpdState) - end; + log_pushmem_reply(From, {returned, "L-0 persist pending"}, SW), + State; false -> log_pushmem_reply(From, {ok, "L0 memory updated"}, SW), update_levelzero(State#state.levelzero_index, @@ -506,7 +472,22 @@ handle_cast({confirm_delete, FileName}, State=#state{is_snapshot=Snap}) {noreply, State#state{unreferenced_files=UF1}}; _ -> {noreply, State} - end. + end; +handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) -> + leveled_log:log("P0029", []), + ManEntry = #manifest_entry{start_key=StartKey, + end_key=EndKey, + owner=State#state.levelzero_constructor, + filename=FN}, + UpdMan = lists:keystore(0, 1, State#state.manifest, {0, [ManEntry]}), + % Prompt clerk to ask about work - do this for every L0 roll + ok = leveled_pclerk:clerk_prompt(State#state.clerk), + {noreply, State#state{levelzero_cache=[], + levelzero_pending=false, + levelzero_constructor=undefined, + levelzero_index=leveled_pmem:new_index(), + levelzero_size=0, + manifest=UpdMan}}. handle_info({_Ref, {ok, SrcFN, _StartKey, _EndKey}}, State) -> @@ -724,7 +705,7 @@ checkready(Pid) -> roll_memory(State, false) -> FileName = levelzero_filename(State), leveled_log:log("P0019", [FileName]), - Opts = #sft_options{wait=false}, + Opts = #sft_options{wait=false, penciller=self()}, PCL = self(), FetchFun = fun(Slot) -> pcl_fetchlevelzero(PCL, Slot) end, % FetchFun = fun(Slot) -> lists:nth(Slot, State#state.levelzero_cache) end, diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 4833015..f6cdeb1 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -245,7 +245,8 @@ sft_newfroml0cache(Filename, Slots, FetchFun, Options) -> {sft_newfroml0cache, Filename, Slots, - FetchFun}), + FetchFun, + Options#sft_options.penciller}), {ok, Pid, noreply} end. @@ -352,12 +353,21 @@ handle_call({set_for_delete, Penciller}, _From, State) -> handle_call(get_maxsqn, _From, State) -> statecheck_onreply(State#state.highest_sqn, State). -handle_cast({sft_newfroml0cache, Filename, Slots, FetchFun}, _State) -> +handle_cast({sft_newfroml0cache, Filename, Slots, FetchFun, PCL}, _State) -> SW = os:timestamp(), Inp1 = leveled_pmem:to_list(Slots, FetchFun), {ok, State} = create_levelzero(Inp1, Filename), leveled_log:log_timer("SFT03", [Filename], SW), - {noreply, State}; + case PCL of + undefined -> + {noreply, State}; + _ -> + ok = leveled_penciller:pcl_confirml0complete(PCL, + Filename, + State#state.smallest_key, + State#state.highest_key), + {noreply, State} + end; handle_cast(close, State) -> {stop, normal, State}.