diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 783f545..699893c 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -1,21 +1,19 @@ +%% -------- CDB File Clerk --------- %% %% This is a modified version of the cdb module provided by Tom Whitcomb. %% %% - https://github.com/thomaswhitcomb/erlang-cdb %% +%% The CDB module is an implementation of the constant database format +%% described by DJ Bernstein +%% +%% - https://cr.yp.to/cdb.html +%% %% The primary differences are: %% - Support for incrementally writing a CDB file while keeping the hash table %% in memory -%% - The ability to scan a database and accumulate all the Key, Values to -%% rebuild in-memory tables on startup %% - The ability to scan a database in blocks of sequence numbers -%% -%% This is to be used in eleveledb, and in this context: -%% - Keys will be a combinatio of the PrimaryKey and the Sequence Number -%% - Values will be a serialised version on the whole object, and the -%% IndexChanges associated with the transaction -%% Where the IndexChanges are all the Key changes required to be added to the -%% ledger to complete the changes (the addition of postings and tombstones). +%% - The applictaion of a CRC chekc by default to all values %% %% This module provides functions to create and query a CDB (constant database). %% A CDB implements a two-level hashtable which provides fast {key,value} diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 30f1e70..a42d303 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -142,16 +142,22 @@ -module(leveled_sft). --behaviour(gen_server). +-behaviour(gen_fsm). -include("include/leveled.hrl"). -export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3, - sft_new/4, + handle_sync_event/4, + handle_event/3, + handle_info/3, + terminate/3, + code_change/4, + starting/2, + starting/3, + reader/3, + delete_pending/3, + delete_pending/2]). + +-export([sft_new/4, sft_newfroml0cache/4, sft_open/1, sft_get/2, @@ -161,8 +167,9 @@ sft_checkready/1, sft_setfordelete/2, sft_deleteconfirmed/1, - sft_getmaxsequencenumber/1, - generate_randomkeys/1]). + sft_getmaxsequencenumber/1]). + +-export([generate_randomkeys/1]). -include_lib("eunit/include/eunit.hrl"). @@ -202,7 +209,6 @@ handle :: file:fd(), background_complete = false :: boolean(), oversized_file = false :: boolean(), - ready_for_delete = false ::boolean(), penciller :: pid()}). @@ -221,65 +227,68 @@ sft_new(Filename, KL1, KL2, LevelInfo) -> LevelInfo end end, - {ok, Pid} = gen_server:start(?MODULE, [], []), - Reply = gen_server:call(Pid, - {sft_new, Filename, KL1, KL2, LevelR}, - infinity), + {ok, Pid} = gen_fsm:start(?MODULE, [], []), + Reply = gen_fsm:sync_send_event(Pid, + {sft_new, Filename, KL1, KL2, LevelR}, + infinity), {ok, Pid, Reply}. sft_newfroml0cache(Filename, Slots, FetchFun, Options) -> - {ok, Pid} = gen_server:start(?MODULE, [], []), + {ok, Pid} = gen_fsm:start(?MODULE, [], []), case Options#sft_options.wait of true -> KL1 = leveled_pmem:to_list(Slots, FetchFun), - Reply = gen_server:call(Pid, - {sft_new, - Filename, - KL1, - [], - #level{level=0}}, - infinity), + Reply = gen_fsm:sync_send_event(Pid, + {sft_new, + Filename, + KL1, + [], + #level{level=0}}, + infinity), {ok, Pid, Reply}; false -> - gen_server:cast(Pid, - {sft_newfroml0cache, - Filename, - Slots, - FetchFun, - Options#sft_options.penciller}), + gen_fsm:send_event(Pid, + {sft_newfroml0cache, + Filename, + Slots, + FetchFun, + Options#sft_options.penciller}), {ok, Pid, noreply} end. sft_open(Filename) -> - {ok, Pid} = gen_server:start(?MODULE, [], []), - case gen_server:call(Pid, {sft_open, Filename}, infinity) of + {ok, Pid} = gen_fsm:start(?MODULE, [], []), + case gen_fsm:sync_send_event(Pid, {sft_open, Filename}, infinity) of {ok, {SK, EK}} -> {ok, Pid, {SK, EK}} end. sft_setfordelete(Pid, Penciller) -> - gen_server:call(Pid, {set_for_delete, Penciller}, infinity). + gen_fsm:sync_send_event(Pid, {set_for_delete, Penciller}, infinity). sft_get(Pid, Key) -> - gen_server:call(Pid, {get_kv, Key}, infinity). + gen_fsm:sync_send_event(Pid, {get_kv, Key}, infinity). sft_getkvrange(Pid, StartKey, EndKey, ScanWidth) -> - gen_server:call(Pid, {get_kvrange, StartKey, EndKey, ScanWidth}, infinity). + gen_fsm:sync_send_event(Pid, + {get_kvrange, StartKey, EndKey, ScanWidth}, + infinity). sft_clear(Pid) -> - gen_server:call(Pid, clear, infinity). + gen_fsm:sync_send_event(Pid, {set_for_delete, false}, infinity), + gen_fsm:sync_send_event(Pid, close, 1000). sft_close(Pid) -> - gen_server:call(Pid, close, 1000). + gen_fsm:sync_send_event(Pid, close, 1000). sft_deleteconfirmed(Pid) -> - gen_server:cast(Pid, close). + gen_fsm:send_event(Pid, close). sft_checkready(Pid) -> - gen_server:call(Pid, background_complete, 20). + gen_fsm:sync_send_event(Pid, background_complete, 20). sft_getmaxsequencenumber(Pid) -> - gen_server:call(Pid, get_maxsqn, infinity). + gen_fsm:sync_send_event(Pid, get_maxsqn, infinity). @@ -288,52 +297,75 @@ sft_getmaxsequencenumber(Pid) -> %%%============================================================================ init([]) -> - {ok, #state{}}. + {ok, starting, #state{}}. -handle_call({sft_new, Filename, KL1, [], _LevelR=#level{level=L}}, - _From, - _State) when L == 0 -> +starting({sft_new, Filename, KL1, [], _LevelR=#level{level=L}}, _From, _State) + when L == 0 -> {ok, State} = create_levelzero(KL1, Filename), {reply, - {{[], []}, - State#state.smallest_key, - State#state.highest_key}, + {{[], []}, State#state.smallest_key, State#state.highest_key}, + reader, State}; -handle_call({sft_new, Filename, KL1, KL2, LevelR}, _From, _State) -> +starting({sft_new, Filename, KL1, KL2, LevelR}, _From, _State) -> case create_file(Filename) of {Handle, FileMD} -> {ReadHandle, UpdFileMD, KeyRemainders} = complete_file(Handle, FileMD, KL1, KL2, LevelR), - {reply, {KeyRemainders, - UpdFileMD#state.smallest_key, - UpdFileMD#state.highest_key}, - UpdFileMD#state{handle=ReadHandle, filename=Filename}} + {reply, + {KeyRemainders, + UpdFileMD#state.smallest_key, + UpdFileMD#state.highest_key}, + reader, + UpdFileMD#state{handle=ReadHandle, filename=Filename}} end; -handle_call({sft_open, Filename}, _From, _State) -> +starting({sft_open, Filename}, _From, _State) -> {_Handle, FileMD} = open_file(#state{filename=Filename}), leveled_log:log("SFT01", [Filename]), {reply, - {ok, - {FileMD#state.smallest_key, FileMD#state.highest_key}}, - FileMD}; -handle_call({get_kv, Key}, _From, State) -> + {ok, {FileMD#state.smallest_key, FileMD#state.highest_key}}, + reader, + FileMD}. + +starting({sft_newfroml0cache, Filename, Slots, FetchFun, PCL}, _State) -> + SW = os:timestamp(), + Inp1 = leveled_pmem:to_list(Slots, FetchFun), + {ok, State} = create_levelzero(Inp1, Filename), + leveled_log:log_timer("SFT03", [Filename], SW), + case PCL of + undefined -> + {next_state, reader, State}; + _ -> + leveled_penciller:pcl_confirml0complete(PCL, + State#state.filename, + State#state.smallest_key, + State#state.highest_key), + {next_state, reader, State} + end. + + +reader({get_kv, Key}, _From, State) -> Reply = fetch_keyvalue(State#state.handle, State, Key), - statecheck_onreply(Reply, State); -handle_call({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) -> + {reply, Reply, reader, State}; +reader({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) -> Reply = pointer_append_queryresults(fetch_range_kv(State#state.handle, State, StartKey, EndKey, ScanWidth), self()), - statecheck_onreply(Reply, State); -handle_call(close, _From, State) -> - {stop, normal, ok, State}; -handle_call(clear, _From, State) -> - {stop, normal, ok, State#state{ready_for_delete=true}}; -handle_call(background_complete, _From, State) -> + {reply, Reply, reader, State}; +reader(get_maxsqn, _From, State) -> + {reply, State#state.highest_sqn, reader, State}; +reader({set_for_delete, Penciller}, _From, State) -> + leveled_log:log("SFT02", [State#state.filename]), + {reply, + ok, + delete_pending, + State#state{penciller=Penciller}, + ?DELETE_TIMEOUT}; +reader(background_complete, _From, State) -> if State#state.background_complete == true -> {reply, @@ -341,67 +373,60 @@ handle_call(background_complete, _From, State) -> State#state.filename, State#state.smallest_key, State#state.highest_key}, + reader, State} end; -handle_call({set_for_delete, Penciller}, _From, State) -> - leveled_log:log("SFT02", [State#state.filename]), - {reply, - ok, - State#state{ready_for_delete=true, - penciller=Penciller}, - ?DELETE_TIMEOUT}; -handle_call(get_maxsqn, _From, State) -> - statecheck_onreply(State#state.highest_sqn, State). +reader(close, _From, State) -> + ok = file:close(State#state.handle), + {stop, normal, ok, State}. -handle_cast({sft_newfroml0cache, Filename, Slots, FetchFun, PCL}, _State) -> - SW = os:timestamp(), - Inp1 = leveled_pmem:to_list(Slots, FetchFun), - {ok, State} = create_levelzero(Inp1, Filename), - leveled_log:log_timer("SFT03", [Filename], SW), - case PCL of - undefined -> - {noreply, State}; - _ -> - leveled_penciller:pcl_confirml0complete(PCL, - State#state.filename, - State#state.smallest_key, - State#state.highest_key), - {noreply, State} - end; -handle_cast(close, State) -> +delete_pending({get_kv, Key}, _From, State) -> + Reply = fetch_keyvalue(State#state.handle, State, Key), + {reply, Reply, delete_pending, State, ?DELETE_TIMEOUT}; +delete_pending({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) -> + Reply = pointer_append_queryresults(fetch_range_kv(State#state.handle, + State, + StartKey, + EndKey, + ScanWidth), + self()), + {reply, Reply, delete_pending, State, ?DELETE_TIMEOUT}; +delete_pending(get_maxsqn, _From, State) -> + {reply, State#state.highest_sqn, delete_pending, State, ?DELETE_TIMEOUT}; +delete_pending(close, _From, State) -> + leveled_log:log("SFT06", [State#state.filename]), + ok = file:close(State#state.handle), + ok = file:delete(State#state.filename), + {stop, normal, ok, State}. + +delete_pending(timeout, State) -> + leveled_log:log("SFT05", [timeout, State#state.filename]), + ok = leveled_penciller:pcl_confirmdelete(State#state.penciller, + State#state.filename), + {next_state, delete_pending, State, ?DELETE_TIMEOUT}; +delete_pending(close, State) -> + leveled_log:log("SFT06", [State#state.filename]), + ok = file:close(State#state.handle), + ok = file:delete(State#state.filename), {stop, normal, State}. -handle_info(timeout, State) -> - if - State#state.ready_for_delete == true -> - leveled_log:log("SFT05", [timeout, State#state.filename]), - ok = leveled_penciller:pcl_confirmdelete(State#state.penciller, - State#state.filename), - {noreply, State, ?DELETE_TIMEOUT} - end. +handle_sync_event(_Msg, _From, StateName, State) -> + {reply, undefined, StateName, State}. -terminate(Reason, State) -> - leveled_log:log("SFT05", [Reason, State#state.filename]), - case State#state.ready_for_delete of - true -> - leveled_log:log("SFT06", [State#state.filename]), - ok = file:close(State#state.handle), - ok = file:delete(State#state.filename); - _ -> - ok = file:close(State#state.handle) - end. +handle_event(_Msg, StateName, State) -> + {next_state, StateName, State}. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. +handle_info(_Msg, StateName, State) -> + {next_state, StateName, State}. + + +terminate(Reason, _StateName, State) -> + leveled_log:log("SFT05", [Reason, State#state.filename]). + +code_change(_OldVsn, StateName, State, _Extra) -> + {ok, StateName, State}. -statecheck_onreply(Reply, State) -> - case State#state.ready_for_delete of - true -> - {reply, Reply, State, ?DELETE_TIMEOUT}; - false -> - {reply, Reply, State} - end. %%%============================================================================ %%% Internal functions