From 22e894c928fb9ef5156c741af6164c8614d643f1 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 8 Nov 2017 12:58:09 +0000 Subject: [PATCH] Allow waste retnetion to be ignored If wast retention period is undefined, then it should be ignored - and no waste retained (rather than retaining waste for 24 hours as at present). This wasn't working anyway - as reopen reader didn't get the cdb options (which didn't have the waste path on anyway) - so waste would not eb retained if the file had been opened after a stop/start. --- src/leveled_bookie.erl | 6 ++-- src/leveled_cdb.erl | 15 +++++---- src/leveled_iclerk.erl | 60 ++++++++++++++++++------------------ src/leveled_inker.erl | 70 +++++++++++++++++++++++++++++------------- src/leveled_log.erl | 8 +++-- 5 files changed, 97 insertions(+), 62 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 642676a..43205c0 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -157,7 +157,7 @@ book_start(RootPath, LedgerCacheSize, JournalSize, SyncStrategy) -> %% - compression_point %% %% Both of the first two options relate to compaction in the Journal. The -%% retain_strategydetermines if a skinny record of the object should be +%% retain_strategy determines if a skinny record of the object should be %% retained following compaction, and how that should be used when recovering %% lost state in the Ledger. %% @@ -170,7 +170,9 @@ book_start(RootPath, LedgerCacheSize, JournalSize, SyncStrategy) -> %% Currently compacted records no longer in use are not removed but moved to %% a journal_waste folder, and the waste_retention_period determines how long %% this history should be kept for (for example to allow for it to be backed -%% up before deletion). +%% up before deletion). If the waste_retention_period (in seconds) is +%% undefined, then there will be no holding of this waste - unused files will +%% be immediately deleted. %% %% Compression method and point allow Leveled to be switched from using bif %% based compression (zlib) to suing nif based compression (lz4). The diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 73959b1..b92efd1 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -66,7 +66,7 @@ cdb_open_writer/2, cdb_open_reader/1, cdb_open_reader/2, - cdb_reopen_reader/2, + cdb_reopen_reader/3, cdb_get/2, cdb_put/3, cdb_mput/2, @@ -138,7 +138,7 @@ cdb_open_writer(Filename, Opts) -> ok = gen_fsm:sync_send_event(Pid, {open_writer, Filename}, infinity), {ok, Pid}. --spec cdb_reopen_reader(string(), binary()) -> {ok, pid()}. +-spec cdb_reopen_reader(string(), binary(), cdb_options()) -> {ok, pid()}. %% @doc %% Open an existing file that has already been moved into read-only mode. The %% LastKey should be known, as it has been stored in the manifest. Knowing the @@ -147,8 +147,9 @@ cdb_open_writer(Filename, Opts) -> %% %% The LastKey is the Key of the last object added to the file - and is used to %% determine when scans over a file have completed. -cdb_reopen_reader(Filename, LastKey) -> - {ok, Pid} = gen_fsm:start(?MODULE, [#cdb_options{binary_mode=true}], []), +cdb_reopen_reader(Filename, LastKey, CDBopts) -> + {ok, Pid} = + gen_fsm:start(?MODULE, [CDBopts#cdb_options{binary_mode=true}], []), ok = gen_fsm:sync_send_event(Pid, {open_reader, Filename, LastKey}, infinity), @@ -692,17 +693,19 @@ handle_info(_Msg, StateName, State) -> {next_state, StateName, State}. terminate(Reason, StateName, State) -> - leveled_log:log("CDB05", [State#state.filename, Reason]), + leveled_log:log("CDB05", [State#state.filename, StateName, Reason]), case {State#state.handle, StateName, State#state.waste_path} of {undefined, _, _} -> ok; {Handle, delete_pending, undefined} -> ok = file:close(Handle), - ok = file:delete(State#state.filename); + ok = file:delete(State#state.filename), + leveled_log:log("CDB20", [State#state.filename]); {Handle, delete_pending, WasteFP} -> file:close(Handle), Components = filename:split(State#state.filename), NewName = WasteFP ++ lists:last(Components), + leveled_log:log("CDB19", [State#state.filename, NewName]), file:rename(State#state.filename, NewName); {Handle, _, _} -> file:close(Handle) diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index fe4260b..195c362 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -101,7 +101,6 @@ -define(MAXRUN_COMPACTION_TARGET, 70.0). -define(CRC_SIZE, 4). -define(DEFAULT_RELOAD_STRATEGY, leveled_codec:inker_reload_strategy([])). --define(DEFAULT_WASTE_RETENTION_PERIOD, 86400). -define(INTERVALS_PER_HOUR, 4). -record(state, {inker :: pid() | undefined, @@ -150,18 +149,15 @@ init([IClerkOpts]) -> ReloadStrategy = IClerkOpts#iclerk_options.reload_strategy, CDBopts = IClerkOpts#iclerk_options.cdb_options, WP = CDBopts#cdb_options.waste_path, - WRP = case IClerkOpts#iclerk_options.waste_retention_period of - undefined -> - ?DEFAULT_WASTE_RETENTION_PERIOD; - WRP0 -> - WRP0 - end, - MRL = case IClerkOpts#iclerk_options.max_run_length of - undefined -> - ?MAX_COMPACTION_RUN; - MRL0 -> - MRL0 - end, + WRP = IClerkOpts#iclerk_options.waste_retention_period, + + MRL = + case IClerkOpts#iclerk_options.max_run_length of + undefined -> + ?MAX_COMPACTION_RUN; + MRL0 -> + MRL0 + end, {ok, #state{max_run_length = MRL, inker = IClerkOpts#iclerk_options.inker, @@ -616,23 +612,27 @@ write_values(KVCList, CDBopts, Journal0, ManSlice0, PressMethod) -> end. clear_waste(State) -> - WP = State#state.waste_path, - WRP = State#state.waste_retention_period, - {ok, ClearedJournals} = file:list_dir(WP), - N = calendar:datetime_to_gregorian_seconds(calendar:local_time()), - lists:foreach(fun(DelJ) -> - LMD = filelib:last_modified(WP ++ DelJ), - case N - calendar:datetime_to_gregorian_seconds(LMD) of - LMD_Delta when LMD_Delta >= WRP -> - ok = file:delete(WP ++ DelJ), - leveled_log:log("IC010", [WP ++ DelJ]); - LMD_Delta -> - leveled_log:log("IC011", [WP ++ DelJ, - LMD_Delta]), - ok - end - end, - ClearedJournals). + case State#state.waste_path of + undefined -> + ok; + WP -> + WRP = State#state.waste_retention_period, + {ok, ClearedJournals} = file:list_dir(WP), + N = calendar:datetime_to_gregorian_seconds(calendar:local_time()), + DeleteJournalFun = + fun(DelJ) -> + LMD = filelib:last_modified(WP ++ DelJ), + case N - calendar:datetime_to_gregorian_seconds(LMD) of + LMD_Delta when LMD_Delta >= WRP -> + ok = file:delete(WP ++ DelJ), + leveled_log:log("IC010", [WP ++ DelJ]); + LMD_Delta -> + leveled_log:log("IC011", [WP ++ DelJ, LMD_Delta]), + ok + end + end, + lists:foreach(DeleteJournalFun, ClearedJournals) + end. %%%============================================================================ diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 92e644f..2fa154c 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -493,25 +493,21 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================ start_from_file(InkOpts) -> - RootPath = InkOpts#inker_options.root_path, - CDBopts = InkOpts#inker_options.cdb_options, + CDBopts = get_cdbopts(InkOpts), + + RootPath = InkOpts#inker_options.root_path, JournalFP = filepath(RootPath, journal_dir), filelib:ensure_dir(JournalFP), CompactFP = filepath(RootPath, journal_compact_dir), filelib:ensure_dir(CompactFP), - WasteFP = filepath(RootPath, journal_waste_dir), - filelib:ensure_dir(WasteFP), ManifestFP = filepath(RootPath, manifest_dir), ok = filelib:ensure_dir(ManifestFP), + IClerkCDBOpts = CDBopts#cdb_options{file_path = CompactFP}, - {ok, ManifestFilenames} = file:list_dir(ManifestFP), - - IClerkCDBOpts = CDBopts#cdb_options{file_path = CompactFP, - waste_path = WasteFP}, + WRP = InkOpts#inker_options.waste_retention_period, ReloadStrategy = InkOpts#inker_options.reload_strategy, MRL = InkOpts#inker_options.max_run_length, - WRP = InkOpts#inker_options.waste_retention_period, PressMethod = InkOpts#inker_options.compression_method, PressOnReceipt = InkOpts#inker_options.compress_on_receipt, IClerkOpts = #iclerk_options{inker = self(), @@ -520,8 +516,10 @@ start_from_file(InkOpts) -> reload_strategy = ReloadStrategy, compression_method = PressMethod, max_run_length = MRL}, + {ok, Clerk} = leveled_iclerk:clerk_new(IClerkOpts), + {ok, ManifestFilenames} = file:list_dir(ManifestFP), {Manifest, ManifestSQN, JournalSQN, @@ -533,11 +531,28 @@ start_from_file(InkOpts) -> journal_sqn = JournalSQN, active_journaldb = ActiveJournal, root_path = RootPath, - cdb_options = CDBopts#cdb_options{waste_path=WasteFP}, + cdb_options = CDBopts, compression_method = PressMethod, compress_on_receipt = PressOnReceipt, clerk = Clerk}}. +get_cdbopts(InkOpts)-> + CDBopts = InkOpts#inker_options.cdb_options, + WasteFP = + case InkOpts#inker_options.waste_retention_period of + undefined -> + % If the waste retention period is undefined, there will + % be no retention of waste. This is triggered by making + % the waste path undefined + undefined; + _WRP -> + WFP = filepath(InkOpts#inker_options.root_path, + journal_waste_dir), + filelib:ensure_dir(WFP), + WFP + end, + CDBopts#cdb_options{waste_path = WasteFP}. + put_object(LedgerKey, Object, KeyChanges, State) -> NewSQN = State#state.journal_sqn + 1, @@ -677,8 +692,8 @@ open_all_manifest(Man0, RootPath, CDBOpts) -> PFN = FN ++ "." ++ ?PENDING_FILEX, case filelib:is_file(CFN) of true -> - {ok, Pid} = leveled_cdb:cdb_reopen_reader(CFN, - LK_RO), + {ok, Pid} = + leveled_cdb:cdb_reopen_reader(CFN, LK_RO, CDBOpts), {LowSQN, FN, Pid, LK_RO}; false -> W = leveled_cdb:cdb_open_writer(PFN, CDBOpts), @@ -920,6 +935,7 @@ build_dummy_journal(KeyConvertF) -> clean_testdir(RootPath) -> clean_subdir(filepath(RootPath, journal_dir)), clean_subdir(filepath(RootPath, journal_compact_dir)), + clean_subdir(filepath(RootPath, journal_waste_dir)), clean_subdir(filepath(RootPath, manifest_dir)). clean_subdir(DirPath) -> @@ -976,19 +992,26 @@ simple_inker_completeactivejournal_test() -> test_ledgerkey(Key) -> {o, "Bucket", Key, null}. -compact_journal_test() -> - {timeout, 60, fun compact_journal_testto/0}. +compact_journal_wasteretained_test_() -> + {timeout, 60, fun() -> compact_journal_testto(300, true) end}. -compact_journal_testto() -> +compact_journal_wastediscarded_test_() -> + {timeout, 60, fun() -> compact_journal_testto(undefined, false) end}. + +compact_journal_testto(WRP, ExpectedFiles) -> RootPath = "../test/journal", - build_dummy_journal(fun test_ledgerkey/1), CDBopts = #cdb_options{max_size=300000}, RStrategy = [{?STD_TAG, recovr}], - {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, - cdb_options=CDBopts, - reload_strategy=RStrategy, - compression_method=native, - compress_on_receipt=false}), + InkOpts = #inker_options{root_path=RootPath, + cdb_options=CDBopts, + reload_strategy=RStrategy, + waste_retention_period=WRP, + compression_method=native, + compress_on_receipt=false}, + + build_dummy_journal(fun test_ledgerkey/1), + {ok, Ink1} = ink_start(InkOpts), + {ok, NewSQN1, _ObjSize} = ink_put(Ink1, test_ledgerkey("KeyAA"), "TestValueAA", @@ -1036,7 +1059,7 @@ compact_journal_testto() -> timer:sleep(1000), CompactedManifest2 = ink_getmanifest(Ink1), R = lists:foldl(fun({_SQN, FN, _P, _LK}, Acc) -> - case string:str(FN, "post_compact") of + case string:str(FN, ?COMPACT_FP) of N when N > 0 -> true; 0 -> @@ -1049,6 +1072,9 @@ compact_journal_testto() -> ink_close(Ink1), % Need to wait for delete_pending files to timeout timer:sleep(10000), + % Are there filese in the waste folder after compaction + {ok, WasteFNs} = file:list_dir(filepath(RootPath, journal_waste_dir)), + ?assertMatch(ExpectedFiles, length(WasteFNs) > 0), clean_testdir(RootPath). empty_manifest_test() -> diff --git a/src/leveled_log.erl b/src/leveled_log.erl index 8cb6c5e..7d27c2b 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -297,7 +297,7 @@ {"CDB04", {info, "Deletion confirmed for file ~s at ManifestSQN ~w"}}, {"CDB05", - {info, "Closing of filename ~s for reason ~w"}}, + {info, "Closing of filename ~s from state ~w for reason ~w"}}, {"CDB06", {info, "File to be truncated at last position of ~w with end of " ++ "file at ~w"}}, @@ -327,7 +327,11 @@ {info, "After ~w PUTs total write time is ~w total sync time is ~w " ++ "and max write time is ~w and max sync time is ~w"}}, {"CDB18", - {info, "Handled return and write of hashtable"}} + {info, "Handled return and write of hashtable"}}, + {"CDB19", + {info, "Transferring filename ~s to waste ~s"}}, + {"CDB20", + {info, "Deleting filename ~s as no waste retention period defined"}} ]).