Deferred Deletion of Journals

This allows for deleted journals to be retained for a period (the
waste_retnetion_period).  The idea being that a backup strategy can
ensure that all journals are backed up, even ones created and removed
from within a backup period - so that any restore pont is possible.

This is also a pre-cursor to removing some of the PromptDelete
complexity from the Inker Clerk - all compactions can prompt deletion as
deletion is now deferred.
This commit is contained in:
martinsumner 2016-11-14 11:17:14 +00:00
parent dbb840d75e
commit 44738f7c75
8 changed files with 147 additions and 44 deletions

View file

@ -46,6 +46,7 @@
-record(cdb_options, -record(cdb_options,
{max_size :: integer(), {max_size :: integer(),
file_path :: string(), file_path :: string(),
waste_path :: string(),
binary_mode = false :: boolean()}). binary_mode = false :: boolean()}).
-record(inker_options, -record(inker_options,
@ -55,6 +56,7 @@
start_snapshot = false :: boolean(), start_snapshot = false :: boolean(),
source_inker :: pid(), source_inker :: pid(),
reload_strategy = [] :: list(), reload_strategy = [] :: list(),
waste_retention_period :: integer(),
max_run_length}). max_run_length}).
-record(penciller_options, -record(penciller_options,
@ -66,7 +68,8 @@
-record(iclerk_options, -record(iclerk_options,
{inker :: pid(), {inker :: pid(),
max_run_length :: integer(), max_run_length :: integer(),
cdb_options :: #cdb_options{}, cdb_options = #cdb_options{} :: #cdb_options{},
waste_retention_period :: integer(),
reload_strategy = [] :: list()}). reload_strategy = [] :: list()}).
-record(r_content, { -record(r_content, {

View file

@ -576,11 +576,14 @@ snapshot_store(State, SnapType) ->
set_options(Opts) -> set_options(Opts) ->
MaxJournalSize = get_opt(max_journalsize, Opts, 10000000000), MaxJournalSize = get_opt(max_journalsize, Opts, 10000000000),
WRP = get_opt(waste_retention_period, Opts),
AltStrategy = get_opt(reload_strategy, Opts, []), AltStrategy = get_opt(reload_strategy, Opts, []),
ReloadStrategy = leveled_codec:inker_reload_strategy(AltStrategy), ReloadStrategy = leveled_codec:inker_reload_strategy(AltStrategy),
PCLL0CacheSize = get_opt(max_pencillercachesize, Opts), PCLL0CacheSize = get_opt(max_pencillercachesize, Opts),
RootPath = get_opt(root_path, Opts), RootPath = get_opt(root_path, Opts),
JournalFP = RootPath ++ "/" ++ ?JOURNAL_FP, JournalFP = RootPath ++ "/" ++ ?JOURNAL_FP,
LedgerFP = RootPath ++ "/" ++ ?LEDGER_FP, LedgerFP = RootPath ++ "/" ++ ?LEDGER_FP,
ok =filelib:ensure_dir(JournalFP), ok =filelib:ensure_dir(JournalFP),
@ -589,6 +592,7 @@ set_options(Opts) ->
{#inker_options{root_path = JournalFP, {#inker_options{root_path = JournalFP,
reload_strategy = ReloadStrategy, reload_strategy = ReloadStrategy,
max_run_length = get_opt(max_run_length, Opts), max_run_length = get_opt(max_run_length, Opts),
waste_retention_period = WRP,
cdb_options = #cdb_options{max_size=MaxJournalSize, cdb_options = #cdb_options{max_size=MaxJournalSize,
binary_mode=true}}, binary_mode=true}},
#penciller_options{root_path = LedgerFP, #penciller_options{root_path = LedgerFP,

View file

@ -106,7 +106,8 @@
binary_mode = false :: boolean(), binary_mode = false :: boolean(),
delete_point = 0 :: integer(), delete_point = 0 :: integer(),
inker :: pid(), inker :: pid(),
deferred_delete = false :: boolean()}). deferred_delete = false :: boolean(),
waste_path :: string()}).
%%%============================================================================ %%%============================================================================
@ -219,7 +220,9 @@ init([Opts]) ->
end, end,
{ok, {ok,
starting, starting,
#state{max_size=MaxSize, binary_mode=Opts#cdb_options.binary_mode}}. #state{max_size=MaxSize,
binary_mode=Opts#cdb_options.binary_mode,
waste_path=Opts#cdb_options.waste_path}}.
starting({open_writer, Filename}, _From, State) -> starting({open_writer, Filename}, _From, State) ->
leveled_log:log("CDB01", [Filename]), leveled_log:log("CDB01", [Filename]),
@ -495,13 +498,18 @@ handle_info(_Msg, StateName, State) ->
terminate(Reason, StateName, State) -> terminate(Reason, StateName, State) ->
leveled_log:log("CDB05", [State#state.filename, Reason]), leveled_log:log("CDB05", [State#state.filename, Reason]),
case {State#state.handle, StateName} of case {State#state.handle, StateName, State#state.waste_path} of
{undefined, _} -> {undefined, _, _} ->
ok; ok;
{Handle, delete_pending} -> {Handle, delete_pending, undefined} ->
file:close(Handle), file:close(Handle),
file:delete(State#state.filename); file:delete(Handle);
{Handle, _} -> {Handle, delete_pending, WasteFP} ->
file:close(Handle),
Components = filename:split(State#state.filename),
NewName = WasteFP ++ lists:last(Components),
file:rename(State#state.filename, NewName);
{Handle, _, _} ->
file:close(Handle) file:close(Handle)
end. end.

View file

@ -41,6 +41,17 @@
%% as a way of directly representing a change, and where anti-entropy can %% as a way of directly representing a change, and where anti-entropy can
%% recover from a loss. %% recover from a loss.
%% %%
%% -------- Removing Compacted Files ---------
%%
%% Once a compaction job is complete, and the manifest change has been
%% committed, the individual journal files will get a deletion prompt. The
%% Journal processes should copy the file to the waste folder, before erasing
%% themselves.
%%
%% The Inker will have a waste duration setting, and before running compaction
%% should delete all over-age items (using the file modified date) from the
%% waste.
%%
%% -------- Tombstone Reaping --------- %% -------- Tombstone Reaping ---------
%% %%
%% Value compaction does not remove tombstones from the database, and so a %% Value compaction does not remove tombstones from the database, and so a
@ -54,7 +65,7 @@
%% before the tombstone. If no ushc objects exist for that tombstone, it can %% before the tombstone. If no ushc objects exist for that tombstone, it can
%% now be reaped as part of the compaction job. %% now be reaped as part of the compaction job.
%% %%
%% Other tombstones cannot be reaped, as otherwis eon laoding a ledger an old %% Other tombstones cannot be reaped, as otherwise on laoding a ledger an old
%% version of the object may re-emerge. %% version of the object may re-emerge.
-module(leveled_iclerk). -module(leveled_iclerk).
@ -88,10 +99,13 @@
-define(MAXRUN_COMPACTION_TARGET, 80.0). -define(MAXRUN_COMPACTION_TARGET, 80.0).
-define(CRC_SIZE, 4). -define(CRC_SIZE, 4).
-define(DEFAULT_RELOAD_STRATEGY, leveled_codec:inker_reload_strategy([])). -define(DEFAULT_RELOAD_STRATEGY, leveled_codec:inker_reload_strategy([])).
-define(DEFAULT_WASTE_RETENTION_PERIOD, 86400).
-record(state, {inker :: pid(), -record(state, {inker :: pid(),
max_run_length :: integer(), max_run_length :: integer(),
cdb_options, cdb_options,
waste_retention_period :: integer(),
waste_path :: string(),
reload_strategy = ?DEFAULT_RELOAD_STRATEGY :: list()}). reload_strategy = ?DEFAULT_RELOAD_STRATEGY :: list()}).
-record(candidate, {low_sqn :: integer(), -record(candidate, {low_sqn :: integer(),
@ -129,32 +143,41 @@ clerk_stop(Pid) ->
init([IClerkOpts]) -> init([IClerkOpts]) ->
ReloadStrategy = IClerkOpts#iclerk_options.reload_strategy, ReloadStrategy = IClerkOpts#iclerk_options.reload_strategy,
case IClerkOpts#iclerk_options.max_run_length of CDBopts = IClerkOpts#iclerk_options.cdb_options,
undefined -> WP = CDBopts#cdb_options.waste_path,
{ok, #state{max_run_length = ?MAX_COMPACTION_RUN, WRP = case IClerkOpts#iclerk_options.waste_retention_period of
undefined ->
?DEFAULT_WASTE_RETENTION_PERIOD;
WRP0 ->
WRP0
end,
MRL = case IClerkOpts#iclerk_options.max_run_length of
undefined ->
?MAX_COMPACTION_RUN;
MRL0 ->
MRL0
end,
{ok, #state{max_run_length = MRL,
inker = IClerkOpts#iclerk_options.inker, inker = IClerkOpts#iclerk_options.inker,
cdb_options = IClerkOpts#iclerk_options.cdb_options, cdb_options = CDBopts,
reload_strategy = ReloadStrategy}}; reload_strategy = ReloadStrategy,
MRL -> waste_path = WP,
{ok, #state{max_run_length = MRL, waste_retention_period = WRP}}.
inker = IClerkOpts#iclerk_options.inker,
cdb_options = IClerkOpts#iclerk_options.cdb_options,
reload_strategy = ReloadStrategy}}
end.
handle_call(_Msg, _From, State) -> handle_call(_Msg, _From, State) ->
{reply, not_supported, State}. {reply, not_supported, State}.
handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout}, handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout},
State) -> State) ->
% Empty the waste folder
clear_waste(State),
% Need to fetch manifest at start rather than have it be passed in % Need to fetch manifest at start rather than have it be passed in
% Don't want to process a queued call waiting on an old manifest % Don't want to process a queued call waiting on an old manifest
[_Active|Manifest] = leveled_inker:ink_getmanifest(Inker), [_Active|Manifest] = leveled_inker:ink_getmanifest(Inker),
MaxRunLength = State#state.max_run_length, MaxRunLength = State#state.max_run_length,
{FilterServer, MaxSQN} = InitiateFun(Checker), {FilterServer, MaxSQN} = InitiateFun(Checker),
CDBopts = State#state.cdb_options, CDBopts = State#state.cdb_options,
FP = CDBopts#cdb_options.file_path,
ok = filelib:ensure_dir(FP),
Candidates = scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN), Candidates = scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN),
BestRun0 = assess_candidates(Candidates, MaxRunLength), BestRun0 = assess_candidates(Candidates, MaxRunLength),
@ -511,10 +534,26 @@ generate_manifest_entry(ActiveJournal) ->
[{StartSQN, NewFN, PidR}]. [{StartSQN, NewFN, PidR}].
clear_waste(State) ->
WP = State#state.waste_path,
WRP = State#state.waste_retention_period,
{ok, ClearedJournals} = file:list_dir(WP),
N = calendar:datetime_to_gregorian_seconds(calendar:local_time()),
lists:foreach(fun(DelJ) ->
LMD = filelib:last_modified(WP ++ DelJ),
case N - calendar:datetime_to_gregorian_seconds(LMD) of
LMD_Delta when LMD_Delta >= WRP ->
ok = file:delete(WP ++ DelJ),
leveled_log:log("IC010", [WP ++ DelJ]);
LMD_Delta ->
leveled_log:log("IC011", [WP ++ DelJ,
LMD_Delta]),
ok
end
end,
ClearedJournals).
%%%============================================================================ %%%============================================================================
%%% Test %%% Test
@ -545,6 +584,21 @@ score_compare_test() ->
?assertMatch(Run1, choose_best_assessment(Run1, Run2, 4)), ?assertMatch(Run1, choose_best_assessment(Run1, Run2, 4)),
?assertMatch(Run2, choose_best_assessment(Run1 ++ Run2, Run2, 4)). ?assertMatch(Run2, choose_best_assessment(Run1 ++ Run2, Run2, 4)).
file_gc_test() ->
State = #state{waste_path="test/waste/",
waste_retention_period=1},
ok = filelib:ensure_dir(State#state.waste_path),
file:write_file(State#state.waste_path ++ "1.cdb", term_to_binary("Hello")),
timer:sleep(1100),
file:write_file(State#state.waste_path ++ "2.cdb", term_to_binary("Hello")),
clear_waste(State),
{ok, ClearedJournals} = file:list_dir(State#state.waste_path),
?assertMatch(["2.cdb"], ClearedJournals),
timer:sleep(1100),
clear_waste(State),
{ok, ClearedJournals2} = file:list_dir(State#state.waste_path),
?assertMatch([], ClearedJournals2).
find_bestrun_test() -> find_bestrun_test() ->
%% Tests dependent on these defaults %% Tests dependent on these defaults
%% -define(MAX_COMPACTION_RUN, 4). %% -define(MAX_COMPACTION_RUN, 4).

View file

@ -119,6 +119,7 @@
-define(MANIFEST_FP, "journal_manifest"). -define(MANIFEST_FP, "journal_manifest").
-define(FILES_FP, "journal_files"). -define(FILES_FP, "journal_files").
-define(COMPACT_FP, "post_compact"). -define(COMPACT_FP, "post_compact").
-define(WASTE_FP, "waste").
-define(JOURNAL_FILEX, "cdb"). -define(JOURNAL_FILEX, "cdb").
-define(MANIFEST_FILEX, "man"). -define(MANIFEST_FILEX, "man").
-define(PENDING_FILEX, "pnd"). -define(PENDING_FILEX, "pnd").
@ -360,20 +361,26 @@ code_change(_OldVsn, State, _Extra) ->
start_from_file(InkOpts) -> start_from_file(InkOpts) ->
RootPath = InkOpts#inker_options.root_path, RootPath = InkOpts#inker_options.root_path,
CDBopts = InkOpts#inker_options.cdb_options, CDBopts = InkOpts#inker_options.cdb_options,
JournalFP = filepath(RootPath, journal_dir), JournalFP = filepath(RootPath, journal_dir),
filelib:ensure_dir(JournalFP), filelib:ensure_dir(JournalFP),
CompactFP = filepath(RootPath, journal_compact_dir), CompactFP = filepath(RootPath, journal_compact_dir),
filelib:ensure_dir(CompactFP), filelib:ensure_dir(CompactFP),
WasteFP = filepath(RootPath, journal_waste_dir),
filelib:ensure_dir(WasteFP),
ManifestFP = filepath(RootPath, manifest_dir), ManifestFP = filepath(RootPath, manifest_dir),
ok = filelib:ensure_dir(ManifestFP), ok = filelib:ensure_dir(ManifestFP),
{ok, ManifestFilenames} = file:list_dir(ManifestFP), {ok, ManifestFilenames} = file:list_dir(ManifestFP),
IClerkCDBOpts = CDBopts#cdb_options{file_path = CompactFP}, IClerkCDBOpts = CDBopts#cdb_options{file_path = CompactFP,
waste_path = WasteFP},
ReloadStrategy = InkOpts#inker_options.reload_strategy, ReloadStrategy = InkOpts#inker_options.reload_strategy,
MRL = InkOpts#inker_options.max_run_length, MRL = InkOpts#inker_options.max_run_length,
WRP = InkOpts#inker_options.waste_retention_period,
IClerkOpts = #iclerk_options{inker = self(), IClerkOpts = #iclerk_options{inker = self(),
cdb_options=IClerkCDBOpts, cdb_options=IClerkCDBOpts,
waste_retention_period = WRP,
reload_strategy = ReloadStrategy, reload_strategy = ReloadStrategy,
max_run_length = MRL}, max_run_length = MRL},
{ok, Clerk} = leveled_iclerk:clerk_new(IClerkOpts), {ok, Clerk} = leveled_iclerk:clerk_new(IClerkOpts),
@ -389,7 +396,7 @@ start_from_file(InkOpts) ->
journal_sqn = JournalSQN, journal_sqn = JournalSQN,
active_journaldb = ActiveJournal, active_journaldb = ActiveJournal,
root_path = RootPath, root_path = RootPath,
cdb_options = CDBopts, cdb_options = CDBopts#cdb_options{waste_path=WasteFP},
clerk = Clerk}}. clerk = Clerk}}.
@ -670,7 +677,9 @@ filepath(RootPath, journal_dir) ->
filepath(RootPath, manifest_dir) -> filepath(RootPath, manifest_dir) ->
RootPath ++ "/" ++ ?MANIFEST_FP ++ "/"; RootPath ++ "/" ++ ?MANIFEST_FP ++ "/";
filepath(RootPath, journal_compact_dir) -> filepath(RootPath, journal_compact_dir) ->
filepath(RootPath, journal_dir) ++ "/" ++ ?COMPACT_FP ++ "/". filepath(RootPath, journal_dir) ++ "/" ++ ?COMPACT_FP ++ "/";
filepath(RootPath, journal_waste_dir) ->
filepath(RootPath, journal_dir) ++ "/" ++ ?WASTE_FP ++ "/".
filepath(RootPath, NewSQN, new_journal) -> filepath(RootPath, NewSQN, new_journal) ->
filename:join(filepath(RootPath, journal_dir), filename:join(filepath(RootPath, journal_dir),

View file

@ -181,6 +181,10 @@
{info, "Compaction source ~s has yielded ~w positions"}}, {info, "Compaction source ~s has yielded ~w positions"}},
{"IC009", {"IC009",
{info, "Generate journal for compaction with filename ~s"}}, {info, "Generate journal for compaction with filename ~s"}},
{"IC010",
{info, "Clearing journal with filename ~s"}},
{"IC011",
{info, "Not clearing filename ~s as modified delta is only ~w seconds"}},
{"PM001", {"PM001",
{info, "Indexed new cache entry with total L0 cache size now ~w"}}, {info, "Indexed new cache entry with total L0 cache size now ~w"}},

View file

@ -144,21 +144,9 @@ journal_compaction(_Config) ->
%% Now replace all the other objects %% Now replace all the other objects
ObjList2 = testutil:generate_objects(40000, 10002), ObjList2 = testutil:generate_objects(40000, 10002),
testutil:riakload(Bookie1, ObjList2), testutil:riakload(Bookie1, ObjList2),
ok = leveled_bookie:book_compactjournal(Bookie1, 30000),
F = fun leveled_bookie:book_islastcompactionpending/1, ok = leveled_bookie:book_compactjournal(Bookie1, 30000),
lists:foldl(fun(X, Pending) -> testutil:wait_for_compaction(Bookie1),
case Pending of
false ->
false;
true ->
io:format("Loop ~w waiting for journal "
++ "compaction to complete~n", [X]),
timer:sleep(20000),
F(Bookie1)
end end,
true,
lists:seq(1, 15)),
ChkList3 = lists:sublist(lists:sort(ObjList2), 500), ChkList3 = lists:sublist(lists:sort(ObjList2), 500),
testutil:check_forlist(Bookie1, ChkList3), testutil:check_forlist(Bookie1, ChkList3),
@ -168,6 +156,25 @@ journal_compaction(_Config) ->
testutil:check_forobject(Bookie2, TestObject), testutil:check_forobject(Bookie2, TestObject),
testutil:check_forlist(Bookie2, ChkList3), testutil:check_forlist(Bookie2, ChkList3),
ok = leveled_bookie:book_close(Bookie2), ok = leveled_bookie:book_close(Bookie2),
WasteFP = RootPath ++ "/journal/journal_files/waste",
{ok, ClearedJournals} = file:list_dir(WasteFP),
io:format("~w ClearedJournals found~n", [length(ClearedJournals)]),
true = length(ClearedJournals) > 0,
StartOpts2 = [{root_path, RootPath},
{max_journalsize, 10000000},
{max_run_length, 1},
{waste_retention_period, 1}],
{ok, Bookie3} = leveled_bookie:book_start(StartOpts2),
ok = leveled_bookie:book_compactjournal(Bookie3, 30000),
testutil:wait_for_compaction(Bookie3),
ok = leveled_bookie:book_close(Bookie3),
{ok, ClearedJournalsPC} = file:list_dir(WasteFP),
io:format("~w ClearedJournals found~n", [length(ClearedJournalsPC)]),
true = length(ClearedJournalsPC) == 0,
testutil:reset_filestructure(10000). testutil:reset_filestructure(10000).

View file

@ -39,7 +39,8 @@
restore_file/2, restore_file/2,
restore_topending/2, restore_topending/2,
find_journals/1, find_journals/1,
riak_hash/1]). riak_hash/1,
wait_for_compaction/1]).
-define(RETURN_TERMS, {true, undefined}). -define(RETURN_TERMS, {true, undefined}).
-define(SLOWOFFER_DELAY, 5). -define(SLOWOFFER_DELAY, 5).
@ -85,7 +86,20 @@ reset_filestructure(Wait) ->
leveled_penciller:clean_testdir(RootPath ++ "/ledger"), leveled_penciller:clean_testdir(RootPath ++ "/ledger"),
RootPath. RootPath.
wait_for_compaction(Bookie) ->
F = fun leveled_bookie:book_islastcompactionpending/1,
lists:foldl(fun(X, Pending) ->
case Pending of
false ->
false;
true ->
io:format("Loop ~w waiting for journal "
++ "compaction to complete~n", [X]),
timer:sleep(20000),
F(Bookie)
end end,
true,
lists:seq(1, 15)).
check_bucket_stats(Bookie, Bucket) -> check_bucket_stats(Bookie, Bucket) ->
FoldSW1 = os:timestamp(), FoldSW1 = os:timestamp(),