diff --git a/src/leveled_clerk.erl b/src/leveled_clerk.erl index 3197eb4..bf5e252 100644 --- a/src/leveled_clerk.erl +++ b/src/leveled_clerk.erl @@ -13,7 +13,7 @@ handle_cast/2, handle_info/2, terminate/2, - clerk_new/0, + clerk_new/1, clerk_prompt/2, code_change/3, perform_merge/4]). @@ -26,14 +26,15 @@ %%% API %%%============================================================================ -clerk_new() -> +clerk_new(Owner) -> {ok, Pid} = gen_server:start(?MODULE, [], []), - ok = gen_server:call(Pid, register, infinity), + ok = gen_server:call(Pid, {register, Owner}, infinity), {ok, Pid}. clerk_prompt(Pid, penciller) -> - gen_server:cast(Pid, penciller_prompt, infinity). + gen_server:cast(Pid, penciller_prompt), + ok. %%%============================================================================ %%% gen_server callbacks @@ -42,10 +43,10 @@ clerk_prompt(Pid, penciller) -> init([]) -> {ok, #state{}}. -handle_call(register, From, State) -> - {noreply, State#state{owner=From}}. +handle_call({register, Owner}, _From, State) -> + {reply, ok, State#state{owner=Owner}}. -handle_cast({penciller_prompt, From}, State) -> +handle_cast(penciller_prompt, State) -> case leveled_penciller:pcl_workforclerk(State#state.owner) of none -> io:format("Work prompted but none needed~n"), @@ -54,7 +55,9 @@ handle_cast({penciller_prompt, From}, State) -> {NewManifest, FilesToDelete} = merge(WI), UpdWI = WI#penciller_work{new_manifest=NewManifest, unreferenced_files=FilesToDelete}, - leveled_penciller:pcl_requestmanifestchange(From, UpdWI), + leveled_penciller:pcl_requestmanifestchange(State#state.owner, + UpdWI), + mark_for_delete(FilesToDelete, State#state.owner), {noreply, State} end. @@ -74,58 +77,69 @@ code_change(_OldVsn, State, _Extra) -> merge(WI) -> SrcLevel = WI#penciller_work.src_level, - {Selection, UpdMFest1} = select_filetomerge(SrcLevel, + {SrcF, UpdMFest1} = select_filetomerge(SrcLevel, WI#penciller_work.manifest), - {{StartKey, EndKey}, SrcFile} = Selection, - SrcFilename = leveled_sft:sft_getfilename(SrcFile), SinkFiles = get_item(SrcLevel + 1, UpdMFest1, []), - SplitLists = lists:splitwith(fun(Ref) -> - case {Ref#manifest_entry.start_key, - Ref#manifest_entry.end_key} of - {_, EK} when StartKey > EK -> - false; - {SK, _} when EndKey < SK -> - false; - _ -> - true - end end, - SinkFiles), - {Candidates, Others} = SplitLists, + Splits = lists:splitwith(fun(Ref) -> + case {Ref#manifest_entry.start_key, + Ref#manifest_entry.end_key} of + {_, EK} when SrcF#manifest_entry.start_key > EK -> + false; + {SK, _} when SrcF#manifest_entry.end_key < SK -> + false; + _ -> + true + end end, + SinkFiles), + {Candidates, Others} = Splits, %% TODO: %% Need to work out if this is the top level %% And then tell merge process to create files at the top level %% Which will include the reaping of expired tombstones - - io:format("Merge from level ~w to merge into ~w files below", + io:format("Merge from level ~w to merge into ~w files below~n", [SrcLevel, length(Candidates)]), - + MergedFiles = case length(Candidates) of 0 -> %% If no overlapping candiates, manifest change only required %% %% TODO: need to think still about simply renaming when at %% lower level - [SrcFile]; + [SrcF]; _ -> - perform_merge({SrcFile, SrcFilename}, + perform_merge({SrcF#manifest_entry.owner, + SrcF#manifest_entry.filename}, Candidates, SrcLevel, {WI#penciller_work.ledger_filepath, WI#penciller_work.next_sqn}) - end, + end, + case MergedFiles of + error -> + merge_failure; + _ -> + NewLevel = lists:sort(lists:append(MergedFiles, Others)), + UpdMFest2 = lists:keystore(SrcLevel + 1, + 1, + UpdMFest1, + {SrcLevel + 1, NewLevel}), + + ok = filelib:ensure_dir(WI#penciller_work.manifest_file), + {ok, Handle} = file:open(WI#penciller_work.manifest_file, + [binary, raw, write]), + ok = file:write(Handle, term_to_binary(UpdMFest2)), + ok = file:close(Handle), + {UpdMFest2, Candidates} + end. - NewLevel = lists:sort(lists:append(MergedFiles, Others)), - UpdMFest2 = lists:keyreplace(SrcLevel + 1, - 1, - UpdMFest1, - {SrcLevel, NewLevel}), + +mark_for_delete([], _Penciller) -> + ok; +mark_for_delete([Head|Tail], Penciller) -> + leveled_sft:sft_setfordelete(Head#manifest_entry.owner, Penciller), + mark_for_delete(Tail, Penciller). - {ok, Handle} = file:open(WI#penciller_work.manifest_file, - [binary, raw, write]), - ok = file:write(Handle, term_to_binary(UpdMFest2)), - ok = file:close(Handle), - {UpdMFest2, Candidates}. %% An algorithm for discovering which files to merge .... @@ -173,11 +187,12 @@ select_filetomerge(SrcLevel, Manifest) -> %% %% The level is the level which the new files should be created at. -perform_merge(FileToMerge, CandidateList, Level, {Filepath, MSN}) -> - {Filename, UpperSFTPid} = FileToMerge, +perform_merge({UpperSFTPid, Filename}, CandidateList, Level, {Filepath, MSN}) -> io:format("Merge to be commenced for FileToMerge=~s with MSN=~w~n", [Filename, MSN]), - PointerList = lists:map(fun(P) -> {next, P, all} end, CandidateList), + PointerList = lists:map(fun(P) -> + {next, P#manifest_entry.owner, all} end, + CandidateList), do_merge([{next, UpperSFTPid, all}], PointerList, Level, {Filepath, MSN}, 0, []). @@ -187,12 +202,14 @@ do_merge([], [], Level, {_Filepath, MSN}, FileCounter, OutList) -> OutList; do_merge(KL1, KL2, Level, {Filepath, MSN}, FileCounter, OutList) -> FileName = lists:flatten(io_lib:format(Filepath ++ "_~w_~w.sft", - [Level, FileCounter])), + [Level + 1, FileCounter])), io:format("File to be created as part of MSN=~w Filename=~s~n", [MSN, FileName]), + TS1 = os:timestamp(), case leveled_sft:sft_new(FileName, KL1, KL2, Level) of {ok, _Pid, {error, Reason}} -> - io:format("Exiting due to error~w~n", [Reason]); + io:format("Exiting due to error~w~n", [Reason]), + error; {ok, Pid, Reply} -> {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply, ExtMan = lists:append(OutList, @@ -200,6 +217,8 @@ do_merge(KL1, KL2, Level, {Filepath, MSN}, FileCounter, OutList) -> end_key=HighestKey, owner=Pid, filename=FileName}]), + MTime = timer:now_diff(os:timestamp(), TS1), + io:format("file creation took ~w microseconds ~n", [MTime]), do_merge(KL1Rem, KL2Rem, Level, {Filepath, MSN}, FileCounter + 1, ExtMan) end. @@ -278,7 +297,7 @@ merge_file_test() -> KL4_L2 = lists:sort(generate_randomkeys(16000, 750, 250)), {ok, PidL2_4, _} = leveled_sft:sft_new("../test/KL4_L2.sft", KL4_L2, [], 2), - Result = perform_merge({"../test/KL1_L1.sft", PidL1_1}, + Result = perform_merge({PidL1_1, "../test/KL1_L1.sft"}, [PidL2_1, PidL2_2, PidL2_3, PidL2_4], 2, {"../test/", 99}), lists:foreach(fun(ManEntry) -> diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 5e75e4a..fb7432d 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -159,7 +159,7 @@ pcl_fetch/2, pcl_workforclerk/1, pcl_requestmanifestchange/2, - commit_manifest_change/3]). + pcl_confirmdelete/2]). -include_lib("eunit/include/eunit.hrl"). @@ -181,7 +181,7 @@ -record(state, {manifest = [] :: list(), ongoing_work = [] :: list(), manifest_sqn = 0 :: integer(), - levelzero_sqn =0 :: integer(), + levelzero_sqn = 0 :: integer(), registered_iterators = [] :: list(), unreferenced_files = [] :: list(), root_path = "../test/" :: string(), @@ -214,7 +214,10 @@ pcl_workforclerk(Pid) -> gen_server:call(Pid, work_for_clerk, infinity). pcl_requestmanifestchange(Pid, WorkItem) -> - gen_server:call(Pid, {manifest_change, WorkItem}, infinity). + gen_server:cast(Pid, {manifest_change, WorkItem}). + +pcl_confirmdelete(Pid, FileName) -> + gen_server:call(Pid, {confirm_delete, FileName}). %%%============================================================================ %%% gen_server callbacks @@ -222,22 +225,27 @@ pcl_requestmanifestchange(Pid, WorkItem) -> init([]) -> TID = ets:new(?MEMTABLE, [ordered_set, private]), - {ok, #state{memtable=TID}}. + {ok, Clerk} = leveled_clerk:clerk_new(self()), + {ok, #state{memtable=TID, clerk=Clerk}}. handle_call({push_mem, DumpList}, _From, State) -> {TableSize, Manifest, L0Pend} = case State#state.levelzero_pending of {true, Remainder, {StartKey, EndKey, Pid}} -> %% Need to handle not error scenarios? %% N.B. Sync call - so will be ready - ok = leveled_sft:sft_checkready(Pid), + {ok, SrcFN} = leveled_sft:sft_checkready(Pid), %% Reset ETS, but re-insert any remainder true = ets:delete_all_objects(State#state.memtable), true = ets:insert(State#state.memtable, Remainder), + ManifestEntry = #manifest_entry{start_key=StartKey, + end_key=EndKey, + owner=Pid, + filename=SrcFN}, {length(Remainder), lists:keystore(0, 1, State#state.manifest, - {0, [{StartKey, EndKey, Pid}]}), + {0, [ManifestEntry]}), ?L0PEND_RESET}; {false, _, _} -> {State#state.table_size, @@ -246,7 +254,9 @@ handle_call({push_mem, DumpList}, _From, State) -> Unexpected -> io:format("Unexpected value of ~w~n", [Unexpected]), error - end, + end, + %% Prompt clerk to ask about work - do this for every push_mem + ok = leveled_clerk:clerk_prompt(State#state.clerk, penciller), case do_push_to_mem(DumpList, TableSize, State#state.memtable) of {twist, ApproxTableSize} -> {reply, ok, State#state{table_size=ApproxTableSize, @@ -258,13 +268,14 @@ handle_call({push_mem, DumpList}, _From, State) -> L0SN = State#state.levelzero_sqn + 1, FileName = State#state.root_path ++ ?FILES_FP ++ "/" - ++ integer_to_list(L0SN), - SFT = leveled_sft:sft_new(FileName, - ets:tab2list(State#state.memtable), - [], - 0, - #sft_options{wait=false}), - {ok, L0Pid, Reply} = SFT, + ++ integer_to_list(L0SN) ++ "_0_0", + Dump = ets:tab2list(State#state.memtable), + L0_SFT = leveled_sft:sft_new(FileName, + Dump, + [], + 0, + #sft_options{wait=false}), + {ok, L0Pid, Reply} = L0_SFT, {{KL1Rem, []}, L0StartKey, L0EndKey} = Reply, {reply, ok, State#state{levelzero_pending={true, KL1Rem, @@ -285,10 +296,16 @@ handle_call({fetch, Key}, _From, State) -> {reply, fetch(Key, State#state.manifest, State#state.memtable), State}; handle_call(work_for_clerk, From, State) -> {UpdState, Work} = return_work(State, From), - {reply, Work, UpdState}. + {reply, Work, UpdState}; +handle_call({confirm_delete, FileName}, _From, State) -> + Reply = confirm_delete(FileName, + State#state.unreferenced_files, + State#state.registered_iterators), + {reply, Reply, State}. -handle_cast(_Msg, State) -> - {noreply, State}. +handle_cast({manifest_change, WI}, State) -> + {ok, UpdState} = commit_manifest_change(WI, State), + {noreply, UpdState}. handle_info(_Info, State) -> {noreply, State}. @@ -453,7 +470,7 @@ get_item(Index, List, Default) -> %% - the list of ongoing work needs to be cleared of this item -commit_manifest_change(ReturnedWorkItem, From, State) -> +commit_manifest_change(ReturnedWorkItem, State) -> NewMSN = State#state.manifest_sqn + 1, [SentWorkItem] = State#state.ongoing_work, RootPath = State#state.root_path, @@ -461,8 +478,8 @@ commit_manifest_change(ReturnedWorkItem, From, State) -> case {SentWorkItem#penciller_work.next_sqn, SentWorkItem#penciller_work.clerk} of - {NewMSN, From} -> - MTime = timer:diff_now(os:timestamp(), + {NewMSN, _From} -> + MTime = timer:now_diff(os:timestamp(), SentWorkItem#penciller_work.start_time), io:format("Merge to sqn ~w completed in ~w microseconds at Level ~w~n", @@ -477,14 +494,15 @@ commit_manifest_change(ReturnedWorkItem, From, State) -> io:format("Merge has been commmitted at sequence number ~w~n", [NewMSN]), NewManifest = ReturnedWorkItem#penciller_work.new_manifest, - {ok, State#state{ongoing_work=null, + %% io:format("Updated manifest is ~w~n", [NewManifest]), + {ok, State#state{ongoing_work=[], manifest_sqn=NewMSN, manifest=NewManifest, unreferenced_files=UnreferencedFilesUpd}}; - {MaybeWrongMSN, MaybeWrongClerk} -> - io:format("Merge commit from ~w at sqn ~w not matched to expected - clerk ~w or sqn ~w~n", - [From, NewMSN, MaybeWrongClerk, MaybeWrongMSN]), + {MaybeWrongMSN, From} -> + io:format("Merge commit at sqn ~w not matched to expected + sqn ~w from Clerk ~w~n", + [NewMSN, MaybeWrongMSN, From]), {error, State} end. @@ -505,9 +523,26 @@ filepath(RootPath, NewMSN, new_merge_files) -> update_deletions([], _NewMSN, UnreferencedFiles) -> UnreferencedFiles; update_deletions([ClearedFile|Tail], MSN, UnreferencedFiles) -> + io:format("Adding cleared file ~s to deletion list ~n", + [ClearedFile#manifest_entry.filename]), update_deletions(Tail, MSN, - lists:append(UnreferencedFiles, [{ClearedFile, MSN}])). + lists:append(UnreferencedFiles, + [{ClearedFile#manifest_entry.filename, MSN}])). + +confirm_delete(Filename, UnreferencedFiles, RegisteredIterators) -> + case lists:keyfind(Filename, 1, UnreferencedFiles) of + false -> + false; + {Filename, MSN} -> + LowSQN = lists:foldl(fun({_, SQN}, MinSQN) -> min(SQN, MinSQN) end, + infinity, + RegisteredIterators), + if + MSN >= LowSQN -> false; + true -> true + end + end. %%%============================================================================ %%% Test @@ -532,3 +567,16 @@ compaction_work_assessment_test() -> Manifest3 = [{0, []}, {1, L1Alt}], WorkQ3 = assess_workqueue([], 0, Manifest3), ?assertMatch(WorkQ3, [{1, Manifest3}]). + +confirm_delete_test() -> + Filename = 'test.sft', + UnreferencedFiles = [{'other.sft', 15}, {Filename, 10}], + RegisteredIterators1 = [{dummy_pid, 16}, {dummy_pid, 12}], + R1 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators1), + ?assertMatch(R1, true), + RegisteredIterators2 = [{dummy_pid, 10}, {dummy_pid, 12}], + R2 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators2), + ?assertMatch(R2, false), + RegisteredIterators3 = [{dummy_pid, 9}, {dummy_pid, 12}], + R3 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators3), + ?assertMatch(R3, false). \ No newline at end of file diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 2896796..cde8313 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -160,6 +160,7 @@ sft_clear/1, sft_checkready/1, sft_getfilename/1, + sft_setfordelete/2, strip_to_keyonly/1, generate_randomkeys/1]). @@ -182,6 +183,7 @@ -define(ITERATOR_SCANWIDTH, 1). -define(MERGE_SCANWIDTH, 8). -define(MAX_KEYS, ?SLOT_COUNT * ?BLOCK_COUNT * ?BLOCK_SIZE). +-define(DELETE_TIMEOUT, 60000). -record(state, {version = ?CURRENT_VERSION :: tuple(), @@ -199,7 +201,9 @@ filename :: string(), handle :: file:fd(), background_complete=false :: boolean(), - background_failure="Unknown" :: string()}). + background_failure="Unknown" :: string(), + ready_for_delete = false ::boolean(), + penciller :: pid()}). %%%============================================================================ @@ -223,7 +227,6 @@ sft_new(Filename, KL1, KL2, Level, Options) -> end, {ok, Pid, Reply}. - sft_open(Filename) -> {ok, Pid} = gen_server:start(?MODULE, [], []), case gen_server:call(Pid, {sft_open, Filename}, infinity) of @@ -233,6 +236,9 @@ sft_open(Filename) -> Error end. +sft_setfordelete(Pid, Penciller) -> + file_request(Pid, {set_for_delete, Penciller}). + sft_get(Pid, Key) -> file_request(Pid, {get_kv, Key}). @@ -266,18 +272,7 @@ sft_getfilename(Pid) -> file_request(Pid, Request) -> case check_pid(Pid) of ok -> - try - gen_server:call(Pid, Request, infinity) - catch - exit:{normal,_} when Request == file_close -> - %% Honest race condition in bitcask_eqc PULSE test. - ok; - exit:{noproc,_} when Request == file_close -> - %% Honest race condition in bitcask_eqc PULSE test. - ok; - X1:X2 -> - exit({file_request_error, self(), Request, X1, X2}) - end; + gen_server:call(Pid, Request, infinity); Error -> Error end. @@ -388,16 +383,42 @@ handle_call(clear, _From, State) -> handle_call(background_complete, _From, State) -> case State#state.background_complete of true -> - {reply, ok, State}; + {reply, {ok, State#state.filename}, State}; false -> {reply, {error, State#state.background_failure}, State} end; -handle_call(get_filename, _from, State) -> - {reply, State#state.filename, State}. +handle_call(get_filename, _From, State) -> + {reply, State#state.filename, State}; +handle_call({set_for_delete, Penciller}, _From, State) -> + {reply, + ok, + State#state{ready_for_delete=true, + penciller=Penciller}, + ?DELETE_TIMEOUT}. handle_cast(_Msg, State) -> {noreply, State}. +handle_info(timeout, State) -> + case State#state.ready_for_delete of + true -> + case leveled_penciller:pcl_confirmdelete(State#state.penciller, + State#state.filename) + of + true -> + io:format("Polled for deletion and now clearing ~s~n", + [State#state.filename]), + ok = file:close(State#state.handle), + ok = file:delete(State#state.filename), + {stop, shutdown, State}; + false -> + io:format("Polled for deletion but ~s not ready~n", + [State#state.filename]), + {noreply, State, ?DELETE_TIMEOUT} + end; + false -> + {noreply, State} + end; handle_info(_Info, State) -> {noreply, State}. @@ -583,7 +604,7 @@ fetch_range(Handle, FileMD, StartKey, EndKey, FunList, AccFun, ScanWidth, Acc) - LengthList, 0, PointerB + FileMD#state.slots_pointer, AccFun(null, Acc)); not_found -> - {complete, Acc} + {complete, AccFun(null, Acc)} end. fetch_range(Handle, FileMD, _StartKey, NearestKey, EndKey, FunList, AccFun, ScanWidth, @@ -989,7 +1010,12 @@ serialise_block(BlockKeyList) -> %% any lower sequence numbers should be compacted out of existence -key_dominates([H1|T1], [], Level) -> +key_dominates(KL1, KL2, Level) -> + key_dominates_expanded(maybe_expand_pointer(KL1), + maybe_expand_pointer(KL2), + Level). + +key_dominates_expanded([H1|T1], [], Level) -> {_, _, St1, _} = H1, case maybe_reap_expiredkey(St1, Level) of true -> @@ -997,7 +1023,7 @@ key_dominates([H1|T1], [], Level) -> false -> {{next_key, H1}, maybe_expand_pointer(T1), []} end; -key_dominates([], [H2|T2], Level) -> +key_dominates_expanded([], [H2|T2], Level) -> {_, _, St2, _} = H2, case maybe_reap_expiredkey(St2, Level) of true -> @@ -1005,7 +1031,7 @@ key_dominates([], [H2|T2], Level) -> false -> {{next_key, H2}, [], maybe_expand_pointer(T2)} end; -key_dominates([H1|T1], [H2|T2], Level) -> +key_dominates_expanded([H1|T1], [H2|T2], Level) -> {K1, Sq1, St1, _} = H1, {K2, Sq2, St2, _} = H2, case K1 of @@ -1051,7 +1077,7 @@ maybe_expand_pointer([]) -> maybe_expand_pointer([H|Tail]) -> case H of {next, SFTPid, StartKey} -> - io:format("Scanning further on PID ~w ~w~n", [SFTPid, StartKey]), + %% io:format("Scanning further on PID ~w ~w~n", [SFTPid, StartKey]), QResult = sft_getkvrange(SFTPid, StartKey, all, ?MERGE_SCANWIDTH), Acc = pointer_append_queryresults(QResult, SFTPid), lists:append(Acc, Tail);