diff --git a/src/leveled_manifest.erl b/src/leveled_manifest.erl index 16629cd..f27e421 100644 --- a/src/leveled_manifest.erl +++ b/src/leveled_manifest.erl @@ -14,10 +14,12 @@ open_manifest/1, copy_manifest/1, load_manifest/3, + close_manifest/2, save_manifest/2, get_manifest_sqn/1, key_lookup/3, range_lookup/4, + merge_lookup/4, insert_manifest_entry/4, remove_manifest_entry/4, switch_manifest_entry/4, @@ -87,8 +89,7 @@ open_manifest(RootPath) -> ValidManSQNs = lists:reverse(lists:sort(lists:foldl(ExtractSQNFun, [], Filenames))), - Manifest = open_manifestfile(RootPath, ValidManSQNs), - Manifest. + open_manifestfile(RootPath, ValidManSQNs). copy_manifest(Manifest) -> % Copy the manifest ensuring anything only the master process should care @@ -100,12 +101,20 @@ load_manifest(Manifest, PidFun, SQNFun) -> fun(LevelIdx, {AccMaxSQN, AccMan}) -> L0 = array:get(LevelIdx, AccMan#manifest.levels), {L1, SQN1} = load_level(LevelIdx, L0, PidFun, SQNFun), - {max(AccMaxSQN, SQN1), - AccMan#manifest{levels = array:set(LevelIdx, L1, AccMan)}} + UpdLevels = array:set(LevelIdx, L1, AccMan#manifest.levels), + {max(AccMaxSQN, SQN1), AccMan#manifest{levels = UpdLevels}} end, lists:foldl(UpdateLevelFun, {0, Manifest}, lists:seq(0, Manifest#manifest.basement)). - + +close_manifest(Manifest, CloseEntryFun) -> + CloseLevelFun = + fun(LevelIdx) -> + Level = array:get(LevelIdx, Manifest#manifest.levels), + close_level(LevelIdx, Level, CloseEntryFun) + end, + lists:foreach(CloseLevelFun, lists:seq(0, Manifest#manifest.basement)). + save_manifest(Manifest, RootPath) -> FP = filepath(RootPath, Manifest#manifest.manifest_sqn, current_manifest), ManBin = term_to_binary(Manifest), @@ -176,24 +185,21 @@ key_lookup(Manifest, LevelIdx, Key) -> array:get(LevelIdx, Manifest#manifest.levels), Key) end. - + range_lookup(Manifest, LevelIdx, StartKey, EndKey) -> - Range = - case LevelIdx > Manifest#manifest.basement of - true -> - []; - false -> - range_lookup_level(LevelIdx, - array:get(LevelIdx, - Manifest#manifest.levels), - StartKey, - EndKey) - end, MakePointerFun = fun(M) -> {next, M, StartKey} end, - lists:map(MakePointerFun, Range). + range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun). + +merge_lookup(Manifest, LevelIdx, StartKey, EndKey) -> + MakePointerFun = + fun(M) -> + {next, M, all} + end, + range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun). + %% An algorithm for discovering which files to merge .... @@ -208,7 +214,7 @@ range_lookup(Manifest, LevelIdx, StartKey, EndKey) -> %% Hence, the initial implementation is to select files to merge at random mergefile_selector(Manifest, LevelIdx) -> Level = array:get(LevelIdx, Manifest#manifest.levels), - lists:nth(random:uniform(length(Level), Level)). + lists:nth(random:uniform(length(Level)), Level). add_snapshot(Manifest, Pid, Timeout) -> {MegaNow, SecNow, _} = os:timestamp(), @@ -299,12 +305,14 @@ load_level(_LevelIdx, Level, PidFun, SQNFun) -> LevelLoadFun = fun(ME, {L_Out, L_MaxSQN}) -> FN = ME#manifest_entry.filename, - {ok, P, _Keys} = PidFun(FN), + P = PidFun(FN), SQN = SQNFun(P), {[ME#manifest_entry{owner=P}|L_Out], max(SQN, L_MaxSQN)} end, lists:foldr(LevelLoadFun, {[], 0}, Level). +close_level(_LevelIdx, Level, CloseEntryFun) -> + lists:foreach(CloseEntryFun, Level). is_empty(_LevelIdx, []) -> true; @@ -337,7 +345,6 @@ remove_section(Level, StartKey, Length) -> Pre ++ Post. - key_lookup_level(_LevelIdx, [], _Key) -> false; key_lookup_level(LevelIdx, [Entry|Rest], Key) -> @@ -353,6 +360,20 @@ key_lookup_level(LevelIdx, [Entry|Rest], Key) -> key_lookup_level(LevelIdx, Rest, Key) end. +range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun) -> + Range = + case LevelIdx > Manifest#manifest.basement of + true -> + []; + false -> + range_lookup_level(LevelIdx, + array:get(LevelIdx, + Manifest#manifest.levels), + StartKey, + EndKey) + end, + lists:map(MakePointerFun, Range). + range_lookup_level(_LevelIdx, Level, QStartKey, QEndKey) -> BeforeFun = fun(M) -> @@ -392,13 +413,13 @@ filepath(RootPath, NewMSN, current_manifest) -> open_manifestfile(_RootPath, []) -> leveled_log:log("P0013", []), - {0, new_manifest()}; + new_manifest(); open_manifestfile(_RootPath, [0]) -> leveled_log:log("P0013", []), - {0, new_manifest()}; + new_manifest(); open_manifestfile(RootPath, [TopManSQN|Rest]) -> CurrManFile = filepath(RootPath, TopManSQN, current_manifest), - FileBin = file:read_file(CurrManFile), + {ok, FileBin} = file:read_file(CurrManFile), <> = FileBin, case erlang:crc32(BinaryOfTerm) of CRC -> diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index 31d9861..ddf4935 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -38,7 +38,8 @@ clerk_new/2, clerk_prompt/1, clerk_push/2, - clerk_close/1 + clerk_close/1, + clerk_promptdeletions/2 ]). -include_lib("eunit/include/eunit.hrl"). @@ -47,7 +48,8 @@ -define(MIN_TIMEOUT, 200). -record(state, {owner :: pid(), - root_path :: string()}). + root_path :: string(), + pending_deletions = dict:new() :: dict:dict()}). %%%============================================================================ %%% API @@ -62,6 +64,9 @@ clerk_new(Owner, Manifest) -> clerk_prompt(Pid) -> gen_server:cast(Pid, prompt). +clerk_promptdeletions(Pid, ManifestSQN) -> + gen_server:cast(Pid, {prompt_deletions, ManifestSQN}). + clerk_push(Pid, Work) -> gen_server:cast(Pid, {push_work, Work}). @@ -83,8 +88,14 @@ handle_call(close, _From, State) -> handle_cast(prompt, State) -> handle_info(timeout, State); handle_cast({push_work, Work}, State) -> - handle_work(Work, State), - {noreply, State, ?MIN_TIMEOUT}. + {ManifestSQN, Deletions} = handle_work(Work, State), + PDs = dict:store(ManifestSQN, Deletions, State#state.pending_deletions), + {noreply, State#state{pending_deletions = PDs}, ?MAX_TIMEOUT}; +handle_cast({prompt_deletions, ManifestSQN}, State) -> + Deletions = dict:fetch(ManifestSQN, State#state.pending_deletions), + ok = notify_deletions(Deletions, State#state.owner), + UpdDeletions = dict:erase(ManifestSQN, State#state.pending_deletions), + {noreply, State#state{pending_deletions = UpdDeletions}, ?MIN_TIMEOUT}. handle_info(timeout, State) -> request_work(State), @@ -117,7 +128,7 @@ handle_work({SrcLevel, Manifest}, State) -> ok = leveled_manifest:save_manifest(UpdManifest, State#state.root_path), leveled_log:log_timer("PC018", [], SWSM), - ok = notify_deletions(EntriesToDelete, State#state.owner). + {leveled_manifest:get_manifest_sqn(UpdManifest), EntriesToDelete}. merge(SrcLevel, Manifest, RootPath) -> Src = leveled_manifest:mergefile_selector(Manifest, SrcLevel), @@ -130,21 +141,13 @@ merge(SrcLevel, Manifest, RootPath) -> leveled_log:log("PC008", [SrcLevel, Candidates]), case Candidates of 0 -> - %% If no overlapping candiates, manifest change only required - %% - %% TODO: need to think still about simply renaming when at - %% lower level leveled_log:log("PC009", [Src#manifest_entry.filename, SrcLevel + 1]), - Man0 = leveled_manifest:remove_manifest_entry(Manifest, + Man0 = leveled_manifest:switch_manifest_entry(Manifest, NewSQN, SrcLevel, Src), - Man1 = leveled_manifest:insert_manifest_entry(Man0, - NewSQN, - SrcLevel + 1, - Src), - {Man1, []}; + {Man0, []}; _ -> FilePath = leveled_penciller:filepath(RootPath, NewSQN, @@ -166,53 +169,59 @@ notify_deletions([Head|Tail], Penciller) -> perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN) -> leveled_log:log("PC010", [Src#manifest_entry.filename, NewSQN]), - SrcList = [{next, Src#manifest_entry.owner, all}], - SinkPointerList = leveled_manifest:pointer_convert(Manifest, SinkList), + SrcList = [{next, Src, all}], MaxSQN = leveled_sst:sst_getmaxsequencenumber(Src#manifest_entry.owner), SinkLevel = SrcLevel + 1, SinkBasement = leveled_manifest:is_basement(Manifest, SinkLevel), - Man0 = do_merge(SrcList, SinkPointerList, - SinkLevel, SinkBasement, - RootPath, NewSQN, MaxSQN, - 0, Manifest), - RemoveFun = - fun(Entry, AccMan) -> - leveled_manifest:remove_manifest_entry(AccMan, + Additions = do_merge(SrcList, SinkList, + SinkLevel, SinkBasement, + RootPath, NewSQN, MaxSQN, + []), + RevertPointerFun = + fun({next, ME, _SK}) -> + ME + end, + Man0 = leveled_manifest:insert_manifest_entry(Manifest, NewSQN, SinkLevel, - Entry) - end, - Man1 = lists:foldl(RemoveFun, Man0, SinkList), - Man2 = leveled_manifest:remove_manifest_entry(Man1, NewSQN, SrcLevel, Src), + Additions), + Man1 = leveled_manifest:remove_manifest_entry(Man0, + NewSQN, + SinkLevel, + lists:map(RevertPointerFun, + SinkList)), + Man2 = leveled_manifest:remove_manifest_entry(Man1, + NewSQN, + SrcLevel, + Src), {Man2, [Src|SinkList]}. -do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, Counter, Man0) -> - leveled_log:log("PC011", [NewSQN, SinkLevel, Counter]), - Man0; -do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, Counter, Man0) -> +do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, Additions) -> + leveled_log:log("PC011", [NewSQN, SinkLevel, length(Additions)]), + Additions; +do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, Additions) -> FileName = lists:flatten(io_lib:format(RP ++ "_~w_~w.sst", - [SinkLevel, Counter])), + [SinkLevel, length(Additions)])), leveled_log:log("PC012", [NewSQN, FileName, SinkB]), TS1 = os:timestamp(), case leveled_sst:sst_new(FileName, KL1, KL2, SinkB, SinkLevel, MaxSQN) of empty -> leveled_log:log("PC013", [FileName]), - Man0; + do_merge([], [], + SinkLevel, SinkB, + RP, NewSQN, MaxSQN, + Additions); {ok, Pid, Reply} -> {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply, Entry = #manifest_entry{start_key=SmallestKey, end_key=HighestKey, owner=Pid, filename=FileName}, - Man1 = leveled_manifest:insert_manifest_entry(Man0, - NewSQN, - SinkLevel, - Entry), leveled_log:log_timer("PC015", [], TS1), do_merge(KL1Rem, KL2Rem, SinkLevel, SinkB, RP, NewSQN, MaxSQN, - Counter + 1, Man1) + Additions ++ [Entry]) end. @@ -294,7 +303,9 @@ merge_file_test() -> Man4 = leveled_manifest:insert_manifest_entry(Man3, 1, 2, E5), Man5 = leveled_manifest:insert_manifest_entry(Man4, 2, 1, E1), - {Man6, _Dels} = perform_merge(Man5, E1, [E2, E3, E4, E5], 1, "../test", 3), + PointerList = lists:map(fun(ME) -> {next, ME, all} end, + [E2, E3, E4, E5]), + {Man6, _Dels} = perform_merge(Man5, E1, PointerList, 1, "../test", 3), ?assertMatch(3, leveled_manifest:get_manifest_sqn(Man6)). diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index b340f7d..889bf50 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -461,6 +461,8 @@ handle_call(doom, _From, State) -> {stop, normal, {ok, [ManifestFP, FilesFP]}, State}. handle_cast({manifest_change, NewManifest}, State) -> + NewManSQN = leveled_manifest:get_manifest_sqn(NewManifest), + ok = leveled_pclerk:clerk_promptdeletions(State#state.clerk, NewManSQN), {noreply, State#state{manifest = NewManifest, work_ongoing=false}}; handle_cast({release_snapshot, Snapshot}, State) -> Manifest0 = leveled_manifest:release_snapshot(State#state.manifest, @@ -569,10 +571,11 @@ terminate(Reason, State) -> end, % Tidy shutdown of individual files - lists:foreach(fun({_FN, {Pid, _DSQN}}) -> - ok = leveled_sst:sst_close(Pid) - end, - leveled_manifest:dump_pidmap(State#state.manifest)), + EntryCloseFun = + fun(ME) -> + ok = leveled_sst:sst_close(ME#manifest_entry.owner) + end, + leveled_manifest:close_manifest(State#state.manifest, EntryCloseFun), leveled_log:log("P0011", []), ok. diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index ec8f8b9..712d74f 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -1175,8 +1175,8 @@ maybe_expand_pointer([{pointer, SSTPid, Slot, StartKey, all}|Tail]) -> expand_list_by_pointer({pointer, SSTPid, Slot, StartKey, all}, Tail, ?MERGE_SCANWIDTH); -maybe_expand_pointer([{next, SSTPid, StartKey}|Tail]) -> - expand_list_by_pointer({next, SSTPid, StartKey, all}, +maybe_expand_pointer([{next, ManEntry, StartKey}|Tail]) -> + expand_list_by_pointer({next, ManEntry, StartKey, all}, Tail, ?MERGE_SCANWIDTH); maybe_expand_pointer(List) -> @@ -1202,8 +1202,9 @@ expand_list_by_pointer({pointer, SSTPid, Slot, StartKey, EndKey}, Tail, Width) - {AccPointers, AccTail} = lists:foldl(FoldFun, InitAcc, Tail), ExpPointers = leveled_sst:sst_getslots(SSTPid, AccPointers), lists:append(ExpPointers, AccTail); -expand_list_by_pointer({next, SSTPid, StartKey, EndKey}, Tail, Width) -> - leveled_log:log("SST10", [SSTPid, is_pid(SSTPid)]), +expand_list_by_pointer({next, ManEntry, StartKey, EndKey}, Tail, Width) -> + SSTPid = ManEntry#manifest_entry.owner, + leveled_log:log("SST10", [SSTPid, is_process_alive(SSTPid)]), ExpPointer = leveled_sst:sst_getkvrange(SSTPid, StartKey, EndKey, Width), ExpPointer ++ Tail.