From 254183369e6644a5ba07a9b4056d3d525fefbd36 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Wed, 26 Oct 2016 20:39:16 +0100 Subject: [PATCH] CDB - switch to gen_fsm The CDB file management server has distinct states, and was growing case logic to prevent certain messages from being handled in ceratin states, and to handle different messages differently. So this has now been converted to a gen_fsm. As part of resolving this, the space_clear_ondelete test has been completed, and completing this revealed that the Penciller could not cope with a change which emptied the ledger. So a series of changes has been handled to allow it to smoothly progress to an empty manifest. --- src/leveled_bookie.erl | 6 + src/leveled_cdb.erl | 503 ++++++++++++++++++-------------- src/leveled_iclerk.erl | 17 +- src/leveled_inker.erl | 52 ++-- src/leveled_pclerk.erl | 29 +- src/leveled_sft.erl | 10 +- test/end_to_end/basic_SUITE.erl | 45 ++- test/end_to_end/testutil.erl | 10 +- 8 files changed, 402 insertions(+), 270 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 9c8e507..c01281a 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -146,6 +146,7 @@ book_snapshotstore/3, book_snapshotledger/3, book_compactjournal/2, + book_islastcompactionpending/1, book_close/1]). -include_lib("eunit/include/eunit.hrl"). @@ -234,6 +235,9 @@ book_snapshotledger(Pid, Requestor, Timeout) -> book_compactjournal(Pid, Timeout) -> gen_server:call(Pid, {compact_journal, Timeout}, infinity). +book_islastcompactionpending(Pid) -> + gen_server:call(Pid, confirm_compact, infinity). + book_close(Pid) -> gen_server:call(Pid, close, infinity). @@ -389,6 +393,8 @@ handle_call({compact_journal, Timeout}, _From, State) -> self(), Timeout), {reply, ok, State}; +handle_call(confirm_compact, _From, State) -> + {reply, leveled_inker:ink_compactionpending(State#state.inker), State}; handle_call(close, _From, State) -> {stop, normal, ok, State}. diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 8d6c6b3..63e48d3 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -40,38 +40,50 @@ %% The first word is the corresponding hash value and the second word is a %% file pointer to the actual {key,value} tuple higher in the file. %% +%% + -module(leveled_cdb). --behaviour(gen_server). +-behaviour(gen_fsm). -include("include/leveled.hrl"). -export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3, - cdb_open_writer/1, - cdb_open_writer/2, - cdb_open_reader/1, - cdb_get/2, - cdb_put/3, - cdb_mput/2, - cdb_getpositions/2, - cdb_directfetch/3, - cdb_lastkey/1, - cdb_firstkey/1, - cdb_filename/1, - cdb_keycheck/2, - cdb_scan/4, - cdb_close/1, - cdb_complete/1, - cdb_roll/1, - cdb_returnhashtable/3, - cdb_destroy/1, - cdb_deletepending/1, - hashtable_calc/2]). + handle_sync_event/4, + handle_event/3, + handle_info/3, + terminate/3, + code_change/4, + starting/3, + writer/3, + writer/2, + rolling/3, + reader/3, + reader/2, + delete_pending/3, + delete_pending/2]). + +-export([cdb_open_writer/1, + cdb_open_writer/2, + cdb_open_reader/1, + cdb_get/2, + cdb_put/3, + cdb_mput/2, + cdb_getpositions/2, + cdb_directfetch/3, + cdb_lastkey/1, + cdb_firstkey/1, + cdb_filename/1, + cdb_keycheck/2, + cdb_scan/4, + cdb_close/1, + cdb_complete/1, + cdb_roll/1, + cdb_returnhashtable/3, + cdb_destroy/1, + cdb_deletepending/1, + cdb_deletepending/3, + hashtable_calc/2]). -include_lib("eunit/include/eunit.hrl"). @@ -83,6 +95,7 @@ -define(BASE_POSITION, 2048). -define(WRITE_OPS, [binary, raw, read, write]). -define(PENDING_ROLL_WAIT, 30). +-define(DELETE_TIMEOUT, 10000). -record(state, {hashtree, last_position :: integer(), @@ -90,11 +103,10 @@ hash_index = [] :: list(), filename :: string(), handle :: file:fd(), - writer :: boolean(), max_size :: integer(), - pending_roll = false :: boolean(), - pending_delete = false :: boolean(), - binary_mode = false :: boolean()}). + binary_mode = false :: boolean(), + delete_point = 0 :: integer(), + inker :: pid()}). %%%============================================================================ @@ -106,8 +118,8 @@ cdb_open_writer(Filename) -> cdb_open_writer(Filename, #cdb_options{}). cdb_open_writer(Filename, Opts) -> - {ok, Pid} = gen_server:start(?MODULE, [Opts], []), - case gen_server:call(Pid, {open_writer, Filename}, infinity) of + {ok, Pid} = gen_fsm:start(?MODULE, [Opts], []), + case gen_fsm:sync_send_event(Pid, {open_writer, Filename}, infinity) of ok -> {ok, Pid}; Error -> @@ -115,8 +127,8 @@ cdb_open_writer(Filename, Opts) -> end. cdb_open_reader(Filename) -> - {ok, Pid} = gen_server:start(?MODULE, [#cdb_options{}], []), - case gen_server:call(Pid, {open_reader, Filename}, infinity) of + {ok, Pid} = gen_fsm:start(?MODULE, [#cdb_options{}], []), + case gen_fsm:sync_send_event(Pid, {open_reader, Filename}, infinity) of ok -> {ok, Pid}; Error -> @@ -124,23 +136,23 @@ cdb_open_reader(Filename) -> end. cdb_get(Pid, Key) -> - gen_server:call(Pid, {get_kv, Key}, infinity). + gen_fsm:sync_send_event(Pid, {get_kv, Key}, infinity). cdb_put(Pid, Key, Value) -> - gen_server:call(Pid, {put_kv, Key, Value}, infinity). + gen_fsm:sync_send_event(Pid, {put_kv, Key, Value}, infinity). cdb_mput(Pid, KVList) -> - gen_server:call(Pid, {mput_kv, KVList}, infinity). + gen_fsm:sync_send_event(Pid, {mput_kv, KVList}, infinity). %% SampleSize can be an integer or the atom all cdb_getpositions(Pid, SampleSize) -> - gen_server:call(Pid, {get_positions, SampleSize}, infinity). + gen_fsm:sync_send_event(Pid, {get_positions, SampleSize}, infinity). %% Info can be key_only, key_size (size being the size of the value) or %% key_value_check (with the check part indicating if the CRC is correct for %% the value) cdb_directfetch(Pid, PositionList, Info) -> - gen_server:call(Pid, {direct_fetch, PositionList, Info}, infinity). + gen_fsm:sync_send_event(Pid, {direct_fetch, PositionList, Info}, infinity). cdb_close(Pid) -> cdb_close(Pid, ?PENDING_ROLL_WAIT). @@ -148,7 +160,7 @@ cdb_close(Pid) -> cdb_close(Pid, WaitsLeft) -> if WaitsLeft > 0 -> - case gen_server:call(Pid, cdb_close, infinity) of + case gen_fsm:sync_send_all_state_event(Pid, cdb_close, infinity) of pending_roll -> timer:sleep(1), cdb_close(Pid, WaitsLeft - 1); @@ -156,23 +168,26 @@ cdb_close(Pid, WaitsLeft) -> R end; true -> - gen_server:call(Pid, cdb_kill, infinity) + gen_fsm:sync_send_event(Pid, cdb_kill, infinity) end. cdb_complete(Pid) -> - gen_server:call(Pid, cdb_complete, infinity). + gen_fsm:sync_send_event(Pid, cdb_complete, infinity). cdb_roll(Pid) -> - gen_server:cast(Pid, cdb_roll). + gen_fsm:send_event(Pid, cdb_roll). cdb_returnhashtable(Pid, IndexList, HashTreeBin) -> - gen_server:call(Pid, {return_hashtable, IndexList, HashTreeBin}, infinity). + gen_fsm:sync_send_event(Pid, {return_hashtable, IndexList, HashTreeBin}, infinity). cdb_destroy(Pid) -> - gen_server:cast(Pid, destroy). + gen_fsm:send_event(Pid, destroy). cdb_deletepending(Pid) -> - gen_server:cast(Pid, delete_pending). + cdb_deletepending(Pid, 0, no_poll). + +cdb_deletepending(Pid, ManSQN, Inker) -> + gen_fsm:send_event(Pid, {delete_pending, ManSQN, Inker}). %% cdb_scan returns {LastPosition, Acc}. Use LastPosition as StartPosiiton to %% continue from that point (calling function has to protect against) double @@ -182,26 +197,29 @@ cdb_deletepending(Pid) -> %% the end of the file. last_key must be defined in LoopState. cdb_scan(Pid, FilterFun, InitAcc, StartPosition) -> - gen_server:call(Pid, - {cdb_scan, FilterFun, InitAcc, StartPosition}, - infinity). + gen_fsm:sync_send_all_state_event(Pid, + {cdb_scan, + FilterFun, + InitAcc, + StartPosition}, + infinity). %% Get the last key to be added to the file (which will have the highest %% sequence number) cdb_lastkey(Pid) -> - gen_server:call(Pid, cdb_lastkey, infinity). + gen_fsm:sync_send_all_state_event(Pid, cdb_lastkey, infinity). cdb_firstkey(Pid) -> - gen_server:call(Pid, cdb_firstkey, infinity). + gen_fsm:sync_send_all_state_event(Pid, cdb_firstkey, infinity). %% Get the filename of the database cdb_filename(Pid) -> - gen_server:call(Pid, cdb_filename, infinity). + gen_fsm:sync_send_all_state_event(Pid, cdb_filename, infinity). %% Check to see if the key is probably present, will return either %% probably or missing. Does not do a definitive check cdb_keycheck(Pid, Key) -> - gen_server:call(Pid, {key_check, Key}, infinity). + gen_fsm:sync_send_event(Pid, {key_check, Key}, infinity). %%%============================================================================ %%% gen_server callbacks @@ -214,120 +232,136 @@ init([Opts]) -> M -> M end, - {ok, #state{max_size=MaxSize, binary_mode=Opts#cdb_options.binary_mode}}. + {ok, + starting, + #state{max_size=MaxSize, binary_mode=Opts#cdb_options.binary_mode}}. -handle_call({open_writer, Filename}, _From, State) -> +starting({open_writer, Filename}, _From, State) -> io:format("Opening file for writing with filename ~s~n", [Filename]), {LastPosition, HashTree, LastKey} = open_active_file(Filename), {ok, Handle} = file:open(Filename, [sync | ?WRITE_OPS]), - {reply, ok, State#state{handle=Handle, - last_position=LastPosition, - last_key=LastKey, - filename=Filename, - hashtree=HashTree, - writer=true}}; -handle_call({open_reader, Filename}, _From, State) -> + {reply, ok, writer, State#state{handle=Handle, + last_position=LastPosition, + last_key=LastKey, + filename=Filename, + hashtree=HashTree}}; +starting({open_reader, Filename}, _From, State) -> io:format("Opening file for reading with filename ~s~n", [Filename]), {Handle, Index, LastKey} = open_for_readonly(Filename), - {reply, ok, State#state{handle=Handle, - last_key=LastKey, - filename=Filename, - writer=false, - hash_index=Index}}; -handle_call({get_kv, Key}, _From, State) -> - case State#state.writer of - true -> - {reply, - get_mem(Key, State#state.handle, State#state.hashtree), - State}; - false -> - {reply, - get_withcache(State#state.handle, Key, State#state.hash_index), - State} - end; -handle_call({key_check, Key}, _From, State) -> - case State#state.writer of - true -> - {reply, - get_mem(Key, - State#state.handle, - State#state.hashtree, - loose_presence), - State}; - false -> - {reply, - get(State#state.handle, + {reply, ok, reader, State#state{handle=Handle, + last_key=LastKey, + filename=Filename, + hash_index=Index}}. + +writer({get_kv, Key}, _From, State) -> + {reply, + get_mem(Key, State#state.handle, State#state.hashtree), + writer, + State}; +writer({key_check, Key}, _From, State) -> + {reply, + get_mem(Key, State#state.handle, State#state.hashtree, loose_presence), + writer, + State}; +writer({put_kv, Key, Value}, _From, State) -> + Result = put(State#state.handle, Key, - loose_presence, - State#state.hash_index), - State} - end; -handle_call({put_kv, Key, Value}, _From, State) -> - case {State#state.writer, State#state.pending_roll} of - {true, false} -> - Result = put(State#state.handle, - Key, Value, - {State#state.last_position, State#state.hashtree}, - State#state.binary_mode, - State#state.max_size), - case Result of - roll -> - %% Key and value could not be written - {reply, roll, State}; - {UpdHandle, NewPosition, HashTree} -> - {reply, ok, State#state{handle=UpdHandle, + Value, + {State#state.last_position, State#state.hashtree}, + State#state.binary_mode, + State#state.max_size), + case Result of + roll -> + %% Key and value could not be written + {reply, roll, writer, State}; + {UpdHandle, NewPosition, HashTree} -> + {reply, ok, writer, State#state{handle=UpdHandle, last_position=NewPosition, last_key=Key, hashtree=HashTree}} - end; - _ -> - {reply, - {error, read_only}, - State} end; -handle_call({mput_kv, KVList}, _From, State) -> - case {State#state.writer, State#state.pending_roll} of - {true, false} -> - Result = mput(State#state.handle, - KVList, - {State#state.last_position, State#state.hashtree}, - State#state.binary_mode, - State#state.max_size), - case Result of - roll -> - %% Keys and values could not be written - {reply, roll, State}; - {UpdHandle, NewPosition, HashTree, LastKey} -> - {reply, ok, State#state{handle=UpdHandle, +writer({mput_kv, KVList}, _From, State) -> + Result = mput(State#state.handle, + KVList, + {State#state.last_position, State#state.hashtree}, + State#state.binary_mode, + State#state.max_size), + case Result of + roll -> + %% Keys and values could not be written + {reply, roll, writer, State}; + {UpdHandle, NewPosition, HashTree, LastKey} -> + {reply, ok, writer, State#state{handle=UpdHandle, last_position=NewPosition, last_key=LastKey, hashtree=HashTree}} - end; - _ -> - {reply, - {error, read_only}, - State} end; -handle_call(cdb_lastkey, _From, State) -> - {reply, State#state.last_key, State}; -handle_call(cdb_firstkey, _From, State) -> - {ok, EOFPos} = file:position(State#state.handle, eof), - FirstKey = case EOFPos of - ?BASE_POSITION -> - empty; - _ -> - extract_key(State#state.handle, ?BASE_POSITION) - end, - {reply, FirstKey, State}; -handle_call(cdb_filename, _From, State) -> - {reply, State#state.filename, State}; -handle_call({get_positions, SampleSize}, _From, State) -> +writer(cdb_complete, _From, State) -> + NewName = determine_new_filename(State#state.filename), + ok = close_file(State#state.handle, + State#state.hashtree, + State#state.last_position), + ok = rename_for_read(State#state.filename, NewName), + {stop, normal, {ok, NewName}, State}. + +writer(cdb_roll, State) -> + ok = leveled_iclerk:clerk_hashtablecalc(State#state.hashtree, + State#state.last_position, + self()), + {next_state, rolling, State}. + + +rolling({get_kv, Key}, _From, State) -> + {reply, + get_mem(Key, State#state.handle, State#state.hashtree), + rolling, + State}; +rolling({key_check, Key}, _From, State) -> + {reply, + get_mem(Key, State#state.handle, State#state.hashtree, loose_presence), + rolling, + State}; +rolling(cdb_filename, _From, State) -> + {reply, State#state.filename, rolling, State}; +rolling({return_hashtable, IndexList, HashTreeBin}, _From, State) -> + Handle = State#state.handle, + {ok, BasePos} = file:position(Handle, State#state.last_position), + NewName = determine_new_filename(State#state.filename), + ok = perform_write_hash_tables(Handle, HashTreeBin, BasePos), + ok = write_top_index_table(Handle, BasePos, IndexList), + file:close(Handle), + ok = rename_for_read(State#state.filename, NewName), + io:format("Opening file for reading with filename ~s~n", [NewName]), + {NewHandle, Index, LastKey} = open_for_readonly(NewName), + {reply, ok, reader, State#state{handle=NewHandle, + last_key=LastKey, + filename=NewName, + hash_index=Index}}; +rolling(cdb_kill, _From, State) -> + {stop, killed, ok, State}. + +reader({get_kv, Key}, _From, State) -> + {reply, + get_withcache(State#state.handle, Key, State#state.hash_index), + reader, + State}; +reader({key_check, Key}, _From, State) -> + {reply, + get(State#state.handle, + Key, + loose_presence, + State#state.hash_index), + reader, + State}; +reader({get_positions, SampleSize}, _From, State) -> case SampleSize of all -> - {reply, scan_index(State#state.handle, - State#state.hash_index, - {fun scan_index_returnpositions/4, []}), - State}; + {reply, + scan_index(State#state.handle, + State#state.hash_index, + {fun scan_index_returnpositions/4, []}), + reader, + State}; _ -> SeededL = lists:map(fun(X) -> {random:uniform(), X} end, State#state.hash_index), @@ -339,28 +373,94 @@ handle_call({get_positions, SampleSize}, _From, State) -> fun scan_index_returnpositions/4, [], SampleSize), + reader, State} end; -handle_call({direct_fetch, PositionList, Info}, _From, State) -> +reader({direct_fetch, PositionList, Info}, _From, State) -> H = State#state.handle, case Info of key_only -> KeyList = lists:map(fun(P) -> extract_key(H, P) end, PositionList), - {reply, KeyList, State}; + {reply, KeyList, reader, State}; key_size -> KeySizeList = lists:map(fun(P) -> extract_key_size(H, P) end, PositionList), - {reply, KeySizeList, State}; + {reply, KeySizeList, reader, State}; key_value_check -> KVCList = lists:map(fun(P) -> extract_key_value_check(H, P) end, PositionList), - {reply, KVCList, State} + {reply, KVCList, reader, State} end; -handle_call({cdb_scan, FilterFun, Acc, StartPos}, _From, State) -> +reader(cdb_complete, _From, State) -> + ok = file:close(State#state.handle), + {stop, normal, {ok, State#state.filename}, State#state{handle=undefined}}. + + +reader({delete_pending, 0, no_poll}, State) -> + {next_state, + delete_pending, + State#state{delete_point=0}}; +reader({delete_pending, ManSQN, Inker}, State) -> + {next_state, + delete_pending, + State#state{delete_point=ManSQN, inker=Inker}, + ?DELETE_TIMEOUT}. + + +delete_pending({get_kv, Key}, _From, State) -> + {reply, + get_withcache(State#state.handle, Key, State#state.hash_index), + delete_pending, + State, + ?DELETE_TIMEOUT}; +delete_pending({key_check, Key}, _From, State) -> + {reply, + get(State#state.handle, + Key, + loose_presence, + State#state.hash_index), + delete_pending, + State, + ?DELETE_TIMEOUT}. + +delete_pending(timeout, State) -> + case State#state.delete_point of + 0 -> + {next_state, delete_pending, State}; + ManSQN -> + case is_process_alive(State#state.inker) of + true -> + case leveled_inker:ink_confirmdelete(State#state.inker, + ManSQN) of + true -> + io:format("Deletion confirmed for file ~s " + ++ "at ManifestSQN ~w~n", + [State#state.filename, ManSQN]), + {stop, normal, State}; + false -> + {next_state, + delete_pending, + State, + ?DELETE_TIMEOUT} + end; + false -> + {stop, normal, State} + end + end; +delete_pending(destroy, State) -> + ok = file:close(State#state.handle), + ok = file:delete(State#state.filename), + {stop, normal, State}. + + +handle_sync_event({cdb_scan, FilterFun, Acc, StartPos}, + _From, + StateName, + State) -> {ok, StartPos0} = case StartPos of undefined -> file:position(State#state.handle, @@ -375,77 +475,50 @@ handle_call({cdb_scan, FilterFun, Acc, StartPos}, _From, State) -> FilterFun, Acc, State#state.last_key), - {reply, {LastPosition, Acc2}, State}; + {reply, {LastPosition, Acc2}, StateName, State}; empty -> - {reply, {eof, Acc}, State} + {reply, {eof, Acc}, StateName, State} end; -handle_call(cdb_close, _From, State=#state{pending_roll=RollPending}) - when RollPending == true -> - {reply, pending_roll, State}; -handle_call(cdb_close, _From, State) -> +handle_sync_event(cdb_lastkey, _From, StateName, State) -> + {reply, State#state.last_key, StateName, State}; +handle_sync_event(cdb_firstkey, _From, StateName, State) -> + {ok, EOFPos} = file:position(State#state.handle, eof), + FirstKey = case EOFPos of + ?BASE_POSITION -> + empty; + _ -> + extract_key(State#state.handle, ?BASE_POSITION) + end, + {reply, FirstKey, StateName, State}; +handle_sync_event(cdb_filename, _From, StateName, State) -> + {reply, State#state.filename, StateName, State}; +handle_sync_event(cdb_close, _From, rolling, State) -> + {reply, pending_roll, rolling, State}; +handle_sync_event(cdb_close, _From, _StateName, State) -> ok = file:close(State#state.handle), - {stop, normal, ok, State#state{handle=undefined}}; -handle_call(cdb_kill, _From, State) -> - {stop, killed, ok, State}; -handle_call(cdb_complete, _From, State=#state{writer=Writer}) - when Writer == true -> - NewName = determine_new_filename(State#state.filename), - ok = close_file(State#state.handle, - State#state.hashtree, - State#state.last_position), - ok = rename_for_read(State#state.filename, NewName), - {stop, normal, {ok, NewName}, State}; -handle_call(cdb_complete, _From, State) -> - ok = file:close(State#state.handle), - {stop, normal, {ok, State#state.filename}, State}; -handle_call({return_hashtable, IndexList, HashTreeBin}, - _From, - State=#state{pending_roll=RollPending}) when RollPending == true -> - Handle = State#state.handle, - {ok, BasePos} = file:position(Handle, State#state.last_position), - NewName = determine_new_filename(State#state.filename), - ok = perform_write_hash_tables(Handle, HashTreeBin, BasePos), - ok = write_top_index_table(Handle, BasePos, IndexList), - file:close(Handle), - ok = rename_for_read(State#state.filename, NewName), - io:format("Opening file for reading with filename ~s~n", [NewName]), - {NewHandle, Index, LastKey} = open_for_readonly(NewName), - {reply, ok, State#state{handle=NewHandle, - last_key=LastKey, - filename=NewName, - writer=false, - pending_roll=false, - hash_index=Index}}. + {stop, normal, ok, State#state{handle=undefined}}. +handle_event(_Msg, StateName, State) -> + {next_State, StateName, State}. -handle_cast(destroy, State) -> - ok = file:close(State#state.handle), - ok = file:delete(State#state.filename), - {noreply, State}; -handle_cast(delete_pending, State) -> - {noreply, State#state{pending_delete=true}}; -handle_cast(cdb_roll, State=#state{writer=Writer}) when Writer == true -> - ok = leveled_iclerk:clerk_hashtablecalc(State#state.hashtree, - State#state.last_position, - self()), - {noreply, State#state{pending_roll=true}}. +handle_info(_Msg, StateName, State) -> + {next_State, StateName, State}. -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, State) -> - case {State#state.handle, State#state.pending_delete} of +terminate(Reason, StateName, State) -> + io:format("Closing of filename ~s for Reason ~w~n", + [State#state.filename, Reason]), + case {State#state.handle, StateName} of {undefined, _} -> ok; - {Handle, false} -> - file:close(Handle); - {Handle, true} -> + {Handle, delete_pending} -> file:close(Handle), - file:delete(State#state.filename) + file:delete(State#state.filename); + {Handle, _} -> + file:close(Handle) end. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. +code_change(_OldVsn, StateName, State, _Extra) -> + {ok, StateName, State}. %%%============================================================================ %%% Internal functions diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 95cb69b..807eaf3 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -179,14 +179,20 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout}, C#candidate.journal} end, BestRun), - ok = leveled_inker:ink_updatemanifest(Inker, - ManifestSlice, - FilesToDelete), + io:format("Clerk updating Inker as compaction complete of " ++ + "~w files~n", [length(FilesToDelete)]), + {ok, ManSQN} = leveled_inker:ink_updatemanifest(Inker, + ManifestSlice, + FilesToDelete), ok = leveled_inker:ink_compactioncomplete(Inker), + io:format("Clerk has completed compaction process~n"), case PromptDelete of true -> lists:foreach(fun({_SQN, _FN, J2D}) -> - leveled_cdb:cdb_deletepending(J2D) end, + leveled_cdb:cdb_deletepending(J2D, + ManSQN, + Inker) + end, FilesToDelete), {noreply, State}; false -> @@ -639,6 +645,7 @@ check_single_file_test() -> ?assertMatch(37.5, Score3), Score4 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 4, 8, 4), ?assertMatch(75.0, Score4), + ok = leveled_cdb:cdb_deletepending(CDB), ok = leveled_cdb:cdb_destroy(CDB). @@ -698,6 +705,7 @@ compact_single_file_recovr_test() -> stnd, test_ledgerkey("Key2")}), ?assertMatch({"Value2", []}, binary_to_term(RV1)), + ok = leveled_cdb:cdb_deletepending(CDB), ok = leveled_cdb:cdb_destroy(CDB). @@ -736,6 +744,7 @@ compact_single_file_retain_test() -> stnd, test_ledgerkey("Key2")}), ?assertMatch({"Value2", []}, binary_to_term(RV1)), + ok = leveled_cdb:cdb_deletepending(CDB), ok = leveled_cdb:cdb_destroy(CDB). compact_empty_file_test() -> diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 0fd512a..7ef5cf4 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -99,8 +99,10 @@ ink_fetch/3, ink_loadpcl/4, ink_registersnapshot/2, + ink_confirmdelete/2, ink_compactjournal/3, ink_compactioncomplete/1, + ink_compactionpending/1, ink_getmanifest/1, ink_updatemanifest/3, ink_print_manifest/1, @@ -159,6 +161,9 @@ ink_registersnapshot(Pid, Requestor) -> ink_releasesnapshot(Pid, Snapshot) -> gen_server:call(Pid, {release_snapshot, Snapshot}, infinity). +ink_confirmdelete(Pid, ManSQN) -> + gen_server:call(Pid, {confirm_delete, ManSQN}, 1000). + ink_close(Pid) -> gen_server:call(Pid, {close, false}, infinity). @@ -193,6 +198,9 @@ ink_compactjournal(Pid, Checker, InitiateFun, FilterFun, Timeout) -> ink_compactioncomplete(Pid) -> gen_server:call(Pid, compaction_complete, infinity). +ink_compactionpending(Pid) -> + gen_server:call(Pid, compaction_pending, infinity). + ink_getmanifest(Pid) -> gen_server:call(Pid, get_manifest, infinity). @@ -263,6 +271,17 @@ handle_call({release_snapshot, Snapshot}, _From , State) -> io:format("Ledger snapshot ~w released~n", [Snapshot]), io:format("Remaining ledger snapshots are ~w~n", [Rs]), {reply, ok, State#state{registered_snapshots=Rs}}; +handle_call({confirm_delete, ManSQN}, _From, State) -> + Reply = lists:foldl(fun({_R, SnapSQN}, Bool) -> + case SnapSQN < ManSQN of + true -> + Bool; + false -> + false + end end, + true, + State#state.registered_snapshots), + {reply, Reply, State}; handle_call(get_manifest, _From, State) -> {reply, State#state.manifest, State}; handle_call({update_manifest, @@ -280,10 +299,11 @@ handle_call({update_manifest, NewManifestSQN = State#state.manifest_sqn + 1, manifest_printer(Man1), ok = simple_manifest_writer(Man1, NewManifestSQN, State#state.root_path), - PendingRemovals = [{NewManifestSQN, DeletedFiles}], - {reply, ok, State#state{manifest=Man1, - manifest_sqn=NewManifestSQN, - pending_removals=PendingRemovals}}; + {reply, + {ok, NewManifestSQN}, + State#state{manifest=Man1, + manifest_sqn=NewManifestSQN, + pending_removals=DeletedFiles}}; handle_call(print_manifest, _From, State) -> manifest_printer(State#state.manifest), {reply, ok, State}; @@ -302,6 +322,8 @@ handle_call({compact, {reply, ok, State#state{compaction_pending=true}}; handle_call(compaction_complete, _From, State) -> {reply, ok, State#state{compaction_pending=false}}; +handle_call(compaction_pending, _From, State) -> + {reply, State#state.compaction_pending, State}; handle_call({close, Force}, _From, State) -> case {State#state.compaction_pending, Force} of {true, false} -> @@ -329,8 +351,7 @@ terminate(Reason, State) -> lists:foreach(fun({Snap, _SQN}) -> ok = ink_close(Snap) end, State#state.registered_snapshots), manifest_printer(State#state.manifest), - ok = close_allmanifest(State#state.manifest), - ok = close_allremovals(State#state.pending_removals) + ok = close_allmanifest(State#state.manifest) end. code_change(_OldVsn, State, _Extra) -> @@ -552,25 +573,6 @@ find_in_manifest(SQN, [_Head|Tail]) -> find_in_manifest(SQN, Tail). -close_allremovals([]) -> - ok; -close_allremovals([{ManifestSQN, Removals}|Tail]) -> - io:format("Closing removals at ManifestSQN=~w~n", [ManifestSQN]), - lists:foreach(fun({LowSQN, FN, Handle}) -> - io:format("Closing removed file with LowSQN=~w" ++ - " and filename ~s~n", - [LowSQN, FN]), - if - is_pid(Handle) == true -> - ok = leveled_cdb:cdb_close(Handle); - true -> - io:format("Non pid in removal ~w - test~n", - [Handle]) - end - end, - Removals), - close_allremovals(Tail). - %% Scan between sequence numbers applying FilterFun to each entry where %% FilterFun{K, V, Acc} -> Penciller Key List diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index bb8978c..7ccab3f 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -319,17 +319,24 @@ do_merge(KL1, KL2, {SrcLevel, IsB}, {Filepath, MSN}, FileCounter, OutList) -> KL1, KL2, LevelR), - {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply, - ExtMan = lists:append(OutList, - [#manifest_entry{start_key=SmallestKey, - end_key=HighestKey, - owner=Pid, - filename=FileName}]), - MTime = timer:now_diff(os:timestamp(), TS1), - io:format("File creation took ~w microseconds ~n", [MTime]), - do_merge(KL1Rem, KL2Rem, - {SrcLevel, IsB}, {Filepath, MSN}, - FileCounter + 1, ExtMan). + case Reply of + {{[], []}, null, _} -> + io:format("Merge resulted in empty file ~s~n", [FileName]), + io:format("Empty file ~s to be cleared~n", [FileName]), + ok = leveled_sft:sft_clear(Pid), + OutList; + {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} -> + ExtMan = lists:append(OutList, + [#manifest_entry{start_key=SmallestKey, + end_key=HighestKey, + owner=Pid, + filename=FileName}]), + MTime = timer:now_diff(os:timestamp(), TS1), + io:format("File creation took ~w microseconds ~n", [MTime]), + do_merge(KL1Rem, KL2Rem, + {SrcLevel, IsB}, {Filepath, MSN}, + FileCounter + 1, ExtMan) + end. get_item(Index, List, Default) -> diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 85392a7..4649f5d 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -467,7 +467,7 @@ create_file(FileName) when is_list(FileName) -> {error, Reason} -> io:format("Error opening filename ~s with reason ~w", [FileName, Reason]), - {error, Reason} + error end. @@ -1037,15 +1037,17 @@ create_slot(KL1, KL2, LevelR, BlockCount, SegLists, SerialisedSlot, LengthList, {BlockKeyList, Status, {LSNb, HSNb}, SegmentList, KL1b, KL2b} = create_block(KL1, KL2, LevelR), - TrackingMetadata = case LowKey of - null -> + TrackingMetadata = case {LowKey, BlockKeyList} of + {null, []} -> + {null, LSN, HSN, LastKey, Status}; + {null, _} -> [NewLowKeyV|_] = BlockKeyList, {leveled_codec:strip_to_keyonly(NewLowKeyV), min(LSN, LSNb), max(HSN, HSNb), leveled_codec:strip_to_keyonly(last(BlockKeyList, {last, LastKey})), Status}; - _ -> + {_, _} -> {LowKey, min(LSN, LSNb), max(HSN, HSNb), leveled_codec:strip_to_keyonly(last(BlockKeyList, diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 6887556..327d421 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -16,7 +16,7 @@ all() -> [ many_put_fetch_head, journal_compaction, fetchput_snapshot, - load_and_count, + load_and_count , load_and_count_withdelete, space_clear_ondelete_test ]. @@ -149,7 +149,7 @@ journal_compaction(_Config) -> testutil:check_forobject(Bookie2, TestObject), testutil:check_forlist(Bookie2, ChkList3), ok = leveled_bookie:book_close(Bookie2), - testutil:reset_filestructure(). + testutil:reset_filestructure(10000). fetchput_snapshot(_Config) -> @@ -435,12 +435,28 @@ space_clear_ondelete_test(_Config) -> io:format("Deletion took ~w microseconds for 80K keys~n", [timer:now_diff(os:timestamp(), SW2)]), ok = leveled_bookie:book_compactjournal(Book1, 30000), - timer:sleep(30000), % Allow for any L0 file to be rolled + F = fun leveled_bookie:book_islastcompactionpending/1, + lists:foldl(fun(X, Pending) -> + case Pending of + false -> + false; + true -> + io:format("Loop ~w waiting for journal " + ++ "compaction to complete~n", [X]), + timer:sleep(20000), + F(Book1) + end end, + true, + lists:seq(1, 15)), + io:format("Waiting for journal deletes~n"), + timer:sleep(20000), {ok, FNsB_L} = file:list_dir(RootPath ++ "/ledger/ledger_files"), {ok, FNsB_J} = file:list_dir(RootPath ++ "/journal/journal_files"), + {ok, FNsB_PC} = file:list_dir(RootPath ++ "/journal/journal_files/post_compact"), + PointB_Journals = length(FNsB_J) + length(FNsB_PC), io:format("Bookie has ~w journal files and ~w ledger files " ++ "after deletes~n", - [length(FNsB_J), length(FNsB_L)]), + [PointB_Journals, length(FNsB_L)]), {async, F2} = leveled_bookie:book_returnfolder(Book1, {keylist, o_rkv}), SW3 = os:timestamp(), @@ -465,7 +481,20 @@ space_clear_ondelete_test(_Config) -> end, ok = leveled_bookie:book_close(Book2), {ok, FNsC_L} = file:list_dir(RootPath ++ "/ledger/ledger_files"), - {ok, FNsC_J} = file:list_dir(RootPath ++ "/journal/journal_files"), - io:format("Bookie has ~w journal files and ~w ledger files " ++ - "after deletes~n", - [length(FNsC_J), length(FNsC_L)]). + io:format("Bookie has ~w ledger files " ++ + "after close~n", [length(FNsC_L)]), + + {ok, Book3} = leveled_bookie:book_start(StartOpts1), + io:format("This should cause a final ledger merge event~n"), + io:format("Will require the penciller to resolve the issue of creating" ++ + " an empty file as all keys compact on merge~n"), + timer:sleep(5000), + ok = leveled_bookie:book_close(Book3), + {ok, FNsD_L} = file:list_dir(RootPath ++ "/ledger/ledger_files"), + io:format("Bookie has ~w ledger files " ++ + "after second close~n", [length(FNsD_L)]), + true = PointB_Journals < length(FNsA_J), + true = length(FNsB_L) =< length(FNsA_L), + true = length(FNsC_L) =< length(FNsB_L), + true = length(FNsD_L) =< length(FNsB_L), + true = length(FNsD_L) == 0. \ No newline at end of file diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index b0a4707..ec002e7 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -3,6 +3,7 @@ -include("../include/leveled.hrl"). -export([reset_filestructure/0, + reset_filestructure/1, check_bucket_stats/2, check_forlist/2, check_forlist/3, @@ -27,9 +28,12 @@ reset_filestructure() -> - % io:format("Waiting ~w ms to give a chance for all file closes " ++ - % "to complete~n", [Wait]), - % timer:sleep(Wait), + reset_filestructure(0). + +reset_filestructure(Wait) -> + io:format("Waiting ~w ms to give a chance for all file closes " ++ + "to complete~n", [Wait]), + timer:sleep(Wait), RootPath = "test", filelib:ensure_dir(RootPath ++ "/journal/"), filelib:ensure_dir(RootPath ++ "/ledger/"),