diff --git a/include/leveled.hrl b/include/leveled.hrl index 55192ca..3b06a40 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -16,7 +16,8 @@ ledger_filepath :: string(), manifest_file :: string(), new_manifest :: list(), - unreferenced_files :: list()}). + unreferenced_files :: list(), + target_is_basement = false ::boolean()}). -record(manifest_entry, {start_key :: tuple(), diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index 2a915ae..5a321fa 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -62,8 +62,7 @@ clerk_new/1, clerk_prompt/1, clerk_manifestchange/3, - code_change/3, - perform_merge/4]). + code_change/3]). -include_lib("eunit/include/eunit.hrl"). @@ -193,7 +192,7 @@ merge(WI) -> perform_merge({SrcF#manifest_entry.owner, SrcF#manifest_entry.filename}, Candidates, - SrcLevel, + {SrcLevel, WI#penciller_work.target_is_basement}, {WI#penciller_work.ledger_filepath, WI#penciller_work.next_sqn}) end, @@ -283,28 +282,32 @@ select_filetomerge(SrcLevel, Manifest) -> %% %% The level is the level which the new files should be created at. -perform_merge({UpperSFTPid, Filename}, CandidateList, Level, {Filepath, MSN}) -> +perform_merge({SrcPid, SrcFN}, CandidateList, LevelInfo, {Filepath, MSN}) -> io:format("Merge to be commenced for FileToMerge=~s with MSN=~w~n", - [Filename, MSN]), + [SrcFN, MSN]), PointerList = lists:map(fun(P) -> {next, P#manifest_entry.owner, all} end, CandidateList), - do_merge([{next, UpperSFTPid, all}], - PointerList, Level, {Filepath, MSN}, 0, []). + do_merge([{next, SrcPid, all}], + PointerList, + LevelInfo, + {Filepath, MSN}, + 0, + []). -do_merge([], [], Level, {_Filepath, MSN}, FileCounter, OutList) -> +do_merge([], [], {SrcLevel, _IsB}, {_Filepath, MSN}, FileCounter, OutList) -> io:format("Merge completed with MSN=~w Level=~w and FileCounter=~w~n", - [MSN, Level, FileCounter]), + [MSN, SrcLevel, FileCounter]), OutList; -do_merge(KL1, KL2, Level, {Filepath, MSN}, FileCounter, OutList) -> +do_merge(KL1, KL2, {SrcLevel, IsB}, {Filepath, MSN}, FileCounter, OutList) -> FileName = lists:flatten(io_lib:format(Filepath ++ "_~w_~w.sft", - [Level + 1, FileCounter])), + [SrcLevel + 1, FileCounter])), io:format("File to be created as part of MSN=~w Filename=~s~n", [MSN, FileName]), % Attempt to trace intermittent eaccess failures false = filelib:is_file(FileName), TS1 = os:timestamp(), - {ok, Pid, Reply} = leveled_sft:sft_new(FileName, KL1, KL2, Level + 1), + {ok, Pid, Reply} = leveled_sft:sft_new(FileName, KL1, KL2, SrcLevel + 1), {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply, ExtMan = lists:append(OutList, [#manifest_entry{start_key=SmallestKey, @@ -313,7 +316,9 @@ do_merge(KL1, KL2, Level, {Filepath, MSN}, FileCounter, OutList) -> filename=FileName}]), MTime = timer:now_diff(os:timestamp(), TS1), io:format("File creation took ~w microseconds ~n", [MTime]), - do_merge(KL1Rem, KL2Rem, Level, {Filepath, MSN}, FileCounter + 1, ExtMan). + do_merge(KL1Rem, KL2Rem, + {SrcLevel, IsB}, {Filepath, MSN}, + FileCounter + 1, ExtMan). get_item(Index, List, Default) -> @@ -389,7 +394,7 @@ merge_file_test() -> #manifest_entry{owner=PidL2_2}, #manifest_entry{owner=PidL2_3}, #manifest_entry{owner=PidL2_4}], - 2, {"../test/", 99}), + {2, false}, {"../test/", 99}), lists:foreach(fun(ManEntry) -> {o, B1, K1} = ManEntry#manifest_entry.start_key, {o, B2, K2} = ManEntry#manifest_entry.end_key, diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 39c2626..1c69eee 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -914,16 +914,20 @@ compare_to_sqn(Obj, SQN) -> %% The full queue is calculated for logging purposes only return_work(State, From) -> - WorkQueue = assess_workqueue([], - 0, - State#state.manifest), - case length(WorkQueue) of + {WorkQ, BasementL} = assess_workqueue([], 0, State#state.manifest, 0), + case length(WorkQ) of L when L > 0 -> - [{SrcLevel, Manifest}|OtherWork] = WorkQueue, + [{SrcLevel, Manifest}|OtherWork] = WorkQ, Backlog = length(OtherWork), io:format("Work at Level ~w to be scheduled for ~w with ~w " ++ "queue items outstanding~n", [SrcLevel, From, Backlog]), + IsBasement = if + SrcLevel + 1 == BasementL -> + true; + true -> + false + end, case element(1, State#state.levelzero_pending) of true -> % Once the L0 file is completed there will be more work @@ -946,7 +950,8 @@ return_work(State, From) -> manifest=Manifest, start_time = os:timestamp(), ledger_filepath = FP, - manifest_file = ManFile}, + manifest_file = ManFile, + target_is_basement = IsBasement}, {State#state{ongoing_work=[WI]}, WI} end; _ -> @@ -1161,7 +1166,7 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, QueryFunT) -> QueryFunT); {{Key, Val}, null, null} -> % No best key set - so can assume that this key is the best key, - % and check the higher levels + % and check the lower levels find_nextkey(QueryArray, LCnt + 1, {LCnt, {Key, Val}}, @@ -1270,14 +1275,21 @@ keyfolder(IMMiterator, SFTiterator, StartKey, EndKey, {AccFun, Acc}) -> end. -assess_workqueue(WorkQ, ?MAX_LEVELS - 1, _Manifest) -> - WorkQ; -assess_workqueue(WorkQ, LevelToAssess, Manifest)-> +assess_workqueue(WorkQ, ?MAX_LEVELS - 1, _Man, BasementLevel) -> + {WorkQ, BasementLevel}; +assess_workqueue(WorkQ, LevelToAssess, Man, BasementLevel) -> MaxFiles = get_item(LevelToAssess, ?LEVEL_SCALEFACTOR, 0), - FileCount = length(get_item(LevelToAssess, Manifest, [])), - NewWQ = maybe_append_work(WorkQ, LevelToAssess, Manifest, MaxFiles, - FileCount), - assess_workqueue(NewWQ, LevelToAssess + 1, Manifest). + case length(get_item(LevelToAssess, Man, [])) of + FileCount when FileCount > 0 -> + NewWQ = maybe_append_work(WorkQ, + LevelToAssess, + Man, + MaxFiles, + FileCount), + assess_workqueue(NewWQ, LevelToAssess + 1, Man, LevelToAssess); + 0 -> + assess_workqueue(WorkQ, LevelToAssess + 1, Man, BasementLevel) + end. maybe_append_work(WorkQ, Level, Manifest, @@ -1418,7 +1430,6 @@ confirm_delete(Filename, UnreferencedFiles, RegisteredSnapshots) -> end. - assess_sqn([]) -> empty; assess_sqn(DumpList) -> @@ -1467,7 +1478,7 @@ compaction_work_assessment_test() -> L1 = [{{o, "B1", "K1", null}, {o, "B2", "K2", null}, dummy_pid}, {{o, "B2", "K3", null}, {o, "B4", "K4", null}, dummy_pid}], Manifest = [{0, L0}, {1, L1}], - WorkQ1 = assess_workqueue([], 0, Manifest), + {WorkQ1, 1} = assess_workqueue([], 0, Manifest, 0), ?assertMatch(WorkQ1, [{0, Manifest}]), L1Alt = lists:append(L1, [{{o, "B5", "K0001", null}, {o, "B5", "K9999", null}, @@ -1485,7 +1496,7 @@ compaction_work_assessment_test() -> {{o, "BB", "K0001", null}, {o, "BB", "K9999", null}, dummy_pid}]), Manifest3 = [{0, []}, {1, L1Alt}], - WorkQ3 = assess_workqueue([], 0, Manifest3), + {WorkQ3, 1} = assess_workqueue([], 0, Manifest3, 0), ?assertMatch(WorkQ3, [{1, Manifest3}]). confirm_delete_test() -> diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index f292339..5399e01 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -1744,6 +1744,45 @@ initial_iterator_test() -> ok = file:close(UpdHandle), ok = file:delete(Filename). +key_dominates_test() -> + KV1 = {{o, "Bucket", "Key1", null}, {5, {active, infinity}, []}}, + KV2 = {{o, "Bucket", "Key3", null}, {6, {active, infinity}, []}}, + KV3 = {{o, "Bucket", "Key2", null}, {3, {active, infinity}, []}}, + KV4 = {{o, "Bucket", "Key4", null}, {7, {active, infinity}, []}}, + KV5 = {{o, "Bucket", "Key1", null}, {4, {active, infinity}, []}}, + KV6 = {{o, "Bucket", "Key1", null}, {99, {tomb, 999}, []}}, + KL1 = [KV1, KV2], + KL2 = [KV3, KV4], + ?assertMatch({{next_key, KV1}, [KV2], KL2}, + key_dominates(KL1, KL2, 1)), + ?assertMatch({{next_key, KV1}, KL2, [KV2]}, + key_dominates(KL2, KL1, 1)), + ?assertMatch({skipped_key, KL2, KL1}, + key_dominates([KV5|KL2], KL1, 1)), + ?assertMatch({{next_key, KV1}, [KV2], []}, + key_dominates(KL1, [], 1)), + ?assertMatch({skipped_key, [KV6|KL2], [KV2]}, + key_dominates([KV6|KL2], KL1, 1)), + ?assertMatch({{next_key, KV6}, KL2, [KV2]}, + key_dominates([KV6|KL2], [KV2], 1)), + ?assertMatch({skipped_key, [KV6|KL2], [KV2]}, + key_dominates([KV6|KL2], KL1, {basement, 1})), + ?assertMatch({skipped_key, [KV6|KL2], [KV2]}, + key_dominates([KV6|KL2], KL1, {basement, 1000})), + ?assertMatch({{next_key, KV6}, KL2, [KV2]}, + key_dominates([KV6|KL2], [KV2], {basement, 1})), + ?assertMatch({skipped_key, KL2, [KV2]}, + key_dominates([KV6|KL2], [KV2], {basement, 1000})), + ?assertMatch({skipped_key, [], []}, + key_dominates([KV6], [], {basement, 1000})), + ?assertMatch({skipped_key, [], []}, + key_dominates([], [KV6], {basement, 1000})), + ?assertMatch({{next_key, KV6}, [], []}, + key_dominates([KV6], [], {basement, 1})), + ?assertMatch({{next_key, KV6}, [], []}, + key_dominates([], [KV6], {basement, 1})). + + big_iterator_test() -> Filename = "../test/bigtest1.sft", {KL1, KL2} = {lists:sort(generate_randomkeys(10000)), []},