diff --git a/include/leveled.hrl b/include/leveled.hrl index 64f0dfe..dd0b90b 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -1,3 +1,6 @@ +% File paths +-define(JOURNAL_FP, "journal"). +-define(LEDGER_FP, "ledger"). %% Tag to be used on standard Riak KV objects -define(RIAK_TAG, o_rkv). diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 901f6b8..c612485 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -63,6 +63,7 @@ book_compactjournal/2, book_islastcompactionpending/1, book_trimjournal/1, + book_hotbackup/1, book_close/1, book_destroy/1, book_isempty/2]). @@ -76,8 +77,6 @@ -include_lib("eunit/include/eunit.hrl"). -define(CACHE_SIZE, 2500). --define(JOURNAL_FP, "journal"). --define(LEDGER_FP, "ledger"). -define(SNAPSHOT_TIMEOUT, 300000). -define(CACHE_SIZE_JITTER, 25). -define(JOURNAL_SIZE_JITTER, 20). @@ -587,6 +586,19 @@ book_destroy(Pid) -> gen_server:call(Pid, destroy, infinity). +-spec book_hotbackup(pid()) -> {async, fun()}. +%% @doc Backup the Bookie +%% Return a function that will take a backup of a snapshot of the Journal. +%% The function will be 1-arity, and can be passed the absolute folder name +%% to store the backup. +%% +%% Backup files are hard-linked. Does not work in head_only mode +%% +%% TODO: Can extend to head_only mode, and also support another parameter +%% which would backup persisted part of ledger (to make restart faster) +book_hotbackup(Pid) -> + gen_server:call(Pid, hot_backup, infinity). + -spec book_isempty(pid(), leveled_codec:tag()) -> boolean(). %% @doc %% Confirm if the store is empty, or if it contains a Key and Value for a @@ -842,6 +854,19 @@ handle_call(confirm_compact, _From, State) handle_call(trim, _From, State) when State#state.head_only == true -> PSQN = leveled_penciller:pcl_persistedsqn(State#state.penciller), {reply, leveled_inker:ink_trim(State#state.inker, PSQN), State}; +handle_call(hot_backup, _From, State) when State#state.head_only == false -> + ok = leveled_inker:ink_roll(State#state.inker), + BackupFun = + fun(InkerSnapshot) -> + fun(BackupPath) -> + ok = leveled_inker:ink_backup(InkerSnapshot, BackupPath), + ok = leveled_inker:ink_close(InkerSnapshot) + end + end, + InkerOpts = + #inker_options{start_snapshot=true, source_inker=State#state.inker}, + {ok, Snapshot} = leveled_inker:ink_snapstart(InkerOpts), + {reply, {async, BackupFun(Snapshot)}, State}; handle_call(close, _From, State) -> leveled_inker:ink_close(State#state.inker), leveled_penciller:pcl_close(State#state.penciller), @@ -1019,8 +1044,8 @@ set_options(Opts) -> PCLL0CacheSize = proplists:get_value(max_pencillercachesize, Opts), RootPath = proplists:get_value(root_path, Opts), - JournalFP = RootPath ++ "/" ++ ?JOURNAL_FP, - LedgerFP = RootPath ++ "/" ++ ?LEDGER_FP, + JournalFP = filename:join(RootPath, ?JOURNAL_FP), + LedgerFP = filename:join(RootPath, ?LEDGER_FP), ok = filelib:ensure_dir(JournalFP), ok = filelib:ensure_dir(LedgerFP), diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 86f85a6..acbac25 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -100,6 +100,7 @@ cdb_destroy/1, cdb_deletepending/1, cdb_deletepending/3, + cdb_isrolling/1, hashtable_calc/2]). -include_lib("eunit/include/eunit.hrl"). @@ -380,6 +381,13 @@ cdb_filename(Pid) -> cdb_keycheck(Pid, Key) -> gen_fsm:sync_send_event(Pid, {key_check, Key}, infinity). +-spec cdb_isrolling(pid()) -> boolean(). +%% @doc +%% Check to see if a cdb file is still rolling +cdb_isrolling(Pid) -> + gen_fsm:sync_send_all_state_event(Pid, cdb_isrolling, infinity). + + %%%============================================================================ %%% gen_server callbacks %%%============================================================================ @@ -744,6 +752,8 @@ handle_sync_event(cdb_firstkey, _From, StateName, State) -> {reply, FirstKey, StateName, State}; handle_sync_event(cdb_filename, _From, StateName, State) -> {reply, State#state.filename, StateName, State}; +handle_sync_event(cdb_isrolling, _From, StateName, State) -> + {reply, StateName == rolling, StateName, State}; handle_sync_event(cdb_close, _From, delete_pending, State) -> leveled_log:log("CDB05", [State#state.filename, delete_pending, cdb_close]), diff --git a/src/leveled_imanifest.erl b/src/leveled_imanifest.erl index e619ab0..2be891f 100644 --- a/src/leveled_imanifest.erl +++ b/src/leveled_imanifest.erl @@ -161,6 +161,8 @@ reader(SQN, RootPath) -> %% disk writer(Manifest, ManSQN, RootPath) -> ManPath = leveled_inker:filepath(RootPath, manifest_dir), + ok = filelib:ensure_dir(ManPath), + % When writing during backups, may not have been generated NewFN = filename:join(ManPath, integer_to_list(ManSQN) ++ "." ++ ?MANIFEST_FILEX), TmpFN = filename:join(ManPath, @@ -198,10 +200,10 @@ complete_filex() -> ?MANIFEST_FILEX. -%%%============================================================================ -%%% Internal Functions -%%%============================================================================ - +-spec from_list(list()) -> manifest(). +%% @doc +%% Convert from a flat list into a manifest with lookup jumps. +%% The opposite of to_list/1 from_list(Manifest) -> % Manifest should already be sorted with the highest SQN at the head % This will be maintained so that we can fold from the left, and find @@ -209,6 +211,11 @@ from_list(Manifest) -> % reads are more common than stale reads lists:foldr(fun prepend_entry/2, [], Manifest). + +%%%============================================================================ +%%% Internal Functions +%%%============================================================================ + prepend_entry(Entry, AccL) -> {SQN, _FN, _PidR, _LastKey} = Entry, case AccL of diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index a03494d..27794d5 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -113,6 +113,8 @@ ink_printmanifest/1, ink_close/1, ink_doom/1, + ink_roll/1, + ink_backup/2, build_dummy_journal/0, clean_testdir/1, filepath/2, @@ -235,10 +237,10 @@ ink_fetch(Pid, PrimaryKey, SQN) -> ink_keycheck(Pid, PrimaryKey, SQN) -> gen_server:call(Pid, {key_check, PrimaryKey, SQN}, infinity). --spec ink_registersnapshot(pid(), pid()) -> {list(), pid()}. +-spec ink_registersnapshot(pid(), pid()) -> {list(), pid(), integer()}. %% @doc %% Register a snapshot clone for the process, returning the Manifest and the -%% pid of the active journal. +%% pid of the active journal, as well as the JournalSQN. ink_registersnapshot(Pid, Requestor) -> gen_server:call(Pid, {register_snapshot, Requestor}, infinity). @@ -389,6 +391,18 @@ ink_compactionpending(Pid) -> ink_trim(Pid, PersistedSQN) -> gen_server:call(Pid, {trim, PersistedSQN}, infinity). +-spec ink_roll(pid()) -> ok. +%% @doc +%% Roll the active journal +ink_roll(Pid) -> + gen_server:call(Pid, roll, infinity). + +-spec ink_backup(pid(), string()) -> ok. +%% @doc +%% Backup the journal to the specified path +ink_backup(Pid, BackupPath) -> + gen_server:call(Pid, {backup, BackupPath}). + -spec ink_getmanifest(pid()) -> list(). %% @doc %% Allows the clerk to fetch the manifest at the point it starts a compaction @@ -428,11 +442,13 @@ init([InkerOpts]) -> {undefined, true} -> SrcInker = InkerOpts#inker_options.source_inker, {Manifest, - ActiveJournalDB} = ink_registersnapshot(SrcInker, self()), - {ok, #state{manifest=Manifest, - active_journaldb=ActiveJournalDB, - source_inker=SrcInker, - is_snapshot=true}}; + ActiveJournalDB, + JournalSQN} = ink_registersnapshot(SrcInker, self()), + {ok, #state{manifest = Manifest, + active_journaldb = ActiveJournalDB, + source_inker = SrcInker, + journal_sqn = JournalSQN, + is_snapshot = true}}; %% Need to do something about timeout {_RootPath, false} -> start_from_file(InkerOpts) @@ -477,7 +493,8 @@ handle_call({register_snapshot, Requestor}, _From , State) -> State#state.manifest_sqn}|State#state.registered_snapshots], leveled_log:log("I0002", [Requestor, State#state.manifest_sqn]), {reply, {State#state.manifest, - State#state.active_journaldb}, + State#state.active_journaldb, + State#state.journal_sqn}, State#state{registered_snapshots=Rs}}; handle_call({confirm_delete, ManSQN}, _From, State) -> CheckSQNFun = @@ -535,6 +552,56 @@ handle_call(compaction_pending, _From, State) -> handle_call({trim, PersistedSQN}, _From, State) -> ok = leveled_iclerk:clerk_trim(State#state.clerk, self(), PersistedSQN), {reply, ok, State}; +handle_call(roll, _From, State) -> + NewSQN = State#state.journal_sqn + 1, + {NewJournalP, Manifest1, NewManSQN} = + roll_active(State#state.active_journaldb, + State#state.manifest, + NewSQN, + State#state.cdb_options, + State#state.root_path, + State#state.manifest_sqn), + {reply, ok, State#state{journal_sqn = NewSQN, + manifest = Manifest1, + manifest_sqn = NewManSQN, + active_journaldb = NewJournalP}}; +handle_call({backup, BackupPath}, _from, State) + when State#state.is_snapshot == true -> + SW = os:timestamp(), + BackupJFP = filepath(filename:join(BackupPath, ?JOURNAL_FP), journal_dir), + ok = filelib:ensure_dir(BackupJFP), + BackupFun = + fun({SQN, FN, PidR, LastKey}, Acc) -> + case SQN < State#state.journal_sqn of + true -> + BaseFN = filename:basename(FN), + BackupName = filename:join(BackupJFP, BaseFN), + false = when_not_rolling(PidR), + case file:make_link(FN ++ "." ++ ?JOURNAL_FILEX, + BackupName ++ "." ++ ?JOURNAL_FILEX) of + ok -> + ok; + {error, eexist} -> + ok + end, + [{SQN, BackupName, PidR, LastKey}|Acc]; + false -> + leveled_log:log("I0021", [FN, SQN, State#state.journal_sqn]), + Acc + end + end, + BackupManifest = + lists:foldr(BackupFun, + [], + leveled_imanifest:to_list(State#state.manifest)), + leveled_imanifest:writer(leveled_imanifest:from_list(BackupManifest), + State#state.manifest_sqn, + filename:join(BackupPath, ?JOURNAL_FP)), + leveled_log:log_timer("I0020", + [filename:join(BackupPath, ?JOURNAL_FP), + length(BackupManifest)], + SW), + {reply, ok, State}; handle_call(close, _From, State) -> case State#state.is_snapshot of true -> @@ -596,9 +663,9 @@ start_from_file(InkOpts) -> % Determine filepaths RootPath = InkOpts#inker_options.root_path, JournalFP = filepath(RootPath, journal_dir), - filelib:ensure_dir(JournalFP), + ok = filelib:ensure_dir(JournalFP), CompactFP = filepath(RootPath, journal_compact_dir), - filelib:ensure_dir(CompactFP), + ok = filelib:ensure_dir(CompactFP), ManifestFP = filepath(RootPath, manifest_dir), ok = filelib:ensure_dir(ManifestFP), % The IClerk must start files with the compaction file path so that they @@ -645,6 +712,19 @@ start_from_file(InkOpts) -> clerk = Clerk}}. +when_not_rolling(CDB) -> + RollerFun = + fun(Sleep, WasRolling) -> + case WasRolling of + false -> + false; + true -> + timer:sleep(Sleep), + leveled_cdb:cdb_isrolling(CDB) + end + end, + lists:foldl(RollerFun, true, [0, 1000, 10000, 100000]). + -spec shutdown_snapshots(list(tuple())) -> ok. %% @doc %% Shutdown any snapshots before closing the store @@ -710,29 +790,20 @@ put_object(LedgerKey, Object, KeyChanges, State) -> State#state{journal_sqn=NewSQN}, byte_size(JournalBin)}; roll -> - SWroll = os:timestamp(), - LastKey = leveled_cdb:cdb_lastkey(ActiveJournal), - ok = leveled_cdb:cdb_roll(ActiveJournal), - Manifest0 = leveled_imanifest:append_lastkey(State#state.manifest, - ActiveJournal, - LastKey), - CDBopts = State#state.cdb_options, - ManEntry = start_new_activejournal(NewSQN, - State#state.root_path, - CDBopts), - {_, _, NewJournalP, _} = ManEntry, - Manifest1 = leveled_imanifest:add_entry(Manifest0, ManEntry, true), - ok = leveled_imanifest:writer(Manifest1, - State#state.manifest_sqn + 1, - State#state.root_path), + {NewJournalP, Manifest1, NewManSQN} = + roll_active(ActiveJournal, + State#state.manifest, + NewSQN, + State#state.cdb_options, + State#state.root_path, + State#state.manifest_sqn), ok = leveled_cdb:cdb_put(NewJournalP, JournalKey, JournalBin), - leveled_log:log_timer("I0008", [], SWroll), {rolling, State#state{journal_sqn=NewSQN, manifest=Manifest1, - manifest_sqn = State#state.manifest_sqn + 1, + manifest_sqn = NewManSQN, active_journaldb=NewJournalP}, byte_size(JournalBin)} end. @@ -756,6 +827,26 @@ get_object(LedgerKey, SQN, Manifest, ToIgnoreKeyChanges) -> leveled_codec:from_inkerkv(Obj, ToIgnoreKeyChanges). +-spec roll_active(pid(), leveled_imanifest:manifest(), + integer(), #cdb_options{}, string(), integer()) -> + {pid(), leveled_imanifest:manifest(), integer()}. +%% @doc +%% Roll the active journal, and start a new active journal, updating the +%% manifest +roll_active(ActiveJournal, Manifest, NewSQN, CDBopts, RootPath, ManifestSQN) -> + SWroll = os:timestamp(), + LastKey = leveled_cdb:cdb_lastkey(ActiveJournal), + ok = leveled_cdb:cdb_roll(ActiveJournal), + Manifest0 = + leveled_imanifest:append_lastkey(Manifest, ActiveJournal, LastKey), + ManEntry = + start_new_activejournal(NewSQN, RootPath, CDBopts), + {_, _, NewJournalP, _} = ManEntry, + Manifest1 = leveled_imanifest:add_entry(Manifest0, ManEntry, true), + ok = leveled_imanifest:writer(Manifest1, ManifestSQN + 1, RootPath), + leveled_log:log_timer("I0008", [], SWroll), + {NewJournalP, Manifest1, ManifestSQN + 1}. + -spec key_check(leveled_codec:ledger_key(), integer(), leveled_imanifest:manifest()) -> missing|probably. @@ -1014,7 +1105,6 @@ sequencenumbers_fromfilenames(Filenames, Regex, IntName) -> [], Filenames). - filepath(RootPath, journal_dir) -> RootPath ++ "/" ++ ?FILES_FP ++ "/"; filepath(RootPath, manifest_dir) -> diff --git a/src/leveled_log.erl b/src/leveled_log.erl index b52f1cc..bd690f7 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -280,6 +280,10 @@ {"I0019", {info, "After ~w PUTs total prepare time is ~w total cdb time is ~w " ++ "and max prepare time is ~w and max cdb time is ~w"}}, + {"I0020", + {info, "Journal backup completed to path=~s with file_count=~w"}}, + {"I0021", + {info, "Ingoring filename=~s with SQN=~w and JournalSQN=~w"}}, {"IC001", {info, "Closed for reason ~w so maybe leaving garbage"}}, diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index 8c63b04..a8b063a 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -2,7 +2,8 @@ -include_lib("common_test/include/ct.hrl"). -include("include/leveled.hrl"). -export([all/0]). --export([retain_strategy/1, +-export([hot_backup_simple/1, + retain_strategy/1, recovr_strategy/1, aae_missingjournal/1, aae_bustedjournal/1, @@ -10,6 +11,7 @@ ]). all() -> [ + hot_backup_simple, retain_strategy, recovr_strategy, aae_missingjournal, @@ -17,6 +19,39 @@ all() -> [ journal_compaction_bustedjournal ]. + +hot_backup_simple(_Config) -> + % The journal may have a hot backup. This allows for an online Bookie + % to be sent a message to prepare a backup function, which an asynchronous + % worker can then call to generate a backup taken at the point in time + % the original message was processsed. + % + % The basic test is to: + % 1 - load a Bookie, take a backup, delete the original path, restore from + % that path + RootPath = testutil:reset_filestructure(), + BookOpts = [{root_path, RootPath}, + {cache_size, 1000}, + {max_journalsize, 10000000}, + {sync_strategy, testutil:sync_strategy()}], + {ok, Spcl1, LastV1} = rotating_object_check(BookOpts, "Bucket1", 10000), + {ok, Book1} = leveled_bookie:book_start(BookOpts), + {async, BackupFun} = leveled_bookie:book_hotbackup(Book1), + BackupPath = testutil:reset_filestructure("backup0"), + ok = BackupFun(BackupPath), + ok = leveled_bookie:book_close(Book1), + RootPath = testutil:reset_filestructure(), + BookOptsBackup = [{root_path, BackupPath}, + {cache_size, 2000}, + {max_journalsize, 20000000}, + {sync_strategy, testutil:sync_strategy()}], + {ok, BookBackup} = leveled_bookie:book_start(BookOptsBackup), + ok = testutil:check_indexed_objects(BookBackup, "Bucket1", Spcl1, LastV1), + ok = leveled_bookie:book_close(BookBackup), + BackupPath = testutil:reset_filestructure("backup0"). + + + retain_strategy(_Config) -> RootPath = testutil:reset_filestructure(), BookOpts = [{root_path, RootPath},