diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 9c8e507..c01281a 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -146,6 +146,7 @@ book_snapshotstore/3, book_snapshotledger/3, book_compactjournal/2, + book_islastcompactionpending/1, book_close/1]). -include_lib("eunit/include/eunit.hrl"). @@ -234,6 +235,9 @@ book_snapshotledger(Pid, Requestor, Timeout) -> book_compactjournal(Pid, Timeout) -> gen_server:call(Pid, {compact_journal, Timeout}, infinity). +book_islastcompactionpending(Pid) -> + gen_server:call(Pid, confirm_compact, infinity). + book_close(Pid) -> gen_server:call(Pid, close, infinity). @@ -389,6 +393,8 @@ handle_call({compact_journal, Timeout}, _From, State) -> self(), Timeout), {reply, ok, State}; +handle_call(confirm_compact, _From, State) -> + {reply, leveled_inker:ink_compactionpending(State#state.inker), State}; handle_call(close, _From, State) -> {stop, normal, ok, State}. diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 8d6c6b3..63e48d3 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -40,38 +40,50 @@ %% The first word is the corresponding hash value and the second word is a %% file pointer to the actual {key,value} tuple higher in the file. %% +%% + -module(leveled_cdb). --behaviour(gen_server). +-behaviour(gen_fsm). -include("include/leveled.hrl"). -export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3, - cdb_open_writer/1, - cdb_open_writer/2, - cdb_open_reader/1, - cdb_get/2, - cdb_put/3, - cdb_mput/2, - cdb_getpositions/2, - cdb_directfetch/3, - cdb_lastkey/1, - cdb_firstkey/1, - cdb_filename/1, - cdb_keycheck/2, - cdb_scan/4, - cdb_close/1, - cdb_complete/1, - cdb_roll/1, - cdb_returnhashtable/3, - cdb_destroy/1, - cdb_deletepending/1, - hashtable_calc/2]). + handle_sync_event/4, + handle_event/3, + handle_info/3, + terminate/3, + code_change/4, + starting/3, + writer/3, + writer/2, + rolling/3, + reader/3, + reader/2, + delete_pending/3, + delete_pending/2]). + +-export([cdb_open_writer/1, + cdb_open_writer/2, + cdb_open_reader/1, + cdb_get/2, + cdb_put/3, + cdb_mput/2, + cdb_getpositions/2, + cdb_directfetch/3, + cdb_lastkey/1, + cdb_firstkey/1, + cdb_filename/1, + cdb_keycheck/2, + cdb_scan/4, + cdb_close/1, + cdb_complete/1, + cdb_roll/1, + cdb_returnhashtable/3, + cdb_destroy/1, + cdb_deletepending/1, + cdb_deletepending/3, + hashtable_calc/2]). -include_lib("eunit/include/eunit.hrl"). @@ -83,6 +95,7 @@ -define(BASE_POSITION, 2048). -define(WRITE_OPS, [binary, raw, read, write]). -define(PENDING_ROLL_WAIT, 30). +-define(DELETE_TIMEOUT, 10000). -record(state, {hashtree, last_position :: integer(), @@ -90,11 +103,10 @@ hash_index = [] :: list(), filename :: string(), handle :: file:fd(), - writer :: boolean(), max_size :: integer(), - pending_roll = false :: boolean(), - pending_delete = false :: boolean(), - binary_mode = false :: boolean()}). + binary_mode = false :: boolean(), + delete_point = 0 :: integer(), + inker :: pid()}). %%%============================================================================ @@ -106,8 +118,8 @@ cdb_open_writer(Filename) -> cdb_open_writer(Filename, #cdb_options{}). cdb_open_writer(Filename, Opts) -> - {ok, Pid} = gen_server:start(?MODULE, [Opts], []), - case gen_server:call(Pid, {open_writer, Filename}, infinity) of + {ok, Pid} = gen_fsm:start(?MODULE, [Opts], []), + case gen_fsm:sync_send_event(Pid, {open_writer, Filename}, infinity) of ok -> {ok, Pid}; Error -> @@ -115,8 +127,8 @@ cdb_open_writer(Filename, Opts) -> end. cdb_open_reader(Filename) -> - {ok, Pid} = gen_server:start(?MODULE, [#cdb_options{}], []), - case gen_server:call(Pid, {open_reader, Filename}, infinity) of + {ok, Pid} = gen_fsm:start(?MODULE, [#cdb_options{}], []), + case gen_fsm:sync_send_event(Pid, {open_reader, Filename}, infinity) of ok -> {ok, Pid}; Error -> @@ -124,23 +136,23 @@ cdb_open_reader(Filename) -> end. cdb_get(Pid, Key) -> - gen_server:call(Pid, {get_kv, Key}, infinity). + gen_fsm:sync_send_event(Pid, {get_kv, Key}, infinity). cdb_put(Pid, Key, Value) -> - gen_server:call(Pid, {put_kv, Key, Value}, infinity). + gen_fsm:sync_send_event(Pid, {put_kv, Key, Value}, infinity). cdb_mput(Pid, KVList) -> - gen_server:call(Pid, {mput_kv, KVList}, infinity). + gen_fsm:sync_send_event(Pid, {mput_kv, KVList}, infinity). %% SampleSize can be an integer or the atom all cdb_getpositions(Pid, SampleSize) -> - gen_server:call(Pid, {get_positions, SampleSize}, infinity). + gen_fsm:sync_send_event(Pid, {get_positions, SampleSize}, infinity). %% Info can be key_only, key_size (size being the size of the value) or %% key_value_check (with the check part indicating if the CRC is correct for %% the value) cdb_directfetch(Pid, PositionList, Info) -> - gen_server:call(Pid, {direct_fetch, PositionList, Info}, infinity). + gen_fsm:sync_send_event(Pid, {direct_fetch, PositionList, Info}, infinity). cdb_close(Pid) -> cdb_close(Pid, ?PENDING_ROLL_WAIT). @@ -148,7 +160,7 @@ cdb_close(Pid) -> cdb_close(Pid, WaitsLeft) -> if WaitsLeft > 0 -> - case gen_server:call(Pid, cdb_close, infinity) of + case gen_fsm:sync_send_all_state_event(Pid, cdb_close, infinity) of pending_roll -> timer:sleep(1), cdb_close(Pid, WaitsLeft - 1); @@ -156,23 +168,26 @@ cdb_close(Pid, WaitsLeft) -> R end; true -> - gen_server:call(Pid, cdb_kill, infinity) + gen_fsm:sync_send_event(Pid, cdb_kill, infinity) end. cdb_complete(Pid) -> - gen_server:call(Pid, cdb_complete, infinity). + gen_fsm:sync_send_event(Pid, cdb_complete, infinity). cdb_roll(Pid) -> - gen_server:cast(Pid, cdb_roll). + gen_fsm:send_event(Pid, cdb_roll). cdb_returnhashtable(Pid, IndexList, HashTreeBin) -> - gen_server:call(Pid, {return_hashtable, IndexList, HashTreeBin}, infinity). + gen_fsm:sync_send_event(Pid, {return_hashtable, IndexList, HashTreeBin}, infinity). cdb_destroy(Pid) -> - gen_server:cast(Pid, destroy). + gen_fsm:send_event(Pid, destroy). cdb_deletepending(Pid) -> - gen_server:cast(Pid, delete_pending). + cdb_deletepending(Pid, 0, no_poll). + +cdb_deletepending(Pid, ManSQN, Inker) -> + gen_fsm:send_event(Pid, {delete_pending, ManSQN, Inker}). %% cdb_scan returns {LastPosition, Acc}. Use LastPosition as StartPosiiton to %% continue from that point (calling function has to protect against) double @@ -182,26 +197,29 @@ cdb_deletepending(Pid) -> %% the end of the file. last_key must be defined in LoopState. cdb_scan(Pid, FilterFun, InitAcc, StartPosition) -> - gen_server:call(Pid, - {cdb_scan, FilterFun, InitAcc, StartPosition}, - infinity). + gen_fsm:sync_send_all_state_event(Pid, + {cdb_scan, + FilterFun, + InitAcc, + StartPosition}, + infinity). %% Get the last key to be added to the file (which will have the highest %% sequence number) cdb_lastkey(Pid) -> - gen_server:call(Pid, cdb_lastkey, infinity). + gen_fsm:sync_send_all_state_event(Pid, cdb_lastkey, infinity). cdb_firstkey(Pid) -> - gen_server:call(Pid, cdb_firstkey, infinity). + gen_fsm:sync_send_all_state_event(Pid, cdb_firstkey, infinity). %% Get the filename of the database cdb_filename(Pid) -> - gen_server:call(Pid, cdb_filename, infinity). + gen_fsm:sync_send_all_state_event(Pid, cdb_filename, infinity). %% Check to see if the key is probably present, will return either %% probably or missing. Does not do a definitive check cdb_keycheck(Pid, Key) -> - gen_server:call(Pid, {key_check, Key}, infinity). + gen_fsm:sync_send_event(Pid, {key_check, Key}, infinity). %%%============================================================================ %%% gen_server callbacks @@ -214,120 +232,136 @@ init([Opts]) -> M -> M end, - {ok, #state{max_size=MaxSize, binary_mode=Opts#cdb_options.binary_mode}}. + {ok, + starting, + #state{max_size=MaxSize, binary_mode=Opts#cdb_options.binary_mode}}. -handle_call({open_writer, Filename}, _From, State) -> +starting({open_writer, Filename}, _From, State) -> io:format("Opening file for writing with filename ~s~n", [Filename]), {LastPosition, HashTree, LastKey} = open_active_file(Filename), {ok, Handle} = file:open(Filename, [sync | ?WRITE_OPS]), - {reply, ok, State#state{handle=Handle, - last_position=LastPosition, - last_key=LastKey, - filename=Filename, - hashtree=HashTree, - writer=true}}; -handle_call({open_reader, Filename}, _From, State) -> + {reply, ok, writer, State#state{handle=Handle, + last_position=LastPosition, + last_key=LastKey, + filename=Filename, + hashtree=HashTree}}; +starting({open_reader, Filename}, _From, State) -> io:format("Opening file for reading with filename ~s~n", [Filename]), {Handle, Index, LastKey} = open_for_readonly(Filename), - {reply, ok, State#state{handle=Handle, - last_key=LastKey, - filename=Filename, - writer=false, - hash_index=Index}}; -handle_call({get_kv, Key}, _From, State) -> - case State#state.writer of - true -> - {reply, - get_mem(Key, State#state.handle, State#state.hashtree), - State}; - false -> - {reply, - get_withcache(State#state.handle, Key, State#state.hash_index), - State} - end; -handle_call({key_check, Key}, _From, State) -> - case State#state.writer of - true -> - {reply, - get_mem(Key, - State#state.handle, - State#state.hashtree, - loose_presence), - State}; - false -> - {reply, - get(State#state.handle, + {reply, ok, reader, State#state{handle=Handle, + last_key=LastKey, + filename=Filename, + hash_index=Index}}. + +writer({get_kv, Key}, _From, State) -> + {reply, + get_mem(Key, State#state.handle, State#state.hashtree), + writer, + State}; +writer({key_check, Key}, _From, State) -> + {reply, + get_mem(Key, State#state.handle, State#state.hashtree, loose_presence), + writer, + State}; +writer({put_kv, Key, Value}, _From, State) -> + Result = put(State#state.handle, Key, - loose_presence, - State#state.hash_index), - State} - end; -handle_call({put_kv, Key, Value}, _From, State) -> - case {State#state.writer, State#state.pending_roll} of - {true, false} -> - Result = put(State#state.handle, - Key, Value, - {State#state.last_position, State#state.hashtree}, - State#state.binary_mode, - State#state.max_size), - case Result of - roll -> - %% Key and value could not be written - {reply, roll, State}; - {UpdHandle, NewPosition, HashTree} -> - {reply, ok, State#state{handle=UpdHandle, + Value, + {State#state.last_position, State#state.hashtree}, + State#state.binary_mode, + State#state.max_size), + case Result of + roll -> + %% Key and value could not be written + {reply, roll, writer, State}; + {UpdHandle, NewPosition, HashTree} -> + {reply, ok, writer, State#state{handle=UpdHandle, last_position=NewPosition, last_key=Key, hashtree=HashTree}} - end; - _ -> - {reply, - {error, read_only}, - State} end; -handle_call({mput_kv, KVList}, _From, State) -> - case {State#state.writer, State#state.pending_roll} of - {true, false} -> - Result = mput(State#state.handle, - KVList, - {State#state.last_position, State#state.hashtree}, - State#state.binary_mode, - State#state.max_size), - case Result of - roll -> - %% Keys and values could not be written - {reply, roll, State}; - {UpdHandle, NewPosition, HashTree, LastKey} -> - {reply, ok, State#state{handle=UpdHandle, +writer({mput_kv, KVList}, _From, State) -> + Result = mput(State#state.handle, + KVList, + {State#state.last_position, State#state.hashtree}, + State#state.binary_mode, + State#state.max_size), + case Result of + roll -> + %% Keys and values could not be written + {reply, roll, writer, State}; + {UpdHandle, NewPosition, HashTree, LastKey} -> + {reply, ok, writer, State#state{handle=UpdHandle, last_position=NewPosition, last_key=LastKey, hashtree=HashTree}} - end; - _ -> - {reply, - {error, read_only}, - State} end; -handle_call(cdb_lastkey, _From, State) -> - {reply, State#state.last_key, State}; -handle_call(cdb_firstkey, _From, State) -> - {ok, EOFPos} = file:position(State#state.handle, eof), - FirstKey = case EOFPos of - ?BASE_POSITION -> - empty; - _ -> - extract_key(State#state.handle, ?BASE_POSITION) - end, - {reply, FirstKey, State}; -handle_call(cdb_filename, _From, State) -> - {reply, State#state.filename, State}; -handle_call({get_positions, SampleSize}, _From, State) -> +writer(cdb_complete, _From, State) -> + NewName = determine_new_filename(State#state.filename), + ok = close_file(State#state.handle, + State#state.hashtree, + State#state.last_position), + ok = rename_for_read(State#state.filename, NewName), + {stop, normal, {ok, NewName}, State}. + +writer(cdb_roll, State) -> + ok = leveled_iclerk:clerk_hashtablecalc(State#state.hashtree, + State#state.last_position, + self()), + {next_state, rolling, State}. + + +rolling({get_kv, Key}, _From, State) -> + {reply, + get_mem(Key, State#state.handle, State#state.hashtree), + rolling, + State}; +rolling({key_check, Key}, _From, State) -> + {reply, + get_mem(Key, State#state.handle, State#state.hashtree, loose_presence), + rolling, + State}; +rolling(cdb_filename, _From, State) -> + {reply, State#state.filename, rolling, State}; +rolling({return_hashtable, IndexList, HashTreeBin}, _From, State) -> + Handle = State#state.handle, + {ok, BasePos} = file:position(Handle, State#state.last_position), + NewName = determine_new_filename(State#state.filename), + ok = perform_write_hash_tables(Handle, HashTreeBin, BasePos), + ok = write_top_index_table(Handle, BasePos, IndexList), + file:close(Handle), + ok = rename_for_read(State#state.filename, NewName), + io:format("Opening file for reading with filename ~s~n", [NewName]), + {NewHandle, Index, LastKey} = open_for_readonly(NewName), + {reply, ok, reader, State#state{handle=NewHandle, + last_key=LastKey, + filename=NewName, + hash_index=Index}}; +rolling(cdb_kill, _From, State) -> + {stop, killed, ok, State}. + +reader({get_kv, Key}, _From, State) -> + {reply, + get_withcache(State#state.handle, Key, State#state.hash_index), + reader, + State}; +reader({key_check, Key}, _From, State) -> + {reply, + get(State#state.handle, + Key, + loose_presence, + State#state.hash_index), + reader, + State}; +reader({get_positions, SampleSize}, _From, State) -> case SampleSize of all -> - {reply, scan_index(State#state.handle, - State#state.hash_index, - {fun scan_index_returnpositions/4, []}), - State}; + {reply, + scan_index(State#state.handle, + State#state.hash_index, + {fun scan_index_returnpositions/4, []}), + reader, + State}; _ -> SeededL = lists:map(fun(X) -> {random:uniform(), X} end, State#state.hash_index), @@ -339,28 +373,94 @@ handle_call({get_positions, SampleSize}, _From, State) -> fun scan_index_returnpositions/4, [], SampleSize), + reader, State} end; -handle_call({direct_fetch, PositionList, Info}, _From, State) -> +reader({direct_fetch, PositionList, Info}, _From, State) -> H = State#state.handle, case Info of key_only -> KeyList = lists:map(fun(P) -> extract_key(H, P) end, PositionList), - {reply, KeyList, State}; + {reply, KeyList, reader, State}; key_size -> KeySizeList = lists:map(fun(P) -> extract_key_size(H, P) end, PositionList), - {reply, KeySizeList, State}; + {reply, KeySizeList, reader, State}; key_value_check -> KVCList = lists:map(fun(P) -> extract_key_value_check(H, P) end, PositionList), - {reply, KVCList, State} + {reply, KVCList, reader, State} end; -handle_call({cdb_scan, FilterFun, Acc, StartPos}, _From, State) -> +reader(cdb_complete, _From, State) -> + ok = file:close(State#state.handle), + {stop, normal, {ok, State#state.filename}, State#state{handle=undefined}}. + + +reader({delete_pending, 0, no_poll}, State) -> + {next_state, + delete_pending, + State#state{delete_point=0}}; +reader({delete_pending, ManSQN, Inker}, State) -> + {next_state, + delete_pending, + State#state{delete_point=ManSQN, inker=Inker}, + ?DELETE_TIMEOUT}. + + +delete_pending({get_kv, Key}, _From, State) -> + {reply, + get_withcache(State#state.handle, Key, State#state.hash_index), + delete_pending, + State, + ?DELETE_TIMEOUT}; +delete_pending({key_check, Key}, _From, State) -> + {reply, + get(State#state.handle, + Key, + loose_presence, + State#state.hash_index), + delete_pending, + State, + ?DELETE_TIMEOUT}. + +delete_pending(timeout, State) -> + case State#state.delete_point of + 0 -> + {next_state, delete_pending, State}; + ManSQN -> + case is_process_alive(State#state.inker) of + true -> + case leveled_inker:ink_confirmdelete(State#state.inker, + ManSQN) of + true -> + io:format("Deletion confirmed for file ~s " + ++ "at ManifestSQN ~w~n", + [State#state.filename, ManSQN]), + {stop, normal, State}; + false -> + {next_state, + delete_pending, + State, + ?DELETE_TIMEOUT} + end; + false -> + {stop, normal, State} + end + end; +delete_pending(destroy, State) -> + ok = file:close(State#state.handle), + ok = file:delete(State#state.filename), + {stop, normal, State}. + + +handle_sync_event({cdb_scan, FilterFun, Acc, StartPos}, + _From, + StateName, + State) -> {ok, StartPos0} = case StartPos of undefined -> file:position(State#state.handle, @@ -375,77 +475,50 @@ handle_call({cdb_scan, FilterFun, Acc, StartPos}, _From, State) -> FilterFun, Acc, State#state.last_key), - {reply, {LastPosition, Acc2}, State}; + {reply, {LastPosition, Acc2}, StateName, State}; empty -> - {reply, {eof, Acc}, State} + {reply, {eof, Acc}, StateName, State} end; -handle_call(cdb_close, _From, State=#state{pending_roll=RollPending}) - when RollPending == true -> - {reply, pending_roll, State}; -handle_call(cdb_close, _From, State) -> +handle_sync_event(cdb_lastkey, _From, StateName, State) -> + {reply, State#state.last_key, StateName, State}; +handle_sync_event(cdb_firstkey, _From, StateName, State) -> + {ok, EOFPos} = file:position(State#state.handle, eof), + FirstKey = case EOFPos of + ?BASE_POSITION -> + empty; + _ -> + extract_key(State#state.handle, ?BASE_POSITION) + end, + {reply, FirstKey, StateName, State}; +handle_sync_event(cdb_filename, _From, StateName, State) -> + {reply, State#state.filename, StateName, State}; +handle_sync_event(cdb_close, _From, rolling, State) -> + {reply, pending_roll, rolling, State}; +handle_sync_event(cdb_close, _From, _StateName, State) -> ok = file:close(State#state.handle), - {stop, normal, ok, State#state{handle=undefined}}; -handle_call(cdb_kill, _From, State) -> - {stop, killed, ok, State}; -handle_call(cdb_complete, _From, State=#state{writer=Writer}) - when Writer == true -> - NewName = determine_new_filename(State#state.filename), - ok = close_file(State#state.handle, - State#state.hashtree, - State#state.last_position), - ok = rename_for_read(State#state.filename, NewName), - {stop, normal, {ok, NewName}, State}; -handle_call(cdb_complete, _From, State) -> - ok = file:close(State#state.handle), - {stop, normal, {ok, State#state.filename}, State}; -handle_call({return_hashtable, IndexList, HashTreeBin}, - _From, - State=#state{pending_roll=RollPending}) when RollPending == true -> - Handle = State#state.handle, - {ok, BasePos} = file:position(Handle, State#state.last_position), - NewName = determine_new_filename(State#state.filename), - ok = perform_write_hash_tables(Handle, HashTreeBin, BasePos), - ok = write_top_index_table(Handle, BasePos, IndexList), - file:close(Handle), - ok = rename_for_read(State#state.filename, NewName), - io:format("Opening file for reading with filename ~s~n", [NewName]), - {NewHandle, Index, LastKey} = open_for_readonly(NewName), - {reply, ok, State#state{handle=NewHandle, - last_key=LastKey, - filename=NewName, - writer=false, - pending_roll=false, - hash_index=Index}}. + {stop, normal, ok, State#state{handle=undefined}}. +handle_event(_Msg, StateName, State) -> + {next_State, StateName, State}. -handle_cast(destroy, State) -> - ok = file:close(State#state.handle), - ok = file:delete(State#state.filename), - {noreply, State}; -handle_cast(delete_pending, State) -> - {noreply, State#state{pending_delete=true}}; -handle_cast(cdb_roll, State=#state{writer=Writer}) when Writer == true -> - ok = leveled_iclerk:clerk_hashtablecalc(State#state.hashtree, - State#state.last_position, - self()), - {noreply, State#state{pending_roll=true}}. +handle_info(_Msg, StateName, State) -> + {next_State, StateName, State}. -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, State) -> - case {State#state.handle, State#state.pending_delete} of +terminate(Reason, StateName, State) -> + io:format("Closing of filename ~s for Reason ~w~n", + [State#state.filename, Reason]), + case {State#state.handle, StateName} of {undefined, _} -> ok; - {Handle, false} -> - file:close(Handle); - {Handle, true} -> + {Handle, delete_pending} -> file:close(Handle), - file:delete(State#state.filename) + file:delete(State#state.filename); + {Handle, _} -> + file:close(Handle) end. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. +code_change(_OldVsn, StateName, State, _Extra) -> + {ok, StateName, State}. %%%============================================================================ %%% Internal functions diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 95cb69b..807eaf3 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -179,14 +179,20 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout}, C#candidate.journal} end, BestRun), - ok = leveled_inker:ink_updatemanifest(Inker, - ManifestSlice, - FilesToDelete), + io:format("Clerk updating Inker as compaction complete of " ++ + "~w files~n", [length(FilesToDelete)]), + {ok, ManSQN} = leveled_inker:ink_updatemanifest(Inker, + ManifestSlice, + FilesToDelete), ok = leveled_inker:ink_compactioncomplete(Inker), + io:format("Clerk has completed compaction process~n"), case PromptDelete of true -> lists:foreach(fun({_SQN, _FN, J2D}) -> - leveled_cdb:cdb_deletepending(J2D) end, + leveled_cdb:cdb_deletepending(J2D, + ManSQN, + Inker) + end, FilesToDelete), {noreply, State}; false -> @@ -639,6 +645,7 @@ check_single_file_test() -> ?assertMatch(37.5, Score3), Score4 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 4, 8, 4), ?assertMatch(75.0, Score4), + ok = leveled_cdb:cdb_deletepending(CDB), ok = leveled_cdb:cdb_destroy(CDB). @@ -698,6 +705,7 @@ compact_single_file_recovr_test() -> stnd, test_ledgerkey("Key2")}), ?assertMatch({"Value2", []}, binary_to_term(RV1)), + ok = leveled_cdb:cdb_deletepending(CDB), ok = leveled_cdb:cdb_destroy(CDB). @@ -736,6 +744,7 @@ compact_single_file_retain_test() -> stnd, test_ledgerkey("Key2")}), ?assertMatch({"Value2", []}, binary_to_term(RV1)), + ok = leveled_cdb:cdb_deletepending(CDB), ok = leveled_cdb:cdb_destroy(CDB). compact_empty_file_test() -> diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 0fd512a..7ef5cf4 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -99,8 +99,10 @@ ink_fetch/3, ink_loadpcl/4, ink_registersnapshot/2, + ink_confirmdelete/2, ink_compactjournal/3, ink_compactioncomplete/1, + ink_compactionpending/1, ink_getmanifest/1, ink_updatemanifest/3, ink_print_manifest/1, @@ -159,6 +161,9 @@ ink_registersnapshot(Pid, Requestor) -> ink_releasesnapshot(Pid, Snapshot) -> gen_server:call(Pid, {release_snapshot, Snapshot}, infinity). +ink_confirmdelete(Pid, ManSQN) -> + gen_server:call(Pid, {confirm_delete, ManSQN}, 1000). + ink_close(Pid) -> gen_server:call(Pid, {close, false}, infinity). @@ -193,6 +198,9 @@ ink_compactjournal(Pid, Checker, InitiateFun, FilterFun, Timeout) -> ink_compactioncomplete(Pid) -> gen_server:call(Pid, compaction_complete, infinity). +ink_compactionpending(Pid) -> + gen_server:call(Pid, compaction_pending, infinity). + ink_getmanifest(Pid) -> gen_server:call(Pid, get_manifest, infinity). @@ -263,6 +271,17 @@ handle_call({release_snapshot, Snapshot}, _From , State) -> io:format("Ledger snapshot ~w released~n", [Snapshot]), io:format("Remaining ledger snapshots are ~w~n", [Rs]), {reply, ok, State#state{registered_snapshots=Rs}}; +handle_call({confirm_delete, ManSQN}, _From, State) -> + Reply = lists:foldl(fun({_R, SnapSQN}, Bool) -> + case SnapSQN < ManSQN of + true -> + Bool; + false -> + false + end end, + true, + State#state.registered_snapshots), + {reply, Reply, State}; handle_call(get_manifest, _From, State) -> {reply, State#state.manifest, State}; handle_call({update_manifest, @@ -280,10 +299,11 @@ handle_call({update_manifest, NewManifestSQN = State#state.manifest_sqn + 1, manifest_printer(Man1), ok = simple_manifest_writer(Man1, NewManifestSQN, State#state.root_path), - PendingRemovals = [{NewManifestSQN, DeletedFiles}], - {reply, ok, State#state{manifest=Man1, - manifest_sqn=NewManifestSQN, - pending_removals=PendingRemovals}}; + {reply, + {ok, NewManifestSQN}, + State#state{manifest=Man1, + manifest_sqn=NewManifestSQN, + pending_removals=DeletedFiles}}; handle_call(print_manifest, _From, State) -> manifest_printer(State#state.manifest), {reply, ok, State}; @@ -302,6 +322,8 @@ handle_call({compact, {reply, ok, State#state{compaction_pending=true}}; handle_call(compaction_complete, _From, State) -> {reply, ok, State#state{compaction_pending=false}}; +handle_call(compaction_pending, _From, State) -> + {reply, State#state.compaction_pending, State}; handle_call({close, Force}, _From, State) -> case {State#state.compaction_pending, Force} of {true, false} -> @@ -329,8 +351,7 @@ terminate(Reason, State) -> lists:foreach(fun({Snap, _SQN}) -> ok = ink_close(Snap) end, State#state.registered_snapshots), manifest_printer(State#state.manifest), - ok = close_allmanifest(State#state.manifest), - ok = close_allremovals(State#state.pending_removals) + ok = close_allmanifest(State#state.manifest) end. code_change(_OldVsn, State, _Extra) -> @@ -552,25 +573,6 @@ find_in_manifest(SQN, [_Head|Tail]) -> find_in_manifest(SQN, Tail). -close_allremovals([]) -> - ok; -close_allremovals([{ManifestSQN, Removals}|Tail]) -> - io:format("Closing removals at ManifestSQN=~w~n", [ManifestSQN]), - lists:foreach(fun({LowSQN, FN, Handle}) -> - io:format("Closing removed file with LowSQN=~w" ++ - " and filename ~s~n", - [LowSQN, FN]), - if - is_pid(Handle) == true -> - ok = leveled_cdb:cdb_close(Handle); - true -> - io:format("Non pid in removal ~w - test~n", - [Handle]) - end - end, - Removals), - close_allremovals(Tail). - %% Scan between sequence numbers applying FilterFun to each entry where %% FilterFun{K, V, Acc} -> Penciller Key List diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index bb8978c..7ccab3f 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -319,17 +319,24 @@ do_merge(KL1, KL2, {SrcLevel, IsB}, {Filepath, MSN}, FileCounter, OutList) -> KL1, KL2, LevelR), - {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply, - ExtMan = lists:append(OutList, - [#manifest_entry{start_key=SmallestKey, - end_key=HighestKey, - owner=Pid, - filename=FileName}]), - MTime = timer:now_diff(os:timestamp(), TS1), - io:format("File creation took ~w microseconds ~n", [MTime]), - do_merge(KL1Rem, KL2Rem, - {SrcLevel, IsB}, {Filepath, MSN}, - FileCounter + 1, ExtMan). + case Reply of + {{[], []}, null, _} -> + io:format("Merge resulted in empty file ~s~n", [FileName]), + io:format("Empty file ~s to be cleared~n", [FileName]), + ok = leveled_sft:sft_clear(Pid), + OutList; + {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} -> + ExtMan = lists:append(OutList, + [#manifest_entry{start_key=SmallestKey, + end_key=HighestKey, + owner=Pid, + filename=FileName}]), + MTime = timer:now_diff(os:timestamp(), TS1), + io:format("File creation took ~w microseconds ~n", [MTime]), + do_merge(KL1Rem, KL2Rem, + {SrcLevel, IsB}, {Filepath, MSN}, + FileCounter + 1, ExtMan) + end. get_item(Index, List, Default) -> diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 85392a7..4649f5d 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -467,7 +467,7 @@ create_file(FileName) when is_list(FileName) -> {error, Reason} -> io:format("Error opening filename ~s with reason ~w", [FileName, Reason]), - {error, Reason} + error end. @@ -1037,15 +1037,17 @@ create_slot(KL1, KL2, LevelR, BlockCount, SegLists, SerialisedSlot, LengthList, {BlockKeyList, Status, {LSNb, HSNb}, SegmentList, KL1b, KL2b} = create_block(KL1, KL2, LevelR), - TrackingMetadata = case LowKey of - null -> + TrackingMetadata = case {LowKey, BlockKeyList} of + {null, []} -> + {null, LSN, HSN, LastKey, Status}; + {null, _} -> [NewLowKeyV|_] = BlockKeyList, {leveled_codec:strip_to_keyonly(NewLowKeyV), min(LSN, LSNb), max(HSN, HSNb), leveled_codec:strip_to_keyonly(last(BlockKeyList, {last, LastKey})), Status}; - _ -> + {_, _} -> {LowKey, min(LSN, LSNb), max(HSN, HSNb), leveled_codec:strip_to_keyonly(last(BlockKeyList, diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 6887556..327d421 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -16,7 +16,7 @@ all() -> [ many_put_fetch_head, journal_compaction, fetchput_snapshot, - load_and_count, + load_and_count , load_and_count_withdelete, space_clear_ondelete_test ]. @@ -149,7 +149,7 @@ journal_compaction(_Config) -> testutil:check_forobject(Bookie2, TestObject), testutil:check_forlist(Bookie2, ChkList3), ok = leveled_bookie:book_close(Bookie2), - testutil:reset_filestructure(). + testutil:reset_filestructure(10000). fetchput_snapshot(_Config) -> @@ -435,12 +435,28 @@ space_clear_ondelete_test(_Config) -> io:format("Deletion took ~w microseconds for 80K keys~n", [timer:now_diff(os:timestamp(), SW2)]), ok = leveled_bookie:book_compactjournal(Book1, 30000), - timer:sleep(30000), % Allow for any L0 file to be rolled + F = fun leveled_bookie:book_islastcompactionpending/1, + lists:foldl(fun(X, Pending) -> + case Pending of + false -> + false; + true -> + io:format("Loop ~w waiting for journal " + ++ "compaction to complete~n", [X]), + timer:sleep(20000), + F(Book1) + end end, + true, + lists:seq(1, 15)), + io:format("Waiting for journal deletes~n"), + timer:sleep(20000), {ok, FNsB_L} = file:list_dir(RootPath ++ "/ledger/ledger_files"), {ok, FNsB_J} = file:list_dir(RootPath ++ "/journal/journal_files"), + {ok, FNsB_PC} = file:list_dir(RootPath ++ "/journal/journal_files/post_compact"), + PointB_Journals = length(FNsB_J) + length(FNsB_PC), io:format("Bookie has ~w journal files and ~w ledger files " ++ "after deletes~n", - [length(FNsB_J), length(FNsB_L)]), + [PointB_Journals, length(FNsB_L)]), {async, F2} = leveled_bookie:book_returnfolder(Book1, {keylist, o_rkv}), SW3 = os:timestamp(), @@ -465,7 +481,20 @@ space_clear_ondelete_test(_Config) -> end, ok = leveled_bookie:book_close(Book2), {ok, FNsC_L} = file:list_dir(RootPath ++ "/ledger/ledger_files"), - {ok, FNsC_J} = file:list_dir(RootPath ++ "/journal/journal_files"), - io:format("Bookie has ~w journal files and ~w ledger files " ++ - "after deletes~n", - [length(FNsC_J), length(FNsC_L)]). + io:format("Bookie has ~w ledger files " ++ + "after close~n", [length(FNsC_L)]), + + {ok, Book3} = leveled_bookie:book_start(StartOpts1), + io:format("This should cause a final ledger merge event~n"), + io:format("Will require the penciller to resolve the issue of creating" ++ + " an empty file as all keys compact on merge~n"), + timer:sleep(5000), + ok = leveled_bookie:book_close(Book3), + {ok, FNsD_L} = file:list_dir(RootPath ++ "/ledger/ledger_files"), + io:format("Bookie has ~w ledger files " ++ + "after second close~n", [length(FNsD_L)]), + true = PointB_Journals < length(FNsA_J), + true = length(FNsB_L) =< length(FNsA_L), + true = length(FNsC_L) =< length(FNsB_L), + true = length(FNsD_L) =< length(FNsB_L), + true = length(FNsD_L) == 0. \ No newline at end of file diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index b0a4707..ec002e7 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -3,6 +3,7 @@ -include("../include/leveled.hrl"). -export([reset_filestructure/0, + reset_filestructure/1, check_bucket_stats/2, check_forlist/2, check_forlist/3, @@ -27,9 +28,12 @@ reset_filestructure() -> - % io:format("Waiting ~w ms to give a chance for all file closes " ++ - % "to complete~n", [Wait]), - % timer:sleep(Wait), + reset_filestructure(0). + +reset_filestructure(Wait) -> + io:format("Waiting ~w ms to give a chance for all file closes " ++ + "to complete~n", [Wait]), + timer:sleep(Wait), RootPath = "test", filelib:ensure_dir(RootPath ++ "/journal/"), filelib:ensure_dir(RootPath ++ "/ledger/"),