Initial level merging

Some basic merging in the lsm tree.
This commit is contained in:
martinsumner 2016-08-10 13:02:08 +01:00
parent 718425633a
commit c269eb3c52
3 changed files with 186 additions and 93 deletions

View file

@ -13,7 +13,7 @@
handle_cast/2, handle_cast/2,
handle_info/2, handle_info/2,
terminate/2, terminate/2,
clerk_new/0, clerk_new/1,
clerk_prompt/2, clerk_prompt/2,
code_change/3, code_change/3,
perform_merge/4]). perform_merge/4]).
@ -26,14 +26,15 @@
%%% API %%% API
%%%============================================================================ %%%============================================================================
clerk_new() -> clerk_new(Owner) ->
{ok, Pid} = gen_server:start(?MODULE, [], []), {ok, Pid} = gen_server:start(?MODULE, [], []),
ok = gen_server:call(Pid, register, infinity), ok = gen_server:call(Pid, {register, Owner}, infinity),
{ok, Pid}. {ok, Pid}.
clerk_prompt(Pid, penciller) -> clerk_prompt(Pid, penciller) ->
gen_server:cast(Pid, penciller_prompt, infinity). gen_server:cast(Pid, penciller_prompt),
ok.
%%%============================================================================ %%%============================================================================
%%% gen_server callbacks %%% gen_server callbacks
@ -42,10 +43,10 @@ clerk_prompt(Pid, penciller) ->
init([]) -> init([]) ->
{ok, #state{}}. {ok, #state{}}.
handle_call(register, From, State) -> handle_call({register, Owner}, _From, State) ->
{noreply, State#state{owner=From}}. {reply, ok, State#state{owner=Owner}}.
handle_cast({penciller_prompt, From}, State) -> handle_cast(penciller_prompt, State) ->
case leveled_penciller:pcl_workforclerk(State#state.owner) of case leveled_penciller:pcl_workforclerk(State#state.owner) of
none -> none ->
io:format("Work prompted but none needed~n"), io:format("Work prompted but none needed~n"),
@ -54,7 +55,9 @@ handle_cast({penciller_prompt, From}, State) ->
{NewManifest, FilesToDelete} = merge(WI), {NewManifest, FilesToDelete} = merge(WI),
UpdWI = WI#penciller_work{new_manifest=NewManifest, UpdWI = WI#penciller_work{new_manifest=NewManifest,
unreferenced_files=FilesToDelete}, unreferenced_files=FilesToDelete},
leveled_penciller:pcl_requestmanifestchange(From, UpdWI), leveled_penciller:pcl_requestmanifestchange(State#state.owner,
UpdWI),
mark_for_delete(FilesToDelete, State#state.owner),
{noreply, State} {noreply, State}
end. end.
@ -74,58 +77,69 @@ code_change(_OldVsn, State, _Extra) ->
merge(WI) -> merge(WI) ->
SrcLevel = WI#penciller_work.src_level, SrcLevel = WI#penciller_work.src_level,
{Selection, UpdMFest1} = select_filetomerge(SrcLevel, {SrcF, UpdMFest1} = select_filetomerge(SrcLevel,
WI#penciller_work.manifest), WI#penciller_work.manifest),
{{StartKey, EndKey}, SrcFile} = Selection,
SrcFilename = leveled_sft:sft_getfilename(SrcFile),
SinkFiles = get_item(SrcLevel + 1, UpdMFest1, []), SinkFiles = get_item(SrcLevel + 1, UpdMFest1, []),
SplitLists = lists:splitwith(fun(Ref) -> Splits = lists:splitwith(fun(Ref) ->
case {Ref#manifest_entry.start_key, case {Ref#manifest_entry.start_key,
Ref#manifest_entry.end_key} of Ref#manifest_entry.end_key} of
{_, EK} when StartKey > EK -> {_, EK} when SrcF#manifest_entry.start_key > EK ->
false; false;
{SK, _} when EndKey < SK -> {SK, _} when SrcF#manifest_entry.end_key < SK ->
false; false;
_ -> _ ->
true true
end end, end end,
SinkFiles), SinkFiles),
{Candidates, Others} = SplitLists, {Candidates, Others} = Splits,
%% TODO: %% TODO:
%% Need to work out if this is the top level %% Need to work out if this is the top level
%% And then tell merge process to create files at the top level %% And then tell merge process to create files at the top level
%% Which will include the reaping of expired tombstones %% Which will include the reaping of expired tombstones
io:format("Merge from level ~w to merge into ~w files below~n",
io:format("Merge from level ~w to merge into ~w files below",
[SrcLevel, length(Candidates)]), [SrcLevel, length(Candidates)]),
MergedFiles = case length(Candidates) of MergedFiles = case length(Candidates) of
0 -> 0 ->
%% If no overlapping candiates, manifest change only required %% If no overlapping candiates, manifest change only required
%% %%
%% TODO: need to think still about simply renaming when at %% TODO: need to think still about simply renaming when at
%% lower level %% lower level
[SrcFile]; [SrcF];
_ -> _ ->
perform_merge({SrcFile, SrcFilename}, perform_merge({SrcF#manifest_entry.owner,
SrcF#manifest_entry.filename},
Candidates, Candidates,
SrcLevel, SrcLevel,
{WI#penciller_work.ledger_filepath, {WI#penciller_work.ledger_filepath,
WI#penciller_work.next_sqn}) WI#penciller_work.next_sqn})
end, end,
case MergedFiles of
error ->
merge_failure;
_ ->
NewLevel = lists:sort(lists:append(MergedFiles, Others)),
UpdMFest2 = lists:keystore(SrcLevel + 1,
1,
UpdMFest1,
{SrcLevel + 1, NewLevel}),
ok = filelib:ensure_dir(WI#penciller_work.manifest_file),
{ok, Handle} = file:open(WI#penciller_work.manifest_file,
[binary, raw, write]),
ok = file:write(Handle, term_to_binary(UpdMFest2)),
ok = file:close(Handle),
{UpdMFest2, Candidates}
end.
NewLevel = lists:sort(lists:append(MergedFiles, Others)),
UpdMFest2 = lists:keyreplace(SrcLevel + 1, mark_for_delete([], _Penciller) ->
1, ok;
UpdMFest1, mark_for_delete([Head|Tail], Penciller) ->
{SrcLevel, NewLevel}), leveled_sft:sft_setfordelete(Head#manifest_entry.owner, Penciller),
mark_for_delete(Tail, Penciller).
{ok, Handle} = file:open(WI#penciller_work.manifest_file,
[binary, raw, write]),
ok = file:write(Handle, term_to_binary(UpdMFest2)),
ok = file:close(Handle),
{UpdMFest2, Candidates}.
%% An algorithm for discovering which files to merge .... %% An algorithm for discovering which files to merge ....
@ -173,11 +187,12 @@ select_filetomerge(SrcLevel, Manifest) ->
%% %%
%% The level is the level which the new files should be created at. %% The level is the level which the new files should be created at.
perform_merge(FileToMerge, CandidateList, Level, {Filepath, MSN}) -> perform_merge({UpperSFTPid, Filename}, CandidateList, Level, {Filepath, MSN}) ->
{Filename, UpperSFTPid} = FileToMerge,
io:format("Merge to be commenced for FileToMerge=~s with MSN=~w~n", io:format("Merge to be commenced for FileToMerge=~s with MSN=~w~n",
[Filename, MSN]), [Filename, MSN]),
PointerList = lists:map(fun(P) -> {next, P, all} end, CandidateList), PointerList = lists:map(fun(P) ->
{next, P#manifest_entry.owner, all} end,
CandidateList),
do_merge([{next, UpperSFTPid, all}], do_merge([{next, UpperSFTPid, all}],
PointerList, Level, {Filepath, MSN}, 0, []). PointerList, Level, {Filepath, MSN}, 0, []).
@ -187,12 +202,14 @@ do_merge([], [], Level, {_Filepath, MSN}, FileCounter, OutList) ->
OutList; OutList;
do_merge(KL1, KL2, Level, {Filepath, MSN}, FileCounter, OutList) -> do_merge(KL1, KL2, Level, {Filepath, MSN}, FileCounter, OutList) ->
FileName = lists:flatten(io_lib:format(Filepath ++ "_~w_~w.sft", FileName = lists:flatten(io_lib:format(Filepath ++ "_~w_~w.sft",
[Level, FileCounter])), [Level + 1, FileCounter])),
io:format("File to be created as part of MSN=~w Filename=~s~n", io:format("File to be created as part of MSN=~w Filename=~s~n",
[MSN, FileName]), [MSN, FileName]),
TS1 = os:timestamp(),
case leveled_sft:sft_new(FileName, KL1, KL2, Level) of case leveled_sft:sft_new(FileName, KL1, KL2, Level) of
{ok, _Pid, {error, Reason}} -> {ok, _Pid, {error, Reason}} ->
io:format("Exiting due to error~w~n", [Reason]); io:format("Exiting due to error~w~n", [Reason]),
error;
{ok, Pid, Reply} -> {ok, Pid, Reply} ->
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply, {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply,
ExtMan = lists:append(OutList, ExtMan = lists:append(OutList,
@ -200,6 +217,8 @@ do_merge(KL1, KL2, Level, {Filepath, MSN}, FileCounter, OutList) ->
end_key=HighestKey, end_key=HighestKey,
owner=Pid, owner=Pid,
filename=FileName}]), filename=FileName}]),
MTime = timer:now_diff(os:timestamp(), TS1),
io:format("file creation took ~w microseconds ~n", [MTime]),
do_merge(KL1Rem, KL2Rem, Level, {Filepath, MSN}, do_merge(KL1Rem, KL2Rem, Level, {Filepath, MSN},
FileCounter + 1, ExtMan) FileCounter + 1, ExtMan)
end. end.
@ -278,7 +297,7 @@ merge_file_test() ->
KL4_L2 = lists:sort(generate_randomkeys(16000, 750, 250)), KL4_L2 = lists:sort(generate_randomkeys(16000, 750, 250)),
{ok, PidL2_4, _} = leveled_sft:sft_new("../test/KL4_L2.sft", {ok, PidL2_4, _} = leveled_sft:sft_new("../test/KL4_L2.sft",
KL4_L2, [], 2), KL4_L2, [], 2),
Result = perform_merge({"../test/KL1_L1.sft", PidL1_1}, Result = perform_merge({PidL1_1, "../test/KL1_L1.sft"},
[PidL2_1, PidL2_2, PidL2_3, PidL2_4], [PidL2_1, PidL2_2, PidL2_3, PidL2_4],
2, {"../test/", 99}), 2, {"../test/", 99}),
lists:foreach(fun(ManEntry) -> lists:foreach(fun(ManEntry) ->

View file

@ -159,7 +159,7 @@
pcl_fetch/2, pcl_fetch/2,
pcl_workforclerk/1, pcl_workforclerk/1,
pcl_requestmanifestchange/2, pcl_requestmanifestchange/2,
commit_manifest_change/3]). pcl_confirmdelete/2]).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
@ -181,7 +181,7 @@
-record(state, {manifest = [] :: list(), -record(state, {manifest = [] :: list(),
ongoing_work = [] :: list(), ongoing_work = [] :: list(),
manifest_sqn = 0 :: integer(), manifest_sqn = 0 :: integer(),
levelzero_sqn =0 :: integer(), levelzero_sqn = 0 :: integer(),
registered_iterators = [] :: list(), registered_iterators = [] :: list(),
unreferenced_files = [] :: list(), unreferenced_files = [] :: list(),
root_path = "../test/" :: string(), root_path = "../test/" :: string(),
@ -214,7 +214,10 @@ pcl_workforclerk(Pid) ->
gen_server:call(Pid, work_for_clerk, infinity). gen_server:call(Pid, work_for_clerk, infinity).
pcl_requestmanifestchange(Pid, WorkItem) -> pcl_requestmanifestchange(Pid, WorkItem) ->
gen_server:call(Pid, {manifest_change, WorkItem}, infinity). gen_server:cast(Pid, {manifest_change, WorkItem}).
pcl_confirmdelete(Pid, FileName) ->
gen_server:call(Pid, {confirm_delete, FileName}).
%%%============================================================================ %%%============================================================================
%%% gen_server callbacks %%% gen_server callbacks
@ -222,22 +225,27 @@ pcl_requestmanifestchange(Pid, WorkItem) ->
init([]) -> init([]) ->
TID = ets:new(?MEMTABLE, [ordered_set, private]), TID = ets:new(?MEMTABLE, [ordered_set, private]),
{ok, #state{memtable=TID}}. {ok, Clerk} = leveled_clerk:clerk_new(self()),
{ok, #state{memtable=TID, clerk=Clerk}}.
handle_call({push_mem, DumpList}, _From, State) -> handle_call({push_mem, DumpList}, _From, State) ->
{TableSize, Manifest, L0Pend} = case State#state.levelzero_pending of {TableSize, Manifest, L0Pend} = case State#state.levelzero_pending of
{true, Remainder, {StartKey, EndKey, Pid}} -> {true, Remainder, {StartKey, EndKey, Pid}} ->
%% Need to handle not error scenarios? %% Need to handle not error scenarios?
%% N.B. Sync call - so will be ready %% N.B. Sync call - so will be ready
ok = leveled_sft:sft_checkready(Pid), {ok, SrcFN} = leveled_sft:sft_checkready(Pid),
%% Reset ETS, but re-insert any remainder %% Reset ETS, but re-insert any remainder
true = ets:delete_all_objects(State#state.memtable), true = ets:delete_all_objects(State#state.memtable),
true = ets:insert(State#state.memtable, Remainder), true = ets:insert(State#state.memtable, Remainder),
ManifestEntry = #manifest_entry{start_key=StartKey,
end_key=EndKey,
owner=Pid,
filename=SrcFN},
{length(Remainder), {length(Remainder),
lists:keystore(0, lists:keystore(0,
1, 1,
State#state.manifest, State#state.manifest,
{0, [{StartKey, EndKey, Pid}]}), {0, [ManifestEntry]}),
?L0PEND_RESET}; ?L0PEND_RESET};
{false, _, _} -> {false, _, _} ->
{State#state.table_size, {State#state.table_size,
@ -246,7 +254,9 @@ handle_call({push_mem, DumpList}, _From, State) ->
Unexpected -> Unexpected ->
io:format("Unexpected value of ~w~n", [Unexpected]), io:format("Unexpected value of ~w~n", [Unexpected]),
error error
end, end,
%% Prompt clerk to ask about work - do this for every push_mem
ok = leveled_clerk:clerk_prompt(State#state.clerk, penciller),
case do_push_to_mem(DumpList, TableSize, State#state.memtable) of case do_push_to_mem(DumpList, TableSize, State#state.memtable) of
{twist, ApproxTableSize} -> {twist, ApproxTableSize} ->
{reply, ok, State#state{table_size=ApproxTableSize, {reply, ok, State#state{table_size=ApproxTableSize,
@ -258,13 +268,14 @@ handle_call({push_mem, DumpList}, _From, State) ->
L0SN = State#state.levelzero_sqn + 1, L0SN = State#state.levelzero_sqn + 1,
FileName = State#state.root_path FileName = State#state.root_path
++ ?FILES_FP ++ "/" ++ ?FILES_FP ++ "/"
++ integer_to_list(L0SN), ++ integer_to_list(L0SN) ++ "_0_0",
SFT = leveled_sft:sft_new(FileName, Dump = ets:tab2list(State#state.memtable),
ets:tab2list(State#state.memtable), L0_SFT = leveled_sft:sft_new(FileName,
[], Dump,
0, [],
#sft_options{wait=false}), 0,
{ok, L0Pid, Reply} = SFT, #sft_options{wait=false}),
{ok, L0Pid, Reply} = L0_SFT,
{{KL1Rem, []}, L0StartKey, L0EndKey} = Reply, {{KL1Rem, []}, L0StartKey, L0EndKey} = Reply,
{reply, ok, State#state{levelzero_pending={true, {reply, ok, State#state{levelzero_pending={true,
KL1Rem, KL1Rem,
@ -285,10 +296,16 @@ handle_call({fetch, Key}, _From, State) ->
{reply, fetch(Key, State#state.manifest, State#state.memtable), State}; {reply, fetch(Key, State#state.manifest, State#state.memtable), State};
handle_call(work_for_clerk, From, State) -> handle_call(work_for_clerk, From, State) ->
{UpdState, Work} = return_work(State, From), {UpdState, Work} = return_work(State, From),
{reply, Work, UpdState}. {reply, Work, UpdState};
handle_call({confirm_delete, FileName}, _From, State) ->
Reply = confirm_delete(FileName,
State#state.unreferenced_files,
State#state.registered_iterators),
{reply, Reply, State}.
handle_cast(_Msg, State) -> handle_cast({manifest_change, WI}, State) ->
{noreply, State}. {ok, UpdState} = commit_manifest_change(WI, State),
{noreply, UpdState}.
handle_info(_Info, State) -> handle_info(_Info, State) ->
{noreply, State}. {noreply, State}.
@ -453,7 +470,7 @@ get_item(Index, List, Default) ->
%% - the list of ongoing work needs to be cleared of this item %% - the list of ongoing work needs to be cleared of this item
commit_manifest_change(ReturnedWorkItem, From, State) -> commit_manifest_change(ReturnedWorkItem, State) ->
NewMSN = State#state.manifest_sqn + 1, NewMSN = State#state.manifest_sqn + 1,
[SentWorkItem] = State#state.ongoing_work, [SentWorkItem] = State#state.ongoing_work,
RootPath = State#state.root_path, RootPath = State#state.root_path,
@ -461,8 +478,8 @@ commit_manifest_change(ReturnedWorkItem, From, State) ->
case {SentWorkItem#penciller_work.next_sqn, case {SentWorkItem#penciller_work.next_sqn,
SentWorkItem#penciller_work.clerk} of SentWorkItem#penciller_work.clerk} of
{NewMSN, From} -> {NewMSN, _From} ->
MTime = timer:diff_now(os:timestamp(), MTime = timer:now_diff(os:timestamp(),
SentWorkItem#penciller_work.start_time), SentWorkItem#penciller_work.start_time),
io:format("Merge to sqn ~w completed in ~w microseconds io:format("Merge to sqn ~w completed in ~w microseconds
at Level ~w~n", at Level ~w~n",
@ -477,14 +494,15 @@ commit_manifest_change(ReturnedWorkItem, From, State) ->
io:format("Merge has been commmitted at sequence number ~w~n", io:format("Merge has been commmitted at sequence number ~w~n",
[NewMSN]), [NewMSN]),
NewManifest = ReturnedWorkItem#penciller_work.new_manifest, NewManifest = ReturnedWorkItem#penciller_work.new_manifest,
{ok, State#state{ongoing_work=null, %% io:format("Updated manifest is ~w~n", [NewManifest]),
{ok, State#state{ongoing_work=[],
manifest_sqn=NewMSN, manifest_sqn=NewMSN,
manifest=NewManifest, manifest=NewManifest,
unreferenced_files=UnreferencedFilesUpd}}; unreferenced_files=UnreferencedFilesUpd}};
{MaybeWrongMSN, MaybeWrongClerk} -> {MaybeWrongMSN, From} ->
io:format("Merge commit from ~w at sqn ~w not matched to expected io:format("Merge commit at sqn ~w not matched to expected
clerk ~w or sqn ~w~n", sqn ~w from Clerk ~w~n",
[From, NewMSN, MaybeWrongClerk, MaybeWrongMSN]), [NewMSN, MaybeWrongMSN, From]),
{error, State} {error, State}
end. end.
@ -505,9 +523,26 @@ filepath(RootPath, NewMSN, new_merge_files) ->
update_deletions([], _NewMSN, UnreferencedFiles) -> update_deletions([], _NewMSN, UnreferencedFiles) ->
UnreferencedFiles; UnreferencedFiles;
update_deletions([ClearedFile|Tail], MSN, UnreferencedFiles) -> update_deletions([ClearedFile|Tail], MSN, UnreferencedFiles) ->
io:format("Adding cleared file ~s to deletion list ~n",
[ClearedFile#manifest_entry.filename]),
update_deletions(Tail, update_deletions(Tail,
MSN, MSN,
lists:append(UnreferencedFiles, [{ClearedFile, MSN}])). lists:append(UnreferencedFiles,
[{ClearedFile#manifest_entry.filename, MSN}])).
confirm_delete(Filename, UnreferencedFiles, RegisteredIterators) ->
case lists:keyfind(Filename, 1, UnreferencedFiles) of
false ->
false;
{Filename, MSN} ->
LowSQN = lists:foldl(fun({_, SQN}, MinSQN) -> min(SQN, MinSQN) end,
infinity,
RegisteredIterators),
if
MSN >= LowSQN -> false;
true -> true
end
end.
%%%============================================================================ %%%============================================================================
%%% Test %%% Test
@ -532,3 +567,16 @@ compaction_work_assessment_test() ->
Manifest3 = [{0, []}, {1, L1Alt}], Manifest3 = [{0, []}, {1, L1Alt}],
WorkQ3 = assess_workqueue([], 0, Manifest3), WorkQ3 = assess_workqueue([], 0, Manifest3),
?assertMatch(WorkQ3, [{1, Manifest3}]). ?assertMatch(WorkQ3, [{1, Manifest3}]).
confirm_delete_test() ->
Filename = 'test.sft',
UnreferencedFiles = [{'other.sft', 15}, {Filename, 10}],
RegisteredIterators1 = [{dummy_pid, 16}, {dummy_pid, 12}],
R1 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators1),
?assertMatch(R1, true),
RegisteredIterators2 = [{dummy_pid, 10}, {dummy_pid, 12}],
R2 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators2),
?assertMatch(R2, false),
RegisteredIterators3 = [{dummy_pid, 9}, {dummy_pid, 12}],
R3 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators3),
?assertMatch(R3, false).

View file

@ -160,6 +160,7 @@
sft_clear/1, sft_clear/1,
sft_checkready/1, sft_checkready/1,
sft_getfilename/1, sft_getfilename/1,
sft_setfordelete/2,
strip_to_keyonly/1, strip_to_keyonly/1,
generate_randomkeys/1]). generate_randomkeys/1]).
@ -182,6 +183,7 @@
-define(ITERATOR_SCANWIDTH, 1). -define(ITERATOR_SCANWIDTH, 1).
-define(MERGE_SCANWIDTH, 8). -define(MERGE_SCANWIDTH, 8).
-define(MAX_KEYS, ?SLOT_COUNT * ?BLOCK_COUNT * ?BLOCK_SIZE). -define(MAX_KEYS, ?SLOT_COUNT * ?BLOCK_COUNT * ?BLOCK_SIZE).
-define(DELETE_TIMEOUT, 60000).
-record(state, {version = ?CURRENT_VERSION :: tuple(), -record(state, {version = ?CURRENT_VERSION :: tuple(),
@ -199,7 +201,9 @@
filename :: string(), filename :: string(),
handle :: file:fd(), handle :: file:fd(),
background_complete=false :: boolean(), background_complete=false :: boolean(),
background_failure="Unknown" :: string()}). background_failure="Unknown" :: string(),
ready_for_delete = false ::boolean(),
penciller :: pid()}).
%%%============================================================================ %%%============================================================================
@ -223,7 +227,6 @@ sft_new(Filename, KL1, KL2, Level, Options) ->
end, end,
{ok, Pid, Reply}. {ok, Pid, Reply}.
sft_open(Filename) -> sft_open(Filename) ->
{ok, Pid} = gen_server:start(?MODULE, [], []), {ok, Pid} = gen_server:start(?MODULE, [], []),
case gen_server:call(Pid, {sft_open, Filename}, infinity) of case gen_server:call(Pid, {sft_open, Filename}, infinity) of
@ -233,6 +236,9 @@ sft_open(Filename) ->
Error Error
end. end.
sft_setfordelete(Pid, Penciller) ->
file_request(Pid, {set_for_delete, Penciller}).
sft_get(Pid, Key) -> sft_get(Pid, Key) ->
file_request(Pid, {get_kv, Key}). file_request(Pid, {get_kv, Key}).
@ -266,18 +272,7 @@ sft_getfilename(Pid) ->
file_request(Pid, Request) -> file_request(Pid, Request) ->
case check_pid(Pid) of case check_pid(Pid) of
ok -> ok ->
try gen_server:call(Pid, Request, infinity);
gen_server:call(Pid, Request, infinity)
catch
exit:{normal,_} when Request == file_close ->
%% Honest race condition in bitcask_eqc PULSE test.
ok;
exit:{noproc,_} when Request == file_close ->
%% Honest race condition in bitcask_eqc PULSE test.
ok;
X1:X2 ->
exit({file_request_error, self(), Request, X1, X2})
end;
Error -> Error ->
Error Error
end. end.
@ -388,16 +383,42 @@ handle_call(clear, _From, State) ->
handle_call(background_complete, _From, State) -> handle_call(background_complete, _From, State) ->
case State#state.background_complete of case State#state.background_complete of
true -> true ->
{reply, ok, State}; {reply, {ok, State#state.filename}, State};
false -> false ->
{reply, {error, State#state.background_failure}, State} {reply, {error, State#state.background_failure}, State}
end; end;
handle_call(get_filename, _from, State) -> handle_call(get_filename, _From, State) ->
{reply, State#state.filename, State}. {reply, State#state.filename, State};
handle_call({set_for_delete, Penciller}, _From, State) ->
{reply,
ok,
State#state{ready_for_delete=true,
penciller=Penciller},
?DELETE_TIMEOUT}.
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
handle_info(timeout, State) ->
case State#state.ready_for_delete of
true ->
case leveled_penciller:pcl_confirmdelete(State#state.penciller,
State#state.filename)
of
true ->
io:format("Polled for deletion and now clearing ~s~n",
[State#state.filename]),
ok = file:close(State#state.handle),
ok = file:delete(State#state.filename),
{stop, shutdown, State};
false ->
io:format("Polled for deletion but ~s not ready~n",
[State#state.filename]),
{noreply, State, ?DELETE_TIMEOUT}
end;
false ->
{noreply, State}
end;
handle_info(_Info, State) -> handle_info(_Info, State) ->
{noreply, State}. {noreply, State}.
@ -583,7 +604,7 @@ fetch_range(Handle, FileMD, StartKey, EndKey, FunList, AccFun, ScanWidth, Acc) -
LengthList, 0, PointerB + FileMD#state.slots_pointer, LengthList, 0, PointerB + FileMD#state.slots_pointer,
AccFun(null, Acc)); AccFun(null, Acc));
not_found -> not_found ->
{complete, Acc} {complete, AccFun(null, Acc)}
end. end.
fetch_range(Handle, FileMD, _StartKey, NearestKey, EndKey, FunList, AccFun, ScanWidth, fetch_range(Handle, FileMD, _StartKey, NearestKey, EndKey, FunList, AccFun, ScanWidth,
@ -989,7 +1010,12 @@ serialise_block(BlockKeyList) ->
%% any lower sequence numbers should be compacted out of existence %% any lower sequence numbers should be compacted out of existence
key_dominates([H1|T1], [], Level) -> key_dominates(KL1, KL2, Level) ->
key_dominates_expanded(maybe_expand_pointer(KL1),
maybe_expand_pointer(KL2),
Level).
key_dominates_expanded([H1|T1], [], Level) ->
{_, _, St1, _} = H1, {_, _, St1, _} = H1,
case maybe_reap_expiredkey(St1, Level) of case maybe_reap_expiredkey(St1, Level) of
true -> true ->
@ -997,7 +1023,7 @@ key_dominates([H1|T1], [], Level) ->
false -> false ->
{{next_key, H1}, maybe_expand_pointer(T1), []} {{next_key, H1}, maybe_expand_pointer(T1), []}
end; end;
key_dominates([], [H2|T2], Level) -> key_dominates_expanded([], [H2|T2], Level) ->
{_, _, St2, _} = H2, {_, _, St2, _} = H2,
case maybe_reap_expiredkey(St2, Level) of case maybe_reap_expiredkey(St2, Level) of
true -> true ->
@ -1005,7 +1031,7 @@ key_dominates([], [H2|T2], Level) ->
false -> false ->
{{next_key, H2}, [], maybe_expand_pointer(T2)} {{next_key, H2}, [], maybe_expand_pointer(T2)}
end; end;
key_dominates([H1|T1], [H2|T2], Level) -> key_dominates_expanded([H1|T1], [H2|T2], Level) ->
{K1, Sq1, St1, _} = H1, {K1, Sq1, St1, _} = H1,
{K2, Sq2, St2, _} = H2, {K2, Sq2, St2, _} = H2,
case K1 of case K1 of
@ -1051,7 +1077,7 @@ maybe_expand_pointer([]) ->
maybe_expand_pointer([H|Tail]) -> maybe_expand_pointer([H|Tail]) ->
case H of case H of
{next, SFTPid, StartKey} -> {next, SFTPid, StartKey} ->
io:format("Scanning further on PID ~w ~w~n", [SFTPid, StartKey]), %% io:format("Scanning further on PID ~w ~w~n", [SFTPid, StartKey]),
QResult = sft_getkvrange(SFTPid, StartKey, all, ?MERGE_SCANWIDTH), QResult = sft_getkvrange(SFTPid, StartKey, all, ?MERGE_SCANWIDTH),
Acc = pointer_append_queryresults(QResult, SFTPid), Acc = pointer_append_queryresults(QResult, SFTPid),
lists:append(Acc, Tail); lists:append(Acc, Tail);