Tombstone preperation
Some initial code changes preparing for the test and implementation of tombstones and tombstone reaping
This commit is contained in:
parent
0324edd6f6
commit
5c2029668d
4 changed files with 88 additions and 32 deletions
|
@ -16,7 +16,8 @@
|
||||||
ledger_filepath :: string(),
|
ledger_filepath :: string(),
|
||||||
manifest_file :: string(),
|
manifest_file :: string(),
|
||||||
new_manifest :: list(),
|
new_manifest :: list(),
|
||||||
unreferenced_files :: list()}).
|
unreferenced_files :: list(),
|
||||||
|
target_is_basement = false ::boolean()}).
|
||||||
|
|
||||||
-record(manifest_entry,
|
-record(manifest_entry,
|
||||||
{start_key :: tuple(),
|
{start_key :: tuple(),
|
||||||
|
|
|
@ -62,8 +62,7 @@
|
||||||
clerk_new/1,
|
clerk_new/1,
|
||||||
clerk_prompt/1,
|
clerk_prompt/1,
|
||||||
clerk_manifestchange/3,
|
clerk_manifestchange/3,
|
||||||
code_change/3,
|
code_change/3]).
|
||||||
perform_merge/4]).
|
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
@ -193,7 +192,7 @@ merge(WI) ->
|
||||||
perform_merge({SrcF#manifest_entry.owner,
|
perform_merge({SrcF#manifest_entry.owner,
|
||||||
SrcF#manifest_entry.filename},
|
SrcF#manifest_entry.filename},
|
||||||
Candidates,
|
Candidates,
|
||||||
SrcLevel,
|
{SrcLevel, WI#penciller_work.target_is_basement},
|
||||||
{WI#penciller_work.ledger_filepath,
|
{WI#penciller_work.ledger_filepath,
|
||||||
WI#penciller_work.next_sqn})
|
WI#penciller_work.next_sqn})
|
||||||
end,
|
end,
|
||||||
|
@ -283,28 +282,32 @@ select_filetomerge(SrcLevel, Manifest) ->
|
||||||
%%
|
%%
|
||||||
%% The level is the level which the new files should be created at.
|
%% The level is the level which the new files should be created at.
|
||||||
|
|
||||||
perform_merge({UpperSFTPid, Filename}, CandidateList, Level, {Filepath, MSN}) ->
|
perform_merge({SrcPid, SrcFN}, CandidateList, LevelInfo, {Filepath, MSN}) ->
|
||||||
io:format("Merge to be commenced for FileToMerge=~s with MSN=~w~n",
|
io:format("Merge to be commenced for FileToMerge=~s with MSN=~w~n",
|
||||||
[Filename, MSN]),
|
[SrcFN, MSN]),
|
||||||
PointerList = lists:map(fun(P) ->
|
PointerList = lists:map(fun(P) ->
|
||||||
{next, P#manifest_entry.owner, all} end,
|
{next, P#manifest_entry.owner, all} end,
|
||||||
CandidateList),
|
CandidateList),
|
||||||
do_merge([{next, UpperSFTPid, all}],
|
do_merge([{next, SrcPid, all}],
|
||||||
PointerList, Level, {Filepath, MSN}, 0, []).
|
PointerList,
|
||||||
|
LevelInfo,
|
||||||
|
{Filepath, MSN},
|
||||||
|
0,
|
||||||
|
[]).
|
||||||
|
|
||||||
do_merge([], [], Level, {_Filepath, MSN}, FileCounter, OutList) ->
|
do_merge([], [], {SrcLevel, _IsB}, {_Filepath, MSN}, FileCounter, OutList) ->
|
||||||
io:format("Merge completed with MSN=~w Level=~w and FileCounter=~w~n",
|
io:format("Merge completed with MSN=~w Level=~w and FileCounter=~w~n",
|
||||||
[MSN, Level, FileCounter]),
|
[MSN, SrcLevel, FileCounter]),
|
||||||
OutList;
|
OutList;
|
||||||
do_merge(KL1, KL2, Level, {Filepath, MSN}, FileCounter, OutList) ->
|
do_merge(KL1, KL2, {SrcLevel, IsB}, {Filepath, MSN}, FileCounter, OutList) ->
|
||||||
FileName = lists:flatten(io_lib:format(Filepath ++ "_~w_~w.sft",
|
FileName = lists:flatten(io_lib:format(Filepath ++ "_~w_~w.sft",
|
||||||
[Level + 1, FileCounter])),
|
[SrcLevel + 1, FileCounter])),
|
||||||
io:format("File to be created as part of MSN=~w Filename=~s~n",
|
io:format("File to be created as part of MSN=~w Filename=~s~n",
|
||||||
[MSN, FileName]),
|
[MSN, FileName]),
|
||||||
% Attempt to trace intermittent eaccess failures
|
% Attempt to trace intermittent eaccess failures
|
||||||
false = filelib:is_file(FileName),
|
false = filelib:is_file(FileName),
|
||||||
TS1 = os:timestamp(),
|
TS1 = os:timestamp(),
|
||||||
{ok, Pid, Reply} = leveled_sft:sft_new(FileName, KL1, KL2, Level + 1),
|
{ok, Pid, Reply} = leveled_sft:sft_new(FileName, KL1, KL2, SrcLevel + 1),
|
||||||
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply,
|
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply,
|
||||||
ExtMan = lists:append(OutList,
|
ExtMan = lists:append(OutList,
|
||||||
[#manifest_entry{start_key=SmallestKey,
|
[#manifest_entry{start_key=SmallestKey,
|
||||||
|
@ -313,7 +316,9 @@ do_merge(KL1, KL2, Level, {Filepath, MSN}, FileCounter, OutList) ->
|
||||||
filename=FileName}]),
|
filename=FileName}]),
|
||||||
MTime = timer:now_diff(os:timestamp(), TS1),
|
MTime = timer:now_diff(os:timestamp(), TS1),
|
||||||
io:format("File creation took ~w microseconds ~n", [MTime]),
|
io:format("File creation took ~w microseconds ~n", [MTime]),
|
||||||
do_merge(KL1Rem, KL2Rem, Level, {Filepath, MSN}, FileCounter + 1, ExtMan).
|
do_merge(KL1Rem, KL2Rem,
|
||||||
|
{SrcLevel, IsB}, {Filepath, MSN},
|
||||||
|
FileCounter + 1, ExtMan).
|
||||||
|
|
||||||
|
|
||||||
get_item(Index, List, Default) ->
|
get_item(Index, List, Default) ->
|
||||||
|
@ -389,7 +394,7 @@ merge_file_test() ->
|
||||||
#manifest_entry{owner=PidL2_2},
|
#manifest_entry{owner=PidL2_2},
|
||||||
#manifest_entry{owner=PidL2_3},
|
#manifest_entry{owner=PidL2_3},
|
||||||
#manifest_entry{owner=PidL2_4}],
|
#manifest_entry{owner=PidL2_4}],
|
||||||
2, {"../test/", 99}),
|
{2, false}, {"../test/", 99}),
|
||||||
lists:foreach(fun(ManEntry) ->
|
lists:foreach(fun(ManEntry) ->
|
||||||
{o, B1, K1} = ManEntry#manifest_entry.start_key,
|
{o, B1, K1} = ManEntry#manifest_entry.start_key,
|
||||||
{o, B2, K2} = ManEntry#manifest_entry.end_key,
|
{o, B2, K2} = ManEntry#manifest_entry.end_key,
|
||||||
|
|
|
@ -914,16 +914,20 @@ compare_to_sqn(Obj, SQN) ->
|
||||||
%% The full queue is calculated for logging purposes only
|
%% The full queue is calculated for logging purposes only
|
||||||
|
|
||||||
return_work(State, From) ->
|
return_work(State, From) ->
|
||||||
WorkQueue = assess_workqueue([],
|
{WorkQ, BasementL} = assess_workqueue([], 0, State#state.manifest, 0),
|
||||||
0,
|
case length(WorkQ) of
|
||||||
State#state.manifest),
|
|
||||||
case length(WorkQueue) of
|
|
||||||
L when L > 0 ->
|
L when L > 0 ->
|
||||||
[{SrcLevel, Manifest}|OtherWork] = WorkQueue,
|
[{SrcLevel, Manifest}|OtherWork] = WorkQ,
|
||||||
Backlog = length(OtherWork),
|
Backlog = length(OtherWork),
|
||||||
io:format("Work at Level ~w to be scheduled for ~w with ~w " ++
|
io:format("Work at Level ~w to be scheduled for ~w with ~w " ++
|
||||||
"queue items outstanding~n",
|
"queue items outstanding~n",
|
||||||
[SrcLevel, From, Backlog]),
|
[SrcLevel, From, Backlog]),
|
||||||
|
IsBasement = if
|
||||||
|
SrcLevel + 1 == BasementL ->
|
||||||
|
true;
|
||||||
|
true ->
|
||||||
|
false
|
||||||
|
end,
|
||||||
case element(1, State#state.levelzero_pending) of
|
case element(1, State#state.levelzero_pending) of
|
||||||
true ->
|
true ->
|
||||||
% Once the L0 file is completed there will be more work
|
% Once the L0 file is completed there will be more work
|
||||||
|
@ -946,7 +950,8 @@ return_work(State, From) ->
|
||||||
manifest=Manifest,
|
manifest=Manifest,
|
||||||
start_time = os:timestamp(),
|
start_time = os:timestamp(),
|
||||||
ledger_filepath = FP,
|
ledger_filepath = FP,
|
||||||
manifest_file = ManFile},
|
manifest_file = ManFile,
|
||||||
|
target_is_basement = IsBasement},
|
||||||
{State#state{ongoing_work=[WI]}, WI}
|
{State#state{ongoing_work=[WI]}, WI}
|
||||||
end;
|
end;
|
||||||
_ ->
|
_ ->
|
||||||
|
@ -1161,7 +1166,7 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, QueryFunT) ->
|
||||||
QueryFunT);
|
QueryFunT);
|
||||||
{{Key, Val}, null, null} ->
|
{{Key, Val}, null, null} ->
|
||||||
% No best key set - so can assume that this key is the best key,
|
% No best key set - so can assume that this key is the best key,
|
||||||
% and check the higher levels
|
% and check the lower levels
|
||||||
find_nextkey(QueryArray,
|
find_nextkey(QueryArray,
|
||||||
LCnt + 1,
|
LCnt + 1,
|
||||||
{LCnt, {Key, Val}},
|
{LCnt, {Key, Val}},
|
||||||
|
@ -1270,14 +1275,21 @@ keyfolder(IMMiterator, SFTiterator, StartKey, EndKey, {AccFun, Acc}) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
assess_workqueue(WorkQ, ?MAX_LEVELS - 1, _Manifest) ->
|
assess_workqueue(WorkQ, ?MAX_LEVELS - 1, _Man, BasementLevel) ->
|
||||||
WorkQ;
|
{WorkQ, BasementLevel};
|
||||||
assess_workqueue(WorkQ, LevelToAssess, Manifest)->
|
assess_workqueue(WorkQ, LevelToAssess, Man, BasementLevel) ->
|
||||||
MaxFiles = get_item(LevelToAssess, ?LEVEL_SCALEFACTOR, 0),
|
MaxFiles = get_item(LevelToAssess, ?LEVEL_SCALEFACTOR, 0),
|
||||||
FileCount = length(get_item(LevelToAssess, Manifest, [])),
|
case length(get_item(LevelToAssess, Man, [])) of
|
||||||
NewWQ = maybe_append_work(WorkQ, LevelToAssess, Manifest, MaxFiles,
|
FileCount when FileCount > 0 ->
|
||||||
FileCount),
|
NewWQ = maybe_append_work(WorkQ,
|
||||||
assess_workqueue(NewWQ, LevelToAssess + 1, Manifest).
|
LevelToAssess,
|
||||||
|
Man,
|
||||||
|
MaxFiles,
|
||||||
|
FileCount),
|
||||||
|
assess_workqueue(NewWQ, LevelToAssess + 1, Man, LevelToAssess);
|
||||||
|
0 ->
|
||||||
|
assess_workqueue(WorkQ, LevelToAssess + 1, Man, BasementLevel)
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
maybe_append_work(WorkQ, Level, Manifest,
|
maybe_append_work(WorkQ, Level, Manifest,
|
||||||
|
@ -1418,7 +1430,6 @@ confirm_delete(Filename, UnreferencedFiles, RegisteredSnapshots) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
assess_sqn([]) ->
|
assess_sqn([]) ->
|
||||||
empty;
|
empty;
|
||||||
assess_sqn(DumpList) ->
|
assess_sqn(DumpList) ->
|
||||||
|
@ -1467,7 +1478,7 @@ compaction_work_assessment_test() ->
|
||||||
L1 = [{{o, "B1", "K1", null}, {o, "B2", "K2", null}, dummy_pid},
|
L1 = [{{o, "B1", "K1", null}, {o, "B2", "K2", null}, dummy_pid},
|
||||||
{{o, "B2", "K3", null}, {o, "B4", "K4", null}, dummy_pid}],
|
{{o, "B2", "K3", null}, {o, "B4", "K4", null}, dummy_pid}],
|
||||||
Manifest = [{0, L0}, {1, L1}],
|
Manifest = [{0, L0}, {1, L1}],
|
||||||
WorkQ1 = assess_workqueue([], 0, Manifest),
|
{WorkQ1, 1} = assess_workqueue([], 0, Manifest, 0),
|
||||||
?assertMatch(WorkQ1, [{0, Manifest}]),
|
?assertMatch(WorkQ1, [{0, Manifest}]),
|
||||||
L1Alt = lists:append(L1,
|
L1Alt = lists:append(L1,
|
||||||
[{{o, "B5", "K0001", null}, {o, "B5", "K9999", null},
|
[{{o, "B5", "K0001", null}, {o, "B5", "K9999", null},
|
||||||
|
@ -1485,7 +1496,7 @@ compaction_work_assessment_test() ->
|
||||||
{{o, "BB", "K0001", null}, {o, "BB", "K9999", null},
|
{{o, "BB", "K0001", null}, {o, "BB", "K9999", null},
|
||||||
dummy_pid}]),
|
dummy_pid}]),
|
||||||
Manifest3 = [{0, []}, {1, L1Alt}],
|
Manifest3 = [{0, []}, {1, L1Alt}],
|
||||||
WorkQ3 = assess_workqueue([], 0, Manifest3),
|
{WorkQ3, 1} = assess_workqueue([], 0, Manifest3, 0),
|
||||||
?assertMatch(WorkQ3, [{1, Manifest3}]).
|
?assertMatch(WorkQ3, [{1, Manifest3}]).
|
||||||
|
|
||||||
confirm_delete_test() ->
|
confirm_delete_test() ->
|
||||||
|
|
|
@ -1744,6 +1744,45 @@ initial_iterator_test() ->
|
||||||
ok = file:close(UpdHandle),
|
ok = file:close(UpdHandle),
|
||||||
ok = file:delete(Filename).
|
ok = file:delete(Filename).
|
||||||
|
|
||||||
|
key_dominates_test() ->
|
||||||
|
KV1 = {{o, "Bucket", "Key1", null}, {5, {active, infinity}, []}},
|
||||||
|
KV2 = {{o, "Bucket", "Key3", null}, {6, {active, infinity}, []}},
|
||||||
|
KV3 = {{o, "Bucket", "Key2", null}, {3, {active, infinity}, []}},
|
||||||
|
KV4 = {{o, "Bucket", "Key4", null}, {7, {active, infinity}, []}},
|
||||||
|
KV5 = {{o, "Bucket", "Key1", null}, {4, {active, infinity}, []}},
|
||||||
|
KV6 = {{o, "Bucket", "Key1", null}, {99, {tomb, 999}, []}},
|
||||||
|
KL1 = [KV1, KV2],
|
||||||
|
KL2 = [KV3, KV4],
|
||||||
|
?assertMatch({{next_key, KV1}, [KV2], KL2},
|
||||||
|
key_dominates(KL1, KL2, 1)),
|
||||||
|
?assertMatch({{next_key, KV1}, KL2, [KV2]},
|
||||||
|
key_dominates(KL2, KL1, 1)),
|
||||||
|
?assertMatch({skipped_key, KL2, KL1},
|
||||||
|
key_dominates([KV5|KL2], KL1, 1)),
|
||||||
|
?assertMatch({{next_key, KV1}, [KV2], []},
|
||||||
|
key_dominates(KL1, [], 1)),
|
||||||
|
?assertMatch({skipped_key, [KV6|KL2], [KV2]},
|
||||||
|
key_dominates([KV6|KL2], KL1, 1)),
|
||||||
|
?assertMatch({{next_key, KV6}, KL2, [KV2]},
|
||||||
|
key_dominates([KV6|KL2], [KV2], 1)),
|
||||||
|
?assertMatch({skipped_key, [KV6|KL2], [KV2]},
|
||||||
|
key_dominates([KV6|KL2], KL1, {basement, 1})),
|
||||||
|
?assertMatch({skipped_key, [KV6|KL2], [KV2]},
|
||||||
|
key_dominates([KV6|KL2], KL1, {basement, 1000})),
|
||||||
|
?assertMatch({{next_key, KV6}, KL2, [KV2]},
|
||||||
|
key_dominates([KV6|KL2], [KV2], {basement, 1})),
|
||||||
|
?assertMatch({skipped_key, KL2, [KV2]},
|
||||||
|
key_dominates([KV6|KL2], [KV2], {basement, 1000})),
|
||||||
|
?assertMatch({skipped_key, [], []},
|
||||||
|
key_dominates([KV6], [], {basement, 1000})),
|
||||||
|
?assertMatch({skipped_key, [], []},
|
||||||
|
key_dominates([], [KV6], {basement, 1000})),
|
||||||
|
?assertMatch({{next_key, KV6}, [], []},
|
||||||
|
key_dominates([KV6], [], {basement, 1})),
|
||||||
|
?assertMatch({{next_key, KV6}, [], []},
|
||||||
|
key_dominates([], [KV6], {basement, 1})).
|
||||||
|
|
||||||
|
|
||||||
big_iterator_test() ->
|
big_iterator_test() ->
|
||||||
Filename = "../test/bigtest1.sft",
|
Filename = "../test/bigtest1.sft",
|
||||||
{KL1, KL2} = {lists:sort(generate_randomkeys(10000)), []},
|
{KL1, KL2} = {lists:sort(generate_randomkeys(10000)), []},
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue