diff --git a/include/leveled.hrl b/include/leveled.hrl new file mode 100644 index 0000000..fdf779c --- /dev/null +++ b/include/leveled.hrl @@ -0,0 +1,21 @@ + +-record(sft_options, + {wait = true :: boolean(), + expire_tombstones = false :: boolean()}). + +-record(penciller_work, + {next_sqn :: integer(), + clerk :: pid(), + src_level :: integer(), + manifest :: list(), + start_time :: tuple(), + ledger_filepath :: string(), + manifest_file :: string(), + new_manifest :: list(), + unreferenced_files :: list()}). + +-record(manifest_entry, + {start_key :: tuple(), + end_key :: tuple(), + owner :: pid(), + filename :: string()}). \ No newline at end of file diff --git a/src/leveled_clerk.erl b/src/leveled_clerk.erl index 807a254..3197eb4 100644 --- a/src/leveled_clerk.erl +++ b/src/leveled_clerk.erl @@ -1,19 +1,156 @@ -%% Controlling asynchronour work in leveleddb to manage compaction within a +%% Controlling asynchronous work in leveleddb to manage compaction within a %% level and cleaning out of old files across a level -module(leveled_clerk). --export([merge_file/3, perform_merge/3]). +-behaviour(gen_server). + +-include("../include/leveled.hrl"). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + clerk_new/0, + clerk_prompt/2, + code_change/3, + perform_merge/4]). -include_lib("eunit/include/eunit.hrl"). +-record(state, {owner :: pid()}). -merge_file(_FileToMerge, _ManifestMgr, _Level) -> - %% CandidateList = leveled_manifest:get_manifest_atlevel(ManifestMgr, Level), - %% [Adds, Removes] = perform_merge(FileToMerge, CandidateList, Level), - %%leveled_manifest:update_manifest_atlevel(ManifestMgr, Level, Adds, Removes), +%%%============================================================================ +%%% API +%%%============================================================================ + +clerk_new() -> + {ok, Pid} = gen_server:start(?MODULE, [], []), + ok = gen_server:call(Pid, register, infinity), + {ok, Pid}. + + +clerk_prompt(Pid, penciller) -> + gen_server:cast(Pid, penciller_prompt, infinity). + +%%%============================================================================ +%%% gen_server callbacks +%%%============================================================================ + +init([]) -> + {ok, #state{}}. + +handle_call(register, From, State) -> + {noreply, State#state{owner=From}}. + +handle_cast({penciller_prompt, From}, State) -> + case leveled_penciller:pcl_workforclerk(State#state.owner) of + none -> + io:format("Work prompted but none needed~n"), + {noreply, State}; + WI -> + {NewManifest, FilesToDelete} = merge(WI), + UpdWI = WI#penciller_work{new_manifest=NewManifest, + unreferenced_files=FilesToDelete}, + leveled_penciller:pcl_requestmanifestchange(From, UpdWI), + {noreply, State} + end. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +%%%============================================================================ +%%% Internal functions +%%%============================================================================ + +merge(WI) -> + SrcLevel = WI#penciller_work.src_level, + {Selection, UpdMFest1} = select_filetomerge(SrcLevel, + WI#penciller_work.manifest), + {{StartKey, EndKey}, SrcFile} = Selection, + SrcFilename = leveled_sft:sft_getfilename(SrcFile), + SinkFiles = get_item(SrcLevel + 1, UpdMFest1, []), + SplitLists = lists:splitwith(fun(Ref) -> + case {Ref#manifest_entry.start_key, + Ref#manifest_entry.end_key} of + {_, EK} when StartKey > EK -> + false; + {SK, _} when EndKey < SK -> + false; + _ -> + true + end end, + SinkFiles), + {Candidates, Others} = SplitLists, + + %% TODO: + %% Need to work out if this is the top level + %% And then tell merge process to create files at the top level + %% Which will include the reaping of expired tombstones + + io:format("Merge from level ~w to merge into ~w files below", + [SrcLevel, length(Candidates)]), + + MergedFiles = case length(Candidates) of + 0 -> + %% If no overlapping candiates, manifest change only required + %% + %% TODO: need to think still about simply renaming when at + %% lower level + [SrcFile]; + _ -> + perform_merge({SrcFile, SrcFilename}, + Candidates, + SrcLevel, + {WI#penciller_work.ledger_filepath, + WI#penciller_work.next_sqn}) + end, + + NewLevel = lists:sort(lists:append(MergedFiles, Others)), + UpdMFest2 = lists:keyreplace(SrcLevel + 1, + 1, + UpdMFest1, + {SrcLevel, NewLevel}), + + {ok, Handle} = file:open(WI#penciller_work.manifest_file, + [binary, raw, write]), + ok = file:write(Handle, term_to_binary(UpdMFest2)), + ok = file:close(Handle), + {UpdMFest2, Candidates}. + + +%% An algorithm for discovering which files to merge .... +%% We can find the most optimal file: +%% - The one with the most overlapping data below? +%% - The one that overlaps with the fewest files below? +%% - The smallest file? +%% We could try and be fair in some way (merge oldest first) +%% Ultimately, there is alack of certainty that being fair or optimal is +%% genuinely better - ultimately every file has to be compacted. +%% +%% Hence, the initial implementation is to select files to merge at random + +select_filetomerge(SrcLevel, Manifest) -> + {SrcLevel, LevelManifest} = lists:keyfind(SrcLevel, 1, Manifest), + Selected = lists:nth(random:uniform(length(LevelManifest)), + LevelManifest), + UpdManifest = lists:keyreplace(SrcLevel, + 1, + Manifest, + {SrcLevel, + lists:delete(Selected, + LevelManifest)}), + {Selected, UpdManifest}. + %% Assumption is that there is a single SFT from a higher level that needs @@ -36,41 +173,45 @@ merge_file(_FileToMerge, _ManifestMgr, _Level) -> %% %% The level is the level which the new files should be created at. -perform_merge(FileToMerge, CandidateList, Level) -> +perform_merge(FileToMerge, CandidateList, Level, {Filepath, MSN}) -> {Filename, UpperSFTPid} = FileToMerge, - MergeID = generate_merge_id(Filename, Level), - io:format("Merge to be commenced for FileToMerge=~s with MergeID=~s~n", - [Filename, MergeID]), + io:format("Merge to be commenced for FileToMerge=~s with MSN=~w~n", + [Filename, MSN]), PointerList = lists:map(fun(P) -> {next, P, all} end, CandidateList), do_merge([{next, UpperSFTPid, all}], - PointerList, Level, MergeID, 0, []). + PointerList, Level, {Filepath, MSN}, 0, []). -do_merge([], [], Level, MergeID, FileCounter, OutList) -> - io:format("Merge completed with MergeID=~s Level=~w and FileCounter=~w~n", - [MergeID, Level, FileCounter]), +do_merge([], [], Level, {_Filepath, MSN}, FileCounter, OutList) -> + io:format("Merge completed with MSN=~w Level=~w and FileCounter=~w~n", + [MSN, Level, FileCounter]), OutList; -do_merge(KL1, KL2, Level, MergeID, FileCounter, OutList) -> - FileName = lists:flatten(io_lib:format("../test/~s_~w.sft", [MergeID, FileCounter])), - io:format("File to be created as part of MergeID=~s Filename=~s~n", [MergeID, FileName]), +do_merge(KL1, KL2, Level, {Filepath, MSN}, FileCounter, OutList) -> + FileName = lists:flatten(io_lib:format(Filepath ++ "_~w_~w.sft", + [Level, FileCounter])), + io:format("File to be created as part of MSN=~w Filename=~s~n", + [MSN, FileName]), case leveled_sft:sft_new(FileName, KL1, KL2, Level) of {ok, _Pid, {error, Reason}} -> io:format("Exiting due to error~w~n", [Reason]); {ok, Pid, Reply} -> {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply, - do_merge(KL1Rem, KL2Rem, Level, MergeID, FileCounter + 1, - lists:append(OutList, [{SmallestKey, HighestKey, Pid}])) + ExtMan = lists:append(OutList, + [#manifest_entry{start_key=SmallestKey, + end_key=HighestKey, + owner=Pid, + filename=FileName}]), + do_merge(KL1Rem, KL2Rem, Level, {Filepath, MSN}, + FileCounter + 1, ExtMan) end. -generate_merge_id(Filename, Level) -> - <> = crypto:rand_bytes(14), - FileID = erlang:phash2(Filename, 256), - B = FileID * 256 + Level, - Str = io_lib:format("~8.16.0b-~4.16.0b-4~3.16.0b-~4.16.0b-~12.16.0b", - [A, B, C band 16#0fff, D band 16#3fff bor 16#8000, E]), - list_to_binary(Str). - - +get_item(Index, List, Default) -> + case lists:keysearch(Index, 1, List) of + {value, {Index, Value}} -> + Value; + false -> + Default + end. %%%============================================================================ @@ -94,9 +235,10 @@ generate_randomkeys(Count, Acc, BucketLow, BRange) -> {active, infinity}, null}, generate_randomkeys(Count - 1, [RandKey|Acc], BucketLow, BRange). -choose_pid_toquery([{StartKey, EndKey, Pid}|_T], Key) when Key >= StartKey, - EndKey >= Key -> - Pid; +choose_pid_toquery([ManEntry|_T], Key) when + Key >= ManEntry#manifest_entry.start_key, + ManEntry#manifest_entry.end_key >= Key -> + ManEntry#manifest_entry.owner; choose_pid_toquery([_H|T], Key) -> choose_pid_toquery(T, Key). @@ -138,10 +280,12 @@ merge_file_test() -> KL4_L2, [], 2), Result = perform_merge({"../test/KL1_L1.sft", PidL1_1}, [PidL2_1, PidL2_2, PidL2_3, PidL2_4], - 2), - lists:foreach(fun({{o, B1, K1}, {o, B2, K2}, R}) -> + 2, {"../test/", 99}), + lists:foreach(fun(ManEntry) -> + {o, B1, K1} = ManEntry#manifest_entry.start_key, + {o, B2, K2} = ManEntry#manifest_entry.end_key, io:format("Result of ~s ~s and ~s ~s with Pid ~w~n", - [B1, K1, B2, K2, R]) end, + [B1, K1, B2, K2, ManEntry#manifest_entry.owner]) end, Result), io:format("Finding keys in KL1_L1~n"), ok = find_randomkeys(Result, 50, KL1_L1), @@ -158,4 +302,16 @@ merge_file_test() -> leveled_sft:sft_clear(PidL2_2), leveled_sft:sft_clear(PidL2_3), leveled_sft:sft_clear(PidL2_4), - lists:foreach(fun({_StK, _EndK, Pid}) -> leveled_sft:sft_clear(Pid) end, Result). + lists:foreach(fun(ManEntry) -> + leveled_sft:sft_clear(ManEntry#manifest_entry.owner) end, + Result). + + +select_merge_file_test() -> + L0 = [{{o, "B1", "K1"}, {o, "B3", "K3"}, dummy_pid}], + L1 = [{{o, "B1", "K1"}, {o, "B2", "K2"}, dummy_pid}, + {{o, "B2", "K3"}, {o, "B4", "K4"}, dummy_pid}], + Manifest = [{0, L0}, {1, L1}], + {FileRef, NewManifest} = select_filetomerge(0, Manifest), + ?assertMatch(FileRef, {{o, "B1", "K1"}, {o, "B3", "K3"}, dummy_pid}), + ?assertMatch(NewManifest, [{0, []}, {1, L1}]). \ No newline at end of file diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 0e1cfaa..5e75e4a 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -114,18 +114,14 @@ %% work. When the Clerk has requested and taken work, it should perform the %5 compaction work starting the new SFT process to manage the new Ledger state %% and then write a new manifest file that represents that state with using -%% The MergeID as the filename .pnd. -%% -%% Prior to completing the work the previous manifest file should be renamed -%% to the filename .bak, and any .bak files other than the -%% the most recent n files should be deleted. +%% the next Manifest sequence number as the filename: +%% - nonzero_.pnd %% -%% The Penciller on accepting the change should rename the manifest file to -%% '.crr'. +%% The Penciller on accepting the change should rename the manifest file to - +%% - nonzero_.crr %% -%% On startup, the Penciller should look first for a *.crr file, and if -%% one is not present it should promot the most recently modified *.bak - -%% checking first that all files referenced in it are still present. +%% On startup, the Penciller should look for the nonzero_*.crr file with the +%% highest such manifest sequence number. %% %% The pace at which the store can accept updates will be dependent on the %% speed at which the Penciller's Clerk can merge files at lower levels plus @@ -134,13 +130,36 @@ %% written the Penciller will need to wait for this compaction work to %% complete and the L0 file to be compacted before the ETS table can be %% allowed to again reach capacity +%% +%% The writing of L0 files do not require the involvement of the clerk. +%% The L0 files are prompted directly by the penciller when the in-memory ets +%% table has reached capacity. When there is a next push into memory the +%% penciller calls to check that the file is now active (which may pause if the +%% write is ongoing the acceptence of the push), and if so it can clear the ets +%% table and build a new table starting with the remainder, and the keys from +%% the latest push. +%% -module(leveled_penciller). -%% -behaviour(gen_server). +-behaviour(gen_server). --export([return_work/2, commit_manifest_change/5]). +-include("../include/leveled.hrl"). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3, + pcl_new/0, + pcl_start/1, + pcl_pushmem/2, + pcl_fetch/2, + pcl_workforclerk/1, + pcl_requestmanifestchange/2, + commit_manifest_change/3]). -include_lib("eunit/include/eunit.hrl"). @@ -155,14 +174,186 @@ -define(PENDING_FILEX, "pnd"). -define(BACKUP_FILEX, "bak"). -define(ARCHIVE_FILEX, "arc"). +-define(MEMTABLE, mem). +-define(MAX_TABLESIZE, 32000). +-define(L0PEND_RESET, {false, [], none}). --record(state, {manifest :: list(), - ongoing_work :: list(), - manifest_sqn :: integer(), - registered_iterators :: list(), - unreferenced_files :: list(), - root_path :: string(), - mem :: ets:tid()}). +-record(state, {manifest = [] :: list(), + ongoing_work = [] :: list(), + manifest_sqn = 0 :: integer(), + levelzero_sqn =0 :: integer(), + registered_iterators = [] :: list(), + unreferenced_files = [] :: list(), + root_path = "../test/" :: string(), + table_size = 0 :: integer(), + clerk :: pid(), + levelzero_pending = {false, [], none} :: tuple(), + memtable}). + + +%%%============================================================================ +%%% API +%%%============================================================================ + +pcl_new() -> + gen_server:start(?MODULE, [], []). + +pcl_start(_RootDir) -> + %% TODO + %% Need to call startup to rebuild from disk + ok. + +pcl_pushmem(Pid, DumpList) -> + %% Bookie to dump memory onto penciller + gen_server:call(Pid, {push_mem, DumpList}, infinity). + +pcl_fetch(Pid, Key) -> + gen_server:call(Pid, {fetch, Key}, infinity). + +pcl_workforclerk(Pid) -> + gen_server:call(Pid, work_for_clerk, infinity). + +pcl_requestmanifestchange(Pid, WorkItem) -> + gen_server:call(Pid, {manifest_change, WorkItem}, infinity). + +%%%============================================================================ +%%% gen_server callbacks +%%%============================================================================ + +init([]) -> + TID = ets:new(?MEMTABLE, [ordered_set, private]), + {ok, #state{memtable=TID}}. + +handle_call({push_mem, DumpList}, _From, State) -> + {TableSize, Manifest, L0Pend} = case State#state.levelzero_pending of + {true, Remainder, {StartKey, EndKey, Pid}} -> + %% Need to handle not error scenarios? + %% N.B. Sync call - so will be ready + ok = leveled_sft:sft_checkready(Pid), + %% Reset ETS, but re-insert any remainder + true = ets:delete_all_objects(State#state.memtable), + true = ets:insert(State#state.memtable, Remainder), + {length(Remainder), + lists:keystore(0, + 1, + State#state.manifest, + {0, [{StartKey, EndKey, Pid}]}), + ?L0PEND_RESET}; + {false, _, _} -> + {State#state.table_size, + State#state.manifest, + State#state.levelzero_pending}; + Unexpected -> + io:format("Unexpected value of ~w~n", [Unexpected]), + error + end, + case do_push_to_mem(DumpList, TableSize, State#state.memtable) of + {twist, ApproxTableSize} -> + {reply, ok, State#state{table_size=ApproxTableSize, + manifest=Manifest, + levelzero_pending=L0Pend}}; + {roll, ApproxTableSize} -> + case {get_item(0, Manifest, []), L0Pend} of + {[], ?L0PEND_RESET} -> + L0SN = State#state.levelzero_sqn + 1, + FileName = State#state.root_path + ++ ?FILES_FP ++ "/" + ++ integer_to_list(L0SN), + SFT = leveled_sft:sft_new(FileName, + ets:tab2list(State#state.memtable), + [], + 0, + #sft_options{wait=false}), + {ok, L0Pid, Reply} = SFT, + {{KL1Rem, []}, L0StartKey, L0EndKey} = Reply, + {reply, ok, State#state{levelzero_pending={true, + KL1Rem, + {L0StartKey, + L0EndKey, + L0Pid}}, + table_size=ApproxTableSize, + levelzero_sqn=L0SN}}; + _ -> + io:format("Memory has exceeded limit but L0 file is still + awaiting compaction ~n"), + {reply, pause, State#state{table_size=ApproxTableSize, + manifest=Manifest, + levelzero_pending=L0Pend}} + end + end; +handle_call({fetch, Key}, _From, State) -> + {reply, fetch(Key, State#state.manifest, State#state.memtable), State}; +handle_call(work_for_clerk, From, State) -> + {UpdState, Work} = return_work(State, From), + {reply, Work, UpdState}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +%%%============================================================================ +%%% Internal functions +%%%============================================================================ + + +fetch(Key, Manifest, TID) -> + case ets:lookup(TID, Key) of + [Object] -> + Object; + [] -> + fetch(Key, Manifest, 0, fun leveled_sft:sft_get/2) + end. + +fetch(_Key, _Manifest, ?MAX_LEVELS + 1, _FetchFun) -> + not_present; +fetch(Key, Manifest, Level, FetchFun) -> + LevelManifest = get_item(Level, Manifest, []), + case lists:foldl(fun(File, Acc) -> + case Acc of + not_present when + Key >= File#manifest_entry.start_key, + File#manifest_entry.end_key >= Key -> + File#manifest_entry.owner; + PidFound -> + PidFound + end end, + not_present, + LevelManifest) of + not_present -> + fetch(Key, Manifest, Level + 1, FetchFun); + FileToCheck -> + case FetchFun(FileToCheck, Key) of + not_present -> + fetch(Key, Manifest, Level + 1, FetchFun); + ObjectFound -> + ObjectFound + end + end. + +do_push_to_mem(DumpList, TableSize, MemTable) -> + ets:insert(MemTable, DumpList), + case TableSize + length(DumpList) of + ApproxTableSize when ApproxTableSize > ?MAX_TABLESIZE -> + case ets:info(MemTable, size) of + ActTableSize when ActTableSize > ?MAX_TABLESIZE -> + {roll, ActTableSize}; + ActTableSize -> + io:format("Table size is actually ~w~n", [ActTableSize]), + {twist, ActTableSize} + end; + ApproxTableSize -> + io:format("Table size is approximately ~w~n", [ApproxTableSize]), + {twist, ApproxTableSize} + end. @@ -173,56 +364,70 @@ %% to look at work at that level return_work(State, From) -> - case State#state.ongoing_work of - [] -> - WorkQueue = assess_workqueue([], - 0, - State#state.manifest, - []), - case length(WorkQueue) of - L when L > 0 -> - [{SrcLevel, Manifest}|OtherWork] = WorkQueue, - io:format("Work at Level ~w to be scheduled for ~w - with ~w queue items outstanding~n", - [SrcLevel, From, length(OtherWork)]), - {State#state{ongoing_work={SrcLevel, From, os:timestamp()}}, - {SrcLevel, Manifest}}; - _ -> + WorkQueue = assess_workqueue([], + 0, + State#state.manifest), + case length(WorkQueue) of + L when L > 0 -> + [{SrcLevel, Manifest}|OtherWork] = WorkQueue, + io:format("Work at Level ~w to be scheduled for ~w + with ~w queue items outstanding~n", + [SrcLevel, From, length(OtherWork)]), + case State#state.ongoing_work of + [] -> + %% No work currently outstanding + %% Can allocate work + NextSQN = State#state.manifest_sqn + 1, + FP = filepath(State#state.root_path, + NextSQN, + new_merge_files), + ManFile = filepath(State#state.root_path, + NextSQN, + pending_manifest), + WI = #penciller_work{next_sqn=NextSQN, + clerk=From, + src_level=SrcLevel, + manifest=Manifest, + start_time = os:timestamp(), + ledger_filepath = FP, + manifest_file = ManFile}, + {State#state{ongoing_work=[WI]}, WI}; + [OutstandingWork] -> + %% Still awaiting a response + io:format("Ongoing work requested by ~w but work + outstanding from Level ~w and Clerk ~w + at sequence number ~w~n", + [From, + OutstandingWork#penciller_work.src_level, + OutstandingWork#penciller_work.clerk, + OutstandingWork#penciller_work.next_sqn]), {State, none} end; - [{SrcLevel, OtherFrom, _TS}|T] -> - io:format("Ongoing work requested by ~w but work - outstanding from Level ~w and Clerk ~w with - ~w other items outstanding~n", - [From, SrcLevel, OtherFrom, length(T)]), + _ -> {State, none} end. -assess_workqueue(WorkQ, ?MAX_LEVELS - 1, _Manifest, _OngoingWork) -> + + + +assess_workqueue(WorkQ, ?MAX_LEVELS - 1, _Manifest) -> WorkQ; -assess_workqueue(WorkQ, LevelToAssess, Manifest, OngoingWork)-> +assess_workqueue(WorkQ, LevelToAssess, Manifest)-> MaxFiles = get_item(LevelToAssess, ?LEVEL_SCALEFACTOR, 0), FileCount = length(get_item(LevelToAssess, Manifest, [])), NewWQ = maybe_append_work(WorkQ, LevelToAssess, Manifest, MaxFiles, - FileCount, OngoingWork), - assess_workqueue(NewWQ, LevelToAssess + 1, Manifest, OngoingWork). + FileCount), + assess_workqueue(NewWQ, LevelToAssess + 1, Manifest). maybe_append_work(WorkQ, Level, Manifest, - MaxFiles, FileCount, OngoingWork) + MaxFiles, FileCount) when FileCount > MaxFiles -> io:format("Outstanding compaction work items of ~w at level ~w~n", [FileCount - MaxFiles, Level]), - case lists:keyfind(Level, 1, OngoingWork) of - {Level, Pid, TS} -> - io:format("Work will not be added to queue due to - outstanding work with ~w assigned at ~w~n", [Pid, TS]), - WorkQ; - false -> - lists:append(WorkQ, [{Level, Manifest}]) - end; + lists:append(WorkQ, [{Level, Manifest}]); maybe_append_work(WorkQ, Level, _Manifest, - _MaxFiles, FileCount, _OngoingWork) -> + _MaxFiles, FileCount) -> io:format("No compaction work due to file count ~w at level ~w~n", [FileCount, Level]), WorkQ. @@ -238,54 +443,65 @@ get_item(Index, List, Default) -> %% Request a manifest change -%% Should be passed the -%% - {SrcLevel, NewManifest, ClearedFiles, MergeID, From, State} -%% To complete a manifest change need to: -%% - Update the Manifest Sequence Number (msn) -%% - Confirm this Pid has a current element of manifest work outstanding at -%% that level -%% - Rename the manifest file created under the MergeID (.manifest) -%% to the filename current.manifest -%% - Update the state of the LevelFileRef lists -%% - Add the ClearedFiles to the list of files to be cleared (as a tuple with -%% the new msn) +%% The clerk should have completed the work, and created a new manifest +%% and persisted the new view of the manifest +%% +%% To complete the change of manifest: +%% - the state of the manifest file needs to be changed from pending to current +%% - the list of unreferenced files needs to be updated on State +%% - the current manifest needs to be update don State +%% - the list of ongoing work needs to be cleared of this item -commit_manifest_change(NewManifest, ClearedFiles, MergeID, From, State) -> +commit_manifest_change(ReturnedWorkItem, From, State) -> NewMSN = State#state.manifest_sqn + 1, - OngoingWork = State#state.ongoing_work, + [SentWorkItem] = State#state.ongoing_work, RootPath = State#state.root_path, UnreferencedFiles = State#state.unreferenced_files, - case OngoingWork of - {SrcLevel, From, TS} -> - io:format("Merge ~s completed in ~w microseconds at Level ~w~n", - [MergeID, timer:diff_now(os:timestamp(), TS), SrcLevel]), - ok = rename_manifest_files(RootPath, MergeID), - UnreferencedFilesUpd = update_deletions(ClearedFiles, + + case {SentWorkItem#penciller_work.next_sqn, + SentWorkItem#penciller_work.clerk} of + {NewMSN, From} -> + MTime = timer:diff_now(os:timestamp(), + SentWorkItem#penciller_work.start_time), + io:format("Merge to sqn ~w completed in ~w microseconds + at Level ~w~n", + [SentWorkItem#penciller_work.next_sqn, + MTime, + SentWorkItem#penciller_work.src_level]), + ok = rename_manifest_files(RootPath, NewMSN), + FilesToDelete = ReturnedWorkItem#penciller_work.unreferenced_files, + UnreferencedFilesUpd = update_deletions(FilesToDelete, NewMSN, UnreferencedFiles), - io:format("Merge ~s has been commmitted at sequence number ~w~n", - [MergeID, NewMSN]), + io:format("Merge has been commmitted at sequence number ~w~n", + [NewMSN]), + NewManifest = ReturnedWorkItem#penciller_work.new_manifest, {ok, State#state{ongoing_work=null, manifest_sqn=NewMSN, manifest=NewManifest, unreferenced_files=UnreferencedFilesUpd}}; - _ -> - io:format("Merge commit ~s not matched to known work~n", - [MergeID]), + {MaybeWrongMSN, MaybeWrongClerk} -> + io:format("Merge commit from ~w at sqn ~w not matched to expected + clerk ~w or sqn ~w~n", + [From, NewMSN, MaybeWrongClerk, MaybeWrongMSN]), {error, State} - end. - + end. -rename_manifest_files(RootPath, MergeID) -> - ManifestFP = RootPath ++ "/" ++ ?MANIFEST_FP ++ "/", - ok = file:rename(ManifestFP ++ MergeID - ++ "." ++ ?PENDING_FILEX, - ManifestFP ++ MergeID - ++ "." ++ ?CURRENT_FILEX), - ok. +rename_manifest_files(RootPath, NewMSN) -> + file:rename(filepath(RootPath, NewMSN, pending_manifest), + filepath(RootPath, NewMSN, current_manifest)). +filepath(RootPath, NewMSN, pending_manifest) -> + RootPath ++ "/" ++ ?MANIFEST_FP ++ "/" ++ "nonzero_" + ++ integer_to_list(NewMSN) ++ "." ++ ?PENDING_FILEX; +filepath(RootPath, NewMSN, current_manifest) -> + RootPath ++ "/" ++ ?MANIFEST_FP ++ "/" ++ "nonzero_" + ++ integer_to_list(NewMSN) ++ "." ++ ?CURRENT_FILEX; +filepath(RootPath, NewMSN, new_merge_files) -> + RootPath ++ "/" ++ ?FILES_FP ++ "/" ++ integer_to_list(NewMSN). + update_deletions([], _NewMSN, UnreferencedFiles) -> UnreferencedFiles; update_deletions([ClearedFile|Tail], MSN, UnreferencedFiles) -> @@ -303,12 +519,8 @@ compaction_work_assessment_test() -> L1 = [{{o, "B1", "K1"}, {o, "B2", "K2"}, dummy_pid}, {{o, "B2", "K3"}, {o, "B4", "K4"}, dummy_pid}], Manifest = [{0, L0}, {1, L1}], - OngoingWork1 = [], - WorkQ1 = assess_workqueue([], 0, Manifest, OngoingWork1), + WorkQ1 = assess_workqueue([], 0, Manifest), ?assertMatch(WorkQ1, [{0, Manifest}]), - OngoingWork2 = [{0, dummy_pid, os:timestamp()}], - WorkQ2 = assess_workqueue([], 0, Manifest, OngoingWork2), - ?assertMatch(WorkQ2, []), L1Alt = lists:append(L1, [{{o, "B5", "K0001"}, {o, "B5", "K9999"}, dummy_pid}, {{o, "B6", "K0001"}, {o, "B6", "K9999"}, dummy_pid}, @@ -318,7 +530,5 @@ compaction_work_assessment_test() -> {{o, "BA", "K0001"}, {o, "BA", "K9999"}, dummy_pid}, {{o, "BB", "K0001"}, {o, "BB", "K9999"}, dummy_pid}]), Manifest3 = [{0, []}, {1, L1Alt}], - WorkQ3 = assess_workqueue([], 0, Manifest3, OngoingWork1), - ?assertMatch(WorkQ3, [{1, Manifest3}]), - WorkQ4 = assess_workqueue([], 0, Manifest3, OngoingWork2), - ?assertMatch(WorkQ4, [{1, Manifest3}]). + WorkQ3 = assess_workqueue([], 0, Manifest3), + ?assertMatch(WorkQ3, [{1, Manifest3}]). diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 04e08ba..2896796 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -143,6 +143,7 @@ -module(leveled_sft). -behaviour(gen_server). +-include("../include/leveled.hrl"). -export([init/1, handle_call/3, @@ -151,14 +152,20 @@ terminate/2, code_change/3, sft_new/4, + sft_new/5, sft_open/1, sft_get/2, sft_getkeyrange/4, sft_close/1, - sft_clear/1]). + sft_clear/1, + sft_checkready/1, + sft_getfilename/1, + strip_to_keyonly/1, + generate_randomkeys/1]). -include_lib("eunit/include/eunit.hrl"). + -define(WORD_SIZE, 4). -define(DWORD_SIZE, 8). -define(CURRENT_VERSION, {0,1}). @@ -174,6 +181,7 @@ -define(HEADER_LEN, 56). -define(ITERATOR_SCANWIDTH, 1). -define(MERGE_SCANWIDTH, 8). +-define(MAX_KEYS, ?SLOT_COUNT * ?BLOCK_COUNT * ?BLOCK_SIZE). -record(state, {version = ?CURRENT_VERSION :: tuple(), @@ -189,7 +197,9 @@ summ_pointer :: integer(), summ_length :: integer(), filename :: string(), - handle :: file:fd()}). + handle :: file:fd(), + background_complete=false :: boolean(), + background_failure="Unknown" :: string()}). %%%============================================================================ @@ -197,10 +207,23 @@ %%%============================================================================ sft_new(Filename, KL1, KL2, Level) -> + sft_new(Filename, KL1, KL2, Level, #sft_options{}). + +sft_new(Filename, KL1, KL2, Level, Options) -> {ok, Pid} = gen_server:start(?MODULE, [], []), - Reply = gen_server:call(Pid, {sft_new, Filename, KL1, KL2, Level}, infinity), + Reply = case Options#sft_options.wait of + true -> + gen_server:call(Pid, + {sft_new, Filename, KL1, KL2, Level}, + infinity); + false -> + gen_server:call(Pid, + {sft_new, Filename, KL1, KL2, Level, background}, + infinity) + end, {ok, Pid, Reply}. + sft_open(Filename) -> {ok, Pid} = gen_server:start(?MODULE, [], []), case gen_server:call(Pid, {sft_open, Filename}, infinity) of @@ -225,6 +248,11 @@ sft_close(Pid) -> sft_clear(Pid) -> file_request(Pid, clear). +sft_checkready(Pid) -> + gen_server:call(Pid, background_complete, infinity). + +sft_getfilename(Pid) -> + gen_server:call(Pid, get_filename, infinty). %%%============================================================================ %%% API helper functions @@ -275,6 +303,47 @@ check_pid(Pid) -> init([]) -> {ok, #state{}}. +handle_call({sft_new, Filename, KL1, [], Level, background}, From, State) -> + {ListForFile, KL1Rem} = case length(KL1) of + L when L >= ?MAX_KEYS -> + lists:split(?MAX_KEYS, KL1); + _ -> + {KL1, []} + end, + StartKey = strip_to_keyonly(lists:nth(1, ListForFile)), + EndKey = strip_to_keyonly(lists:last(ListForFile)), + Ext = filename:extension(Filename), + Components = filename:split(Filename), + {TmpFilename, PrmFilename} = case Ext of + [] -> + {filename:join(Components) ++ ".pnd", filename:join(Components) ++ ".sft"}; + Ext -> + %% This seems unnecessarily hard + DN = filename:dirname(Filename), + FP = lists:last(Components), + FP_NOEXT = lists:sublist(FP, 1, 1 + length(FP) - length(Ext)), + {DN ++ "/" ++ FP_NOEXT ++ ".pnd", DN ++ "/" ++ FP_NOEXT ++ ".sft"} + end, + gen_server:reply(From, {{KL1Rem, []}, StartKey, EndKey}), + case create_file(TmpFilename) of + {error, Reason} -> + {noreply, State#state{background_complete=false, + background_failure=Reason}}; + {Handle, FileMD} -> + io:format("Creating file in background with input of size ~w~n", + [length(ListForFile)]), + % Key remainders must match to empty + Rename = {true, TmpFilename, PrmFilename}, + {ReadHandle, UpdFileMD, {[], []}} = complete_file(Handle, + FileMD, + ListForFile, + [], + Level, + Rename), + {noreply, UpdFileMD#state{handle=ReadHandle, + filename=PrmFilename, + background_complete=true}} + end; handle_call({sft_new, Filename, KL1, KL2, Level}, _From, State) -> case create_file(Filename) of {error, Reason} -> @@ -292,7 +361,7 @@ handle_call({sft_new, Filename, KL1, KL2, Level}, _From, State) -> {reply, {KeyRemainders, UpdFileMD#state.smallest_key, UpdFileMD#state.highest_key}, - UpdFileMD#state{handle=ReadHandle}} + UpdFileMD#state{handle=ReadHandle, filename=Filename}} end; handle_call({sft_open, Filename}, _From, _State) -> {_Handle, FileMD} = open_file(Filename), @@ -315,8 +384,17 @@ handle_call(close, _From, State) -> handle_call(clear, _From, State) -> ok = file:close(State#state.handle), ok = file:delete(State#state.filename), - {reply, true, State}. - + {reply, true, State}; +handle_call(background_complete, _From, State) -> + case State#state.background_complete of + true -> + {reply, ok, State}; + false -> + {reply, {error, State#state.background_failure}, State} + end; +handle_call(get_filename, _from, State) -> + {reply, State#state.filename, State}. + handle_cast(_Msg, State) -> {noreply, State}. @@ -338,6 +416,7 @@ code_change(_OldVsn, State, _Extra) -> %% Return the {Handle, metadata record} create_file(FileName) when is_list(FileName) -> io:format("Opening file with filename ~s~n", [FileName]), + ok = filelib:ensure_dir(FileName), case file:open(FileName, [binary, raw, read, write]) of {ok, Handle} -> Header = create_header(initial), @@ -396,15 +475,23 @@ open_file(FileMD) -> %% Take a file handle with a previously created header and complete it based on %% the two key lists KL1 and KL2 - complete_file(Handle, FileMD, KL1, KL2, Level) -> + complete_file(Handle, FileMD, KL1, KL2, Level, false). + +complete_file(Handle, FileMD, KL1, KL2, Level, Rename) -> {ok, KeyRemainders} = write_keys(Handle, maybe_expand_pointer(KL1), maybe_expand_pointer(KL2), [], <<>>, Level, fun sftwrite_function/2), - {ReadHandle, UpdFileMD} = open_file(FileMD), + {ReadHandle, UpdFileMD} = case Rename of + false -> + open_file(FileMD); + {true, OldName, NewName} -> + ok = file:rename(OldName, NewName), + open_file(FileMD#state{filename=NewName}) + end, {ReadHandle, UpdFileMD, KeyRemainders}. %% Fetch a Key and Value from a file, returns