Switch SFT to fsm

To amke it easier to context switch between the two file formats, make
the SFT file clerk a FSM like the CDB one.
This commit is contained in:
martinsumner 2016-11-07 17:08:50 +00:00
parent aacf377f0a
commit 05338a1e85
2 changed files with 147 additions and 124 deletions

View file

@ -1,21 +1,19 @@
%% -------- CDB File Clerk ---------
%%
%% This is a modified version of the cdb module provided by Tom Whitcomb.
%%
%% - https://github.com/thomaswhitcomb/erlang-cdb
%%
%% The CDB module is an implementation of the constant database format
%% described by DJ Bernstein
%%
%% - https://cr.yp.to/cdb.html
%%
%% The primary differences are:
%% - Support for incrementally writing a CDB file while keeping the hash table
%% in memory
%% - The ability to scan a database and accumulate all the Key, Values to
%% rebuild in-memory tables on startup
%% - The ability to scan a database in blocks of sequence numbers
%%
%% This is to be used in eleveledb, and in this context:
%% - Keys will be a combinatio of the PrimaryKey and the Sequence Number
%% - Values will be a serialised version on the whole object, and the
%% IndexChanges associated with the transaction
%% Where the IndexChanges are all the Key changes required to be added to the
%% ledger to complete the changes (the addition of postings and tombstones).
%% - The applictaion of a CRC chekc by default to all values
%%
%% This module provides functions to create and query a CDB (constant database).
%% A CDB implements a two-level hashtable which provides fast {key,value}

View file

@ -142,16 +142,22 @@
-module(leveled_sft).
-behaviour(gen_server).
-behaviour(gen_fsm).
-include("include/leveled.hrl").
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3,
sft_new/4,
handle_sync_event/4,
handle_event/3,
handle_info/3,
terminate/3,
code_change/4,
starting/2,
starting/3,
reader/3,
delete_pending/3,
delete_pending/2]).
-export([sft_new/4,
sft_newfroml0cache/4,
sft_open/1,
sft_get/2,
@ -161,8 +167,9 @@
sft_checkready/1,
sft_setfordelete/2,
sft_deleteconfirmed/1,
sft_getmaxsequencenumber/1,
generate_randomkeys/1]).
sft_getmaxsequencenumber/1]).
-export([generate_randomkeys/1]).
-include_lib("eunit/include/eunit.hrl").
@ -202,7 +209,6 @@
handle :: file:fd(),
background_complete = false :: boolean(),
oversized_file = false :: boolean(),
ready_for_delete = false ::boolean(),
penciller :: pid()}).
@ -221,65 +227,68 @@ sft_new(Filename, KL1, KL2, LevelInfo) ->
LevelInfo
end
end,
{ok, Pid} = gen_server:start(?MODULE, [], []),
Reply = gen_server:call(Pid,
{sft_new, Filename, KL1, KL2, LevelR},
infinity),
{ok, Pid} = gen_fsm:start(?MODULE, [], []),
Reply = gen_fsm:sync_send_event(Pid,
{sft_new, Filename, KL1, KL2, LevelR},
infinity),
{ok, Pid, Reply}.
sft_newfroml0cache(Filename, Slots, FetchFun, Options) ->
{ok, Pid} = gen_server:start(?MODULE, [], []),
{ok, Pid} = gen_fsm:start(?MODULE, [], []),
case Options#sft_options.wait of
true ->
KL1 = leveled_pmem:to_list(Slots, FetchFun),
Reply = gen_server:call(Pid,
{sft_new,
Filename,
KL1,
[],
#level{level=0}},
infinity),
Reply = gen_fsm:sync_send_event(Pid,
{sft_new,
Filename,
KL1,
[],
#level{level=0}},
infinity),
{ok, Pid, Reply};
false ->
gen_server:cast(Pid,
{sft_newfroml0cache,
Filename,
Slots,
FetchFun,
Options#sft_options.penciller}),
gen_fsm:send_event(Pid,
{sft_newfroml0cache,
Filename,
Slots,
FetchFun,
Options#sft_options.penciller}),
{ok, Pid, noreply}
end.
sft_open(Filename) ->
{ok, Pid} = gen_server:start(?MODULE, [], []),
case gen_server:call(Pid, {sft_open, Filename}, infinity) of
{ok, Pid} = gen_fsm:start(?MODULE, [], []),
case gen_fsm:sync_send_event(Pid, {sft_open, Filename}, infinity) of
{ok, {SK, EK}} ->
{ok, Pid, {SK, EK}}
end.
sft_setfordelete(Pid, Penciller) ->
gen_server:call(Pid, {set_for_delete, Penciller}, infinity).
gen_fsm:sync_send_event(Pid, {set_for_delete, Penciller}, infinity).
sft_get(Pid, Key) ->
gen_server:call(Pid, {get_kv, Key}, infinity).
gen_fsm:sync_send_event(Pid, {get_kv, Key}, infinity).
sft_getkvrange(Pid, StartKey, EndKey, ScanWidth) ->
gen_server:call(Pid, {get_kvrange, StartKey, EndKey, ScanWidth}, infinity).
gen_fsm:sync_send_event(Pid,
{get_kvrange, StartKey, EndKey, ScanWidth},
infinity).
sft_clear(Pid) ->
gen_server:call(Pid, clear, infinity).
gen_fsm:sync_send_event(Pid, {set_for_delete, false}, infinity),
gen_fsm:sync_send_event(Pid, close, 1000).
sft_close(Pid) ->
gen_server:call(Pid, close, 1000).
gen_fsm:sync_send_event(Pid, close, 1000).
sft_deleteconfirmed(Pid) ->
gen_server:cast(Pid, close).
gen_fsm:send_event(Pid, close).
sft_checkready(Pid) ->
gen_server:call(Pid, background_complete, 20).
gen_fsm:sync_send_event(Pid, background_complete, 20).
sft_getmaxsequencenumber(Pid) ->
gen_server:call(Pid, get_maxsqn, infinity).
gen_fsm:sync_send_event(Pid, get_maxsqn, infinity).
@ -288,52 +297,75 @@ sft_getmaxsequencenumber(Pid) ->
%%%============================================================================
init([]) ->
{ok, #state{}}.
{ok, starting, #state{}}.
handle_call({sft_new, Filename, KL1, [], _LevelR=#level{level=L}},
_From,
_State) when L == 0 ->
starting({sft_new, Filename, KL1, [], _LevelR=#level{level=L}}, _From, _State)
when L == 0 ->
{ok, State} = create_levelzero(KL1, Filename),
{reply,
{{[], []},
State#state.smallest_key,
State#state.highest_key},
{{[], []}, State#state.smallest_key, State#state.highest_key},
reader,
State};
handle_call({sft_new, Filename, KL1, KL2, LevelR}, _From, _State) ->
starting({sft_new, Filename, KL1, KL2, LevelR}, _From, _State) ->
case create_file(Filename) of
{Handle, FileMD} ->
{ReadHandle, UpdFileMD, KeyRemainders} = complete_file(Handle,
FileMD,
KL1, KL2,
LevelR),
{reply, {KeyRemainders,
UpdFileMD#state.smallest_key,
UpdFileMD#state.highest_key},
UpdFileMD#state{handle=ReadHandle, filename=Filename}}
{reply,
{KeyRemainders,
UpdFileMD#state.smallest_key,
UpdFileMD#state.highest_key},
reader,
UpdFileMD#state{handle=ReadHandle, filename=Filename}}
end;
handle_call({sft_open, Filename}, _From, _State) ->
starting({sft_open, Filename}, _From, _State) ->
{_Handle, FileMD} = open_file(#state{filename=Filename}),
leveled_log:log("SFT01", [Filename]),
{reply,
{ok,
{FileMD#state.smallest_key, FileMD#state.highest_key}},
FileMD};
handle_call({get_kv, Key}, _From, State) ->
{ok, {FileMD#state.smallest_key, FileMD#state.highest_key}},
reader,
FileMD}.
starting({sft_newfroml0cache, Filename, Slots, FetchFun, PCL}, _State) ->
SW = os:timestamp(),
Inp1 = leveled_pmem:to_list(Slots, FetchFun),
{ok, State} = create_levelzero(Inp1, Filename),
leveled_log:log_timer("SFT03", [Filename], SW),
case PCL of
undefined ->
{next_state, reader, State};
_ ->
leveled_penciller:pcl_confirml0complete(PCL,
State#state.filename,
State#state.smallest_key,
State#state.highest_key),
{next_state, reader, State}
end.
reader({get_kv, Key}, _From, State) ->
Reply = fetch_keyvalue(State#state.handle, State, Key),
statecheck_onreply(Reply, State);
handle_call({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) ->
{reply, Reply, reader, State};
reader({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) ->
Reply = pointer_append_queryresults(fetch_range_kv(State#state.handle,
State,
StartKey,
EndKey,
ScanWidth),
self()),
statecheck_onreply(Reply, State);
handle_call(close, _From, State) ->
{stop, normal, ok, State};
handle_call(clear, _From, State) ->
{stop, normal, ok, State#state{ready_for_delete=true}};
handle_call(background_complete, _From, State) ->
{reply, Reply, reader, State};
reader(get_maxsqn, _From, State) ->
{reply, State#state.highest_sqn, reader, State};
reader({set_for_delete, Penciller}, _From, State) ->
leveled_log:log("SFT02", [State#state.filename]),
{reply,
ok,
delete_pending,
State#state{penciller=Penciller},
?DELETE_TIMEOUT};
reader(background_complete, _From, State) ->
if
State#state.background_complete == true ->
{reply,
@ -341,67 +373,60 @@ handle_call(background_complete, _From, State) ->
State#state.filename,
State#state.smallest_key,
State#state.highest_key},
reader,
State}
end;
handle_call({set_for_delete, Penciller}, _From, State) ->
leveled_log:log("SFT02", [State#state.filename]),
{reply,
ok,
State#state{ready_for_delete=true,
penciller=Penciller},
?DELETE_TIMEOUT};
handle_call(get_maxsqn, _From, State) ->
statecheck_onreply(State#state.highest_sqn, State).
reader(close, _From, State) ->
ok = file:close(State#state.handle),
{stop, normal, ok, State}.
handle_cast({sft_newfroml0cache, Filename, Slots, FetchFun, PCL}, _State) ->
SW = os:timestamp(),
Inp1 = leveled_pmem:to_list(Slots, FetchFun),
{ok, State} = create_levelzero(Inp1, Filename),
leveled_log:log_timer("SFT03", [Filename], SW),
case PCL of
undefined ->
{noreply, State};
_ ->
leveled_penciller:pcl_confirml0complete(PCL,
State#state.filename,
State#state.smallest_key,
State#state.highest_key),
{noreply, State}
end;
handle_cast(close, State) ->
delete_pending({get_kv, Key}, _From, State) ->
Reply = fetch_keyvalue(State#state.handle, State, Key),
{reply, Reply, delete_pending, State, ?DELETE_TIMEOUT};
delete_pending({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) ->
Reply = pointer_append_queryresults(fetch_range_kv(State#state.handle,
State,
StartKey,
EndKey,
ScanWidth),
self()),
{reply, Reply, delete_pending, State, ?DELETE_TIMEOUT};
delete_pending(get_maxsqn, _From, State) ->
{reply, State#state.highest_sqn, delete_pending, State, ?DELETE_TIMEOUT};
delete_pending(close, _From, State) ->
leveled_log:log("SFT06", [State#state.filename]),
ok = file:close(State#state.handle),
ok = file:delete(State#state.filename),
{stop, normal, ok, State}.
delete_pending(timeout, State) ->
leveled_log:log("SFT05", [timeout, State#state.filename]),
ok = leveled_penciller:pcl_confirmdelete(State#state.penciller,
State#state.filename),
{next_state, delete_pending, State, ?DELETE_TIMEOUT};
delete_pending(close, State) ->
leveled_log:log("SFT06", [State#state.filename]),
ok = file:close(State#state.handle),
ok = file:delete(State#state.filename),
{stop, normal, State}.
handle_info(timeout, State) ->
if
State#state.ready_for_delete == true ->
leveled_log:log("SFT05", [timeout, State#state.filename]),
ok = leveled_penciller:pcl_confirmdelete(State#state.penciller,
State#state.filename),
{noreply, State, ?DELETE_TIMEOUT}
end.
handle_sync_event(_Msg, _From, StateName, State) ->
{reply, undefined, StateName, State}.
terminate(Reason, State) ->
leveled_log:log("SFT05", [Reason, State#state.filename]),
case State#state.ready_for_delete of
true ->
leveled_log:log("SFT06", [State#state.filename]),
ok = file:close(State#state.handle),
ok = file:delete(State#state.filename);
_ ->
ok = file:close(State#state.handle)
end.
handle_event(_Msg, StateName, State) ->
{next_state, StateName, State}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
handle_info(_Msg, StateName, State) ->
{next_state, StateName, State}.
terminate(Reason, _StateName, State) ->
leveled_log:log("SFT05", [Reason, State#state.filename]).
code_change(_OldVsn, StateName, State, _Extra) ->
{ok, StateName, State}.
statecheck_onreply(Reply, State) ->
case State#state.ready_for_delete of
true ->
{reply, Reply, State, ?DELETE_TIMEOUT};
false ->
{reply, Reply, State}
end.
%%%============================================================================
%%% Internal functions