Startup and Shutdown Support

Added support for startup and shutdown of a Ledger.  As aprt of this
will now start tracking the highest sequence number.  This also adds a
safety check on pcl_pushmem to make sure that only keys with a higher
sequenc enumber are being pushed in - and hence we can happily insert
into the in-memory view without checking the sequence number.
This commit is contained in:
martinsumner 2016-08-15 16:43:39 +01:00
parent 6e56b569b8
commit 4586e2340a
3 changed files with 344 additions and 152 deletions

View file

@ -15,6 +15,7 @@
terminate/2,
clerk_new/1,
clerk_prompt/2,
clerk_stop/1,
code_change/3,
perform_merge/4]).
@ -23,8 +24,7 @@
-define(INACTIVITY_TIMEOUT, 2000).
-define(HAPPYTIME_MULTIPLIER, 5).
-record(state, {owner :: pid(),
in_backlog = false :: boolean()}).
-record(state, {owner :: pid()}).
%%%============================================================================
%%% API
@ -35,11 +35,13 @@ clerk_new(Owner) ->
ok = gen_server:call(Pid, {register, Owner}, infinity),
{ok, Pid}.
clerk_prompt(Pid, penciller) ->
gen_server:cast(Pid, penciller_prompt),
ok.
clerk_stop(Pid) ->
gen_server:cast(Pid, stop).
%%%============================================================================
%%% gen_server callbacks
%%%============================================================================
@ -48,11 +50,13 @@ init([]) ->
{ok, #state{}}.
handle_call({register, Owner}, _From, State) ->
{reply, ok, State#state{owner=Owner}}.
{reply, ok, State#state{owner=Owner}, ?INACTIVITY_TIMEOUT}.
handle_cast(penciller_prompt, State) ->
Timeout = requestandhandle_work(State),
{noreply, State, Timeout}.
{noreply, State, Timeout};
handle_cast(stop, State) ->
{stop, normal, State}.
handle_info(timeout, State) ->
%% The pcl prompt will cause a penciller_prompt, to re-trigger timeout
@ -90,10 +94,21 @@ requestandhandle_work(State) ->
{NewManifest, FilesToDelete} = merge(WI),
UpdWI = WI#penciller_work{new_manifest=NewManifest,
unreferenced_files=FilesToDelete},
leveled_penciller:pcl_requestmanifestchange(State#state.owner,
UpdWI),
mark_for_delete(FilesToDelete, State#state.owner),
?INACTIVITY_TIMEOUT
R = leveled_penciller:pcl_requestmanifestchange(State#state.owner,
UpdWI),
case R of
ok ->
%% Request for manifest change must be a synchronous call
%% Otherwise cannot mark files for deletion (may erase
%% without manifest change on close)
mark_for_delete(FilesToDelete, State#state.owner),
?INACTIVITY_TIMEOUT;
_ ->
%% New files will forever remain in an undetermined state
%% The disconnected files should be logged at start-up for
%% Manual clear-up
?INACTIVITY_TIMEOUT
end
end.

View file

@ -84,9 +84,8 @@
%% To initiate the Ledger the must consult the manifest, and then start a SFT
%% management process for each file in the manifest.
%%
%% The penciller should then try and read any persisted ETS table in the
%% on_shutdown folder. The Penciller must then discover the highest sequence
%% number in the ledger, and respond to the Bookie with that sequence number.
%% The penciller should then try and read any Level 0 file which has the
%% manifest sequence number one higher than the last store in the manifest.
%%
%% The Bookie will ask the Inker for any Keys seen beyond that sequence number
%% before the startup of the overall store can be completed.
@ -153,14 +152,15 @@
handle_info/2,
terminate/2,
code_change/3,
pcl_new/0,
pcl_start/1,
pcl_pushmem/2,
pcl_fetch/2,
pcl_workforclerk/1,
pcl_requestmanifestchange/2,
pcl_confirmdelete/2,
pcl_prompt/1]).
pcl_prompt/1,
pcl_close/1,
pcl_getstartupsequencenumber/1]).
-include_lib("eunit/include/eunit.hrl").
@ -182,6 +182,7 @@
-record(state, {manifest = [] :: list(),
ongoing_work = [] :: list(),
manifest_sqn = 0 :: integer(),
ledger_sqn = 0 :: integer(),
registered_iterators = [] :: list(),
unreferenced_files = [] :: list(),
root_path = "../test" :: string(),
@ -196,13 +197,8 @@
%%% API
%%%============================================================================
pcl_new() ->
gen_server:start(?MODULE, [], []).
pcl_start(RootDir) ->
{ok, Pid} = gen_server:start(?MODULE, [], []),
gen_server:call(Pid, {load, RootDir}, infinity),
ok.
gen_server:start(?MODULE, [RootDir], []).
pcl_pushmem(Pid, DumpList) ->
%% Bookie to dump memory onto penciller
@ -215,7 +211,7 @@ pcl_workforclerk(Pid) ->
gen_server:call(Pid, work_for_clerk, infinity).
pcl_requestmanifestchange(Pid, WorkItem) ->
gen_server:cast(Pid, {manifest_change, WorkItem}).
gen_server:call(Pid, {manifest_change, WorkItem}, infinity).
pcl_confirmdelete(Pid, FileName) ->
gen_server:call(Pid, {confirm_delete, FileName}).
@ -223,23 +219,116 @@ pcl_confirmdelete(Pid, FileName) ->
pcl_prompt(Pid) ->
gen_server:call(Pid, prompt_compaction).
pcl_getstartupsequencenumber(Pid) ->
gen_server:call(Pid, get_startup_sqn).
pcl_close(Pid) ->
gen_server:call(Pid, close).
%%%============================================================================
%%% gen_server callbacks
%%%============================================================================
init([]) ->
init([RootPath]) ->
TID = ets:new(?MEMTABLE, [ordered_set, private]),
{ok, Clerk} = leveled_clerk:clerk_new(self()),
{ok, #state{memtable=TID, clerk=Clerk}}.
InitState = #state{memtable=TID, clerk=Clerk, root_path=RootPath},
%% Open manifest
ManifestPath = InitState#state.root_path ++ "/" ++ ?MANIFEST_FP ++ "/",
{ok, Filenames} = file:list_dir(ManifestPath),
CurrRegex = "nonzero_(?<MSN>[0-9]+)\\." ++ ?CURRENT_FILEX,
ValidManSQNs = lists:foldl(fun(FN, Acc) ->
case re:run(FN,
CurrRegex,
[{capture, ['MSN'], list}]) of
nomatch ->
Acc;
{match, [Int]} when is_list(Int) ->
Acc ++ [list_to_integer(Int)];
_ ->
Acc
end end,
[],
Filenames),
TopManSQN = lists:foldl(fun(X, MaxSQN) -> max(X, MaxSQN) end,
0,
ValidManSQNs),
io:format("Store to be started based on " ++
"manifest sequence number of ~w~n", [TopManSQN]),
case TopManSQN of
0 ->
io:format("Seqence number of 0 indicates no valid manifest~n"),
{ok, InitState};
_ ->
{ok, Bin} = file:read_file(filepath(InitState#state.root_path,
TopManSQN,
current_manifest)),
Manifest = binary_to_term(Bin),
{UpdManifest, MaxSQN} = open_all_filesinmanifest(Manifest),
io:format("Maximum sequence number of ~w "
++ "found in nonzero levels~n",
[MaxSQN]),
%% TODO
%% Find any L0 File left outstanding
L0FN = filepath(RootPath,
TopManSQN + 1,
new_merge_files) ++ "_0_0.sft",
case filelib:is_file(L0FN) of
true ->
io:format("L0 file found ~s~n", [L0FN]),
{ok,
L0Pid,
{L0StartKey, L0EndKey}} = leveled_sft:sft_open(L0FN),
L0SQN = leveled_sft:sft_getmaxsequencenumber(L0Pid),
ManifestEntry = #manifest_entry{start_key=L0StartKey,
end_key=L0EndKey,
owner=L0Pid,
filename=L0FN},
UpdManifest2 = lists:keystore(0,
1,
UpdManifest,
{0, [ManifestEntry]}),
io:format("L0 file had maximum sequence number of ~w~n",
[L0SQN]),
{ok,
InitState#state{manifest=UpdManifest2,
manifest_sqn=TopManSQN,
ledger_sqn=max(MaxSQN, L0SQN)}};
false ->
io:format("No L0 file found~n"),
{ok,
InitState#state{manifest=UpdManifest,
manifest_sqn=TopManSQN,
ledger_sqn=MaxSQN}}
end
end.
handle_call({push_mem, DumpList}, _From, State) ->
case push_to_memory(DumpList, State) of
{ok, UpdState} ->
{reply, ok, UpdState};
{{pause, Reason, Details}, UpdState} ->
io:format("Excess work due to - " ++ Reason, Details),
{reply, pause, UpdState#state{backlog=true}}
end;
StartWatch = os:timestamp(),
Response = case assess_sqn(DumpList) of
{MinSQN, MaxSQN} when MaxSQN > MinSQN,
MinSQN >= State#state.ledger_sqn ->
case push_to_memory(DumpList, State) of
{ok, UpdState} ->
{reply, ok, UpdState};
{{pause, Reason, Details}, UpdState} ->
io:format("Excess work due to - " ++ Reason, Details),
{reply, pause, UpdState#state{backlog=true,
ledger_sqn=MaxSQN}}
end;
{MinSQN, MaxSQN} ->
io:format("Mismatch of sequence number expectations with push "
++ "having sequence numbers between ~w and ~w "
++ "but current sequence number is ~w~n",
[MinSQN, MaxSQN, State#state.ledger_sqn]),
{reply, refused, State}
end,
io:format("Push completed in ~w microseconds~n",
[timer:now_diff(os:timestamp(),StartWatch)]),
Response;
handle_call({fetch, Key}, _From, State) ->
{reply, fetch(Key, State#state.manifest, State#state.memtable), State};
handle_call(work_for_clerk, From, State) ->
@ -249,14 +338,13 @@ handle_call({confirm_delete, FileName}, _From, State) ->
Reply = confirm_delete(FileName,
State#state.unreferenced_files,
State#state.registered_iterators),
{reply, Reply, State};
handle_call({load, RootDir}, _From, State) ->
{Manifest, ManifestSQN} = load_manifest(RootDir),
{UpdManifest, MaxSQN} = load_allsft(RootDir, Manifest),
{UpdMaxSQN} = load_levelzero(RootDir, MaxSQN),
{reply, UpdMaxSQN, State#state{root_path=RootDir,
manifest=UpdManifest,
manifest_sqn=ManifestSQN}};
case Reply of
true ->
UF1 = lists:keydelete(FileName, 1, State#state.unreferenced_files),
{reply, true, State#state{unreferenced_files=UF1}};
_ ->
{reply, Reply, State}
end;
handle_call(prompt_compaction, _From, State) ->
case push_to_memory([], State) of
{ok, UpdState} ->
@ -264,17 +352,66 @@ handle_call(prompt_compaction, _From, State) ->
{{pause, Reason, Details}, UpdState} ->
io:format("Excess work due to - " ++ Reason, Details),
{reply, pause, UpdState#state{backlog=true}}
end.
handle_cast({manifest_change, WI}, State) ->
end;
handle_call({manifest_change, WI}, _From, State) ->
{ok, UpdState} = commit_manifest_change(WI, State),
{noreply, UpdState}.
{reply, ok, UpdState};
handle_call(get_startup_sqn, _From, State) ->
{reply, State#state.ledger_sqn, State};
handle_call(close, _From, State) ->
{stop, normal, ok, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
terminate(_Reason, State) ->
%% When a Penciller shuts down it isn't safe to try an manage the safe
%% finishing of any outstanding work. The last commmitted manifest will
%% be used.
%%
%% Level 0 files lie outside of the manifest, and so if there is no L0
%% file present it is safe to write the current contents of memory. If
%% there is a L0 file present - then the memory can be dropped (it is
%% recoverable from the ledger, and there should not be a lot to recover
%% as presumably the ETS file has been recently flushed, hence the presence
%% of a L0 file).
%%
%% The penciller should close each file in the unreferenced files, and
%% then each file in the manifest, and cast a close on the clerk.
%% The cast may not succeed as the clerk could be synchronously calling
%% the penciller looking for a manifest commit
%%
leveled_clerk:clerk_stop(State#state.clerk),
Dump = ets:tab2list(State#state.memtable),
case {State#state.levelzero_pending,
get_item(0, State#state.manifest, []), length(Dump)} of
{{false, _, _}, [], L} when L > 0 ->
MSN = State#state.manifest_sqn + 1,
FileName = State#state.root_path
++ "/" ++ ?FILES_FP ++ "/"
++ integer_to_list(MSN) ++ "_0_0",
{ok,
L0Pid,
{{KR1, _}, _SK, _HK}} = leveled_sft:sft_new(FileName ++ ".pnd",
Dump,
[],
0),
io:format("Dump of memory on close to filename ~s with"
++ " remainder ~w~n", [FileName, length(KR1)]),
leveled_sft:sft_close(L0Pid),
file:rename(FileName ++ ".pnd", FileName ++ ".sft");
{{false, _, _}, [], L} when L == 0 ->
io:format("No keys to dump from memory when closing~n");
_ ->
io:format("No opportunity to persist memory before closing "
++ "with ~w keys discarded~n", [length(Dump)])
end,
ok = close_files(0, State#state.manifest),
lists:foreach(fun({_FN, Pid, _SN}) -> leveled_sft:sft_close(Pid) end,
State#state.unreferenced_files),
ok.
code_change(_OldVsn, State, _Extra) ->
@ -288,7 +425,7 @@ code_change(_OldVsn, State, _Extra) ->
push_to_memory(DumpList, State) ->
{TableSize, UpdState} = case State#state.levelzero_pending of
{true, Remainder, {StartKey, EndKey, Pid}} ->
%% Need to handle not error scenarios?
%% Need to handle error scenarios?
%% N.B. Sync call - so will be ready
{ok, SrcFN} = leveled_sft:sft_checkready(Pid),
%% Reset ETS, but re-insert any remainder
@ -429,6 +566,7 @@ manifest_locked(State) ->
end.
%% Work out what the current work queue should be
%%
%% The work queue should have a lower level work at the front, and no work
@ -485,8 +623,41 @@ return_work(State, From) ->
end.
close_files(?MAX_LEVELS - 1, _Manifest) ->
ok;
close_files(Level, Manifest) ->
LevelList = get_item(Level, Manifest, []),
lists:foreach(fun(F) -> leveled_sft:sft_close(F#manifest_entry.owner) end,
LevelList),
close_files(Level + 1, Manifest).
open_all_filesinmanifest(Manifest) ->
open_all_filesinmanifest({Manifest, 0}, 0).
open_all_filesinmanifest(Result, ?MAX_LEVELS - 1) ->
Result;
open_all_filesinmanifest({Manifest, TopSQN}, Level) ->
LevelList = get_item(Level, Manifest, []),
%% The Pids in the saved manifest related to now closed references
%% Need to roll over the manifest at this level starting new processes to
%5 replace them
LvlR = lists:foldl(fun(F, {FL, FL_SQN}) ->
FN = F#manifest_entry.filename,
{ok, P, _Keys} = leveled_sft:sft_open(FN),
F_SQN = leveled_sft:sft_getmaxsequencenumber(P),
{lists:append(FL,
[F#manifest_entry{owner = P}]),
max(FL_SQN, F_SQN)}
end,
{[], 0},
LevelList),
%% Result is tuple of revised file list for this level in manifest, and
%% the maximum sequence number seen at this level
{LvlFL, LvlSQN} = LvlR,
UpdManifest = lists:keystore(Level, 1, Manifest, {Level, LvlFL}),
open_all_filesinmanifest({UpdManifest, max(TopSQN, LvlSQN)}, Level + 1).
assess_workqueue(WorkQ, ?MAX_LEVELS - 1, _Manifest) ->
WorkQ;
assess_workqueue(WorkQ, LevelToAssess, Manifest)->
@ -539,8 +710,8 @@ commit_manifest_change(ReturnedWorkItem, State) ->
{NewMSN, _From} ->
MTime = timer:now_diff(os:timestamp(),
SentWorkItem#penciller_work.start_time),
io:format("Merge to sqn ~w completed in ~w microseconds
at Level ~w~n",
io:format("Merge to sqn ~w completed in ~w microseconds " ++
"at Level ~w~n",
[SentWorkItem#penciller_work.next_sqn,
MTime,
SentWorkItem#penciller_work.src_level]),
@ -558,8 +729,8 @@ commit_manifest_change(ReturnedWorkItem, State) ->
manifest=NewManifest,
unreferenced_files=UnreferencedFilesUpd}};
{MaybeWrongMSN, From} ->
io:format("Merge commit at sqn ~w not matched to expected
sqn ~w from Clerk ~w~n",
io:format("Merge commit at sqn ~w not matched to expected" ++
" sqn ~w from Clerk ~w~n",
[NewMSN, MaybeWrongMSN, From]),
{error, State}
end.
@ -586,43 +757,36 @@ update_deletions([ClearedFile|Tail], MSN, UnreferencedFiles) ->
update_deletions(Tail,
MSN,
lists:append(UnreferencedFiles,
[{ClearedFile#manifest_entry.filename, MSN}])).
[{ClearedFile#manifest_entry.filename,
ClearedFile#manifest_entry.owner,
MSN}])).
confirm_delete(Filename, UnreferencedFiles, RegisteredIterators) ->
case lists:keyfind(Filename, 1, UnreferencedFiles) of
false ->
false;
{Filename, MSN} ->
{Filename, _Pid, MSN} ->
LowSQN = lists:foldl(fun({_, SQN}, MinSQN) -> min(SQN, MinSQN) end,
infinity,
RegisteredIterators),
if
MSN >= LowSQN -> false;
true -> true
MSN >= LowSQN ->
false;
true ->
true
end
end.
%% load_manifest(RootDir),
%% {UpdManifest, MaxSQN} = load_allsft(RootDir, Manifest),
%% Level0SQN, UpdMaxSQN} = load_levelzero(RootDir, MaxSQN)
load_manifest(_RootDir) ->
{{}, 0}.
assess_sqn(DumpList) ->
assess_sqn(DumpList, infinity, 0).
load_allsft(_RootDir, _Manifest) ->
%% Manifest has been persisted with PIDs that are no longer
%% valid, roll through each entry opening files and replacing Pids in
%% Manifest
{{}, 0}.
load_levelzero(_RootDir, _MaxSQN) ->
%% When loading L0 manifest make sure that the lowest sequence number in
%% the L0 manifest is bigger than the highest in all levels below
%% - not True
%% ... what about key remainders?
%% ... need to rethink L0
{0, 0}.
assess_sqn([], MinSQN, MaxSQN) ->
{MinSQN, MaxSQN};
assess_sqn([HeadKey|Tail], MinSQN, MaxSQN) ->
{_K, SQN} = leveled_sft:strip_to_key_seqn_only(HeadKey),
assess_sqn(Tail, min(MinSQN, SQN), max(MaxSQN, SQN)).
%%%============================================================================
@ -651,7 +815,8 @@ compaction_work_assessment_test() ->
confirm_delete_test() ->
Filename = 'test.sft',
UnreferencedFiles = [{'other.sft', 15}, {Filename, 10}],
UnreferencedFiles = [{'other.sft', dummy_owner, 15},
{Filename, dummy_owner, 10}],
RegisteredIterators1 = [{dummy_pid, 16}, {dummy_pid, 12}],
R1 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators1),
?assertMatch(R1, true),

View file

@ -163,6 +163,7 @@
sft_setfordelete/2,
sft_getmaxsequencenumber/1,
strip_to_keyonly/1,
strip_to_key_seqn_only/1,
generate_randomkeys/1]).
-include_lib("eunit/include/eunit.hrl").
@ -231,29 +232,33 @@ sft_new(Filename, KL1, KL2, Level, Options) ->
sft_open(Filename) ->
{ok, Pid} = gen_server:start(?MODULE, [], []),
case gen_server:call(Pid, {sft_open, Filename}, infinity) of
ok ->
{ok, Pid};
{ok, {SK, EK}} ->
{ok, Pid, {SK, EK}};
Error ->
Error
end.
sft_setfordelete(Pid, Penciller) ->
file_request(Pid, {set_for_delete, Penciller}).
gen_server:call(Pid, {set_for_delete, Penciller}, infinity).
sft_get(Pid, Key) ->
file_request(Pid, {get_kv, Key}).
gen_server:call(Pid, {get_kv, Key}, infinity).
sft_getkeyrange(Pid, StartKey, EndKey, ScanWidth) ->
file_request(Pid, {get_keyrange, StartKey, EndKey, ScanWidth}).
gen_server:call(Pid,
{get_keyrange, StartKey, EndKey, ScanWidth},
infinity).
sft_getkvrange(Pid, StartKey, EndKey, ScanWidth) ->
file_request(Pid, {get_kvrange, StartKey, EndKey, ScanWidth}).
sft_close(Pid) ->
file_request(Pid, close).
gen_server:call(Pid,
{get_kvrange, StartKey, EndKey, ScanWidth},
infinity).
sft_clear(Pid) ->
file_request(Pid, clear).
gen_server:call(Pid, clear, infinity).
sft_close(Pid) ->
gen_server:call(Pid, close, infinity).
sft_checkready(Pid) ->
gen_server:call(Pid, background_complete, infinity).
@ -264,36 +269,7 @@ sft_getfilename(Pid) ->
sft_getmaxsequencenumber(Pid) ->
gen_server:call(Pid, get_maxsqn, infinity).
%%%============================================================================
%%% API helper functions
%%%============================================================================
%% This saftey measure of checking the Pid is alive before perfoming any ops
%% is copied from the bitcask source code.
%%
%% It is not clear at present if this is necessary.
file_request(Pid, Request) ->
case check_pid(Pid) of
ok ->
gen_server:call(Pid, Request, infinity);
Error ->
Error
end.
check_pid(Pid) ->
IsPid = is_pid(Pid),
IsAlive = IsPid andalso is_process_alive(Pid),
case {IsAlive, IsPid} of
{true, _} ->
ok;
{false, true} ->
%% Same result as `file' module when accessing closed FD
{error, einval};
_ ->
%% Same result as `file' module when providing wrong arg
{error, badarg}
end.
%%%============================================================================
%%% gen_server callbacks
@ -363,8 +339,12 @@ handle_call({sft_new, Filename, KL1, KL2, Level}, _From, State) ->
UpdFileMD#state{handle=ReadHandle, filename=Filename}}
end;
handle_call({sft_open, Filename}, _From, _State) ->
{_Handle, FileMD} = open_file(Filename),
{reply, {FileMD#state.smallest_key, FileMD#state.highest_key}, FileMD};
{_Handle, FileMD} = open_file(#state{filename=Filename}),
io:format("Opened filename with name ~s~n", [Filename]),
{reply,
{ok,
{FileMD#state.smallest_key, FileMD#state.highest_key}},
FileMD};
handle_call({get_kv, Key}, _From, State) ->
Reply = fetch_keyvalue(State#state.handle, State, Key),
{reply, Reply, State};
@ -379,11 +359,9 @@ handle_call({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) ->
ScanWidth),
{reply, Reply, State};
handle_call(close, _From, State) ->
{reply, true, State};
{stop, normal, ok, State};
handle_call(clear, _From, State) ->
ok = file:close(State#state.handle),
ok = file:delete(State#state.filename),
{reply, true, State};
{stop, normal, ok, State#state{ready_for_delete=true}};
handle_call(background_complete, _From, State) ->
case State#state.background_complete of
true ->
@ -401,7 +379,7 @@ handle_call({set_for_delete, Penciller}, _From, State) ->
?DELETE_TIMEOUT};
handle_call(get_maxsqn, _From, State) ->
{reply, State#state.highest_sqn, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
@ -412,10 +390,6 @@ handle_info(timeout, State) ->
State#state.filename)
of
true ->
io:format("Polled for deletion and now clearing ~s~n",
[State#state.filename]),
ok = file:close(State#state.handle),
ok = file:delete(State#state.filename),
{stop, shutdown, State};
false ->
io:format("Polled for deletion but ~s not ready~n",
@ -428,8 +402,18 @@ handle_info(timeout, State) ->
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
terminate(Reason, State) ->
io:format("Exit called for reason ~w on filename ~s~n",
[Reason, State#state.filename]),
case State#state.ready_for_delete of
true ->
io:format("Exit called and now clearing ~s~n",
[State#state.filename]),
ok = file:close(State#state.handle),
ok = file:delete(State#state.filename);
_ ->
ok = file:close(State#state.handle)
end.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@ -453,7 +437,8 @@ create_file(FileName) when is_list(FileName) ->
FileMD = #state{next_position=StartPos, filename=FileName},
{Handle, FileMD};
{error, Reason} ->
io:format("Error opening filename ~s with reason ~s", [FileName, Reason]),
io:format("Error opening filename ~s with reason ~s",
[FileName, Reason]),
{error, Reason}
end.
@ -484,7 +469,8 @@ open_file(FileMD) ->
Ilen:32/integer,
Flen:32/integer,
Slen:32/integer>> = HeaderLengths,
{ok, SummaryBin} = file:pread(Handle, ?HEADER_LEN + Blen + Ilen + Flen, Slen),
{ok, SummaryBin} = file:pread(Handle,
?HEADER_LEN + Blen + Ilen + Flen, Slen),
{{LowSQN, HighSQN}, {LowKey, HighKey}} = binary_to_term(SummaryBin),
{ok, SlotIndexBin} = file:pread(Handle, ?HEADER_LEN + Blen, Ilen),
SlotIndex = binary_to_term(SlotIndexBin),
@ -552,14 +538,17 @@ fetch_keyvalue(Handle, FileMD, Key) ->
%% Fetches a range of keys returning a list of {Key, SeqN} tuples
fetch_range_keysonly(Handle, FileMD, StartKey, EndKey) ->
fetch_range(Handle, FileMD, StartKey, EndKey, [], fun acc_list_keysonly/2).
fetch_range(Handle, FileMD, StartKey, EndKey, [],
fun acc_list_keysonly/2).
fetch_range_keysonly(Handle, FileMD, StartKey, EndKey, ScanWidth) ->
fetch_range(Handle, FileMD, StartKey, EndKey, [], fun acc_list_keysonly/2, ScanWidth).
fetch_range(Handle, FileMD, StartKey, EndKey, [],
fun acc_list_keysonly/2, ScanWidth).
%% Fetches a range of keys returning the full tuple, including value
fetch_range_kv(Handle, FileMD, StartKey, EndKey, ScanWidth) ->
fetch_range(Handle, FileMD, StartKey, EndKey, [], fun acc_list_kv/2, ScanWidth).
fetch_range(Handle, FileMD, StartKey, EndKey, [],
fun acc_list_kv/2, ScanWidth).
acc_list_keysonly(null, empty) ->
[];
@ -594,39 +583,60 @@ acc_list_kv(R, RList) ->
%% used - e.g. counters, hash-lists to build bloom filters etc
fetch_range(Handle, FileMD, StartKey, EndKey, FunList, AccFun) ->
fetch_range(Handle, FileMD, StartKey, EndKey, FunList, AccFun, ?ITERATOR_SCANWIDTH).
fetch_range(Handle, FileMD, StartKey, EndKey, FunList,
AccFun, ?ITERATOR_SCANWIDTH).
fetch_range(Handle, FileMD, StartKey, EndKey, FunList, AccFun, ScanWidth) ->
fetch_range(Handle, FileMD, StartKey, EndKey, FunList, AccFun, ScanWidth, empty).
fetch_range(Handle, FileMD, StartKey, EndKey, FunList,
AccFun, ScanWidth, empty).
fetch_range(_Handle, _FileMD, StartKey, _EndKey, _FunList, _AccFun, 0, Acc) ->
fetch_range(_Handle, _FileMD, StartKey, _EndKey, _FunList,
_AccFun, 0, Acc) ->
{partial, Acc, StartKey};
fetch_range(Handle, FileMD, StartKey, EndKey, FunList, AccFun, ScanWidth, Acc) ->
fetch_range(Handle, FileMD, StartKey, EndKey, FunList,
AccFun, ScanWidth, Acc) ->
%% get_nearestkey gets the last key in the index <= StartKey, or the next
%% key along if {next, StartKey} is passed
case get_nearestkey(FileMD#state.slot_index, StartKey) of
{NearestKey, _Filter, {LengthList, PointerB}} ->
fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, FunList, AccFun, ScanWidth,
LengthList, 0, PointerB + FileMD#state.slots_pointer,
fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, FunList,
AccFun, ScanWidth,
LengthList,
0,
PointerB + FileMD#state.slots_pointer,
AccFun(null, Acc));
not_found ->
{complete, AccFun(null, Acc)}
end.
fetch_range(Handle, FileMD, _StartKey, NearestKey, EndKey, FunList, AccFun, ScanWidth,
LengthList, BlockNumber, _Pointer, Acc)
fetch_range(Handle, FileMD, _StartKey, NearestKey, EndKey, FunList,
AccFun, ScanWidth,
LengthList,
BlockNumber,
_Pointer,
Acc)
when length(LengthList) == BlockNumber ->
%% Reached the end of the slot. Move the start key on one to scan a new slot
fetch_range(Handle, FileMD, {next, NearestKey}, EndKey, FunList, AccFun, ScanWidth - 1, Acc);
fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, FunList, AccFun, ScanWidth,
LengthList, BlockNumber, Pointer, Acc) ->
fetch_range(Handle, FileMD, {next, NearestKey}, EndKey, FunList,
AccFun, ScanWidth - 1,
Acc);
fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, FunList,
AccFun, ScanWidth,
LengthList,
BlockNumber,
Pointer,
Acc) ->
Block = fetch_block(Handle, LengthList, BlockNumber, Pointer),
Results = scan_block(Block, StartKey, EndKey, FunList, AccFun, Acc),
case Results of
{partial, Acc1, StartKey} ->
%% Move on to the next block
fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, FunList, AccFun, ScanWidth,
LengthList, BlockNumber + 1, Pointer, Acc1);
fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, FunList,
AccFun, ScanWidth,
LengthList,
BlockNumber + 1,
Pointer,
Acc1);
{complete, Acc1} ->
{complete, Acc1}
end.
@ -665,8 +675,8 @@ applyfuns([HeadFun|OtherFuns], KV) ->
fetch_keyvalue_fromblock([], _Key, _LengthList, _Handle, _StartOfSlot) ->
not_present;
fetch_keyvalue_fromblock([BlockNumber|T], Key, LengthList, Handle, StartOfSlot) ->
BlockToCheck = fetch_block(Handle, LengthList, BlockNumber, StartOfSlot),
fetch_keyvalue_fromblock([BlockNmb|T], Key, LengthList, Handle, StartOfSlot) ->
BlockToCheck = fetch_block(Handle, LengthList, BlockNmb, StartOfSlot),
Result = lists:keyfind(Key, 1, BlockToCheck),
case Result of
false ->
@ -675,9 +685,9 @@ fetch_keyvalue_fromblock([BlockNumber|T], Key, LengthList, Handle, StartOfSlot)
KV
end.
fetch_block(Handle, LengthList, BlockNumber, StartOfSlot) ->
Start = lists:sum(lists:sublist(LengthList, BlockNumber)),
Length = lists:nth(BlockNumber + 1, LengthList),
fetch_block(Handle, LengthList, BlockNmb, StartOfSlot) ->
Start = lists:sum(lists:sublist(LengthList, BlockNmb)),
Length = lists:nth(BlockNmb + 1, LengthList),
{ok, BlockToCheckBin} = file:pread(Handle, Start + StartOfSlot, Length),
binary_to_term(BlockToCheckBin).
@ -1384,18 +1394,20 @@ generate_randomsegfilter(BlockSize) ->
Block4})).
generate_randomkeys({Count, StartSQN}) ->
generate_randomkeys(Count, StartSQN, []);
generate_randomkeys(Count) ->
generate_randomkeys(Count, []).
generate_randomkeys(Count, 0, []).
generate_randomkeys(0, Acc) ->
generate_randomkeys(0, _SQN, Acc) ->
Acc;
generate_randomkeys(Count, Acc) ->
generate_randomkeys(Count, SQN, Acc) ->
RandKey = {{o,
lists:concat(["Bucket", random:uniform(1024)]),
lists:concat(["Key", random:uniform(1024)])},
Count + 1,
SQN,
{active, infinity}, null},
generate_randomkeys(Count - 1, [RandKey|Acc]).
generate_randomkeys(Count - 1, SQN + 1, [RandKey|Acc]).
generate_sequentialkeys(Count, Start) ->
generate_sequentialkeys(Count + Start, Start, []).