Merge pull request #129 from martinsumner/mas-i128-determinate

Mas i128 determinate
This commit is contained in:
Martin Sumner 2018-04-10 15:18:35 +01:00 committed by GitHub
commit 12dae9209a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 136 additions and 91 deletions

View file

@ -742,6 +742,8 @@ handle_call(trim, _From, State) when State#state.head_only == true ->
PSQN = leveled_penciller:pcl_persistedsqn(State#state.penciller),
{reply, leveled_inker:ink_trim(State#state.inker, PSQN), State};
handle_call(close, _From, State) ->
leveled_inker:ink_close(State#state.inker),
leveled_penciller:pcl_close(State#state.penciller),
{stop, normal, ok, State};
handle_call(destroy, _From, State=#state{is_snapshot=Snp}) when Snp == false ->
leveled_log:log("B0011", []),
@ -759,22 +761,8 @@ handle_cast(_Msg, State) ->
handle_info(_Info, State) ->
{noreply, State}.
terminate(Reason, State) ->
leveled_log:log("B0003", [Reason]),
ok =
case is_process_alive(State#state.inker) of
true ->
leveled_inker:ink_close(State#state.inker);
false ->
ok
end,
ok =
case is_process_alive(State#state.penciller) of
true ->
leveled_penciller:pcl_close(State#state.penciller);
false ->
ok
end.
terminate(Reason, _State) ->
leveled_log:log("B0003", [Reason]).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View file

@ -595,6 +595,7 @@ reader({direct_fetch, PositionList, Info}, _From, State) ->
end,
{reply, Reply, reader, State};
reader(cdb_complete, _From, State) ->
leveled_log:log("CDB05", [State#state.filename, reader, cdb_ccomplete]),
ok = file:close(State#state.handle),
{stop, normal, {ok, State#state.filename}, State#state{handle=undefined}};
reader(check_hashtable, _From, State) ->
@ -642,6 +643,9 @@ delete_pending(timeout, State=#state{delete_point=ManSQN}) when ManSQN > 0 ->
case leveled_inker:ink_confirmdelete(State#state.inker, ManSQN) of
true ->
leveled_log:log("CDB04", [State#state.filename, ManSQN]),
close_pendingdelete(State#state.handle,
State#state.filename,
State#state.waste_path),
{stop, normal, State};
false ->
{next_state,
@ -650,9 +654,17 @@ delete_pending(timeout, State=#state{delete_point=ManSQN}) when ManSQN > 0 ->
?DELETE_TIMEOUT}
end;
false ->
leveled_log:log("CDB04", [State#state.filename, ManSQN]),
close_pendingdelete(State#state.handle,
State#state.filename,
State#state.waste_path),
{stop, normal, State}
end;
delete_pending(destroy, State) ->
leveled_log:log("CDB05", [State#state.filename, delete_pending, destroy]),
close_pendingdelete(State#state.handle,
State#state.filename,
State#state.waste_path),
{stop, normal, State}.
@ -713,7 +725,16 @@ handle_sync_event(cdb_firstkey, _From, StateName, State) ->
{reply, FirstKey, StateName, State};
handle_sync_event(cdb_filename, _From, StateName, State) ->
{reply, State#state.filename, StateName, State};
handle_sync_event(cdb_close, _From, _StateName, State) ->
handle_sync_event(cdb_close, _From, delete_pending, State) ->
leveled_log:log("CDB05",
[State#state.filename, delete_pending, cdb_close]),
close_pendingdelete(State#state.handle,
State#state.filename,
State#state.waste_path),
{stop, normal, ok, State};
handle_sync_event(cdb_close, _From, StateName, State) ->
leveled_log:log("CDB05", [State#state.filename, StateName, cdb_close]),
file:close(State#state.handle),
{stop, normal, ok, State}.
handle_event(_Msg, StateName, State) ->
@ -722,22 +743,10 @@ handle_event(_Msg, StateName, State) ->
handle_info(_Msg, StateName, State) ->
{next_state, StateName, State}.
terminate(Reason, StateName, State) ->
leveled_log:log("CDB05", [State#state.filename, StateName, Reason]),
case {State#state.handle, StateName, State#state.waste_path} of
{undefined, _, _} ->
ok;
{Handle, delete_pending, undefined} ->
ok = file:close(Handle),
ok = file:delete(State#state.filename);
{Handle, delete_pending, WasteFP} ->
file:close(Handle),
Components = filename:split(State#state.filename),
NewName = WasteFP ++ lists:last(Components),
file:rename(State#state.filename, NewName);
{Handle, _, _} ->
file:close(Handle)
end.
terminate(_Reason, _StateName, _State) ->
ok.
code_change(_OldVsn, StateName, State, _Extra) ->
{ok, StateName, State}.
@ -747,6 +756,22 @@ code_change(_OldVsn, StateName, State, _Extra) ->
%%%============================================================================
-spec close_pendingdelete(file:io_device(), list(), list()|undefined) -> ok.
%% @doc
%% If delete is pending - thent he close behaviour needs to actuallly delete
%% the file
close_pendingdelete(Handle, Filename, WasteFP) ->
case WasteFP of
undefined ->
ok = file:close(Handle),
ok = file:delete(Filename);
WasteFP ->
file:close(Handle),
Components = filename:split(Filename),
NewName = WasteFP ++ lists:last(Components),
file:rename(Filename, NewName)
end.
-spec set_writeops(sync|riak_sync|none) -> {list(), sync|riak_sync|none}.
%% Assumption is that sync should be used - it is a transaction log.
%%

View file

@ -531,6 +531,17 @@ handle_call({trim, PersistedSQN}, _From, State) ->
ok = leveled_iclerk:clerk_trim(State#state.clerk, self(), PersistedSQN),
{reply, ok, State};
handle_call(close, _From, State) ->
case State#state.is_snapshot of
true ->
ok = ink_releasesnapshot(State#state.source_inker, self());
false ->
leveled_log:log("I0005", [close]),
leveled_log:log("I0006", [State#state.journal_sqn,
State#state.manifest_sqn]),
leveled_iclerk:clerk_stop(State#state.clerk),
shutdown_snapshots(State#state.registered_snapshots),
shutdown_manifest(State#state.manifest)
end,
{stop, normal, ok, State};
handle_call(doom, _From, State) ->
FPs = [filepath(State#state.root_path, journal_dir),
@ -538,6 +549,14 @@ handle_call(doom, _From, State) ->
filepath(State#state.root_path, journal_compact_dir),
filepath(State#state.root_path, journal_waste_dir)],
leveled_log:log("I0018", []),
leveled_log:log("I0005", [doom]),
leveled_log:log("I0006", [State#state.journal_sqn,
State#state.manifest_sqn]),
leveled_iclerk:clerk_stop(State#state.clerk),
shutdown_snapshots(State#state.registered_snapshots),
shutdown_manifest(State#state.manifest),
{stop, normal, {ok, FPs}, State}.
handle_cast({release_snapshot, Snapshot}, State) ->
@ -549,22 +568,8 @@ handle_cast({release_snapshot, Snapshot}, State) ->
handle_info(_Info, State) ->
{noreply, State}.
terminate(Reason, State) ->
case State#state.is_snapshot of
true ->
ok = ink_releasesnapshot(State#state.source_inker, self());
false ->
leveled_log:log("I0005", [Reason]),
leveled_log:log("I0006", [State#state.journal_sqn,
State#state.manifest_sqn]),
leveled_iclerk:clerk_stop(State#state.clerk),
lists:foreach(fun({Snap, _SQN}) -> ok = ink_close(Snap) end,
State#state.registered_snapshots),
leveled_log:log("I0007", []),
leveled_imanifest:printer(State#state.manifest),
ManAsList = leveled_imanifest:to_list(State#state.manifest),
ok = close_allmanifest(ManAsList)
end.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@ -625,6 +630,22 @@ start_from_file(InkOpts) ->
compress_on_receipt = PressOnReceipt,
clerk = Clerk}}.
-spec shutdown_snapshots(list(tuple())) -> ok.
%% @doc
%% Shutdown any snapshots before closing the store
shutdown_snapshots(Snapshots) ->
lists:foreach(fun({Snap, _SQN}) -> ok = ink_close(Snap) end, Snapshots).
-spec shutdown_manifest(leveled_imanifest:manifest()) -> ok.
%% @doc
%% Shutdown all files in the manifest
shutdown_manifest(Manifest) ->
leveled_log:log("I0007", []),
leveled_imanifest:printer(Manifest),
ManAsList = leveled_imanifest:to_list(Manifest),
close_allmanifest(ManAsList).
get_cdbopts(InkOpts)->
CDBopts = InkOpts#inker_options.cdb_options,
WasteFP =

View file

@ -79,7 +79,7 @@
{"P0010",
{info, "No level zero action on close of Penciller ~w"}},
{"P0011",
{info, "Shutdown complete for Penciller"}},
{info, "Shutdown complete for Penciller for reason ~w"}},
{"P0012",
{info, "Store to be started based on manifest sequence number of ~w"}},
{"P0013",

View file

@ -763,10 +763,40 @@ handle_call({register_snapshot, Snapshot, Query, BookiesMem, LR}, _From, State)
State#state{manifest = Manifest0}};
handle_call({fetch_levelzero, Slot}, _From, State) ->
{reply, lists:nth(Slot, State#state.levelzero_cache), State};
handle_call(close, _From, State=#state{is_snapshot=Snap}) when Snap == true ->
ok = pcl_releasesnapshot(State#state.source_penciller, self()),
{stop, normal, ok, State};
handle_call(close, _From, State) ->
% Level 0 files lie outside of the manifest, and so if there is no L0
% file present it is safe to write the current contents of memory. If
% there is a L0 file present - then the memory can be dropped (it is
% recoverable from the ledger, and there should not be a lot to recover
% as presumably the ETS file has been recently flushed, hence the presence
% of a L0 file).
%
% The penciller should close each file in the manifest, and call a close
% on the clerk.
ok = leveled_pclerk:clerk_close(State#state.clerk),
leveled_log:log("P0008", [close]),
L0_Present = leveled_pmanifest:key_lookup(State#state.manifest, 0, all),
L0_Left = State#state.levelzero_size > 0,
case {State#state.levelzero_pending, L0_Present, L0_Left} of
{false, false, true} ->
{L0Pid, _L0Bloom} = roll_memory(State, true),
ok = leveled_sst:sst_close(L0Pid);
StatusTuple ->
leveled_log:log("P0010", [StatusTuple])
end,
shutdown_manifest(State#state.manifest),
{stop, normal, ok, State};
handle_call(doom, _From, State) ->
leveled_log:log("P0030", []),
ok = leveled_pclerk:clerk_close(State#state.clerk),
shutdown_manifest(State#state.manifest),
ManifestFP = State#state.root_path ++ "/" ++ ?MANIFEST_FP ++ "/",
FilesFP = State#state.root_path ++ "/" ++ ?FILES_FP ++ "/",
{stop, normal, {ok, [ManifestFP, FilesFP]}, State};
@ -885,47 +915,10 @@ handle_cast(work_for_clerk, State) ->
handle_info(_Info, State) ->
{noreply, State}.
terminate(Reason, State=#state{is_snapshot=Snap}) when Snap == true ->
ok = pcl_releasesnapshot(State#state.source_penciller, self()),
leveled_log:log("P0007", [Reason]),
ok;
terminate(Reason, State) ->
%% Level 0 files lie outside of the manifest, and so if there is no L0
%% file present it is safe to write the current contents of memory. If
%% there is a L0 file present - then the memory can be dropped (it is
%% recoverable from the ledger, and there should not be a lot to recover
%% as presumably the ETS file has been recently flushed, hence the presence
%% of a L0 file).
%%
%% The penciller should close each file in the manifest, and cast a close
%% on the clerk.
ok = leveled_pclerk:clerk_close(State#state.clerk),
leveled_log:log("P0008", [Reason]),
L0_Present = leveled_pmanifest:key_lookup(State#state.manifest, 0, all),
L0_Left = State#state.levelzero_size > 0,
case {State#state.levelzero_pending, L0_Present, L0_Left} of
{false, false, true} ->
{L0Pid, _L0Bloom} = roll_memory(State, true),
ok = leveled_sst:sst_close(L0Pid);
StatusTuple ->
leveled_log:log("P0010", [StatusTuple])
end,
% Tidy shutdown of individual files
EntryCloseFun =
fun(ME) ->
case is_record(ME, manifest_entry) of
true ->
ok = leveled_sst:sst_close(ME#manifest_entry.owner);
false ->
{_SK, ME0} = ME,
ok = leveled_sst:sst_close(ME0#manifest_entry.owner)
end
end,
leveled_pmanifest:close_manifest(State#state.manifest, EntryCloseFun),
leveled_log:log("P0011", []),
ok.
terminate(Reason, _State=#state{is_snapshot=Snap}) when Snap == true ->
leveled_log:log("P0007", [Reason]);
terminate(Reason, _State) ->
leveled_log:log("P0011", [Reason]).
code_change(_OldVsn, State, _Extra) ->
@ -1024,6 +1017,24 @@ start_from_file(PCLopts) ->
ok = archive_files(RootPath, FileList0),
{ok, State0}.
-spec shutdown_manifest(leveled_pmanifest:manifest()) -> ok.
%% @doc
%% Shutdown all the SST files within the manifest
shutdown_manifest(Manifest)->
EntryCloseFun =
fun(ME) ->
case is_record(ME, manifest_entry) of
true ->
ok = leveled_sst:sst_close(ME#manifest_entry.owner);
false ->
{_SK, ME0} = ME,
ok = leveled_sst:sst_close(ME0#manifest_entry.owner)
end
end,
leveled_pmanifest:close_manifest(Manifest, EntryCloseFun).
archive_files(RootPath, UsedFileList) ->
{ok, AllFiles} = file:list_dir(sst_rootpath(RootPath)),
FileCheckFun =