Allow waste retnetion to be ignored
If wast retention period is undefined, then it should be ignored - and no waste retained (rather than retaining waste for 24 hours as at present). This wasn't working anyway - as reopen reader didn't get the cdb options (which didn't have the waste path on anyway) - so waste would not eb retained if the file had been opened after a stop/start.
This commit is contained in:
parent
e8bd712fb8
commit
22e894c928
5 changed files with 97 additions and 62 deletions
|
@ -157,7 +157,7 @@ book_start(RootPath, LedgerCacheSize, JournalSize, SyncStrategy) ->
|
||||||
%% - compression_point
|
%% - compression_point
|
||||||
%%
|
%%
|
||||||
%% Both of the first two options relate to compaction in the Journal. The
|
%% Both of the first two options relate to compaction in the Journal. The
|
||||||
%% retain_strategydetermines if a skinny record of the object should be
|
%% retain_strategy determines if a skinny record of the object should be
|
||||||
%% retained following compaction, and how that should be used when recovering
|
%% retained following compaction, and how that should be used when recovering
|
||||||
%% lost state in the Ledger.
|
%% lost state in the Ledger.
|
||||||
%%
|
%%
|
||||||
|
@ -170,7 +170,9 @@ book_start(RootPath, LedgerCacheSize, JournalSize, SyncStrategy) ->
|
||||||
%% Currently compacted records no longer in use are not removed but moved to
|
%% Currently compacted records no longer in use are not removed but moved to
|
||||||
%% a journal_waste folder, and the waste_retention_period determines how long
|
%% a journal_waste folder, and the waste_retention_period determines how long
|
||||||
%% this history should be kept for (for example to allow for it to be backed
|
%% this history should be kept for (for example to allow for it to be backed
|
||||||
%% up before deletion).
|
%% up before deletion). If the waste_retention_period (in seconds) is
|
||||||
|
%% undefined, then there will be no holding of this waste - unused files will
|
||||||
|
%% be immediately deleted.
|
||||||
%%
|
%%
|
||||||
%% Compression method and point allow Leveled to be switched from using bif
|
%% Compression method and point allow Leveled to be switched from using bif
|
||||||
%% based compression (zlib) to suing nif based compression (lz4). The
|
%% based compression (zlib) to suing nif based compression (lz4). The
|
||||||
|
|
|
@ -66,7 +66,7 @@
|
||||||
cdb_open_writer/2,
|
cdb_open_writer/2,
|
||||||
cdb_open_reader/1,
|
cdb_open_reader/1,
|
||||||
cdb_open_reader/2,
|
cdb_open_reader/2,
|
||||||
cdb_reopen_reader/2,
|
cdb_reopen_reader/3,
|
||||||
cdb_get/2,
|
cdb_get/2,
|
||||||
cdb_put/3,
|
cdb_put/3,
|
||||||
cdb_mput/2,
|
cdb_mput/2,
|
||||||
|
@ -138,7 +138,7 @@ cdb_open_writer(Filename, Opts) ->
|
||||||
ok = gen_fsm:sync_send_event(Pid, {open_writer, Filename}, infinity),
|
ok = gen_fsm:sync_send_event(Pid, {open_writer, Filename}, infinity),
|
||||||
{ok, Pid}.
|
{ok, Pid}.
|
||||||
|
|
||||||
-spec cdb_reopen_reader(string(), binary()) -> {ok, pid()}.
|
-spec cdb_reopen_reader(string(), binary(), cdb_options()) -> {ok, pid()}.
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Open an existing file that has already been moved into read-only mode. The
|
%% Open an existing file that has already been moved into read-only mode. The
|
||||||
%% LastKey should be known, as it has been stored in the manifest. Knowing the
|
%% LastKey should be known, as it has been stored in the manifest. Knowing the
|
||||||
|
@ -147,8 +147,9 @@ cdb_open_writer(Filename, Opts) ->
|
||||||
%%
|
%%
|
||||||
%% The LastKey is the Key of the last object added to the file - and is used to
|
%% The LastKey is the Key of the last object added to the file - and is used to
|
||||||
%% determine when scans over a file have completed.
|
%% determine when scans over a file have completed.
|
||||||
cdb_reopen_reader(Filename, LastKey) ->
|
cdb_reopen_reader(Filename, LastKey, CDBopts) ->
|
||||||
{ok, Pid} = gen_fsm:start(?MODULE, [#cdb_options{binary_mode=true}], []),
|
{ok, Pid} =
|
||||||
|
gen_fsm:start(?MODULE, [CDBopts#cdb_options{binary_mode=true}], []),
|
||||||
ok = gen_fsm:sync_send_event(Pid,
|
ok = gen_fsm:sync_send_event(Pid,
|
||||||
{open_reader, Filename, LastKey},
|
{open_reader, Filename, LastKey},
|
||||||
infinity),
|
infinity),
|
||||||
|
@ -692,17 +693,19 @@ handle_info(_Msg, StateName, State) ->
|
||||||
{next_state, StateName, State}.
|
{next_state, StateName, State}.
|
||||||
|
|
||||||
terminate(Reason, StateName, State) ->
|
terminate(Reason, StateName, State) ->
|
||||||
leveled_log:log("CDB05", [State#state.filename, Reason]),
|
leveled_log:log("CDB05", [State#state.filename, StateName, Reason]),
|
||||||
case {State#state.handle, StateName, State#state.waste_path} of
|
case {State#state.handle, StateName, State#state.waste_path} of
|
||||||
{undefined, _, _} ->
|
{undefined, _, _} ->
|
||||||
ok;
|
ok;
|
||||||
{Handle, delete_pending, undefined} ->
|
{Handle, delete_pending, undefined} ->
|
||||||
ok = file:close(Handle),
|
ok = file:close(Handle),
|
||||||
ok = file:delete(State#state.filename);
|
ok = file:delete(State#state.filename),
|
||||||
|
leveled_log:log("CDB20", [State#state.filename]);
|
||||||
{Handle, delete_pending, WasteFP} ->
|
{Handle, delete_pending, WasteFP} ->
|
||||||
file:close(Handle),
|
file:close(Handle),
|
||||||
Components = filename:split(State#state.filename),
|
Components = filename:split(State#state.filename),
|
||||||
NewName = WasteFP ++ lists:last(Components),
|
NewName = WasteFP ++ lists:last(Components),
|
||||||
|
leveled_log:log("CDB19", [State#state.filename, NewName]),
|
||||||
file:rename(State#state.filename, NewName);
|
file:rename(State#state.filename, NewName);
|
||||||
{Handle, _, _} ->
|
{Handle, _, _} ->
|
||||||
file:close(Handle)
|
file:close(Handle)
|
||||||
|
|
|
@ -101,7 +101,6 @@
|
||||||
-define(MAXRUN_COMPACTION_TARGET, 70.0).
|
-define(MAXRUN_COMPACTION_TARGET, 70.0).
|
||||||
-define(CRC_SIZE, 4).
|
-define(CRC_SIZE, 4).
|
||||||
-define(DEFAULT_RELOAD_STRATEGY, leveled_codec:inker_reload_strategy([])).
|
-define(DEFAULT_RELOAD_STRATEGY, leveled_codec:inker_reload_strategy([])).
|
||||||
-define(DEFAULT_WASTE_RETENTION_PERIOD, 86400).
|
|
||||||
-define(INTERVALS_PER_HOUR, 4).
|
-define(INTERVALS_PER_HOUR, 4).
|
||||||
|
|
||||||
-record(state, {inker :: pid() | undefined,
|
-record(state, {inker :: pid() | undefined,
|
||||||
|
@ -150,18 +149,15 @@ init([IClerkOpts]) ->
|
||||||
ReloadStrategy = IClerkOpts#iclerk_options.reload_strategy,
|
ReloadStrategy = IClerkOpts#iclerk_options.reload_strategy,
|
||||||
CDBopts = IClerkOpts#iclerk_options.cdb_options,
|
CDBopts = IClerkOpts#iclerk_options.cdb_options,
|
||||||
WP = CDBopts#cdb_options.waste_path,
|
WP = CDBopts#cdb_options.waste_path,
|
||||||
WRP = case IClerkOpts#iclerk_options.waste_retention_period of
|
WRP = IClerkOpts#iclerk_options.waste_retention_period,
|
||||||
undefined ->
|
|
||||||
?DEFAULT_WASTE_RETENTION_PERIOD;
|
MRL =
|
||||||
WRP0 ->
|
case IClerkOpts#iclerk_options.max_run_length of
|
||||||
WRP0
|
undefined ->
|
||||||
end,
|
?MAX_COMPACTION_RUN;
|
||||||
MRL = case IClerkOpts#iclerk_options.max_run_length of
|
MRL0 ->
|
||||||
undefined ->
|
MRL0
|
||||||
?MAX_COMPACTION_RUN;
|
end,
|
||||||
MRL0 ->
|
|
||||||
MRL0
|
|
||||||
end,
|
|
||||||
|
|
||||||
{ok, #state{max_run_length = MRL,
|
{ok, #state{max_run_length = MRL,
|
||||||
inker = IClerkOpts#iclerk_options.inker,
|
inker = IClerkOpts#iclerk_options.inker,
|
||||||
|
@ -616,23 +612,27 @@ write_values(KVCList, CDBopts, Journal0, ManSlice0, PressMethod) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
clear_waste(State) ->
|
clear_waste(State) ->
|
||||||
WP = State#state.waste_path,
|
case State#state.waste_path of
|
||||||
WRP = State#state.waste_retention_period,
|
undefined ->
|
||||||
{ok, ClearedJournals} = file:list_dir(WP),
|
ok;
|
||||||
N = calendar:datetime_to_gregorian_seconds(calendar:local_time()),
|
WP ->
|
||||||
lists:foreach(fun(DelJ) ->
|
WRP = State#state.waste_retention_period,
|
||||||
LMD = filelib:last_modified(WP ++ DelJ),
|
{ok, ClearedJournals} = file:list_dir(WP),
|
||||||
case N - calendar:datetime_to_gregorian_seconds(LMD) of
|
N = calendar:datetime_to_gregorian_seconds(calendar:local_time()),
|
||||||
LMD_Delta when LMD_Delta >= WRP ->
|
DeleteJournalFun =
|
||||||
ok = file:delete(WP ++ DelJ),
|
fun(DelJ) ->
|
||||||
leveled_log:log("IC010", [WP ++ DelJ]);
|
LMD = filelib:last_modified(WP ++ DelJ),
|
||||||
LMD_Delta ->
|
case N - calendar:datetime_to_gregorian_seconds(LMD) of
|
||||||
leveled_log:log("IC011", [WP ++ DelJ,
|
LMD_Delta when LMD_Delta >= WRP ->
|
||||||
LMD_Delta]),
|
ok = file:delete(WP ++ DelJ),
|
||||||
ok
|
leveled_log:log("IC010", [WP ++ DelJ]);
|
||||||
end
|
LMD_Delta ->
|
||||||
end,
|
leveled_log:log("IC011", [WP ++ DelJ, LMD_Delta]),
|
||||||
ClearedJournals).
|
ok
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
lists:foreach(DeleteJournalFun, ClearedJournals)
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
|
@ -493,25 +493,21 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
start_from_file(InkOpts) ->
|
start_from_file(InkOpts) ->
|
||||||
RootPath = InkOpts#inker_options.root_path,
|
|
||||||
CDBopts = InkOpts#inker_options.cdb_options,
|
|
||||||
|
|
||||||
|
CDBopts = get_cdbopts(InkOpts),
|
||||||
|
|
||||||
|
RootPath = InkOpts#inker_options.root_path,
|
||||||
JournalFP = filepath(RootPath, journal_dir),
|
JournalFP = filepath(RootPath, journal_dir),
|
||||||
filelib:ensure_dir(JournalFP),
|
filelib:ensure_dir(JournalFP),
|
||||||
CompactFP = filepath(RootPath, journal_compact_dir),
|
CompactFP = filepath(RootPath, journal_compact_dir),
|
||||||
filelib:ensure_dir(CompactFP),
|
filelib:ensure_dir(CompactFP),
|
||||||
WasteFP = filepath(RootPath, journal_waste_dir),
|
|
||||||
filelib:ensure_dir(WasteFP),
|
|
||||||
ManifestFP = filepath(RootPath, manifest_dir),
|
ManifestFP = filepath(RootPath, manifest_dir),
|
||||||
ok = filelib:ensure_dir(ManifestFP),
|
ok = filelib:ensure_dir(ManifestFP),
|
||||||
|
IClerkCDBOpts = CDBopts#cdb_options{file_path = CompactFP},
|
||||||
|
|
||||||
{ok, ManifestFilenames} = file:list_dir(ManifestFP),
|
WRP = InkOpts#inker_options.waste_retention_period,
|
||||||
|
|
||||||
IClerkCDBOpts = CDBopts#cdb_options{file_path = CompactFP,
|
|
||||||
waste_path = WasteFP},
|
|
||||||
ReloadStrategy = InkOpts#inker_options.reload_strategy,
|
ReloadStrategy = InkOpts#inker_options.reload_strategy,
|
||||||
MRL = InkOpts#inker_options.max_run_length,
|
MRL = InkOpts#inker_options.max_run_length,
|
||||||
WRP = InkOpts#inker_options.waste_retention_period,
|
|
||||||
PressMethod = InkOpts#inker_options.compression_method,
|
PressMethod = InkOpts#inker_options.compression_method,
|
||||||
PressOnReceipt = InkOpts#inker_options.compress_on_receipt,
|
PressOnReceipt = InkOpts#inker_options.compress_on_receipt,
|
||||||
IClerkOpts = #iclerk_options{inker = self(),
|
IClerkOpts = #iclerk_options{inker = self(),
|
||||||
|
@ -520,8 +516,10 @@ start_from_file(InkOpts) ->
|
||||||
reload_strategy = ReloadStrategy,
|
reload_strategy = ReloadStrategy,
|
||||||
compression_method = PressMethod,
|
compression_method = PressMethod,
|
||||||
max_run_length = MRL},
|
max_run_length = MRL},
|
||||||
|
|
||||||
{ok, Clerk} = leveled_iclerk:clerk_new(IClerkOpts),
|
{ok, Clerk} = leveled_iclerk:clerk_new(IClerkOpts),
|
||||||
|
|
||||||
|
{ok, ManifestFilenames} = file:list_dir(ManifestFP),
|
||||||
{Manifest,
|
{Manifest,
|
||||||
ManifestSQN,
|
ManifestSQN,
|
||||||
JournalSQN,
|
JournalSQN,
|
||||||
|
@ -533,11 +531,28 @@ start_from_file(InkOpts) ->
|
||||||
journal_sqn = JournalSQN,
|
journal_sqn = JournalSQN,
|
||||||
active_journaldb = ActiveJournal,
|
active_journaldb = ActiveJournal,
|
||||||
root_path = RootPath,
|
root_path = RootPath,
|
||||||
cdb_options = CDBopts#cdb_options{waste_path=WasteFP},
|
cdb_options = CDBopts,
|
||||||
compression_method = PressMethod,
|
compression_method = PressMethod,
|
||||||
compress_on_receipt = PressOnReceipt,
|
compress_on_receipt = PressOnReceipt,
|
||||||
clerk = Clerk}}.
|
clerk = Clerk}}.
|
||||||
|
|
||||||
|
get_cdbopts(InkOpts)->
|
||||||
|
CDBopts = InkOpts#inker_options.cdb_options,
|
||||||
|
WasteFP =
|
||||||
|
case InkOpts#inker_options.waste_retention_period of
|
||||||
|
undefined ->
|
||||||
|
% If the waste retention period is undefined, there will
|
||||||
|
% be no retention of waste. This is triggered by making
|
||||||
|
% the waste path undefined
|
||||||
|
undefined;
|
||||||
|
_WRP ->
|
||||||
|
WFP = filepath(InkOpts#inker_options.root_path,
|
||||||
|
journal_waste_dir),
|
||||||
|
filelib:ensure_dir(WFP),
|
||||||
|
WFP
|
||||||
|
end,
|
||||||
|
CDBopts#cdb_options{waste_path = WasteFP}.
|
||||||
|
|
||||||
|
|
||||||
put_object(LedgerKey, Object, KeyChanges, State) ->
|
put_object(LedgerKey, Object, KeyChanges, State) ->
|
||||||
NewSQN = State#state.journal_sqn + 1,
|
NewSQN = State#state.journal_sqn + 1,
|
||||||
|
@ -677,8 +692,8 @@ open_all_manifest(Man0, RootPath, CDBOpts) ->
|
||||||
PFN = FN ++ "." ++ ?PENDING_FILEX,
|
PFN = FN ++ "." ++ ?PENDING_FILEX,
|
||||||
case filelib:is_file(CFN) of
|
case filelib:is_file(CFN) of
|
||||||
true ->
|
true ->
|
||||||
{ok, Pid} = leveled_cdb:cdb_reopen_reader(CFN,
|
{ok, Pid} =
|
||||||
LK_RO),
|
leveled_cdb:cdb_reopen_reader(CFN, LK_RO, CDBOpts),
|
||||||
{LowSQN, FN, Pid, LK_RO};
|
{LowSQN, FN, Pid, LK_RO};
|
||||||
false ->
|
false ->
|
||||||
W = leveled_cdb:cdb_open_writer(PFN, CDBOpts),
|
W = leveled_cdb:cdb_open_writer(PFN, CDBOpts),
|
||||||
|
@ -920,6 +935,7 @@ build_dummy_journal(KeyConvertF) ->
|
||||||
clean_testdir(RootPath) ->
|
clean_testdir(RootPath) ->
|
||||||
clean_subdir(filepath(RootPath, journal_dir)),
|
clean_subdir(filepath(RootPath, journal_dir)),
|
||||||
clean_subdir(filepath(RootPath, journal_compact_dir)),
|
clean_subdir(filepath(RootPath, journal_compact_dir)),
|
||||||
|
clean_subdir(filepath(RootPath, journal_waste_dir)),
|
||||||
clean_subdir(filepath(RootPath, manifest_dir)).
|
clean_subdir(filepath(RootPath, manifest_dir)).
|
||||||
|
|
||||||
clean_subdir(DirPath) ->
|
clean_subdir(DirPath) ->
|
||||||
|
@ -976,19 +992,26 @@ simple_inker_completeactivejournal_test() ->
|
||||||
test_ledgerkey(Key) ->
|
test_ledgerkey(Key) ->
|
||||||
{o, "Bucket", Key, null}.
|
{o, "Bucket", Key, null}.
|
||||||
|
|
||||||
compact_journal_test() ->
|
compact_journal_wasteretained_test_() ->
|
||||||
{timeout, 60, fun compact_journal_testto/0}.
|
{timeout, 60, fun() -> compact_journal_testto(300, true) end}.
|
||||||
|
|
||||||
compact_journal_testto() ->
|
compact_journal_wastediscarded_test_() ->
|
||||||
|
{timeout, 60, fun() -> compact_journal_testto(undefined, false) end}.
|
||||||
|
|
||||||
|
compact_journal_testto(WRP, ExpectedFiles) ->
|
||||||
RootPath = "../test/journal",
|
RootPath = "../test/journal",
|
||||||
build_dummy_journal(fun test_ledgerkey/1),
|
|
||||||
CDBopts = #cdb_options{max_size=300000},
|
CDBopts = #cdb_options{max_size=300000},
|
||||||
RStrategy = [{?STD_TAG, recovr}],
|
RStrategy = [{?STD_TAG, recovr}],
|
||||||
{ok, Ink1} = ink_start(#inker_options{root_path=RootPath,
|
InkOpts = #inker_options{root_path=RootPath,
|
||||||
cdb_options=CDBopts,
|
cdb_options=CDBopts,
|
||||||
reload_strategy=RStrategy,
|
reload_strategy=RStrategy,
|
||||||
compression_method=native,
|
waste_retention_period=WRP,
|
||||||
compress_on_receipt=false}),
|
compression_method=native,
|
||||||
|
compress_on_receipt=false},
|
||||||
|
|
||||||
|
build_dummy_journal(fun test_ledgerkey/1),
|
||||||
|
{ok, Ink1} = ink_start(InkOpts),
|
||||||
|
|
||||||
{ok, NewSQN1, _ObjSize} = ink_put(Ink1,
|
{ok, NewSQN1, _ObjSize} = ink_put(Ink1,
|
||||||
test_ledgerkey("KeyAA"),
|
test_ledgerkey("KeyAA"),
|
||||||
"TestValueAA",
|
"TestValueAA",
|
||||||
|
@ -1036,7 +1059,7 @@ compact_journal_testto() ->
|
||||||
timer:sleep(1000),
|
timer:sleep(1000),
|
||||||
CompactedManifest2 = ink_getmanifest(Ink1),
|
CompactedManifest2 = ink_getmanifest(Ink1),
|
||||||
R = lists:foldl(fun({_SQN, FN, _P, _LK}, Acc) ->
|
R = lists:foldl(fun({_SQN, FN, _P, _LK}, Acc) ->
|
||||||
case string:str(FN, "post_compact") of
|
case string:str(FN, ?COMPACT_FP) of
|
||||||
N when N > 0 ->
|
N when N > 0 ->
|
||||||
true;
|
true;
|
||||||
0 ->
|
0 ->
|
||||||
|
@ -1049,6 +1072,9 @@ compact_journal_testto() ->
|
||||||
ink_close(Ink1),
|
ink_close(Ink1),
|
||||||
% Need to wait for delete_pending files to timeout
|
% Need to wait for delete_pending files to timeout
|
||||||
timer:sleep(10000),
|
timer:sleep(10000),
|
||||||
|
% Are there filese in the waste folder after compaction
|
||||||
|
{ok, WasteFNs} = file:list_dir(filepath(RootPath, journal_waste_dir)),
|
||||||
|
?assertMatch(ExpectedFiles, length(WasteFNs) > 0),
|
||||||
clean_testdir(RootPath).
|
clean_testdir(RootPath).
|
||||||
|
|
||||||
empty_manifest_test() ->
|
empty_manifest_test() ->
|
||||||
|
|
|
@ -297,7 +297,7 @@
|
||||||
{"CDB04",
|
{"CDB04",
|
||||||
{info, "Deletion confirmed for file ~s at ManifestSQN ~w"}},
|
{info, "Deletion confirmed for file ~s at ManifestSQN ~w"}},
|
||||||
{"CDB05",
|
{"CDB05",
|
||||||
{info, "Closing of filename ~s for reason ~w"}},
|
{info, "Closing of filename ~s from state ~w for reason ~w"}},
|
||||||
{"CDB06",
|
{"CDB06",
|
||||||
{info, "File to be truncated at last position of ~w with end of "
|
{info, "File to be truncated at last position of ~w with end of "
|
||||||
++ "file at ~w"}},
|
++ "file at ~w"}},
|
||||||
|
@ -327,7 +327,11 @@
|
||||||
{info, "After ~w PUTs total write time is ~w total sync time is ~w "
|
{info, "After ~w PUTs total write time is ~w total sync time is ~w "
|
||||||
++ "and max write time is ~w and max sync time is ~w"}},
|
++ "and max write time is ~w and max sync time is ~w"}},
|
||||||
{"CDB18",
|
{"CDB18",
|
||||||
{info, "Handled return and write of hashtable"}}
|
{info, "Handled return and write of hashtable"}},
|
||||||
|
{"CDB19",
|
||||||
|
{info, "Transferring filename ~s to waste ~s"}},
|
||||||
|
{"CDB20",
|
||||||
|
{info, "Deleting filename ~s as no waste retention period defined"}}
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue