Merge pull request #168 from martinsumner/mas-i164-hotbackup
Mas i164 hotbackup
This commit is contained in:
commit
8f8e393227
9 changed files with 367 additions and 74 deletions
|
@ -120,6 +120,16 @@ Three potential recovery strategies are supported to provide some flexibility fo
|
|||
|
||||
- recalc (not yet implemented) - the compaction rules assume that on recovery the key changes will be recalculated by comparing the change with the current database state. In recovery the key changes will be recalculated by comparing the change with the current database state.
|
||||
|
||||
### Hot Backups
|
||||
|
||||
A request can be made to backup a leveled instance, where there is OS support for hard links of files in Erlang. The backup request will return an `{async, BackupFun}` response, and calling `BackupFun(BackupPath)` will cause a backup to be taken into the given path. If a backup already exists in that path, then the Backup will be updated.
|
||||
|
||||
Backups are taken of the Journal only, as the Ledger can be recreated on startup from empty using the KeyChanges in the Journal (backups are not currently an option in `head_only` mode).
|
||||
|
||||
The backup uses hard-links, so at the point the backup is taken, there will be a minimal change to the on-disk footprint of the store. However, as journal compaction is run, the hard-links will prevent space from getting released by the dropping of replaced journal files - so backups will cause the size of the store to grow faster than it would otherwise do. It is an operator responsibility to garbage collect old backups, to prevent this growth from being an issue.
|
||||
|
||||
As backups depend on hard-links, they cannot be taken with a `BackupPath` on a different file system to the standard data path. The move a backup across to a different file system, standard tools should be used such as rsync. The leveled backups should be relatively friendly for rsync-like delta-based backup approaches due to significantly lower write amplification when compared to other LSM stores (e.g. leveldb).
|
||||
|
||||
## Head only
|
||||
|
||||
Leveled can be started in `head_only` mode. This is a special mode which dispenses with the long-term role of the Journal in retaining data. This is a mode to be used in *special circumstances* when values are small, and Key/Value pairs are added in batches.
|
||||
|
|
|
@ -1,3 +1,6 @@
|
|||
% File paths
|
||||
-define(JOURNAL_FP, "journal").
|
||||
-define(LEDGER_FP, "ledger").
|
||||
|
||||
%% Tag to be used on standard Riak KV objects
|
||||
-define(RIAK_TAG, o_rkv).
|
||||
|
|
|
@ -62,6 +62,7 @@
|
|||
book_compactjournal/2,
|
||||
book_islastcompactionpending/1,
|
||||
book_trimjournal/1,
|
||||
book_hotbackup/1,
|
||||
book_close/1,
|
||||
book_destroy/1,
|
||||
book_isempty/2]).
|
||||
|
@ -90,8 +91,6 @@
|
|||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-define(CACHE_SIZE, 2500).
|
||||
-define(JOURNAL_FP, "journal").
|
||||
-define(LEDGER_FP, "ledger").
|
||||
-define(SNAPSHOT_TIMEOUT, 300000).
|
||||
-define(CACHE_SIZE_JITTER, 25).
|
||||
-define(JOURNAL_SIZE_JITTER, 20).
|
||||
|
@ -893,6 +892,19 @@ book_destroy(Pid) ->
|
|||
gen_server:call(Pid, destroy, infinity).
|
||||
|
||||
|
||||
-spec book_hotbackup(pid()) -> {async, fun()}.
|
||||
%% @doc Backup the Bookie
|
||||
%% Return a function that will take a backup of a snapshot of the Journal.
|
||||
%% The function will be 1-arity, and can be passed the absolute folder name
|
||||
%% to store the backup.
|
||||
%%
|
||||
%% Backup files are hard-linked. Does not work in head_only mode
|
||||
%%
|
||||
%% TODO: Can extend to head_only mode, and also support another parameter
|
||||
%% which would backup persisted part of ledger (to make restart faster)
|
||||
book_hotbackup(Pid) ->
|
||||
gen_server:call(Pid, hot_backup, infinity).
|
||||
|
||||
-spec book_isempty(pid(), leveled_codec:tag()) -> boolean().
|
||||
%% @doc
|
||||
%% Confirm if the store is empty, or if it contains a Key and Value for a
|
||||
|
@ -1147,6 +1159,21 @@ handle_call(confirm_compact, _From, State)
|
|||
handle_call(trim, _From, State) when State#state.head_only == true ->
|
||||
PSQN = leveled_penciller:pcl_persistedsqn(State#state.penciller),
|
||||
{reply, leveled_inker:ink_trim(State#state.inker, PSQN), State};
|
||||
handle_call(hot_backup, _From, State) when State#state.head_only == false ->
|
||||
ok = leveled_inker:ink_roll(State#state.inker),
|
||||
BackupFun =
|
||||
fun(InkerSnapshot) ->
|
||||
fun(BackupPath) ->
|
||||
ok = leveled_inker:ink_backup(InkerSnapshot, BackupPath),
|
||||
ok = leveled_inker:ink_close(InkerSnapshot)
|
||||
end
|
||||
end,
|
||||
InkerOpts =
|
||||
#inker_options{start_snapshot = true,
|
||||
source_inker = State#state.inker,
|
||||
bookies_pid = self()},
|
||||
{ok, Snapshot} = leveled_inker:ink_snapstart(InkerOpts),
|
||||
{reply, {async, BackupFun(Snapshot)}, State};
|
||||
handle_call(close, _From, State) ->
|
||||
leveled_inker:ink_close(State#state.inker),
|
||||
leveled_penciller:pcl_close(State#state.penciller),
|
||||
|
@ -1326,8 +1353,8 @@ set_options(Opts) ->
|
|||
PCLL0CacheSize = proplists:get_value(max_pencillercachesize, Opts),
|
||||
RootPath = proplists:get_value(root_path, Opts),
|
||||
|
||||
JournalFP = RootPath ++ "/" ++ ?JOURNAL_FP,
|
||||
LedgerFP = RootPath ++ "/" ++ ?LEDGER_FP,
|
||||
JournalFP = filename:join(RootPath, ?JOURNAL_FP),
|
||||
LedgerFP = filename:join(RootPath, ?LEDGER_FP),
|
||||
ok = filelib:ensure_dir(JournalFP),
|
||||
ok = filelib:ensure_dir(LedgerFP),
|
||||
|
||||
|
|
|
@ -100,6 +100,9 @@
|
|||
cdb_destroy/1,
|
||||
cdb_deletepending/1,
|
||||
cdb_deletepending/3,
|
||||
cdb_isrolling/1]).
|
||||
|
||||
-export([finished_rolling/1,
|
||||
hashtable_calc/2]).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
@ -380,6 +383,13 @@ cdb_filename(Pid) ->
|
|||
cdb_keycheck(Pid, Key) ->
|
||||
gen_fsm:sync_send_event(Pid, {key_check, Key}, infinity).
|
||||
|
||||
-spec cdb_isrolling(pid()) -> boolean().
|
||||
%% @doc
|
||||
%% Check to see if a cdb file is still rolling
|
||||
cdb_isrolling(Pid) ->
|
||||
gen_fsm:sync_send_all_state_event(Pid, cdb_isrolling, infinity).
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
%%% gen_server callbacks
|
||||
%%%============================================================================
|
||||
|
@ -744,6 +754,8 @@ handle_sync_event(cdb_firstkey, _From, StateName, State) ->
|
|||
{reply, FirstKey, StateName, State};
|
||||
handle_sync_event(cdb_filename, _From, StateName, State) ->
|
||||
{reply, State#state.filename, StateName, State};
|
||||
handle_sync_event(cdb_isrolling, _From, StateName, State) ->
|
||||
{reply, StateName == rolling, StateName, State};
|
||||
handle_sync_event(cdb_close, _From, delete_pending, State) ->
|
||||
leveled_log:log("CDB05",
|
||||
[State#state.filename, delete_pending, cdb_close]),
|
||||
|
@ -770,6 +782,25 @@ terminate(_Reason, _StateName, _State) ->
|
|||
code_change(_OldVsn, StateName, State, _Extra) ->
|
||||
{ok, StateName, State}.
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
%%% External functions
|
||||
%%%============================================================================
|
||||
|
||||
|
||||
finished_rolling(CDB) ->
|
||||
RollerFun =
|
||||
fun(Sleep, FinishedRolling) ->
|
||||
case FinishedRolling of
|
||||
true ->
|
||||
true;
|
||||
false ->
|
||||
timer:sleep(Sleep),
|
||||
not leveled_cdb:cdb_isrolling(CDB)
|
||||
end
|
||||
end,
|
||||
lists:foldl(RollerFun, false, [0, 1000, 10000, 100000]).
|
||||
|
||||
%%%============================================================================
|
||||
%%% Internal functions
|
||||
%%%============================================================================
|
||||
|
@ -2117,6 +2148,19 @@ emptyvalue_fromdict_test() ->
|
|||
?assertMatch(KVP, D_Result),
|
||||
ok = file:delete("../test/from_dict_test_ev.cdb").
|
||||
|
||||
|
||||
empty_roll_test() ->
|
||||
file:delete("../test/empty_roll.cdb"),
|
||||
file:delete("../test/empty_roll.pnd"),
|
||||
{ok, P1} = cdb_open_writer("../test/empty_roll.pnd",
|
||||
#cdb_options{binary_mode=true}),
|
||||
ok = cdb_roll(P1),
|
||||
true = finished_rolling(P1),
|
||||
{ok, P2} = cdb_open_reader("../test/empty_roll.cdb",
|
||||
#cdb_options{binary_mode=true}),
|
||||
ok = cdb_close(P2),
|
||||
ok = file:delete("../test/empty_roll.cdb").
|
||||
|
||||
find_lastkey_test() ->
|
||||
file:delete("../test/lastkey.pnd"),
|
||||
{ok, P1} = cdb_open_writer("../test/lastkey.pnd",
|
||||
|
|
|
@ -161,17 +161,16 @@ reader(SQN, RootPath) ->
|
|||
%% disk
|
||||
writer(Manifest, ManSQN, RootPath) ->
|
||||
ManPath = leveled_inker:filepath(RootPath, manifest_dir),
|
||||
ok = filelib:ensure_dir(ManPath),
|
||||
% When writing during backups, may not have been generated
|
||||
NewFN = filename:join(ManPath,
|
||||
integer_to_list(ManSQN) ++ "." ++ ?MANIFEST_FILEX),
|
||||
TmpFN = filename:join(ManPath,
|
||||
integer_to_list(ManSQN) ++ "." ++ ?PENDING_FILEX),
|
||||
MBin = term_to_binary(to_list(Manifest), [compressed]),
|
||||
case filelib:is_file(NewFN) of
|
||||
false ->
|
||||
leveled_log:log("I0016", [ManSQN]),
|
||||
ok = file:write_file(TmpFN, MBin),
|
||||
ok = file:rename(TmpFN, NewFN)
|
||||
end,
|
||||
leveled_log:log("I0016", [ManSQN]),
|
||||
ok = file:write_file(TmpFN, MBin),
|
||||
ok = file:rename(TmpFN, NewFN),
|
||||
GC_SQN = ManSQN - ?MANIFESTS_TO_RETAIN,
|
||||
GC_Man = filename:join(ManPath,
|
||||
integer_to_list(GC_SQN) ++ "." ++ ?MANIFEST_FILEX),
|
||||
|
@ -198,10 +197,10 @@ complete_filex() ->
|
|||
?MANIFEST_FILEX.
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
%%% Internal Functions
|
||||
%%%============================================================================
|
||||
|
||||
-spec from_list(list()) -> manifest().
|
||||
%% @doc
|
||||
%% Convert from a flat list into a manifest with lookup jumps.
|
||||
%% The opposite of to_list/1
|
||||
from_list(Manifest) ->
|
||||
% Manifest should already be sorted with the highest SQN at the head
|
||||
% This will be maintained so that we can fold from the left, and find
|
||||
|
@ -209,6 +208,11 @@ from_list(Manifest) ->
|
|||
% reads are more common than stale reads
|
||||
lists:foldr(fun prepend_entry/2, [], Manifest).
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
%%% Internal Functions
|
||||
%%%============================================================================
|
||||
|
||||
prepend_entry(Entry, AccL) ->
|
||||
{SQN, _FN, _PidR, _LastKey} = Entry,
|
||||
case AccL of
|
||||
|
|
|
@ -113,6 +113,8 @@
|
|||
ink_printmanifest/1,
|
||||
ink_close/1,
|
||||
ink_doom/1,
|
||||
ink_roll/1,
|
||||
ink_backup/2,
|
||||
build_dummy_journal/0,
|
||||
clean_testdir/1,
|
||||
filepath/2,
|
||||
|
@ -236,10 +238,10 @@ ink_fetch(Pid, PrimaryKey, SQN) ->
|
|||
ink_keycheck(Pid, PrimaryKey, SQN) ->
|
||||
gen_server:call(Pid, {key_check, PrimaryKey, SQN}, infinity).
|
||||
|
||||
-spec ink_registersnapshot(pid(), pid()) -> {list(), pid()}.
|
||||
-spec ink_registersnapshot(pid(), pid()) -> {list(), pid(), integer()}.
|
||||
%% @doc
|
||||
%% Register a snapshot clone for the process, returning the Manifest and the
|
||||
%% pid of the active journal.
|
||||
%% pid of the active journal, as well as the JournalSQN.
|
||||
ink_registersnapshot(Pid, Requestor) ->
|
||||
gen_server:call(Pid, {register_snapshot, Requestor}, infinity).
|
||||
|
||||
|
@ -299,7 +301,7 @@ ink_doom(Pid) ->
|
|||
%%
|
||||
%% The InitAccFun should return an initial batch accumulator for each subfold.
|
||||
%% It is a 2-arity function that takes a filename and a MinSQN as an input
|
||||
%% potentially to be use din logging
|
||||
%% potentially to be used in logging
|
||||
%%
|
||||
%% The BatchFun is a two arity function that should take as inputs:
|
||||
%% An overall accumulator
|
||||
|
@ -390,6 +392,18 @@ ink_compactionpending(Pid) ->
|
|||
ink_trim(Pid, PersistedSQN) ->
|
||||
gen_server:call(Pid, {trim, PersistedSQN}, infinity).
|
||||
|
||||
-spec ink_roll(pid()) -> ok.
|
||||
%% @doc
|
||||
%% Roll the active journal
|
||||
ink_roll(Pid) ->
|
||||
gen_server:call(Pid, roll, infinity).
|
||||
|
||||
-spec ink_backup(pid(), string()) -> ok.
|
||||
%% @doc
|
||||
%% Backup the journal to the specified path
|
||||
ink_backup(Pid, BackupPath) ->
|
||||
gen_server:call(Pid, {backup, BackupPath}).
|
||||
|
||||
-spec ink_getmanifest(pid()) -> list().
|
||||
%% @doc
|
||||
%% Allows the clerk to fetch the manifest at the point it starts a compaction
|
||||
|
@ -429,16 +443,17 @@ init([InkerOpts]) ->
|
|||
{undefined, true} ->
|
||||
%% monitor the bookie, and close the snapshot when bookie
|
||||
%% exits
|
||||
BookieMonitor = erlang:monitor(process, InkerOpts#inker_options.bookies_pid),
|
||||
|
||||
BookieMonitor = erlang:monitor(process, InkerOpts#inker_options.bookies_pid),
|
||||
SrcInker = InkerOpts#inker_options.source_inker,
|
||||
{Manifest,
|
||||
ActiveJournalDB} = ink_registersnapshot(SrcInker, self()),
|
||||
{ok, #state{manifest=Manifest,
|
||||
active_journaldb=ActiveJournalDB,
|
||||
source_inker=SrcInker,
|
||||
bookie_monref=BookieMonitor,
|
||||
is_snapshot=true}};
|
||||
ActiveJournalDB,
|
||||
JournalSQN} = ink_registersnapshot(SrcInker, self()),
|
||||
{ok, #state{manifest = Manifest,
|
||||
active_journaldb = ActiveJournalDB,
|
||||
source_inker = SrcInker,
|
||||
journal_sqn = JournalSQN,
|
||||
bookie_monref = BookieMonitor,
|
||||
is_snapshot = true}};
|
||||
%% Need to do something about timeout
|
||||
{_RootPath, false} ->
|
||||
start_from_file(InkerOpts)
|
||||
|
@ -483,7 +498,8 @@ handle_call({register_snapshot, Requestor}, _From , State) ->
|
|||
State#state.manifest_sqn}|State#state.registered_snapshots],
|
||||
leveled_log:log("I0002", [Requestor, State#state.manifest_sqn]),
|
||||
{reply, {State#state.manifest,
|
||||
State#state.active_journaldb},
|
||||
State#state.active_journaldb,
|
||||
State#state.journal_sqn},
|
||||
State#state{registered_snapshots=Rs}};
|
||||
handle_call({confirm_delete, ManSQN}, _From, State) ->
|
||||
CheckSQNFun =
|
||||
|
@ -541,6 +557,82 @@ handle_call(compaction_pending, _From, State) ->
|
|||
handle_call({trim, PersistedSQN}, _From, State) ->
|
||||
ok = leveled_iclerk:clerk_trim(State#state.clerk, self(), PersistedSQN),
|
||||
{reply, ok, State};
|
||||
handle_call(roll, _From, State) ->
|
||||
case leveled_cdb:cdb_lastkey(State#state.active_journaldb) of
|
||||
empty ->
|
||||
{reply, ok, State};
|
||||
_ ->
|
||||
NewSQN = State#state.journal_sqn + 1,
|
||||
SWroll = os:timestamp(),
|
||||
{NewJournalP, Manifest1, NewManSQN} =
|
||||
roll_active(State#state.active_journaldb,
|
||||
State#state.manifest,
|
||||
NewSQN,
|
||||
State#state.cdb_options,
|
||||
State#state.root_path,
|
||||
State#state.manifest_sqn),
|
||||
leveled_log:log_timer("I0024", [NewSQN], SWroll),
|
||||
{reply, ok, State#state{journal_sqn = NewSQN,
|
||||
manifest = Manifest1,
|
||||
manifest_sqn = NewManSQN,
|
||||
active_journaldb = NewJournalP}}
|
||||
end;
|
||||
handle_call({backup, BackupPath}, _from, State)
|
||||
when State#state.is_snapshot == true ->
|
||||
SW = os:timestamp(),
|
||||
BackupJFP = filepath(filename:join(BackupPath, ?JOURNAL_FP), journal_dir),
|
||||
ok = filelib:ensure_dir(BackupJFP),
|
||||
{ok, CurrentFNs} = file:list_dir(BackupJFP),
|
||||
leveled_log:log("I0023", [length(CurrentFNs)]),
|
||||
BackupFun =
|
||||
fun({SQN, FN, PidR, LastKey}, {ManAcc, FTRAcc}) ->
|
||||
case SQN < State#state.journal_sqn of
|
||||
true ->
|
||||
BaseFN = filename:basename(FN),
|
||||
ExtendedBaseFN = BaseFN ++ "." ++ ?JOURNAL_FILEX,
|
||||
BackupName = filename:join(BackupJFP, BaseFN),
|
||||
true = leveled_cdb:finished_rolling(PidR),
|
||||
case file:make_link(FN ++ "." ++ ?JOURNAL_FILEX,
|
||||
BackupName ++ "." ++ ?JOURNAL_FILEX) of
|
||||
ok ->
|
||||
ok;
|
||||
{error, eexist} ->
|
||||
ok
|
||||
end,
|
||||
{[{SQN, BackupName, PidR, LastKey}|ManAcc],
|
||||
[ExtendedBaseFN|FTRAcc]};
|
||||
false ->
|
||||
leveled_log:log("I0021", [FN, SQN, State#state.journal_sqn]),
|
||||
{ManAcc, FTRAcc}
|
||||
end
|
||||
end,
|
||||
{BackupManifest, FilesToRetain} =
|
||||
lists:foldr(BackupFun,
|
||||
{[], []},
|
||||
leveled_imanifest:to_list(State#state.manifest)),
|
||||
|
||||
FilesToRemove = lists:subtract(CurrentFNs, FilesToRetain),
|
||||
RemoveFun =
|
||||
fun(RFN) ->
|
||||
leveled_log:log("I0022", [RFN]),
|
||||
RemoveFile = filename:join(BackupJFP, RFN),
|
||||
case filelib:is_file(RemoveFile)
|
||||
and not filelib:is_dir(RemoveFile) of
|
||||
true ->
|
||||
ok = file:delete(RemoveFile);
|
||||
false ->
|
||||
ok
|
||||
end
|
||||
end,
|
||||
lists:foreach(RemoveFun, FilesToRemove),
|
||||
leveled_imanifest:writer(leveled_imanifest:from_list(BackupManifest),
|
||||
State#state.manifest_sqn,
|
||||
filename:join(BackupPath, ?JOURNAL_FP)),
|
||||
leveled_log:log_timer("I0020",
|
||||
[filename:join(BackupPath, ?JOURNAL_FP),
|
||||
length(BackupManifest)],
|
||||
SW),
|
||||
{reply, ok, State};
|
||||
handle_call(close, _From, State) ->
|
||||
case State#state.is_snapshot of
|
||||
true ->
|
||||
|
@ -608,9 +700,9 @@ start_from_file(InkOpts) ->
|
|||
% Determine filepaths
|
||||
RootPath = InkOpts#inker_options.root_path,
|
||||
JournalFP = filepath(RootPath, journal_dir),
|
||||
filelib:ensure_dir(JournalFP),
|
||||
ok = filelib:ensure_dir(JournalFP),
|
||||
CompactFP = filepath(RootPath, journal_compact_dir),
|
||||
filelib:ensure_dir(CompactFP),
|
||||
ok = filelib:ensure_dir(CompactFP),
|
||||
ManifestFP = filepath(RootPath, manifest_dir),
|
||||
ok = filelib:ensure_dir(ManifestFP),
|
||||
% The IClerk must start files with the compaction file path so that they
|
||||
|
@ -723,28 +815,21 @@ put_object(LedgerKey, Object, KeyChanges, State) ->
|
|||
byte_size(JournalBin)};
|
||||
roll ->
|
||||
SWroll = os:timestamp(),
|
||||
LastKey = leveled_cdb:cdb_lastkey(ActiveJournal),
|
||||
ok = leveled_cdb:cdb_roll(ActiveJournal),
|
||||
Manifest0 = leveled_imanifest:append_lastkey(State#state.manifest,
|
||||
ActiveJournal,
|
||||
LastKey),
|
||||
CDBopts = State#state.cdb_options,
|
||||
ManEntry = start_new_activejournal(NewSQN,
|
||||
State#state.root_path,
|
||||
CDBopts),
|
||||
{_, _, NewJournalP, _} = ManEntry,
|
||||
Manifest1 = leveled_imanifest:add_entry(Manifest0, ManEntry, true),
|
||||
ok = leveled_imanifest:writer(Manifest1,
|
||||
State#state.manifest_sqn + 1,
|
||||
State#state.root_path),
|
||||
{NewJournalP, Manifest1, NewManSQN} =
|
||||
roll_active(ActiveJournal,
|
||||
State#state.manifest,
|
||||
NewSQN,
|
||||
State#state.cdb_options,
|
||||
State#state.root_path,
|
||||
State#state.manifest_sqn),
|
||||
leveled_log:log_timer("I0008", [], SWroll),
|
||||
ok = leveled_cdb:cdb_put(NewJournalP,
|
||||
JournalKey,
|
||||
JournalBin),
|
||||
leveled_log:log_timer("I0008", [], SWroll),
|
||||
{rolling,
|
||||
State#state{journal_sqn=NewSQN,
|
||||
manifest=Manifest1,
|
||||
manifest_sqn = State#state.manifest_sqn + 1,
|
||||
manifest_sqn = NewManSQN,
|
||||
active_journaldb=NewJournalP},
|
||||
byte_size(JournalBin)}
|
||||
end.
|
||||
|
@ -768,6 +853,25 @@ get_object(LedgerKey, SQN, Manifest, ToIgnoreKeyChanges) ->
|
|||
leveled_codec:from_inkerkv(Obj, ToIgnoreKeyChanges).
|
||||
|
||||
|
||||
-spec roll_active(pid(), leveled_imanifest:manifest(),
|
||||
integer(), #cdb_options{}, string(), integer()) ->
|
||||
{pid(), leveled_imanifest:manifest(), integer()}.
|
||||
%% @doc
|
||||
%% Roll the active journal, and start a new active journal, updating the
|
||||
%% manifest
|
||||
roll_active(ActiveJournal, Manifest, NewSQN, CDBopts, RootPath, ManifestSQN) ->
|
||||
LastKey = leveled_cdb:cdb_lastkey(ActiveJournal),
|
||||
ok = leveled_cdb:cdb_roll(ActiveJournal),
|
||||
Manifest0 =
|
||||
leveled_imanifest:append_lastkey(Manifest, ActiveJournal, LastKey),
|
||||
ManEntry =
|
||||
start_new_activejournal(NewSQN, RootPath, CDBopts),
|
||||
{_, _, NewJournalP, _} = ManEntry,
|
||||
Manifest1 = leveled_imanifest:add_entry(Manifest0, ManEntry, true),
|
||||
ok = leveled_imanifest:writer(Manifest1, ManifestSQN + 1, RootPath),
|
||||
|
||||
{NewJournalP, Manifest1, ManifestSQN + 1}.
|
||||
|
||||
-spec key_check(leveled_codec:ledger_key(),
|
||||
integer(),
|
||||
leveled_imanifest:manifest()) -> missing|probably.
|
||||
|
@ -904,9 +1008,7 @@ open_all_manifest(Man0, RootPath, CDBOpts) ->
|
|||
NewManEntry = start_new_activejournal(LastSQN + 1,
|
||||
RootPath,
|
||||
CDBOpts),
|
||||
leveled_imanifest:add_entry(ManToHead,
|
||||
NewManEntry,
|
||||
true);
|
||||
leveled_imanifest:add_entry(ManToHead, NewManEntry, true);
|
||||
false ->
|
||||
{ok, HeadW} = leveled_cdb:cdb_open_writer(PendingHeadFN,
|
||||
CDBOpts),
|
||||
|
@ -1026,7 +1128,6 @@ sequencenumbers_fromfilenames(Filenames, Regex, IntName) ->
|
|||
[],
|
||||
Filenames).
|
||||
|
||||
|
||||
filepath(RootPath, journal_dir) ->
|
||||
RootPath ++ "/" ++ ?FILES_FP ++ "/";
|
||||
filepath(RootPath, manifest_dir) ->
|
||||
|
|
|
@ -280,6 +280,16 @@
|
|||
{"I0019",
|
||||
{info, "After ~w PUTs total prepare time is ~w total cdb time is ~w "
|
||||
++ "and max prepare time is ~w and max cdb time is ~w"}},
|
||||
{"I0020",
|
||||
{info, "Journal backup completed to path=~s with file_count=~w"}},
|
||||
{"I0021",
|
||||
{info, "Ingoring filename=~s with SQN=~w and JournalSQN=~w"}},
|
||||
{"I0022",
|
||||
{info, "Removing filename=~s from backup folder as not in backup"}},
|
||||
{"I0023",
|
||||
{info, "Backup commencing into folder with ~w existing files"}},
|
||||
{"I0024",
|
||||
{info, "Prompted roll at NewSQN=~w"}},
|
||||
|
||||
{"IC001",
|
||||
{info, "Closed for reason ~w so maybe leaving garbage"}},
|
||||
|
|
|
@ -2,7 +2,9 @@
|
|||
-include_lib("common_test/include/ct.hrl").
|
||||
-include("include/leveled.hrl").
|
||||
-export([all/0]).
|
||||
-export([retain_strategy/1,
|
||||
-export([hot_backup_simple/1,
|
||||
hot_backup_changes/1,
|
||||
retain_strategy/1,
|
||||
recovr_strategy/1,
|
||||
aae_missingjournal/1,
|
||||
aae_bustedjournal/1,
|
||||
|
@ -10,6 +12,8 @@
|
|||
]).
|
||||
|
||||
all() -> [
|
||||
hot_backup_simple,
|
||||
hot_backup_changes,
|
||||
retain_strategy,
|
||||
recovr_strategy,
|
||||
aae_missingjournal,
|
||||
|
@ -17,6 +21,94 @@ all() -> [
|
|||
journal_compaction_bustedjournal
|
||||
].
|
||||
|
||||
|
||||
hot_backup_simple(_Config) ->
|
||||
% The journal may have a hot backup. This allows for an online Bookie
|
||||
% to be sent a message to prepare a backup function, which an asynchronous
|
||||
% worker can then call to generate a backup taken at the point in time
|
||||
% the original message was processsed.
|
||||
%
|
||||
% The basic test is to:
|
||||
% 1 - load a Bookie, take a backup, delete the original path, restore from
|
||||
% that path
|
||||
RootPath = testutil:reset_filestructure(),
|
||||
BackupPath = testutil:reset_filestructure("backup0"),
|
||||
BookOpts = [{root_path, RootPath},
|
||||
{cache_size, 1000},
|
||||
{max_journalsize, 10000000},
|
||||
{sync_strategy, testutil:sync_strategy()}],
|
||||
{ok, Spcl1, LastV1} = rotating_object_check(BookOpts, "Bucket1", 3200),
|
||||
{ok, Book1} = leveled_bookie:book_start(BookOpts),
|
||||
{async, BackupFun} = leveled_bookie:book_hotbackup(Book1),
|
||||
ok = BackupFun(BackupPath),
|
||||
ok = leveled_bookie:book_close(Book1),
|
||||
RootPath = testutil:reset_filestructure(),
|
||||
BookOptsBackup = [{root_path, BackupPath},
|
||||
{cache_size, 2000},
|
||||
{max_journalsize, 20000000},
|
||||
{sync_strategy, testutil:sync_strategy()}],
|
||||
{ok, BookBackup} = leveled_bookie:book_start(BookOptsBackup),
|
||||
ok = testutil:check_indexed_objects(BookBackup, "Bucket1", Spcl1, LastV1),
|
||||
ok = leveled_bookie:book_close(BookBackup),
|
||||
BackupPath = testutil:reset_filestructure("backup0").
|
||||
|
||||
hot_backup_changes(_Config) ->
|
||||
RootPath = testutil:reset_filestructure(),
|
||||
BackupPath = testutil:reset_filestructure("backup0"),
|
||||
BookOpts = [{root_path, RootPath},
|
||||
{cache_size, 1000},
|
||||
{max_journalsize, 10000000},
|
||||
{sync_strategy, testutil:sync_strategy()}],
|
||||
B = "Bucket0",
|
||||
|
||||
{ok, Book1} = leveled_bookie:book_start(BookOpts),
|
||||
{KSpcL1, _V1} = testutil:put_indexed_objects(Book1, B, 20000),
|
||||
|
||||
{async, BackupFun1} = leveled_bookie:book_hotbackup(Book1),
|
||||
ok = BackupFun1(BackupPath),
|
||||
{ok, FileList1} =
|
||||
file:list_dir(filename:join(BackupPath, "journal/journal_files/")),
|
||||
|
||||
{KSpcL2, V2} = testutil:put_altered_indexed_objects(Book1, B, KSpcL1),
|
||||
|
||||
{async, BackupFun2} = leveled_bookie:book_hotbackup(Book1),
|
||||
ok = BackupFun2(BackupPath),
|
||||
{ok, FileList2} =
|
||||
file:list_dir(filename:join(BackupPath, "journal/journal_files/")),
|
||||
|
||||
ok = testutil:check_indexed_objects(Book1, B, KSpcL2, V2),
|
||||
compact_and_wait(Book1),
|
||||
|
||||
{async, BackupFun3} = leveled_bookie:book_hotbackup(Book1),
|
||||
ok = BackupFun3(BackupPath),
|
||||
{ok, FileList3} =
|
||||
file:list_dir(filename:join(BackupPath, "journal/journal_files/")),
|
||||
% Confirm null impact of backing up twice in a row
|
||||
{async, BackupFun4} = leveled_bookie:book_hotbackup(Book1),
|
||||
ok = BackupFun4(BackupPath),
|
||||
{ok, FileList4} =
|
||||
file:list_dir(filename:join(BackupPath, "journal/journal_files/")),
|
||||
|
||||
true = length(FileList2) > length(FileList1),
|
||||
true = length(FileList2) > length(FileList3),
|
||||
true = length(FileList3) == length(FileList4),
|
||||
|
||||
ok = leveled_bookie:book_close(Book1),
|
||||
|
||||
RootPath = testutil:reset_filestructure(),
|
||||
BookOptsBackup = [{root_path, BackupPath},
|
||||
{cache_size, 2000},
|
||||
{max_journalsize, 20000000},
|
||||
{sync_strategy, testutil:sync_strategy()}],
|
||||
{ok, BookBackup} = leveled_bookie:book_start(BookOptsBackup),
|
||||
|
||||
ok = testutil:check_indexed_objects(BookBackup, B, KSpcL2, V2),
|
||||
|
||||
testutil:reset_filestructure("backup0"),
|
||||
testutil:reset_filestructure().
|
||||
|
||||
|
||||
|
||||
retain_strategy(_Config) ->
|
||||
RootPath = testutil:reset_filestructure(),
|
||||
BookOpts = [{root_path, RootPath},
|
||||
|
@ -419,22 +511,7 @@ rotating_object_check(BookOpts, B, NumberOfObjects) ->
|
|||
KSpcL1 ++ KSpcL2 ++ KSpcL3 ++ KSpcL4,
|
||||
V4),
|
||||
|
||||
ok = leveled_bookie:book_compactjournal(Book2, 30000),
|
||||
F = fun leveled_bookie:book_islastcompactionpending/1,
|
||||
lists:foldl(fun(X, Pending) ->
|
||||
case Pending of
|
||||
false ->
|
||||
false;
|
||||
true ->
|
||||
io:format("Loop ~w waiting for journal "
|
||||
++ "compaction to complete~n", [X]),
|
||||
timer:sleep(20000),
|
||||
F(Book2)
|
||||
end end,
|
||||
true,
|
||||
lists:seq(1, 15)),
|
||||
io:format("Waiting for journal deletes~n"),
|
||||
timer:sleep(20000),
|
||||
compact_and_wait(Book2),
|
||||
|
||||
io:format("Checking index following compaction~n"),
|
||||
ok = testutil:check_indexed_objects(Book2,
|
||||
|
@ -445,6 +522,23 @@ rotating_object_check(BookOpts, B, NumberOfObjects) ->
|
|||
ok = leveled_bookie:book_close(Book2),
|
||||
{ok, KSpcL1 ++ KSpcL2 ++ KSpcL3 ++ KSpcL4, V4}.
|
||||
|
||||
compact_and_wait(Book) ->
|
||||
ok = leveled_bookie:book_compactjournal(Book, 30000),
|
||||
F = fun leveled_bookie:book_islastcompactionpending/1,
|
||||
lists:foldl(fun(X, Pending) ->
|
||||
case Pending of
|
||||
false ->
|
||||
false;
|
||||
true ->
|
||||
io:format("Loop ~w waiting for journal "
|
||||
++ "compaction to complete~n", [X]),
|
||||
timer:sleep(20000),
|
||||
F(Book)
|
||||
end end,
|
||||
true,
|
||||
lists:seq(1, 15)),
|
||||
io:format("Waiting for journal deletes~n"),
|
||||
timer:sleep(20000).
|
||||
|
||||
restart_from_blankledger(BookOpts, B_SpcL) ->
|
||||
leveled_penciller:clean_testdir(proplists:get_value(root_path, BookOpts) ++
|
||||
|
|
|
@ -374,7 +374,7 @@ generate_objects(Count, KeyNumber, ObjL, Value, IndexGen) ->
|
|||
generate_objects(Count, KeyNumber, ObjL, Value, IndexGen, "Bucket").
|
||||
|
||||
generate_objects(0, _KeyNumber, ObjL, _Value, _IndexGen, _Bucket) ->
|
||||
ObjL;
|
||||
lists:reverse(ObjL);
|
||||
generate_objects(Count, binary_uuid, ObjL, Value, IndexGen, Bucket) ->
|
||||
{Obj1, Spec1} = set_object(list_to_binary(Bucket),
|
||||
list_to_binary(leveled_util:generate_uuid()),
|
||||
|
@ -382,7 +382,7 @@ generate_objects(Count, binary_uuid, ObjL, Value, IndexGen, Bucket) ->
|
|||
IndexGen),
|
||||
generate_objects(Count - 1,
|
||||
binary_uuid,
|
||||
ObjL ++ [{leveled_rand:uniform(), Obj1, Spec1}],
|
||||
[{leveled_rand:uniform(), Obj1, Spec1}|ObjL],
|
||||
Value,
|
||||
IndexGen,
|
||||
Bucket);
|
||||
|
@ -393,7 +393,7 @@ generate_objects(Count, uuid, ObjL, Value, IndexGen, Bucket) ->
|
|||
IndexGen),
|
||||
generate_objects(Count - 1,
|
||||
uuid,
|
||||
ObjL ++ [{leveled_rand:uniform(), Obj1, Spec1}],
|
||||
[{leveled_rand:uniform(), Obj1, Spec1}|ObjL],
|
||||
Value,
|
||||
IndexGen,
|
||||
Bucket);
|
||||
|
@ -405,7 +405,7 @@ generate_objects(Count, {binary, KeyNumber}, ObjL, Value, IndexGen, Bucket) ->
|
|||
IndexGen),
|
||||
generate_objects(Count - 1,
|
||||
{binary, KeyNumber + 1},
|
||||
ObjL ++ [{leveled_rand:uniform(), Obj1, Spec1}],
|
||||
[{leveled_rand:uniform(), Obj1, Spec1}|ObjL],
|
||||
Value,
|
||||
IndexGen,
|
||||
Bucket);
|
||||
|
@ -417,7 +417,7 @@ generate_objects(Count, {fixed_binary, KeyNumber}, ObjL, Value, IndexGen, Bucket
|
|||
IndexGen),
|
||||
generate_objects(Count - 1,
|
||||
{fixed_binary, KeyNumber + 1},
|
||||
ObjL ++ [{leveled_rand:uniform(), Obj1, Spec1}],
|
||||
[{leveled_rand:uniform(), Obj1, Spec1}|ObjL],
|
||||
Value,
|
||||
IndexGen,
|
||||
Bucket);
|
||||
|
@ -428,7 +428,7 @@ generate_objects(Count, KeyNumber, ObjL, Value, IndexGen, Bucket) ->
|
|||
IndexGen),
|
||||
generate_objects(Count - 1,
|
||||
KeyNumber + 1,
|
||||
ObjL ++ [{leveled_rand:uniform(), Obj1, Spec1}],
|
||||
[{leveled_rand:uniform(), Obj1, Spec1}|ObjL],
|
||||
Value,
|
||||
IndexGen,
|
||||
Bucket).
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue