Initial hot_backup
The idea being that a consistent inker manifest and set of journal files is guaranteed - with hard links to the actual manifest files.
This commit is contained in:
parent
a1269e5274
commit
0838ff34e5
7 changed files with 211 additions and 37 deletions
|
@ -113,6 +113,8 @@
|
|||
ink_printmanifest/1,
|
||||
ink_close/1,
|
||||
ink_doom/1,
|
||||
ink_roll/1,
|
||||
ink_backup/2,
|
||||
build_dummy_journal/0,
|
||||
clean_testdir/1,
|
||||
filepath/2,
|
||||
|
@ -235,10 +237,10 @@ ink_fetch(Pid, PrimaryKey, SQN) ->
|
|||
ink_keycheck(Pid, PrimaryKey, SQN) ->
|
||||
gen_server:call(Pid, {key_check, PrimaryKey, SQN}, infinity).
|
||||
|
||||
-spec ink_registersnapshot(pid(), pid()) -> {list(), pid()}.
|
||||
-spec ink_registersnapshot(pid(), pid()) -> {list(), pid(), integer()}.
|
||||
%% @doc
|
||||
%% Register a snapshot clone for the process, returning the Manifest and the
|
||||
%% pid of the active journal.
|
||||
%% pid of the active journal, as well as the JournalSQN.
|
||||
ink_registersnapshot(Pid, Requestor) ->
|
||||
gen_server:call(Pid, {register_snapshot, Requestor}, infinity).
|
||||
|
||||
|
@ -389,6 +391,18 @@ ink_compactionpending(Pid) ->
|
|||
ink_trim(Pid, PersistedSQN) ->
|
||||
gen_server:call(Pid, {trim, PersistedSQN}, infinity).
|
||||
|
||||
-spec ink_roll(pid()) -> ok.
|
||||
%% @doc
|
||||
%% Roll the active journal
|
||||
ink_roll(Pid) ->
|
||||
gen_server:call(Pid, roll, infinity).
|
||||
|
||||
-spec ink_backup(pid(), string()) -> ok.
|
||||
%% @doc
|
||||
%% Backup the journal to the specified path
|
||||
ink_backup(Pid, BackupPath) ->
|
||||
gen_server:call(Pid, {backup, BackupPath}).
|
||||
|
||||
-spec ink_getmanifest(pid()) -> list().
|
||||
%% @doc
|
||||
%% Allows the clerk to fetch the manifest at the point it starts a compaction
|
||||
|
@ -428,11 +442,13 @@ init([InkerOpts]) ->
|
|||
{undefined, true} ->
|
||||
SrcInker = InkerOpts#inker_options.source_inker,
|
||||
{Manifest,
|
||||
ActiveJournalDB} = ink_registersnapshot(SrcInker, self()),
|
||||
{ok, #state{manifest=Manifest,
|
||||
active_journaldb=ActiveJournalDB,
|
||||
source_inker=SrcInker,
|
||||
is_snapshot=true}};
|
||||
ActiveJournalDB,
|
||||
JournalSQN} = ink_registersnapshot(SrcInker, self()),
|
||||
{ok, #state{manifest = Manifest,
|
||||
active_journaldb = ActiveJournalDB,
|
||||
source_inker = SrcInker,
|
||||
journal_sqn = JournalSQN,
|
||||
is_snapshot = true}};
|
||||
%% Need to do something about timeout
|
||||
{_RootPath, false} ->
|
||||
start_from_file(InkerOpts)
|
||||
|
@ -477,7 +493,8 @@ handle_call({register_snapshot, Requestor}, _From , State) ->
|
|||
State#state.manifest_sqn}|State#state.registered_snapshots],
|
||||
leveled_log:log("I0002", [Requestor, State#state.manifest_sqn]),
|
||||
{reply, {State#state.manifest,
|
||||
State#state.active_journaldb},
|
||||
State#state.active_journaldb,
|
||||
State#state.journal_sqn},
|
||||
State#state{registered_snapshots=Rs}};
|
||||
handle_call({confirm_delete, ManSQN}, _From, State) ->
|
||||
CheckSQNFun =
|
||||
|
@ -535,6 +552,56 @@ handle_call(compaction_pending, _From, State) ->
|
|||
handle_call({trim, PersistedSQN}, _From, State) ->
|
||||
ok = leveled_iclerk:clerk_trim(State#state.clerk, self(), PersistedSQN),
|
||||
{reply, ok, State};
|
||||
handle_call(roll, _From, State) ->
|
||||
NewSQN = State#state.journal_sqn + 1,
|
||||
{NewJournalP, Manifest1, NewManSQN} =
|
||||
roll_active(State#state.active_journaldb,
|
||||
State#state.manifest,
|
||||
NewSQN,
|
||||
State#state.cdb_options,
|
||||
State#state.root_path,
|
||||
State#state.manifest_sqn),
|
||||
{reply, ok, State#state{journal_sqn = NewSQN,
|
||||
manifest = Manifest1,
|
||||
manifest_sqn = NewManSQN,
|
||||
active_journaldb = NewJournalP}};
|
||||
handle_call({backup, BackupPath}, _from, State)
|
||||
when State#state.is_snapshot == true ->
|
||||
SW = os:timestamp(),
|
||||
BackupJFP = filepath(filename:join(BackupPath, ?JOURNAL_FP), journal_dir),
|
||||
ok = filelib:ensure_dir(BackupJFP),
|
||||
BackupFun =
|
||||
fun({SQN, FN, PidR, LastKey}, Acc) ->
|
||||
case SQN < State#state.journal_sqn of
|
||||
true ->
|
||||
BaseFN = filename:basename(FN),
|
||||
BackupName = filename:join(BackupJFP, BaseFN),
|
||||
false = when_not_rolling(PidR),
|
||||
case file:make_link(FN ++ "." ++ ?JOURNAL_FILEX,
|
||||
BackupName ++ "." ++ ?JOURNAL_FILEX) of
|
||||
ok ->
|
||||
ok;
|
||||
{error, eexist} ->
|
||||
ok
|
||||
end,
|
||||
[{SQN, BackupName, PidR, LastKey}|Acc];
|
||||
false ->
|
||||
leveled_log:log("I0021", [FN, SQN, State#state.journal_sqn]),
|
||||
Acc
|
||||
end
|
||||
end,
|
||||
BackupManifest =
|
||||
lists:foldr(BackupFun,
|
||||
[],
|
||||
leveled_imanifest:to_list(State#state.manifest)),
|
||||
leveled_imanifest:writer(leveled_imanifest:from_list(BackupManifest),
|
||||
State#state.manifest_sqn,
|
||||
filename:join(BackupPath, ?JOURNAL_FP)),
|
||||
leveled_log:log_timer("I0020",
|
||||
[filename:join(BackupPath, ?JOURNAL_FP),
|
||||
length(BackupManifest)],
|
||||
SW),
|
||||
{reply, ok, State};
|
||||
handle_call(close, _From, State) ->
|
||||
case State#state.is_snapshot of
|
||||
true ->
|
||||
|
@ -596,9 +663,9 @@ start_from_file(InkOpts) ->
|
|||
% Determine filepaths
|
||||
RootPath = InkOpts#inker_options.root_path,
|
||||
JournalFP = filepath(RootPath, journal_dir),
|
||||
filelib:ensure_dir(JournalFP),
|
||||
ok = filelib:ensure_dir(JournalFP),
|
||||
CompactFP = filepath(RootPath, journal_compact_dir),
|
||||
filelib:ensure_dir(CompactFP),
|
||||
ok = filelib:ensure_dir(CompactFP),
|
||||
ManifestFP = filepath(RootPath, manifest_dir),
|
||||
ok = filelib:ensure_dir(ManifestFP),
|
||||
% The IClerk must start files with the compaction file path so that they
|
||||
|
@ -645,6 +712,19 @@ start_from_file(InkOpts) ->
|
|||
clerk = Clerk}}.
|
||||
|
||||
|
||||
when_not_rolling(CDB) ->
|
||||
RollerFun =
|
||||
fun(Sleep, WasRolling) ->
|
||||
case WasRolling of
|
||||
false ->
|
||||
false;
|
||||
true ->
|
||||
timer:sleep(Sleep),
|
||||
leveled_cdb:cdb_isrolling(CDB)
|
||||
end
|
||||
end,
|
||||
lists:foldl(RollerFun, true, [0, 1000, 10000, 100000]).
|
||||
|
||||
-spec shutdown_snapshots(list(tuple())) -> ok.
|
||||
%% @doc
|
||||
%% Shutdown any snapshots before closing the store
|
||||
|
@ -710,29 +790,20 @@ put_object(LedgerKey, Object, KeyChanges, State) ->
|
|||
State#state{journal_sqn=NewSQN},
|
||||
byte_size(JournalBin)};
|
||||
roll ->
|
||||
SWroll = os:timestamp(),
|
||||
LastKey = leveled_cdb:cdb_lastkey(ActiveJournal),
|
||||
ok = leveled_cdb:cdb_roll(ActiveJournal),
|
||||
Manifest0 = leveled_imanifest:append_lastkey(State#state.manifest,
|
||||
ActiveJournal,
|
||||
LastKey),
|
||||
CDBopts = State#state.cdb_options,
|
||||
ManEntry = start_new_activejournal(NewSQN,
|
||||
State#state.root_path,
|
||||
CDBopts),
|
||||
{_, _, NewJournalP, _} = ManEntry,
|
||||
Manifest1 = leveled_imanifest:add_entry(Manifest0, ManEntry, true),
|
||||
ok = leveled_imanifest:writer(Manifest1,
|
||||
State#state.manifest_sqn + 1,
|
||||
State#state.root_path),
|
||||
{NewJournalP, Manifest1, NewManSQN} =
|
||||
roll_active(ActiveJournal,
|
||||
State#state.manifest,
|
||||
NewSQN,
|
||||
State#state.cdb_options,
|
||||
State#state.root_path,
|
||||
State#state.manifest_sqn),
|
||||
ok = leveled_cdb:cdb_put(NewJournalP,
|
||||
JournalKey,
|
||||
JournalBin),
|
||||
leveled_log:log_timer("I0008", [], SWroll),
|
||||
{rolling,
|
||||
State#state{journal_sqn=NewSQN,
|
||||
manifest=Manifest1,
|
||||
manifest_sqn = State#state.manifest_sqn + 1,
|
||||
manifest_sqn = NewManSQN,
|
||||
active_journaldb=NewJournalP},
|
||||
byte_size(JournalBin)}
|
||||
end.
|
||||
|
@ -756,6 +827,26 @@ get_object(LedgerKey, SQN, Manifest, ToIgnoreKeyChanges) ->
|
|||
leveled_codec:from_inkerkv(Obj, ToIgnoreKeyChanges).
|
||||
|
||||
|
||||
-spec roll_active(pid(), leveled_imanifest:manifest(),
|
||||
integer(), #cdb_options{}, string(), integer()) ->
|
||||
{pid(), leveled_imanifest:manifest(), integer()}.
|
||||
%% @doc
|
||||
%% Roll the active journal, and start a new active journal, updating the
|
||||
%% manifest
|
||||
roll_active(ActiveJournal, Manifest, NewSQN, CDBopts, RootPath, ManifestSQN) ->
|
||||
SWroll = os:timestamp(),
|
||||
LastKey = leveled_cdb:cdb_lastkey(ActiveJournal),
|
||||
ok = leveled_cdb:cdb_roll(ActiveJournal),
|
||||
Manifest0 =
|
||||
leveled_imanifest:append_lastkey(Manifest, ActiveJournal, LastKey),
|
||||
ManEntry =
|
||||
start_new_activejournal(NewSQN, RootPath, CDBopts),
|
||||
{_, _, NewJournalP, _} = ManEntry,
|
||||
Manifest1 = leveled_imanifest:add_entry(Manifest0, ManEntry, true),
|
||||
ok = leveled_imanifest:writer(Manifest1, ManifestSQN + 1, RootPath),
|
||||
leveled_log:log_timer("I0008", [], SWroll),
|
||||
{NewJournalP, Manifest1, ManifestSQN + 1}.
|
||||
|
||||
-spec key_check(leveled_codec:ledger_key(),
|
||||
integer(),
|
||||
leveled_imanifest:manifest()) -> missing|probably.
|
||||
|
@ -1014,7 +1105,6 @@ sequencenumbers_fromfilenames(Filenames, Regex, IntName) ->
|
|||
[],
|
||||
Filenames).
|
||||
|
||||
|
||||
filepath(RootPath, journal_dir) ->
|
||||
RootPath ++ "/" ++ ?FILES_FP ++ "/";
|
||||
filepath(RootPath, manifest_dir) ->
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue