diff --git a/src/leveled_clerk.erl b/src/leveled_clerk.erl index 1a181b2..107f12b 100644 --- a/src/leveled_clerk.erl +++ b/src/leveled_clerk.erl @@ -15,6 +15,7 @@ terminate/2, clerk_new/1, clerk_prompt/2, + clerk_stop/1, code_change/3, perform_merge/4]). @@ -23,8 +24,7 @@ -define(INACTIVITY_TIMEOUT, 2000). -define(HAPPYTIME_MULTIPLIER, 5). --record(state, {owner :: pid(), - in_backlog = false :: boolean()}). +-record(state, {owner :: pid()}). %%%============================================================================ %%% API @@ -35,11 +35,13 @@ clerk_new(Owner) -> ok = gen_server:call(Pid, {register, Owner}, infinity), {ok, Pid}. - clerk_prompt(Pid, penciller) -> gen_server:cast(Pid, penciller_prompt), ok. +clerk_stop(Pid) -> + gen_server:cast(Pid, stop). + %%%============================================================================ %%% gen_server callbacks %%%============================================================================ @@ -48,11 +50,13 @@ init([]) -> {ok, #state{}}. handle_call({register, Owner}, _From, State) -> - {reply, ok, State#state{owner=Owner}}. + {reply, ok, State#state{owner=Owner}, ?INACTIVITY_TIMEOUT}. handle_cast(penciller_prompt, State) -> Timeout = requestandhandle_work(State), - {noreply, State, Timeout}. + {noreply, State, Timeout}; +handle_cast(stop, State) -> + {stop, normal, State}. handle_info(timeout, State) -> %% The pcl prompt will cause a penciller_prompt, to re-trigger timeout @@ -90,10 +94,21 @@ requestandhandle_work(State) -> {NewManifest, FilesToDelete} = merge(WI), UpdWI = WI#penciller_work{new_manifest=NewManifest, unreferenced_files=FilesToDelete}, - leveled_penciller:pcl_requestmanifestchange(State#state.owner, - UpdWI), - mark_for_delete(FilesToDelete, State#state.owner), - ?INACTIVITY_TIMEOUT + R = leveled_penciller:pcl_requestmanifestchange(State#state.owner, + UpdWI), + case R of + ok -> + %% Request for manifest change must be a synchronous call + %% Otherwise cannot mark files for deletion (may erase + %% without manifest change on close) + mark_for_delete(FilesToDelete, State#state.owner), + ?INACTIVITY_TIMEOUT; + _ -> + %% New files will forever remain in an undetermined state + %% The disconnected files should be logged at start-up for + %% Manual clear-up + ?INACTIVITY_TIMEOUT + end end. diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 44e70c0..49de7c6 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -84,9 +84,8 @@ %% To initiate the Ledger the must consult the manifest, and then start a SFT %% management process for each file in the manifest. %% -%% The penciller should then try and read any persisted ETS table in the -%% on_shutdown folder. The Penciller must then discover the highest sequence -%% number in the ledger, and respond to the Bookie with that sequence number. +%% The penciller should then try and read any Level 0 file which has the +%% manifest sequence number one higher than the last store in the manifest. %% %% The Bookie will ask the Inker for any Keys seen beyond that sequence number %% before the startup of the overall store can be completed. @@ -153,14 +152,15 @@ handle_info/2, terminate/2, code_change/3, - pcl_new/0, pcl_start/1, pcl_pushmem/2, pcl_fetch/2, pcl_workforclerk/1, pcl_requestmanifestchange/2, pcl_confirmdelete/2, - pcl_prompt/1]). + pcl_prompt/1, + pcl_close/1, + pcl_getstartupsequencenumber/1]). -include_lib("eunit/include/eunit.hrl"). @@ -182,6 +182,7 @@ -record(state, {manifest = [] :: list(), ongoing_work = [] :: list(), manifest_sqn = 0 :: integer(), + ledger_sqn = 0 :: integer(), registered_iterators = [] :: list(), unreferenced_files = [] :: list(), root_path = "../test" :: string(), @@ -196,13 +197,8 @@ %%% API %%%============================================================================ -pcl_new() -> - gen_server:start(?MODULE, [], []). - pcl_start(RootDir) -> - {ok, Pid} = gen_server:start(?MODULE, [], []), - gen_server:call(Pid, {load, RootDir}, infinity), - ok. + gen_server:start(?MODULE, [RootDir], []). pcl_pushmem(Pid, DumpList) -> %% Bookie to dump memory onto penciller @@ -215,7 +211,7 @@ pcl_workforclerk(Pid) -> gen_server:call(Pid, work_for_clerk, infinity). pcl_requestmanifestchange(Pid, WorkItem) -> - gen_server:cast(Pid, {manifest_change, WorkItem}). + gen_server:call(Pid, {manifest_change, WorkItem}, infinity). pcl_confirmdelete(Pid, FileName) -> gen_server:call(Pid, {confirm_delete, FileName}). @@ -223,23 +219,116 @@ pcl_confirmdelete(Pid, FileName) -> pcl_prompt(Pid) -> gen_server:call(Pid, prompt_compaction). +pcl_getstartupsequencenumber(Pid) -> + gen_server:call(Pid, get_startup_sqn). + +pcl_close(Pid) -> + gen_server:call(Pid, close). + %%%============================================================================ %%% gen_server callbacks %%%============================================================================ -init([]) -> +init([RootPath]) -> TID = ets:new(?MEMTABLE, [ordered_set, private]), {ok, Clerk} = leveled_clerk:clerk_new(self()), - {ok, #state{memtable=TID, clerk=Clerk}}. + InitState = #state{memtable=TID, clerk=Clerk, root_path=RootPath}, + + %% Open manifest + ManifestPath = InitState#state.root_path ++ "/" ++ ?MANIFEST_FP ++ "/", + {ok, Filenames} = file:list_dir(ManifestPath), + CurrRegex = "nonzero_(?[0-9]+)\\." ++ ?CURRENT_FILEX, + ValidManSQNs = lists:foldl(fun(FN, Acc) -> + case re:run(FN, + CurrRegex, + [{capture, ['MSN'], list}]) of + nomatch -> + Acc; + {match, [Int]} when is_list(Int) -> + Acc ++ [list_to_integer(Int)]; + _ -> + Acc + end end, + [], + Filenames), + TopManSQN = lists:foldl(fun(X, MaxSQN) -> max(X, MaxSQN) end, + 0, + ValidManSQNs), + io:format("Store to be started based on " ++ + "manifest sequence number of ~w~n", [TopManSQN]), + case TopManSQN of + 0 -> + io:format("Seqence number of 0 indicates no valid manifest~n"), + {ok, InitState}; + _ -> + {ok, Bin} = file:read_file(filepath(InitState#state.root_path, + TopManSQN, + current_manifest)), + Manifest = binary_to_term(Bin), + {UpdManifest, MaxSQN} = open_all_filesinmanifest(Manifest), + io:format("Maximum sequence number of ~w " + ++ "found in nonzero levels~n", + [MaxSQN]), + + %% TODO + %% Find any L0 File left outstanding + L0FN = filepath(RootPath, + TopManSQN + 1, + new_merge_files) ++ "_0_0.sft", + case filelib:is_file(L0FN) of + true -> + io:format("L0 file found ~s~n", [L0FN]), + {ok, + L0Pid, + {L0StartKey, L0EndKey}} = leveled_sft:sft_open(L0FN), + L0SQN = leveled_sft:sft_getmaxsequencenumber(L0Pid), + ManifestEntry = #manifest_entry{start_key=L0StartKey, + end_key=L0EndKey, + owner=L0Pid, + filename=L0FN}, + UpdManifest2 = lists:keystore(0, + 1, + UpdManifest, + {0, [ManifestEntry]}), + io:format("L0 file had maximum sequence number of ~w~n", + [L0SQN]), + {ok, + InitState#state{manifest=UpdManifest2, + manifest_sqn=TopManSQN, + ledger_sqn=max(MaxSQN, L0SQN)}}; + false -> + io:format("No L0 file found~n"), + {ok, + InitState#state{manifest=UpdManifest, + manifest_sqn=TopManSQN, + ledger_sqn=MaxSQN}} + end + end. + handle_call({push_mem, DumpList}, _From, State) -> - case push_to_memory(DumpList, State) of - {ok, UpdState} -> - {reply, ok, UpdState}; - {{pause, Reason, Details}, UpdState} -> - io:format("Excess work due to - " ++ Reason, Details), - {reply, pause, UpdState#state{backlog=true}} - end; + StartWatch = os:timestamp(), + Response = case assess_sqn(DumpList) of + {MinSQN, MaxSQN} when MaxSQN > MinSQN, + MinSQN >= State#state.ledger_sqn -> + case push_to_memory(DumpList, State) of + {ok, UpdState} -> + {reply, ok, UpdState}; + {{pause, Reason, Details}, UpdState} -> + io:format("Excess work due to - " ++ Reason, Details), + {reply, pause, UpdState#state{backlog=true, + ledger_sqn=MaxSQN}} + end; + {MinSQN, MaxSQN} -> + io:format("Mismatch of sequence number expectations with push " + ++ "having sequence numbers between ~w and ~w " + ++ "but current sequence number is ~w~n", + [MinSQN, MaxSQN, State#state.ledger_sqn]), + {reply, refused, State} + end, + io:format("Push completed in ~w microseconds~n", + [timer:now_diff(os:timestamp(),StartWatch)]), + Response; handle_call({fetch, Key}, _From, State) -> {reply, fetch(Key, State#state.manifest, State#state.memtable), State}; handle_call(work_for_clerk, From, State) -> @@ -249,14 +338,13 @@ handle_call({confirm_delete, FileName}, _From, State) -> Reply = confirm_delete(FileName, State#state.unreferenced_files, State#state.registered_iterators), - {reply, Reply, State}; -handle_call({load, RootDir}, _From, State) -> - {Manifest, ManifestSQN} = load_manifest(RootDir), - {UpdManifest, MaxSQN} = load_allsft(RootDir, Manifest), - {UpdMaxSQN} = load_levelzero(RootDir, MaxSQN), - {reply, UpdMaxSQN, State#state{root_path=RootDir, - manifest=UpdManifest, - manifest_sqn=ManifestSQN}}; + case Reply of + true -> + UF1 = lists:keydelete(FileName, 1, State#state.unreferenced_files), + {reply, true, State#state{unreferenced_files=UF1}}; + _ -> + {reply, Reply, State} + end; handle_call(prompt_compaction, _From, State) -> case push_to_memory([], State) of {ok, UpdState} -> @@ -264,17 +352,66 @@ handle_call(prompt_compaction, _From, State) -> {{pause, Reason, Details}, UpdState} -> io:format("Excess work due to - " ++ Reason, Details), {reply, pause, UpdState#state{backlog=true}} - end. - - -handle_cast({manifest_change, WI}, State) -> + end; +handle_call({manifest_change, WI}, _From, State) -> {ok, UpdState} = commit_manifest_change(WI, State), - {noreply, UpdState}. + {reply, ok, UpdState}; +handle_call(get_startup_sqn, _From, State) -> + {reply, State#state.ledger_sqn, State}; +handle_call(close, _From, State) -> + {stop, normal, ok, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, _State) -> +terminate(_Reason, State) -> + %% When a Penciller shuts down it isn't safe to try an manage the safe + %% finishing of any outstanding work. The last commmitted manifest will + %% be used. + %% + %% Level 0 files lie outside of the manifest, and so if there is no L0 + %% file present it is safe to write the current contents of memory. If + %% there is a L0 file present - then the memory can be dropped (it is + %% recoverable from the ledger, and there should not be a lot to recover + %% as presumably the ETS file has been recently flushed, hence the presence + %% of a L0 file). + %% + %% The penciller should close each file in the unreferenced files, and + %% then each file in the manifest, and cast a close on the clerk. + %% The cast may not succeed as the clerk could be synchronously calling + %% the penciller looking for a manifest commit + %% + leveled_clerk:clerk_stop(State#state.clerk), + Dump = ets:tab2list(State#state.memtable), + case {State#state.levelzero_pending, + get_item(0, State#state.manifest, []), length(Dump)} of + {{false, _, _}, [], L} when L > 0 -> + MSN = State#state.manifest_sqn + 1, + FileName = State#state.root_path + ++ "/" ++ ?FILES_FP ++ "/" + ++ integer_to_list(MSN) ++ "_0_0", + {ok, + L0Pid, + {{KR1, _}, _SK, _HK}} = leveled_sft:sft_new(FileName ++ ".pnd", + Dump, + [], + 0), + io:format("Dump of memory on close to filename ~s with" + ++ " remainder ~w~n", [FileName, length(KR1)]), + leveled_sft:sft_close(L0Pid), + file:rename(FileName ++ ".pnd", FileName ++ ".sft"); + {{false, _, _}, [], L} when L == 0 -> + io:format("No keys to dump from memory when closing~n"); + _ -> + io:format("No opportunity to persist memory before closing " + ++ "with ~w keys discarded~n", [length(Dump)]) + end, + ok = close_files(0, State#state.manifest), + lists:foreach(fun({_FN, Pid, _SN}) -> leveled_sft:sft_close(Pid) end, + State#state.unreferenced_files), ok. code_change(_OldVsn, State, _Extra) -> @@ -288,7 +425,7 @@ code_change(_OldVsn, State, _Extra) -> push_to_memory(DumpList, State) -> {TableSize, UpdState} = case State#state.levelzero_pending of {true, Remainder, {StartKey, EndKey, Pid}} -> - %% Need to handle not error scenarios? + %% Need to handle error scenarios? %% N.B. Sync call - so will be ready {ok, SrcFN} = leveled_sft:sft_checkready(Pid), %% Reset ETS, but re-insert any remainder @@ -429,6 +566,7 @@ manifest_locked(State) -> end. + %% Work out what the current work queue should be %% %% The work queue should have a lower level work at the front, and no work @@ -485,8 +623,41 @@ return_work(State, From) -> end. +close_files(?MAX_LEVELS - 1, _Manifest) -> + ok; +close_files(Level, Manifest) -> + LevelList = get_item(Level, Manifest, []), + lists:foreach(fun(F) -> leveled_sft:sft_close(F#manifest_entry.owner) end, + LevelList), + close_files(Level + 1, Manifest). +open_all_filesinmanifest(Manifest) -> + open_all_filesinmanifest({Manifest, 0}, 0). + +open_all_filesinmanifest(Result, ?MAX_LEVELS - 1) -> + Result; +open_all_filesinmanifest({Manifest, TopSQN}, Level) -> + LevelList = get_item(Level, Manifest, []), + %% The Pids in the saved manifest related to now closed references + %% Need to roll over the manifest at this level starting new processes to + %5 replace them + LvlR = lists:foldl(fun(F, {FL, FL_SQN}) -> + FN = F#manifest_entry.filename, + {ok, P, _Keys} = leveled_sft:sft_open(FN), + F_SQN = leveled_sft:sft_getmaxsequencenumber(P), + {lists:append(FL, + [F#manifest_entry{owner = P}]), + max(FL_SQN, F_SQN)} + end, + {[], 0}, + LevelList), + %% Result is tuple of revised file list for this level in manifest, and + %% the maximum sequence number seen at this level + {LvlFL, LvlSQN} = LvlR, + UpdManifest = lists:keystore(Level, 1, Manifest, {Level, LvlFL}), + open_all_filesinmanifest({UpdManifest, max(TopSQN, LvlSQN)}, Level + 1). + assess_workqueue(WorkQ, ?MAX_LEVELS - 1, _Manifest) -> WorkQ; assess_workqueue(WorkQ, LevelToAssess, Manifest)-> @@ -539,8 +710,8 @@ commit_manifest_change(ReturnedWorkItem, State) -> {NewMSN, _From} -> MTime = timer:now_diff(os:timestamp(), SentWorkItem#penciller_work.start_time), - io:format("Merge to sqn ~w completed in ~w microseconds - at Level ~w~n", + io:format("Merge to sqn ~w completed in ~w microseconds " ++ + "at Level ~w~n", [SentWorkItem#penciller_work.next_sqn, MTime, SentWorkItem#penciller_work.src_level]), @@ -558,8 +729,8 @@ commit_manifest_change(ReturnedWorkItem, State) -> manifest=NewManifest, unreferenced_files=UnreferencedFilesUpd}}; {MaybeWrongMSN, From} -> - io:format("Merge commit at sqn ~w not matched to expected - sqn ~w from Clerk ~w~n", + io:format("Merge commit at sqn ~w not matched to expected" ++ + " sqn ~w from Clerk ~w~n", [NewMSN, MaybeWrongMSN, From]), {error, State} end. @@ -586,43 +757,36 @@ update_deletions([ClearedFile|Tail], MSN, UnreferencedFiles) -> update_deletions(Tail, MSN, lists:append(UnreferencedFiles, - [{ClearedFile#manifest_entry.filename, MSN}])). + [{ClearedFile#manifest_entry.filename, + ClearedFile#manifest_entry.owner, + MSN}])). confirm_delete(Filename, UnreferencedFiles, RegisteredIterators) -> case lists:keyfind(Filename, 1, UnreferencedFiles) of false -> false; - {Filename, MSN} -> + {Filename, _Pid, MSN} -> LowSQN = lists:foldl(fun({_, SQN}, MinSQN) -> min(SQN, MinSQN) end, infinity, RegisteredIterators), if - MSN >= LowSQN -> false; - true -> true + MSN >= LowSQN -> + false; + true -> + true end end. -%% load_manifest(RootDir), -%% {UpdManifest, MaxSQN} = load_allsft(RootDir, Manifest), -%% Level0SQN, UpdMaxSQN} = load_levelzero(RootDir, MaxSQN) -load_manifest(_RootDir) -> - {{}, 0}. +assess_sqn(DumpList) -> + assess_sqn(DumpList, infinity, 0). -load_allsft(_RootDir, _Manifest) -> - %% Manifest has been persisted with PIDs that are no longer - %% valid, roll through each entry opening files and replacing Pids in - %% Manifest - {{}, 0}. - -load_levelzero(_RootDir, _MaxSQN) -> - %% When loading L0 manifest make sure that the lowest sequence number in - %% the L0 manifest is bigger than the highest in all levels below - %% - not True - %% ... what about key remainders? - %% ... need to rethink L0 - {0, 0}. +assess_sqn([], MinSQN, MaxSQN) -> + {MinSQN, MaxSQN}; +assess_sqn([HeadKey|Tail], MinSQN, MaxSQN) -> + {_K, SQN} = leveled_sft:strip_to_key_seqn_only(HeadKey), + assess_sqn(Tail, min(MinSQN, SQN), max(MaxSQN, SQN)). %%%============================================================================ @@ -651,7 +815,8 @@ compaction_work_assessment_test() -> confirm_delete_test() -> Filename = 'test.sft', - UnreferencedFiles = [{'other.sft', 15}, {Filename, 10}], + UnreferencedFiles = [{'other.sft', dummy_owner, 15}, + {Filename, dummy_owner, 10}], RegisteredIterators1 = [{dummy_pid, 16}, {dummy_pid, 12}], R1 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators1), ?assertMatch(R1, true), diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 80d3fd4..fe4d331 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -163,6 +163,7 @@ sft_setfordelete/2, sft_getmaxsequencenumber/1, strip_to_keyonly/1, + strip_to_key_seqn_only/1, generate_randomkeys/1]). -include_lib("eunit/include/eunit.hrl"). @@ -231,29 +232,33 @@ sft_new(Filename, KL1, KL2, Level, Options) -> sft_open(Filename) -> {ok, Pid} = gen_server:start(?MODULE, [], []), case gen_server:call(Pid, {sft_open, Filename}, infinity) of - ok -> - {ok, Pid}; + {ok, {SK, EK}} -> + {ok, Pid, {SK, EK}}; Error -> Error end. sft_setfordelete(Pid, Penciller) -> - file_request(Pid, {set_for_delete, Penciller}). + gen_server:call(Pid, {set_for_delete, Penciller}, infinity). sft_get(Pid, Key) -> - file_request(Pid, {get_kv, Key}). + gen_server:call(Pid, {get_kv, Key}, infinity). sft_getkeyrange(Pid, StartKey, EndKey, ScanWidth) -> - file_request(Pid, {get_keyrange, StartKey, EndKey, ScanWidth}). + gen_server:call(Pid, + {get_keyrange, StartKey, EndKey, ScanWidth}, + infinity). sft_getkvrange(Pid, StartKey, EndKey, ScanWidth) -> - file_request(Pid, {get_kvrange, StartKey, EndKey, ScanWidth}). - -sft_close(Pid) -> - file_request(Pid, close). + gen_server:call(Pid, + {get_kvrange, StartKey, EndKey, ScanWidth}, + infinity). sft_clear(Pid) -> - file_request(Pid, clear). + gen_server:call(Pid, clear, infinity). + +sft_close(Pid) -> + gen_server:call(Pid, close, infinity). sft_checkready(Pid) -> gen_server:call(Pid, background_complete, infinity). @@ -264,36 +269,7 @@ sft_getfilename(Pid) -> sft_getmaxsequencenumber(Pid) -> gen_server:call(Pid, get_maxsqn, infinity). -%%%============================================================================ -%%% API helper functions -%%%============================================================================ -%% This saftey measure of checking the Pid is alive before perfoming any ops -%% is copied from the bitcask source code. -%% -%% It is not clear at present if this is necessary. - -file_request(Pid, Request) -> - case check_pid(Pid) of - ok -> - gen_server:call(Pid, Request, infinity); - Error -> - Error - end. - -check_pid(Pid) -> - IsPid = is_pid(Pid), - IsAlive = IsPid andalso is_process_alive(Pid), - case {IsAlive, IsPid} of - {true, _} -> - ok; - {false, true} -> - %% Same result as `file' module when accessing closed FD - {error, einval}; - _ -> - %% Same result as `file' module when providing wrong arg - {error, badarg} - end. %%%============================================================================ %%% gen_server callbacks @@ -363,8 +339,12 @@ handle_call({sft_new, Filename, KL1, KL2, Level}, _From, State) -> UpdFileMD#state{handle=ReadHandle, filename=Filename}} end; handle_call({sft_open, Filename}, _From, _State) -> - {_Handle, FileMD} = open_file(Filename), - {reply, {FileMD#state.smallest_key, FileMD#state.highest_key}, FileMD}; + {_Handle, FileMD} = open_file(#state{filename=Filename}), + io:format("Opened filename with name ~s~n", [Filename]), + {reply, + {ok, + {FileMD#state.smallest_key, FileMD#state.highest_key}}, + FileMD}; handle_call({get_kv, Key}, _From, State) -> Reply = fetch_keyvalue(State#state.handle, State, Key), {reply, Reply, State}; @@ -379,11 +359,9 @@ handle_call({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) -> ScanWidth), {reply, Reply, State}; handle_call(close, _From, State) -> - {reply, true, State}; + {stop, normal, ok, State}; handle_call(clear, _From, State) -> - ok = file:close(State#state.handle), - ok = file:delete(State#state.filename), - {reply, true, State}; + {stop, normal, ok, State#state{ready_for_delete=true}}; handle_call(background_complete, _From, State) -> case State#state.background_complete of true -> @@ -401,7 +379,7 @@ handle_call({set_for_delete, Penciller}, _From, State) -> ?DELETE_TIMEOUT}; handle_call(get_maxsqn, _From, State) -> {reply, State#state.highest_sqn, State}. - + handle_cast(_Msg, State) -> {noreply, State}. @@ -412,10 +390,6 @@ handle_info(timeout, State) -> State#state.filename) of true -> - io:format("Polled for deletion and now clearing ~s~n", - [State#state.filename]), - ok = file:close(State#state.handle), - ok = file:delete(State#state.filename), {stop, shutdown, State}; false -> io:format("Polled for deletion but ~s not ready~n", @@ -428,8 +402,18 @@ handle_info(timeout, State) -> handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, _State) -> - ok. +terminate(Reason, State) -> + io:format("Exit called for reason ~w on filename ~s~n", + [Reason, State#state.filename]), + case State#state.ready_for_delete of + true -> + io:format("Exit called and now clearing ~s~n", + [State#state.filename]), + ok = file:close(State#state.handle), + ok = file:delete(State#state.filename); + _ -> + ok = file:close(State#state.handle) + end. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -453,7 +437,8 @@ create_file(FileName) when is_list(FileName) -> FileMD = #state{next_position=StartPos, filename=FileName}, {Handle, FileMD}; {error, Reason} -> - io:format("Error opening filename ~s with reason ~s", [FileName, Reason]), + io:format("Error opening filename ~s with reason ~s", + [FileName, Reason]), {error, Reason} end. @@ -484,7 +469,8 @@ open_file(FileMD) -> Ilen:32/integer, Flen:32/integer, Slen:32/integer>> = HeaderLengths, - {ok, SummaryBin} = file:pread(Handle, ?HEADER_LEN + Blen + Ilen + Flen, Slen), + {ok, SummaryBin} = file:pread(Handle, + ?HEADER_LEN + Blen + Ilen + Flen, Slen), {{LowSQN, HighSQN}, {LowKey, HighKey}} = binary_to_term(SummaryBin), {ok, SlotIndexBin} = file:pread(Handle, ?HEADER_LEN + Blen, Ilen), SlotIndex = binary_to_term(SlotIndexBin), @@ -552,14 +538,17 @@ fetch_keyvalue(Handle, FileMD, Key) -> %% Fetches a range of keys returning a list of {Key, SeqN} tuples fetch_range_keysonly(Handle, FileMD, StartKey, EndKey) -> - fetch_range(Handle, FileMD, StartKey, EndKey, [], fun acc_list_keysonly/2). + fetch_range(Handle, FileMD, StartKey, EndKey, [], + fun acc_list_keysonly/2). fetch_range_keysonly(Handle, FileMD, StartKey, EndKey, ScanWidth) -> - fetch_range(Handle, FileMD, StartKey, EndKey, [], fun acc_list_keysonly/2, ScanWidth). + fetch_range(Handle, FileMD, StartKey, EndKey, [], + fun acc_list_keysonly/2, ScanWidth). %% Fetches a range of keys returning the full tuple, including value fetch_range_kv(Handle, FileMD, StartKey, EndKey, ScanWidth) -> - fetch_range(Handle, FileMD, StartKey, EndKey, [], fun acc_list_kv/2, ScanWidth). + fetch_range(Handle, FileMD, StartKey, EndKey, [], + fun acc_list_kv/2, ScanWidth). acc_list_keysonly(null, empty) -> []; @@ -594,39 +583,60 @@ acc_list_kv(R, RList) -> %% used - e.g. counters, hash-lists to build bloom filters etc fetch_range(Handle, FileMD, StartKey, EndKey, FunList, AccFun) -> - fetch_range(Handle, FileMD, StartKey, EndKey, FunList, AccFun, ?ITERATOR_SCANWIDTH). + fetch_range(Handle, FileMD, StartKey, EndKey, FunList, + AccFun, ?ITERATOR_SCANWIDTH). fetch_range(Handle, FileMD, StartKey, EndKey, FunList, AccFun, ScanWidth) -> - fetch_range(Handle, FileMD, StartKey, EndKey, FunList, AccFun, ScanWidth, empty). + fetch_range(Handle, FileMD, StartKey, EndKey, FunList, + AccFun, ScanWidth, empty). -fetch_range(_Handle, _FileMD, StartKey, _EndKey, _FunList, _AccFun, 0, Acc) -> +fetch_range(_Handle, _FileMD, StartKey, _EndKey, _FunList, + _AccFun, 0, Acc) -> {partial, Acc, StartKey}; -fetch_range(Handle, FileMD, StartKey, EndKey, FunList, AccFun, ScanWidth, Acc) -> +fetch_range(Handle, FileMD, StartKey, EndKey, FunList, + AccFun, ScanWidth, Acc) -> %% get_nearestkey gets the last key in the index <= StartKey, or the next %% key along if {next, StartKey} is passed case get_nearestkey(FileMD#state.slot_index, StartKey) of {NearestKey, _Filter, {LengthList, PointerB}} -> - fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, FunList, AccFun, ScanWidth, - LengthList, 0, PointerB + FileMD#state.slots_pointer, + fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, FunList, + AccFun, ScanWidth, + LengthList, + 0, + PointerB + FileMD#state.slots_pointer, AccFun(null, Acc)); not_found -> {complete, AccFun(null, Acc)} end. -fetch_range(Handle, FileMD, _StartKey, NearestKey, EndKey, FunList, AccFun, ScanWidth, - LengthList, BlockNumber, _Pointer, Acc) +fetch_range(Handle, FileMD, _StartKey, NearestKey, EndKey, FunList, + AccFun, ScanWidth, + LengthList, + BlockNumber, + _Pointer, + Acc) when length(LengthList) == BlockNumber -> %% Reached the end of the slot. Move the start key on one to scan a new slot - fetch_range(Handle, FileMD, {next, NearestKey}, EndKey, FunList, AccFun, ScanWidth - 1, Acc); -fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, FunList, AccFun, ScanWidth, - LengthList, BlockNumber, Pointer, Acc) -> + fetch_range(Handle, FileMD, {next, NearestKey}, EndKey, FunList, + AccFun, ScanWidth - 1, + Acc); +fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, FunList, + AccFun, ScanWidth, + LengthList, + BlockNumber, + Pointer, + Acc) -> Block = fetch_block(Handle, LengthList, BlockNumber, Pointer), Results = scan_block(Block, StartKey, EndKey, FunList, AccFun, Acc), case Results of {partial, Acc1, StartKey} -> %% Move on to the next block - fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, FunList, AccFun, ScanWidth, - LengthList, BlockNumber + 1, Pointer, Acc1); + fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, FunList, + AccFun, ScanWidth, + LengthList, + BlockNumber + 1, + Pointer, + Acc1); {complete, Acc1} -> {complete, Acc1} end. @@ -665,8 +675,8 @@ applyfuns([HeadFun|OtherFuns], KV) -> fetch_keyvalue_fromblock([], _Key, _LengthList, _Handle, _StartOfSlot) -> not_present; -fetch_keyvalue_fromblock([BlockNumber|T], Key, LengthList, Handle, StartOfSlot) -> - BlockToCheck = fetch_block(Handle, LengthList, BlockNumber, StartOfSlot), +fetch_keyvalue_fromblock([BlockNmb|T], Key, LengthList, Handle, StartOfSlot) -> + BlockToCheck = fetch_block(Handle, LengthList, BlockNmb, StartOfSlot), Result = lists:keyfind(Key, 1, BlockToCheck), case Result of false -> @@ -675,9 +685,9 @@ fetch_keyvalue_fromblock([BlockNumber|T], Key, LengthList, Handle, StartOfSlot) KV end. -fetch_block(Handle, LengthList, BlockNumber, StartOfSlot) -> - Start = lists:sum(lists:sublist(LengthList, BlockNumber)), - Length = lists:nth(BlockNumber + 1, LengthList), +fetch_block(Handle, LengthList, BlockNmb, StartOfSlot) -> + Start = lists:sum(lists:sublist(LengthList, BlockNmb)), + Length = lists:nth(BlockNmb + 1, LengthList), {ok, BlockToCheckBin} = file:pread(Handle, Start + StartOfSlot, Length), binary_to_term(BlockToCheckBin). @@ -1384,18 +1394,20 @@ generate_randomsegfilter(BlockSize) -> Block4})). +generate_randomkeys({Count, StartSQN}) -> + generate_randomkeys(Count, StartSQN, []); generate_randomkeys(Count) -> - generate_randomkeys(Count, []). + generate_randomkeys(Count, 0, []). -generate_randomkeys(0, Acc) -> +generate_randomkeys(0, _SQN, Acc) -> Acc; -generate_randomkeys(Count, Acc) -> +generate_randomkeys(Count, SQN, Acc) -> RandKey = {{o, lists:concat(["Bucket", random:uniform(1024)]), lists:concat(["Key", random:uniform(1024)])}, - Count + 1, + SQN, {active, infinity}, null}, - generate_randomkeys(Count - 1, [RandKey|Acc]). + generate_randomkeys(Count - 1, SQN + 1, [RandKey|Acc]). generate_sequentialkeys(Count, Start) -> generate_sequentialkeys(Count + Start, Start, []).