From 44738f7c755665123a6b90f35ea8c53d810c2af0 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Mon, 14 Nov 2016 11:17:14 +0000 Subject: [PATCH] Deferred Deletion of Journals This allows for deleted journals to be retained for a period (the waste_retnetion_period). The idea being that a backup strategy can ensure that all journals are backed up, even ones created and removed from within a backup period - so that any restore pont is possible. This is also a pre-cursor to removing some of the PromptDelete complexity from the Inker Clerk - all compactions can prompt deletion as deletion is now deferred. --- include/leveled.hrl | 5 +- src/leveled_bookie.erl | 4 ++ src/leveled_cdb.erl | 22 ++++++--- src/leveled_iclerk.erl | 86 +++++++++++++++++++++++++++------ src/leveled_inker.erl | 17 +++++-- src/leveled_log.erl | 4 ++ test/end_to_end/basic_SUITE.erl | 35 ++++++++------ test/end_to_end/testutil.erl | 18 ++++++- 8 files changed, 147 insertions(+), 44 deletions(-) diff --git a/include/leveled.hrl b/include/leveled.hrl index e685a39..8c5d6f8 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -46,6 +46,7 @@ -record(cdb_options, {max_size :: integer(), file_path :: string(), + waste_path :: string(), binary_mode = false :: boolean()}). -record(inker_options, @@ -55,6 +56,7 @@ start_snapshot = false :: boolean(), source_inker :: pid(), reload_strategy = [] :: list(), + waste_retention_period :: integer(), max_run_length}). -record(penciller_options, @@ -66,7 +68,8 @@ -record(iclerk_options, {inker :: pid(), max_run_length :: integer(), - cdb_options :: #cdb_options{}, + cdb_options = #cdb_options{} :: #cdb_options{}, + waste_retention_period :: integer(), reload_strategy = [] :: list()}). -record(r_content, { diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 1963c94..881b790 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -576,11 +576,14 @@ snapshot_store(State, SnapType) -> set_options(Opts) -> MaxJournalSize = get_opt(max_journalsize, Opts, 10000000000), + WRP = get_opt(waste_retention_period, Opts), + AltStrategy = get_opt(reload_strategy, Opts, []), ReloadStrategy = leveled_codec:inker_reload_strategy(AltStrategy), PCLL0CacheSize = get_opt(max_pencillercachesize, Opts), RootPath = get_opt(root_path, Opts), + JournalFP = RootPath ++ "/" ++ ?JOURNAL_FP, LedgerFP = RootPath ++ "/" ++ ?LEDGER_FP, ok =filelib:ensure_dir(JournalFP), @@ -589,6 +592,7 @@ set_options(Opts) -> {#inker_options{root_path = JournalFP, reload_strategy = ReloadStrategy, max_run_length = get_opt(max_run_length, Opts), + waste_retention_period = WRP, cdb_options = #cdb_options{max_size=MaxJournalSize, binary_mode=true}}, #penciller_options{root_path = LedgerFP, diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index f7d3056..4f4462d 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -106,7 +106,8 @@ binary_mode = false :: boolean(), delete_point = 0 :: integer(), inker :: pid(), - deferred_delete = false :: boolean()}). + deferred_delete = false :: boolean(), + waste_path :: string()}). %%%============================================================================ @@ -219,7 +220,9 @@ init([Opts]) -> end, {ok, starting, - #state{max_size=MaxSize, binary_mode=Opts#cdb_options.binary_mode}}. + #state{max_size=MaxSize, + binary_mode=Opts#cdb_options.binary_mode, + waste_path=Opts#cdb_options.waste_path}}. starting({open_writer, Filename}, _From, State) -> leveled_log:log("CDB01", [Filename]), @@ -495,13 +498,18 @@ handle_info(_Msg, StateName, State) -> terminate(Reason, StateName, State) -> leveled_log:log("CDB05", [State#state.filename, Reason]), - case {State#state.handle, StateName} of - {undefined, _} -> + case {State#state.handle, StateName, State#state.waste_path} of + {undefined, _, _} -> ok; - {Handle, delete_pending} -> + {Handle, delete_pending, undefined} -> file:close(Handle), - file:delete(State#state.filename); - {Handle, _} -> + file:delete(Handle); + {Handle, delete_pending, WasteFP} -> + file:close(Handle), + Components = filename:split(State#state.filename), + NewName = WasteFP ++ lists:last(Components), + file:rename(State#state.filename, NewName); + {Handle, _, _} -> file:close(Handle) end. diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 5c69362..f1e62c4 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -41,6 +41,17 @@ %% as a way of directly representing a change, and where anti-entropy can %% recover from a loss. %% +%% -------- Removing Compacted Files --------- +%% +%% Once a compaction job is complete, and the manifest change has been +%% committed, the individual journal files will get a deletion prompt. The +%% Journal processes should copy the file to the waste folder, before erasing +%% themselves. +%% +%% The Inker will have a waste duration setting, and before running compaction +%% should delete all over-age items (using the file modified date) from the +%% waste. +%% %% -------- Tombstone Reaping --------- %% %% Value compaction does not remove tombstones from the database, and so a @@ -54,7 +65,7 @@ %% before the tombstone. If no ushc objects exist for that tombstone, it can %% now be reaped as part of the compaction job. %% -%% Other tombstones cannot be reaped, as otherwis eon laoding a ledger an old +%% Other tombstones cannot be reaped, as otherwise on laoding a ledger an old %% version of the object may re-emerge. -module(leveled_iclerk). @@ -88,10 +99,13 @@ -define(MAXRUN_COMPACTION_TARGET, 80.0). -define(CRC_SIZE, 4). -define(DEFAULT_RELOAD_STRATEGY, leveled_codec:inker_reload_strategy([])). +-define(DEFAULT_WASTE_RETENTION_PERIOD, 86400). -record(state, {inker :: pid(), max_run_length :: integer(), cdb_options, + waste_retention_period :: integer(), + waste_path :: string(), reload_strategy = ?DEFAULT_RELOAD_STRATEGY :: list()}). -record(candidate, {low_sqn :: integer(), @@ -129,32 +143,41 @@ clerk_stop(Pid) -> init([IClerkOpts]) -> ReloadStrategy = IClerkOpts#iclerk_options.reload_strategy, - case IClerkOpts#iclerk_options.max_run_length of - undefined -> - {ok, #state{max_run_length = ?MAX_COMPACTION_RUN, + CDBopts = IClerkOpts#iclerk_options.cdb_options, + WP = CDBopts#cdb_options.waste_path, + WRP = case IClerkOpts#iclerk_options.waste_retention_period of + undefined -> + ?DEFAULT_WASTE_RETENTION_PERIOD; + WRP0 -> + WRP0 + end, + MRL = case IClerkOpts#iclerk_options.max_run_length of + undefined -> + ?MAX_COMPACTION_RUN; + MRL0 -> + MRL0 + end, + + {ok, #state{max_run_length = MRL, inker = IClerkOpts#iclerk_options.inker, - cdb_options = IClerkOpts#iclerk_options.cdb_options, - reload_strategy = ReloadStrategy}}; - MRL -> - {ok, #state{max_run_length = MRL, - inker = IClerkOpts#iclerk_options.inker, - cdb_options = IClerkOpts#iclerk_options.cdb_options, - reload_strategy = ReloadStrategy}} - end. + cdb_options = CDBopts, + reload_strategy = ReloadStrategy, + waste_path = WP, + waste_retention_period = WRP}}. handle_call(_Msg, _From, State) -> {reply, not_supported, State}. handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout}, State) -> + % Empty the waste folder + clear_waste(State), % Need to fetch manifest at start rather than have it be passed in % Don't want to process a queued call waiting on an old manifest [_Active|Manifest] = leveled_inker:ink_getmanifest(Inker), MaxRunLength = State#state.max_run_length, {FilterServer, MaxSQN} = InitiateFun(Checker), CDBopts = State#state.cdb_options, - FP = CDBopts#cdb_options.file_path, - ok = filelib:ensure_dir(FP), Candidates = scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN), BestRun0 = assess_candidates(Candidates, MaxRunLength), @@ -511,10 +534,26 @@ generate_manifest_entry(ActiveJournal) -> [{StartSQN, NewFN, PidR}]. - +clear_waste(State) -> + WP = State#state.waste_path, + WRP = State#state.waste_retention_period, + {ok, ClearedJournals} = file:list_dir(WP), + N = calendar:datetime_to_gregorian_seconds(calendar:local_time()), + lists:foreach(fun(DelJ) -> + LMD = filelib:last_modified(WP ++ DelJ), + case N - calendar:datetime_to_gregorian_seconds(LMD) of + LMD_Delta when LMD_Delta >= WRP -> + ok = file:delete(WP ++ DelJ), + leveled_log:log("IC010", [WP ++ DelJ]); + LMD_Delta -> + leveled_log:log("IC011", [WP ++ DelJ, + LMD_Delta]), + ok + end + end, + ClearedJournals). - %%%============================================================================ %%% Test @@ -545,6 +584,21 @@ score_compare_test() -> ?assertMatch(Run1, choose_best_assessment(Run1, Run2, 4)), ?assertMatch(Run2, choose_best_assessment(Run1 ++ Run2, Run2, 4)). +file_gc_test() -> + State = #state{waste_path="test/waste/", + waste_retention_period=1}, + ok = filelib:ensure_dir(State#state.waste_path), + file:write_file(State#state.waste_path ++ "1.cdb", term_to_binary("Hello")), + timer:sleep(1100), + file:write_file(State#state.waste_path ++ "2.cdb", term_to_binary("Hello")), + clear_waste(State), + {ok, ClearedJournals} = file:list_dir(State#state.waste_path), + ?assertMatch(["2.cdb"], ClearedJournals), + timer:sleep(1100), + clear_waste(State), + {ok, ClearedJournals2} = file:list_dir(State#state.waste_path), + ?assertMatch([], ClearedJournals2). + find_bestrun_test() -> %% Tests dependent on these defaults %% -define(MAX_COMPACTION_RUN, 4). diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 5cc1f86..81cc154 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -119,6 +119,7 @@ -define(MANIFEST_FP, "journal_manifest"). -define(FILES_FP, "journal_files"). -define(COMPACT_FP, "post_compact"). +-define(WASTE_FP, "waste"). -define(JOURNAL_FILEX, "cdb"). -define(MANIFEST_FILEX, "man"). -define(PENDING_FILEX, "pnd"). @@ -360,20 +361,26 @@ code_change(_OldVsn, State, _Extra) -> start_from_file(InkOpts) -> RootPath = InkOpts#inker_options.root_path, CDBopts = InkOpts#inker_options.cdb_options, + JournalFP = filepath(RootPath, journal_dir), filelib:ensure_dir(JournalFP), CompactFP = filepath(RootPath, journal_compact_dir), filelib:ensure_dir(CompactFP), - + WasteFP = filepath(RootPath, journal_waste_dir), + filelib:ensure_dir(WasteFP), ManifestFP = filepath(RootPath, manifest_dir), ok = filelib:ensure_dir(ManifestFP), + {ok, ManifestFilenames} = file:list_dir(ManifestFP), - IClerkCDBOpts = CDBopts#cdb_options{file_path = CompactFP}, + IClerkCDBOpts = CDBopts#cdb_options{file_path = CompactFP, + waste_path = WasteFP}, ReloadStrategy = InkOpts#inker_options.reload_strategy, MRL = InkOpts#inker_options.max_run_length, + WRP = InkOpts#inker_options.waste_retention_period, IClerkOpts = #iclerk_options{inker = self(), cdb_options=IClerkCDBOpts, + waste_retention_period = WRP, reload_strategy = ReloadStrategy, max_run_length = MRL}, {ok, Clerk} = leveled_iclerk:clerk_new(IClerkOpts), @@ -389,7 +396,7 @@ start_from_file(InkOpts) -> journal_sqn = JournalSQN, active_journaldb = ActiveJournal, root_path = RootPath, - cdb_options = CDBopts, + cdb_options = CDBopts#cdb_options{waste_path=WasteFP}, clerk = Clerk}}. @@ -670,7 +677,9 @@ filepath(RootPath, journal_dir) -> filepath(RootPath, manifest_dir) -> RootPath ++ "/" ++ ?MANIFEST_FP ++ "/"; filepath(RootPath, journal_compact_dir) -> - filepath(RootPath, journal_dir) ++ "/" ++ ?COMPACT_FP ++ "/". + filepath(RootPath, journal_dir) ++ "/" ++ ?COMPACT_FP ++ "/"; +filepath(RootPath, journal_waste_dir) -> + filepath(RootPath, journal_dir) ++ "/" ++ ?WASTE_FP ++ "/". filepath(RootPath, NewSQN, new_journal) -> filename:join(filepath(RootPath, journal_dir), diff --git a/src/leveled_log.erl b/src/leveled_log.erl index f1779fc..17ec59b 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -181,6 +181,10 @@ {info, "Compaction source ~s has yielded ~w positions"}}, {"IC009", {info, "Generate journal for compaction with filename ~s"}}, + {"IC010", + {info, "Clearing journal with filename ~s"}}, + {"IC011", + {info, "Not clearing filename ~s as modified delta is only ~w seconds"}}, {"PM001", {info, "Indexed new cache entry with total L0 cache size now ~w"}}, diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 0c1deae..e760962 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -144,21 +144,9 @@ journal_compaction(_Config) -> %% Now replace all the other objects ObjList2 = testutil:generate_objects(40000, 10002), testutil:riakload(Bookie1, ObjList2), - ok = leveled_bookie:book_compactjournal(Bookie1, 30000), - F = fun leveled_bookie:book_islastcompactionpending/1, - lists:foldl(fun(X, Pending) -> - case Pending of - false -> - false; - true -> - io:format("Loop ~w waiting for journal " - ++ "compaction to complete~n", [X]), - timer:sleep(20000), - F(Bookie1) - end end, - true, - lists:seq(1, 15)), + ok = leveled_bookie:book_compactjournal(Bookie1, 30000), + testutil:wait_for_compaction(Bookie1), ChkList3 = lists:sublist(lists:sort(ObjList2), 500), testutil:check_forlist(Bookie1, ChkList3), @@ -168,6 +156,25 @@ journal_compaction(_Config) -> testutil:check_forobject(Bookie2, TestObject), testutil:check_forlist(Bookie2, ChkList3), ok = leveled_bookie:book_close(Bookie2), + + WasteFP = RootPath ++ "/journal/journal_files/waste", + {ok, ClearedJournals} = file:list_dir(WasteFP), + io:format("~w ClearedJournals found~n", [length(ClearedJournals)]), + true = length(ClearedJournals) > 0, + + StartOpts2 = [{root_path, RootPath}, + {max_journalsize, 10000000}, + {max_run_length, 1}, + {waste_retention_period, 1}], + {ok, Bookie3} = leveled_bookie:book_start(StartOpts2), + ok = leveled_bookie:book_compactjournal(Bookie3, 30000), + testutil:wait_for_compaction(Bookie3), + ok = leveled_bookie:book_close(Bookie3), + + {ok, ClearedJournalsPC} = file:list_dir(WasteFP), + io:format("~w ClearedJournals found~n", [length(ClearedJournalsPC)]), + true = length(ClearedJournalsPC) == 0, + testutil:reset_filestructure(10000). diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index e596993..3232735 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -39,7 +39,8 @@ restore_file/2, restore_topending/2, find_journals/1, - riak_hash/1]). + riak_hash/1, + wait_for_compaction/1]). -define(RETURN_TERMS, {true, undefined}). -define(SLOWOFFER_DELAY, 5). @@ -85,7 +86,20 @@ reset_filestructure(Wait) -> leveled_penciller:clean_testdir(RootPath ++ "/ledger"), RootPath. - +wait_for_compaction(Bookie) -> + F = fun leveled_bookie:book_islastcompactionpending/1, + lists:foldl(fun(X, Pending) -> + case Pending of + false -> + false; + true -> + io:format("Loop ~w waiting for journal " + ++ "compaction to complete~n", [X]), + timer:sleep(20000), + F(Bookie) + end end, + true, + lists:seq(1, 15)). check_bucket_stats(Bookie, Bucket) -> FoldSW1 = os:timestamp(),