Snapshot Work - Interim Commit

Some initial work to get snapshots going.

Changes required, as need to snapshot through the Bookie to ensure that
there is no race between extracting the Bookie's in-memory view and the
Penciller's view if a push_to_mem has occurred inbetween.

A lot still outstanding, especially around Inker snapshots, and handling
timeouts
This commit is contained in:
martinsumner 2016-09-23 18:50:29 +01:00
parent d3e985ed80
commit c64d67d9fb
5 changed files with 160 additions and 75 deletions

View file

@ -103,7 +103,7 @@
ink_get/3,
ink_fetch/3,
ink_loadpcl/4,
ink_snap/1,
ink_registersnapshot/2,
ink_close/1,
ink_print_manifest/1,
build_dummy_journal/0,
@ -146,8 +146,8 @@ ink_get(Pid, PrimaryKey, SQN) ->
ink_fetch(Pid, PrimaryKey, SQN) ->
gen_server:call(Pid, {fetch, PrimaryKey, SQN}, infinity).
ink_snap(Pid) ->
gen_server:call(Pid, snapshot, infinity).
ink_registersnapshot(Pid, Requestor) ->
gen_server:call(Pid, {snapshot, Requestor}, infinity).
ink_close(Pid) ->
gen_server:call(Pid, close, infinity).
@ -163,39 +163,20 @@ ink_print_manifest(Pid) ->
%%%============================================================================
init([InkerOpts]) ->
RootPath = InkerOpts#inker_options.root_path,
CDBopts = InkerOpts#inker_options.cdb_options,
JournalFP = filepath(RootPath, journal_dir),
{ok, JournalFilenames} = case filelib:is_dir(JournalFP) of
true ->
file:list_dir(JournalFP);
false ->
filelib:ensure_dir(JournalFP),
{ok, []}
end,
ManifestFP = filepath(RootPath, manifest_dir),
{ok, ManifestFilenames} = case filelib:is_dir(ManifestFP) of
true ->
file:list_dir(ManifestFP);
false ->
filelib:ensure_dir(ManifestFP),
{ok, []}
end,
{Manifest,
{ActiveJournal, LowActiveSQN},
JournalSQN,
ManifestSQN} = build_manifest(ManifestFilenames,
JournalFilenames,
fun simple_manifest_reader/2,
RootPath,
CDBopts),
{ok, #state{manifest = lists:reverse(lists:keysort(1, Manifest)),
manifest_sqn = ManifestSQN,
journal_sqn = JournalSQN,
active_journaldb = ActiveJournal,
active_journaldb_sqn = LowActiveSQN,
root_path = RootPath,
cdb_options = CDBopts}}.
case {InkerOpts#inker_options.root_path,
InkerOpts#inker_options.start_snapshot} of
{undefined, true} ->
SrcInker = InkerOpts#inker_options.source_inker,
Requestor = InkerOpts#inker_options.requestor,
{ok,
{ActiveJournalDB,
Manifest}} = ink_registersnapshot(SrcInker, Requestor),
{ok, #state{manifest=Manifest,
active_journaldb=ActiveJournalDB}};
%% Need to do something about timeout
{_RootPath, false} ->
start_from_file(InkerOpts)
end.
handle_call({put, Key, Object, KeyChanges}, From, State) ->
@ -240,12 +221,11 @@ handle_call({load_pcl, StartSQN, FilterFun, Penciller}, _From, State) ->
State#state.active_journaldb}],
Reply = load_from_sequence(StartSQN, FilterFun, Penciller, Manifest),
{reply, Reply, State};
handle_call(snapshot, _From , State) ->
handle_call({register_snapshot, _Requestor}, _From , State) ->
%% TODO: Not yet implemented registration of snapshot
%% Should return manifest and register the snapshot
{reply, {State#state.manifest,
State#state.active_journaldb,
State#state.active_journaldb_sqn},
State#state.active_journaldb},
State};
handle_call(print_manifest, _From, State) ->
manifest_printer(State#state.manifest),
@ -275,6 +255,42 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%============================================================================
start_from_file(InkerOpts) ->
RootPath = InkerOpts#inker_options.root_path,
CDBopts = InkerOpts#inker_options.cdb_options,
JournalFP = filepath(RootPath, journal_dir),
{ok, JournalFilenames} = case filelib:is_dir(JournalFP) of
true ->
file:list_dir(JournalFP);
false ->
filelib:ensure_dir(JournalFP),
{ok, []}
end,
ManifestFP = filepath(RootPath, manifest_dir),
{ok, ManifestFilenames} = case filelib:is_dir(ManifestFP) of
true ->
file:list_dir(ManifestFP);
false ->
filelib:ensure_dir(ManifestFP),
{ok, []}
end,
{Manifest,
{ActiveJournal, LowActiveSQN},
JournalSQN,
ManifestSQN} = build_manifest(ManifestFilenames,
JournalFilenames,
fun simple_manifest_reader/2,
RootPath,
CDBopts),
{ok, #state{manifest = lists:reverse(lists:keysort(1, Manifest)),
manifest_sqn = ManifestSQN,
journal_sqn = JournalSQN,
active_journaldb = ActiveJournal,
active_journaldb_sqn = LowActiveSQN,
root_path = RootPath,
cdb_options = CDBopts}}.
put_object(PrimaryKey, Object, KeyChanges, State) ->
NewSQN = State#state.journal_sqn + 1,
%% TODO: The term goes through a double binary_to_term conversion