CDB - switch to gen_fsm

The CDB file management server has distinct states, and was growing case
logic to prevent certain messages from being handled in ceratin states,
and to handle different messages differently.  So this has now been
converted to a gen_fsm.

As part of resolving this, the space_clear_ondelete test has been
completed, and completing this revealed that the Penciller could not
cope with a change which emptied the ledger.  So a series of changes has
been handled to allow it to smoothly progress to an empty manifest.
This commit is contained in:
martinsumner 2016-10-26 20:39:16 +01:00
parent 6f40869070
commit 254183369e
8 changed files with 402 additions and 270 deletions

View file

@ -146,6 +146,7 @@
book_snapshotstore/3,
book_snapshotledger/3,
book_compactjournal/2,
book_islastcompactionpending/1,
book_close/1]).
-include_lib("eunit/include/eunit.hrl").
@ -234,6 +235,9 @@ book_snapshotledger(Pid, Requestor, Timeout) ->
book_compactjournal(Pid, Timeout) ->
gen_server:call(Pid, {compact_journal, Timeout}, infinity).
book_islastcompactionpending(Pid) ->
gen_server:call(Pid, confirm_compact, infinity).
book_close(Pid) ->
gen_server:call(Pid, close, infinity).
@ -389,6 +393,8 @@ handle_call({compact_journal, Timeout}, _From, State) ->
self(),
Timeout),
{reply, ok, State};
handle_call(confirm_compact, _From, State) ->
{reply, leveled_inker:ink_compactionpending(State#state.inker), State};
handle_call(close, _From, State) ->
{stop, normal, ok, State}.

View file

@ -40,19 +40,30 @@
%% The first word is the corresponding hash value and the second word is a
%% file pointer to the actual {key,value} tuple higher in the file.
%%
%%
-module(leveled_cdb).
-behaviour(gen_server).
-behaviour(gen_fsm).
-include("include/leveled.hrl").
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3,
cdb_open_writer/1,
handle_sync_event/4,
handle_event/3,
handle_info/3,
terminate/3,
code_change/4,
starting/3,
writer/3,
writer/2,
rolling/3,
reader/3,
reader/2,
delete_pending/3,
delete_pending/2]).
-export([cdb_open_writer/1,
cdb_open_writer/2,
cdb_open_reader/1,
cdb_get/2,
@ -71,6 +82,7 @@
cdb_returnhashtable/3,
cdb_destroy/1,
cdb_deletepending/1,
cdb_deletepending/3,
hashtable_calc/2]).
-include_lib("eunit/include/eunit.hrl").
@ -83,6 +95,7 @@
-define(BASE_POSITION, 2048).
-define(WRITE_OPS, [binary, raw, read, write]).
-define(PENDING_ROLL_WAIT, 30).
-define(DELETE_TIMEOUT, 10000).
-record(state, {hashtree,
last_position :: integer(),
@ -90,11 +103,10 @@
hash_index = [] :: list(),
filename :: string(),
handle :: file:fd(),
writer :: boolean(),
max_size :: integer(),
pending_roll = false :: boolean(),
pending_delete = false :: boolean(),
binary_mode = false :: boolean()}).
binary_mode = false :: boolean(),
delete_point = 0 :: integer(),
inker :: pid()}).
%%%============================================================================
@ -106,8 +118,8 @@ cdb_open_writer(Filename) ->
cdb_open_writer(Filename, #cdb_options{}).
cdb_open_writer(Filename, Opts) ->
{ok, Pid} = gen_server:start(?MODULE, [Opts], []),
case gen_server:call(Pid, {open_writer, Filename}, infinity) of
{ok, Pid} = gen_fsm:start(?MODULE, [Opts], []),
case gen_fsm:sync_send_event(Pid, {open_writer, Filename}, infinity) of
ok ->
{ok, Pid};
Error ->
@ -115,8 +127,8 @@ cdb_open_writer(Filename, Opts) ->
end.
cdb_open_reader(Filename) ->
{ok, Pid} = gen_server:start(?MODULE, [#cdb_options{}], []),
case gen_server:call(Pid, {open_reader, Filename}, infinity) of
{ok, Pid} = gen_fsm:start(?MODULE, [#cdb_options{}], []),
case gen_fsm:sync_send_event(Pid, {open_reader, Filename}, infinity) of
ok ->
{ok, Pid};
Error ->
@ -124,23 +136,23 @@ cdb_open_reader(Filename) ->
end.
cdb_get(Pid, Key) ->
gen_server:call(Pid, {get_kv, Key}, infinity).
gen_fsm:sync_send_event(Pid, {get_kv, Key}, infinity).
cdb_put(Pid, Key, Value) ->
gen_server:call(Pid, {put_kv, Key, Value}, infinity).
gen_fsm:sync_send_event(Pid, {put_kv, Key, Value}, infinity).
cdb_mput(Pid, KVList) ->
gen_server:call(Pid, {mput_kv, KVList}, infinity).
gen_fsm:sync_send_event(Pid, {mput_kv, KVList}, infinity).
%% SampleSize can be an integer or the atom all
cdb_getpositions(Pid, SampleSize) ->
gen_server:call(Pid, {get_positions, SampleSize}, infinity).
gen_fsm:sync_send_event(Pid, {get_positions, SampleSize}, infinity).
%% Info can be key_only, key_size (size being the size of the value) or
%% key_value_check (with the check part indicating if the CRC is correct for
%% the value)
cdb_directfetch(Pid, PositionList, Info) ->
gen_server:call(Pid, {direct_fetch, PositionList, Info}, infinity).
gen_fsm:sync_send_event(Pid, {direct_fetch, PositionList, Info}, infinity).
cdb_close(Pid) ->
cdb_close(Pid, ?PENDING_ROLL_WAIT).
@ -148,7 +160,7 @@ cdb_close(Pid) ->
cdb_close(Pid, WaitsLeft) ->
if
WaitsLeft > 0 ->
case gen_server:call(Pid, cdb_close, infinity) of
case gen_fsm:sync_send_all_state_event(Pid, cdb_close, infinity) of
pending_roll ->
timer:sleep(1),
cdb_close(Pid, WaitsLeft - 1);
@ -156,23 +168,26 @@ cdb_close(Pid, WaitsLeft) ->
R
end;
true ->
gen_server:call(Pid, cdb_kill, infinity)
gen_fsm:sync_send_event(Pid, cdb_kill, infinity)
end.
cdb_complete(Pid) ->
gen_server:call(Pid, cdb_complete, infinity).
gen_fsm:sync_send_event(Pid, cdb_complete, infinity).
cdb_roll(Pid) ->
gen_server:cast(Pid, cdb_roll).
gen_fsm:send_event(Pid, cdb_roll).
cdb_returnhashtable(Pid, IndexList, HashTreeBin) ->
gen_server:call(Pid, {return_hashtable, IndexList, HashTreeBin}, infinity).
gen_fsm:sync_send_event(Pid, {return_hashtable, IndexList, HashTreeBin}, infinity).
cdb_destroy(Pid) ->
gen_server:cast(Pid, destroy).
gen_fsm:send_event(Pid, destroy).
cdb_deletepending(Pid) ->
gen_server:cast(Pid, delete_pending).
cdb_deletepending(Pid, 0, no_poll).
cdb_deletepending(Pid, ManSQN, Inker) ->
gen_fsm:send_event(Pid, {delete_pending, ManSQN, Inker}).
%% cdb_scan returns {LastPosition, Acc}. Use LastPosition as StartPosiiton to
%% continue from that point (calling function has to protect against) double
@ -182,26 +197,29 @@ cdb_deletepending(Pid) ->
%% the end of the file. last_key must be defined in LoopState.
cdb_scan(Pid, FilterFun, InitAcc, StartPosition) ->
gen_server:call(Pid,
{cdb_scan, FilterFun, InitAcc, StartPosition},
gen_fsm:sync_send_all_state_event(Pid,
{cdb_scan,
FilterFun,
InitAcc,
StartPosition},
infinity).
%% Get the last key to be added to the file (which will have the highest
%% sequence number)
cdb_lastkey(Pid) ->
gen_server:call(Pid, cdb_lastkey, infinity).
gen_fsm:sync_send_all_state_event(Pid, cdb_lastkey, infinity).
cdb_firstkey(Pid) ->
gen_server:call(Pid, cdb_firstkey, infinity).
gen_fsm:sync_send_all_state_event(Pid, cdb_firstkey, infinity).
%% Get the filename of the database
cdb_filename(Pid) ->
gen_server:call(Pid, cdb_filename, infinity).
gen_fsm:sync_send_all_state_event(Pid, cdb_filename, infinity).
%% Check to see if the key is probably present, will return either
%% probably or missing. Does not do a definitive check
cdb_keycheck(Pid, Key) ->
gen_server:call(Pid, {key_check, Key}, infinity).
gen_fsm:sync_send_event(Pid, {key_check, Key}, infinity).
%%%============================================================================
%%% gen_server callbacks
@ -214,80 +232,55 @@ init([Opts]) ->
M ->
M
end,
{ok, #state{max_size=MaxSize, binary_mode=Opts#cdb_options.binary_mode}}.
{ok,
starting,
#state{max_size=MaxSize, binary_mode=Opts#cdb_options.binary_mode}}.
handle_call({open_writer, Filename}, _From, State) ->
starting({open_writer, Filename}, _From, State) ->
io:format("Opening file for writing with filename ~s~n", [Filename]),
{LastPosition, HashTree, LastKey} = open_active_file(Filename),
{ok, Handle} = file:open(Filename, [sync | ?WRITE_OPS]),
{reply, ok, State#state{handle=Handle,
{reply, ok, writer, State#state{handle=Handle,
last_position=LastPosition,
last_key=LastKey,
filename=Filename,
hashtree=HashTree,
writer=true}};
handle_call({open_reader, Filename}, _From, State) ->
hashtree=HashTree}};
starting({open_reader, Filename}, _From, State) ->
io:format("Opening file for reading with filename ~s~n", [Filename]),
{Handle, Index, LastKey} = open_for_readonly(Filename),
{reply, ok, State#state{handle=Handle,
{reply, ok, reader, State#state{handle=Handle,
last_key=LastKey,
filename=Filename,
writer=false,
hash_index=Index}};
handle_call({get_kv, Key}, _From, State) ->
case State#state.writer of
true ->
hash_index=Index}}.
writer({get_kv, Key}, _From, State) ->
{reply,
get_mem(Key, State#state.handle, State#state.hashtree),
writer,
State};
false ->
writer({key_check, Key}, _From, State) ->
{reply,
get_withcache(State#state.handle, Key, State#state.hash_index),
State}
end;
handle_call({key_check, Key}, _From, State) ->
case State#state.writer of
true ->
{reply,
get_mem(Key,
State#state.handle,
State#state.hashtree,
loose_presence),
get_mem(Key, State#state.handle, State#state.hashtree, loose_presence),
writer,
State};
false ->
{reply,
get(State#state.handle,
Key,
loose_presence,
State#state.hash_index),
State}
end;
handle_call({put_kv, Key, Value}, _From, State) ->
case {State#state.writer, State#state.pending_roll} of
{true, false} ->
writer({put_kv, Key, Value}, _From, State) ->
Result = put(State#state.handle,
Key, Value,
Key,
Value,
{State#state.last_position, State#state.hashtree},
State#state.binary_mode,
State#state.max_size),
case Result of
roll ->
%% Key and value could not be written
{reply, roll, State};
{reply, roll, writer, State};
{UpdHandle, NewPosition, HashTree} ->
{reply, ok, State#state{handle=UpdHandle,
{reply, ok, writer, State#state{handle=UpdHandle,
last_position=NewPosition,
last_key=Key,
hashtree=HashTree}}
end;
_ ->
{reply,
{error, read_only},
State}
end;
handle_call({mput_kv, KVList}, _From, State) ->
case {State#state.writer, State#state.pending_roll} of
{true, false} ->
writer({mput_kv, KVList}, _From, State) ->
Result = mput(State#state.handle,
KVList,
{State#state.last_position, State#state.hashtree},
@ -296,37 +289,78 @@ handle_call({mput_kv, KVList}, _From, State) ->
case Result of
roll ->
%% Keys and values could not be written
{reply, roll, State};
{reply, roll, writer, State};
{UpdHandle, NewPosition, HashTree, LastKey} ->
{reply, ok, State#state{handle=UpdHandle,
{reply, ok, writer, State#state{handle=UpdHandle,
last_position=NewPosition,
last_key=LastKey,
hashtree=HashTree}}
end;
_ ->
writer(cdb_complete, _From, State) ->
NewName = determine_new_filename(State#state.filename),
ok = close_file(State#state.handle,
State#state.hashtree,
State#state.last_position),
ok = rename_for_read(State#state.filename, NewName),
{stop, normal, {ok, NewName}, State}.
writer(cdb_roll, State) ->
ok = leveled_iclerk:clerk_hashtablecalc(State#state.hashtree,
State#state.last_position,
self()),
{next_state, rolling, State}.
rolling({get_kv, Key}, _From, State) ->
{reply,
{error, read_only},
State}
end;
handle_call(cdb_lastkey, _From, State) ->
{reply, State#state.last_key, State};
handle_call(cdb_firstkey, _From, State) ->
{ok, EOFPos} = file:position(State#state.handle, eof),
FirstKey = case EOFPos of
?BASE_POSITION ->
empty;
_ ->
extract_key(State#state.handle, ?BASE_POSITION)
end,
{reply, FirstKey, State};
handle_call(cdb_filename, _From, State) ->
{reply, State#state.filename, State};
handle_call({get_positions, SampleSize}, _From, State) ->
get_mem(Key, State#state.handle, State#state.hashtree),
rolling,
State};
rolling({key_check, Key}, _From, State) ->
{reply,
get_mem(Key, State#state.handle, State#state.hashtree, loose_presence),
rolling,
State};
rolling(cdb_filename, _From, State) ->
{reply, State#state.filename, rolling, State};
rolling({return_hashtable, IndexList, HashTreeBin}, _From, State) ->
Handle = State#state.handle,
{ok, BasePos} = file:position(Handle, State#state.last_position),
NewName = determine_new_filename(State#state.filename),
ok = perform_write_hash_tables(Handle, HashTreeBin, BasePos),
ok = write_top_index_table(Handle, BasePos, IndexList),
file:close(Handle),
ok = rename_for_read(State#state.filename, NewName),
io:format("Opening file for reading with filename ~s~n", [NewName]),
{NewHandle, Index, LastKey} = open_for_readonly(NewName),
{reply, ok, reader, State#state{handle=NewHandle,
last_key=LastKey,
filename=NewName,
hash_index=Index}};
rolling(cdb_kill, _From, State) ->
{stop, killed, ok, State}.
reader({get_kv, Key}, _From, State) ->
{reply,
get_withcache(State#state.handle, Key, State#state.hash_index),
reader,
State};
reader({key_check, Key}, _From, State) ->
{reply,
get(State#state.handle,
Key,
loose_presence,
State#state.hash_index),
reader,
State};
reader({get_positions, SampleSize}, _From, State) ->
case SampleSize of
all ->
{reply, scan_index(State#state.handle,
{reply,
scan_index(State#state.handle,
State#state.hash_index,
{fun scan_index_returnpositions/4, []}),
reader,
State};
_ ->
SeededL = lists:map(fun(X) -> {random:uniform(), X} end,
@ -339,28 +373,94 @@ handle_call({get_positions, SampleSize}, _From, State) ->
fun scan_index_returnpositions/4,
[],
SampleSize),
reader,
State}
end;
handle_call({direct_fetch, PositionList, Info}, _From, State) ->
reader({direct_fetch, PositionList, Info}, _From, State) ->
H = State#state.handle,
case Info of
key_only ->
KeyList = lists:map(fun(P) ->
extract_key(H, P) end,
PositionList),
{reply, KeyList, State};
{reply, KeyList, reader, State};
key_size ->
KeySizeList = lists:map(fun(P) ->
extract_key_size(H, P) end,
PositionList),
{reply, KeySizeList, State};
{reply, KeySizeList, reader, State};
key_value_check ->
KVCList = lists:map(fun(P) ->
extract_key_value_check(H, P) end,
PositionList),
{reply, KVCList, State}
{reply, KVCList, reader, State}
end;
handle_call({cdb_scan, FilterFun, Acc, StartPos}, _From, State) ->
reader(cdb_complete, _From, State) ->
ok = file:close(State#state.handle),
{stop, normal, {ok, State#state.filename}, State#state{handle=undefined}}.
reader({delete_pending, 0, no_poll}, State) ->
{next_state,
delete_pending,
State#state{delete_point=0}};
reader({delete_pending, ManSQN, Inker}, State) ->
{next_state,
delete_pending,
State#state{delete_point=ManSQN, inker=Inker},
?DELETE_TIMEOUT}.
delete_pending({get_kv, Key}, _From, State) ->
{reply,
get_withcache(State#state.handle, Key, State#state.hash_index),
delete_pending,
State,
?DELETE_TIMEOUT};
delete_pending({key_check, Key}, _From, State) ->
{reply,
get(State#state.handle,
Key,
loose_presence,
State#state.hash_index),
delete_pending,
State,
?DELETE_TIMEOUT}.
delete_pending(timeout, State) ->
case State#state.delete_point of
0 ->
{next_state, delete_pending, State};
ManSQN ->
case is_process_alive(State#state.inker) of
true ->
case leveled_inker:ink_confirmdelete(State#state.inker,
ManSQN) of
true ->
io:format("Deletion confirmed for file ~s "
++ "at ManifestSQN ~w~n",
[State#state.filename, ManSQN]),
{stop, normal, State};
false ->
{next_state,
delete_pending,
State,
?DELETE_TIMEOUT}
end;
false ->
{stop, normal, State}
end
end;
delete_pending(destroy, State) ->
ok = file:close(State#state.handle),
ok = file:delete(State#state.filename),
{stop, normal, State}.
handle_sync_event({cdb_scan, FilterFun, Acc, StartPos},
_From,
StateName,
State) ->
{ok, StartPos0} = case StartPos of
undefined ->
file:position(State#state.handle,
@ -375,77 +475,50 @@ handle_call({cdb_scan, FilterFun, Acc, StartPos}, _From, State) ->
FilterFun,
Acc,
State#state.last_key),
{reply, {LastPosition, Acc2}, State};
{reply, {LastPosition, Acc2}, StateName, State};
empty ->
{reply, {eof, Acc}, State}
{reply, {eof, Acc}, StateName, State}
end;
handle_call(cdb_close, _From, State=#state{pending_roll=RollPending})
when RollPending == true ->
{reply, pending_roll, State};
handle_call(cdb_close, _From, State) ->
handle_sync_event(cdb_lastkey, _From, StateName, State) ->
{reply, State#state.last_key, StateName, State};
handle_sync_event(cdb_firstkey, _From, StateName, State) ->
{ok, EOFPos} = file:position(State#state.handle, eof),
FirstKey = case EOFPos of
?BASE_POSITION ->
empty;
_ ->
extract_key(State#state.handle, ?BASE_POSITION)
end,
{reply, FirstKey, StateName, State};
handle_sync_event(cdb_filename, _From, StateName, State) ->
{reply, State#state.filename, StateName, State};
handle_sync_event(cdb_close, _From, rolling, State) ->
{reply, pending_roll, rolling, State};
handle_sync_event(cdb_close, _From, _StateName, State) ->
ok = file:close(State#state.handle),
{stop, normal, ok, State#state{handle=undefined}};
handle_call(cdb_kill, _From, State) ->
{stop, killed, ok, State};
handle_call(cdb_complete, _From, State=#state{writer=Writer})
when Writer == true ->
NewName = determine_new_filename(State#state.filename),
ok = close_file(State#state.handle,
State#state.hashtree,
State#state.last_position),
ok = rename_for_read(State#state.filename, NewName),
{stop, normal, {ok, NewName}, State};
handle_call(cdb_complete, _From, State) ->
ok = file:close(State#state.handle),
{stop, normal, {ok, State#state.filename}, State};
handle_call({return_hashtable, IndexList, HashTreeBin},
_From,
State=#state{pending_roll=RollPending}) when RollPending == true ->
Handle = State#state.handle,
{ok, BasePos} = file:position(Handle, State#state.last_position),
NewName = determine_new_filename(State#state.filename),
ok = perform_write_hash_tables(Handle, HashTreeBin, BasePos),
ok = write_top_index_table(Handle, BasePos, IndexList),
file:close(Handle),
ok = rename_for_read(State#state.filename, NewName),
io:format("Opening file for reading with filename ~s~n", [NewName]),
{NewHandle, Index, LastKey} = open_for_readonly(NewName),
{reply, ok, State#state{handle=NewHandle,
last_key=LastKey,
filename=NewName,
writer=false,
pending_roll=false,
hash_index=Index}}.
{stop, normal, ok, State#state{handle=undefined}}.
handle_event(_Msg, StateName, State) ->
{next_State, StateName, State}.
handle_cast(destroy, State) ->
ok = file:close(State#state.handle),
ok = file:delete(State#state.filename),
{noreply, State};
handle_cast(delete_pending, State) ->
{noreply, State#state{pending_delete=true}};
handle_cast(cdb_roll, State=#state{writer=Writer}) when Writer == true ->
ok = leveled_iclerk:clerk_hashtablecalc(State#state.hashtree,
State#state.last_position,
self()),
{noreply, State#state{pending_roll=true}}.
handle_info(_Msg, StateName, State) ->
{next_State, StateName, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, State) ->
case {State#state.handle, State#state.pending_delete} of
terminate(Reason, StateName, State) ->
io:format("Closing of filename ~s for Reason ~w~n",
[State#state.filename, Reason]),
case {State#state.handle, StateName} of
{undefined, _} ->
ok;
{Handle, false} ->
file:close(Handle);
{Handle, true} ->
{Handle, delete_pending} ->
file:close(Handle),
file:delete(State#state.filename)
file:delete(State#state.filename);
{Handle, _} ->
file:close(Handle)
end.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
code_change(_OldVsn, StateName, State, _Extra) ->
{ok, StateName, State}.
%%%============================================================================
%%% Internal functions

View file

@ -179,14 +179,20 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout},
C#candidate.journal}
end,
BestRun),
ok = leveled_inker:ink_updatemanifest(Inker,
io:format("Clerk updating Inker as compaction complete of " ++
"~w files~n", [length(FilesToDelete)]),
{ok, ManSQN} = leveled_inker:ink_updatemanifest(Inker,
ManifestSlice,
FilesToDelete),
ok = leveled_inker:ink_compactioncomplete(Inker),
io:format("Clerk has completed compaction process~n"),
case PromptDelete of
true ->
lists:foreach(fun({_SQN, _FN, J2D}) ->
leveled_cdb:cdb_deletepending(J2D) end,
leveled_cdb:cdb_deletepending(J2D,
ManSQN,
Inker)
end,
FilesToDelete),
{noreply, State};
false ->
@ -639,6 +645,7 @@ check_single_file_test() ->
?assertMatch(37.5, Score3),
Score4 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 4, 8, 4),
?assertMatch(75.0, Score4),
ok = leveled_cdb:cdb_deletepending(CDB),
ok = leveled_cdb:cdb_destroy(CDB).
@ -698,6 +705,7 @@ compact_single_file_recovr_test() ->
stnd,
test_ledgerkey("Key2")}),
?assertMatch({"Value2", []}, binary_to_term(RV1)),
ok = leveled_cdb:cdb_deletepending(CDB),
ok = leveled_cdb:cdb_destroy(CDB).
@ -736,6 +744,7 @@ compact_single_file_retain_test() ->
stnd,
test_ledgerkey("Key2")}),
?assertMatch({"Value2", []}, binary_to_term(RV1)),
ok = leveled_cdb:cdb_deletepending(CDB),
ok = leveled_cdb:cdb_destroy(CDB).
compact_empty_file_test() ->

View file

@ -99,8 +99,10 @@
ink_fetch/3,
ink_loadpcl/4,
ink_registersnapshot/2,
ink_confirmdelete/2,
ink_compactjournal/3,
ink_compactioncomplete/1,
ink_compactionpending/1,
ink_getmanifest/1,
ink_updatemanifest/3,
ink_print_manifest/1,
@ -159,6 +161,9 @@ ink_registersnapshot(Pid, Requestor) ->
ink_releasesnapshot(Pid, Snapshot) ->
gen_server:call(Pid, {release_snapshot, Snapshot}, infinity).
ink_confirmdelete(Pid, ManSQN) ->
gen_server:call(Pid, {confirm_delete, ManSQN}, 1000).
ink_close(Pid) ->
gen_server:call(Pid, {close, false}, infinity).
@ -193,6 +198,9 @@ ink_compactjournal(Pid, Checker, InitiateFun, FilterFun, Timeout) ->
ink_compactioncomplete(Pid) ->
gen_server:call(Pid, compaction_complete, infinity).
ink_compactionpending(Pid) ->
gen_server:call(Pid, compaction_pending, infinity).
ink_getmanifest(Pid) ->
gen_server:call(Pid, get_manifest, infinity).
@ -263,6 +271,17 @@ handle_call({release_snapshot, Snapshot}, _From , State) ->
io:format("Ledger snapshot ~w released~n", [Snapshot]),
io:format("Remaining ledger snapshots are ~w~n", [Rs]),
{reply, ok, State#state{registered_snapshots=Rs}};
handle_call({confirm_delete, ManSQN}, _From, State) ->
Reply = lists:foldl(fun({_R, SnapSQN}, Bool) ->
case SnapSQN < ManSQN of
true ->
Bool;
false ->
false
end end,
true,
State#state.registered_snapshots),
{reply, Reply, State};
handle_call(get_manifest, _From, State) ->
{reply, State#state.manifest, State};
handle_call({update_manifest,
@ -280,10 +299,11 @@ handle_call({update_manifest,
NewManifestSQN = State#state.manifest_sqn + 1,
manifest_printer(Man1),
ok = simple_manifest_writer(Man1, NewManifestSQN, State#state.root_path),
PendingRemovals = [{NewManifestSQN, DeletedFiles}],
{reply, ok, State#state{manifest=Man1,
{reply,
{ok, NewManifestSQN},
State#state{manifest=Man1,
manifest_sqn=NewManifestSQN,
pending_removals=PendingRemovals}};
pending_removals=DeletedFiles}};
handle_call(print_manifest, _From, State) ->
manifest_printer(State#state.manifest),
{reply, ok, State};
@ -302,6 +322,8 @@ handle_call({compact,
{reply, ok, State#state{compaction_pending=true}};
handle_call(compaction_complete, _From, State) ->
{reply, ok, State#state{compaction_pending=false}};
handle_call(compaction_pending, _From, State) ->
{reply, State#state.compaction_pending, State};
handle_call({close, Force}, _From, State) ->
case {State#state.compaction_pending, Force} of
{true, false} ->
@ -329,8 +351,7 @@ terminate(Reason, State) ->
lists:foreach(fun({Snap, _SQN}) -> ok = ink_close(Snap) end,
State#state.registered_snapshots),
manifest_printer(State#state.manifest),
ok = close_allmanifest(State#state.manifest),
ok = close_allremovals(State#state.pending_removals)
ok = close_allmanifest(State#state.manifest)
end.
code_change(_OldVsn, State, _Extra) ->
@ -552,25 +573,6 @@ find_in_manifest(SQN, [_Head|Tail]) ->
find_in_manifest(SQN, Tail).
close_allremovals([]) ->
ok;
close_allremovals([{ManifestSQN, Removals}|Tail]) ->
io:format("Closing removals at ManifestSQN=~w~n", [ManifestSQN]),
lists:foreach(fun({LowSQN, FN, Handle}) ->
io:format("Closing removed file with LowSQN=~w" ++
" and filename ~s~n",
[LowSQN, FN]),
if
is_pid(Handle) == true ->
ok = leveled_cdb:cdb_close(Handle);
true ->
io:format("Non pid in removal ~w - test~n",
[Handle])
end
end,
Removals),
close_allremovals(Tail).
%% Scan between sequence numbers applying FilterFun to each entry where
%% FilterFun{K, V, Acc} -> Penciller Key List

View file

@ -319,7 +319,13 @@ do_merge(KL1, KL2, {SrcLevel, IsB}, {Filepath, MSN}, FileCounter, OutList) ->
KL1,
KL2,
LevelR),
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply,
case Reply of
{{[], []}, null, _} ->
io:format("Merge resulted in empty file ~s~n", [FileName]),
io:format("Empty file ~s to be cleared~n", [FileName]),
ok = leveled_sft:sft_clear(Pid),
OutList;
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} ->
ExtMan = lists:append(OutList,
[#manifest_entry{start_key=SmallestKey,
end_key=HighestKey,
@ -329,7 +335,8 @@ do_merge(KL1, KL2, {SrcLevel, IsB}, {Filepath, MSN}, FileCounter, OutList) ->
io:format("File creation took ~w microseconds ~n", [MTime]),
do_merge(KL1Rem, KL2Rem,
{SrcLevel, IsB}, {Filepath, MSN},
FileCounter + 1, ExtMan).
FileCounter + 1, ExtMan)
end.
get_item(Index, List, Default) ->

View file

@ -467,7 +467,7 @@ create_file(FileName) when is_list(FileName) ->
{error, Reason} ->
io:format("Error opening filename ~s with reason ~w",
[FileName, Reason]),
{error, Reason}
error
end.
@ -1037,15 +1037,17 @@ create_slot(KL1, KL2, LevelR, BlockCount, SegLists, SerialisedSlot, LengthList,
{BlockKeyList, Status,
{LSNb, HSNb},
SegmentList, KL1b, KL2b} = create_block(KL1, KL2, LevelR),
TrackingMetadata = case LowKey of
null ->
TrackingMetadata = case {LowKey, BlockKeyList} of
{null, []} ->
{null, LSN, HSN, LastKey, Status};
{null, _} ->
[NewLowKeyV|_] = BlockKeyList,
{leveled_codec:strip_to_keyonly(NewLowKeyV),
min(LSN, LSNb), max(HSN, HSNb),
leveled_codec:strip_to_keyonly(last(BlockKeyList,
{last, LastKey})),
Status};
_ ->
{_, _} ->
{LowKey,
min(LSN, LSNb), max(HSN, HSNb),
leveled_codec:strip_to_keyonly(last(BlockKeyList,

View file

@ -149,7 +149,7 @@ journal_compaction(_Config) ->
testutil:check_forobject(Bookie2, TestObject),
testutil:check_forlist(Bookie2, ChkList3),
ok = leveled_bookie:book_close(Bookie2),
testutil:reset_filestructure().
testutil:reset_filestructure(10000).
fetchput_snapshot(_Config) ->
@ -435,12 +435,28 @@ space_clear_ondelete_test(_Config) ->
io:format("Deletion took ~w microseconds for 80K keys~n",
[timer:now_diff(os:timestamp(), SW2)]),
ok = leveled_bookie:book_compactjournal(Book1, 30000),
timer:sleep(30000), % Allow for any L0 file to be rolled
F = fun leveled_bookie:book_islastcompactionpending/1,
lists:foldl(fun(X, Pending) ->
case Pending of
false ->
false;
true ->
io:format("Loop ~w waiting for journal "
++ "compaction to complete~n", [X]),
timer:sleep(20000),
F(Book1)
end end,
true,
lists:seq(1, 15)),
io:format("Waiting for journal deletes~n"),
timer:sleep(20000),
{ok, FNsB_L} = file:list_dir(RootPath ++ "/ledger/ledger_files"),
{ok, FNsB_J} = file:list_dir(RootPath ++ "/journal/journal_files"),
{ok, FNsB_PC} = file:list_dir(RootPath ++ "/journal/journal_files/post_compact"),
PointB_Journals = length(FNsB_J) + length(FNsB_PC),
io:format("Bookie has ~w journal files and ~w ledger files " ++
"after deletes~n",
[length(FNsB_J), length(FNsB_L)]),
[PointB_Journals, length(FNsB_L)]),
{async, F2} = leveled_bookie:book_returnfolder(Book1, {keylist, o_rkv}),
SW3 = os:timestamp(),
@ -465,7 +481,20 @@ space_clear_ondelete_test(_Config) ->
end,
ok = leveled_bookie:book_close(Book2),
{ok, FNsC_L} = file:list_dir(RootPath ++ "/ledger/ledger_files"),
{ok, FNsC_J} = file:list_dir(RootPath ++ "/journal/journal_files"),
io:format("Bookie has ~w journal files and ~w ledger files " ++
"after deletes~n",
[length(FNsC_J), length(FNsC_L)]).
io:format("Bookie has ~w ledger files " ++
"after close~n", [length(FNsC_L)]),
{ok, Book3} = leveled_bookie:book_start(StartOpts1),
io:format("This should cause a final ledger merge event~n"),
io:format("Will require the penciller to resolve the issue of creating" ++
" an empty file as all keys compact on merge~n"),
timer:sleep(5000),
ok = leveled_bookie:book_close(Book3),
{ok, FNsD_L} = file:list_dir(RootPath ++ "/ledger/ledger_files"),
io:format("Bookie has ~w ledger files " ++
"after second close~n", [length(FNsD_L)]),
true = PointB_Journals < length(FNsA_J),
true = length(FNsB_L) =< length(FNsA_L),
true = length(FNsC_L) =< length(FNsB_L),
true = length(FNsD_L) =< length(FNsB_L),
true = length(FNsD_L) == 0.

View file

@ -3,6 +3,7 @@
-include("../include/leveled.hrl").
-export([reset_filestructure/0,
reset_filestructure/1,
check_bucket_stats/2,
check_forlist/2,
check_forlist/3,
@ -27,9 +28,12 @@
reset_filestructure() ->
% io:format("Waiting ~w ms to give a chance for all file closes " ++
% "to complete~n", [Wait]),
% timer:sleep(Wait),
reset_filestructure(0).
reset_filestructure(Wait) ->
io:format("Waiting ~w ms to give a chance for all file closes " ++
"to complete~n", [Wait]),
timer:sleep(Wait),
RootPath = "test",
filelib:ensure_dir(RootPath ++ "/journal/"),
filelib:ensure_dir(RootPath ++ "/ledger/"),