From 5cee3a8e4e736c757798a72649e512285686c3a6 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 7 Nov 2017 19:41:39 +0000 Subject: [PATCH 1/5] Tidy up spec Also remove _app _sup originally added for dialyzer (due to false understanding they were needed for dialyzer) --- rebar.lock | 4 ++++ src/leveled_app.erl | 19 ---------------- src/leveled_bookie.erl | 51 ++++++++++++++++++++++++++++++++++++------ src/leveled_sup.erl | 30 ------------------------- 4 files changed, 48 insertions(+), 56 deletions(-) create mode 100644 rebar.lock delete mode 100644 src/leveled_app.erl delete mode 100644 src/leveled_sup.erl diff --git a/rebar.lock b/rebar.lock new file mode 100644 index 0000000..d315af2 --- /dev/null +++ b/rebar.lock @@ -0,0 +1,4 @@ +[{<<"lz4">>, + {git,"https://github.com/martinsumner/erlang-lz4", + {ref,"09d539685e616b614e851926384439384601ee5a"}}, + 0}]. diff --git a/src/leveled_app.erl b/src/leveled_app.erl deleted file mode 100644 index 3f40412..0000000 --- a/src/leveled_app.erl +++ /dev/null @@ -1,19 +0,0 @@ --module(leveled_app). - --behaviour(application). - -%% Application callbacks --export([start/2, stop/1]). - -%% =================================================================== -%% Application callbacks -%% =================================================================== - -%% Currently this is just to keep dialyzer happy -%% Run the store diretcly using leveled_bookie:book_start/4 or bookie_start/1 - -start(_StartType, _StartArgs) -> - leveled_sup:start_link(). - -stop(_State) -> - ok. diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 1e291d1..642676a 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -104,11 +104,14 @@ -type book_state() :: #state{}. +-type sync_mode() :: sync|none|riak_sync. %%%============================================================================ %%% API %%%============================================================================ +-spec book_start(string(), integer(), integer(), sync_mode()) -> {ok, pid()}. + %% @doc Start a Leveled Key/Value store - limited options support. %% %% The most common startup parameters are extracted out from the options to @@ -142,6 +145,8 @@ book_start(RootPath, LedgerCacheSize, JournalSize, SyncStrategy) -> {max_journalsize, JournalSize}, {sync_strategy, SyncStrategy}]). +-spec book_start(list(tuple())) -> {ok, pid()}. + %% @doc Start a Leveled Key/Value store - full options support. %% %% Allows an options proplists to be passed for setting options. There are @@ -195,6 +200,10 @@ book_start(RootPath, LedgerCacheSize, JournalSize, SyncStrategy) -> book_start(Opts) -> gen_server:start(?MODULE, [Opts], []). + +-spec book_tempput(pid(), any(), any(), any(), list(), atom(), integer()) -> + ok|pause. + %% @doc Put an object with an expiry time %% %% Put an item in the store but with a Time To Live - the time when the object @@ -258,12 +267,18 @@ book_put(Pid, Bucket, Key, Object, IndexSpecs) -> book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag) -> book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, infinity). +-spec book_put(pid(), any(), any(), any(), list(), atom(), infinity|integer()) + -> ok|pause. + book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) -> gen_server:call(Pid, {put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, infinity). -%% @doc - Standard PUT + +-spec book_delete(pid(), any(), any(), list()) -> ok|pause. + +%% @doc %% %% A thin wrap around the put of a special tombstone object. There is no %% immediate reclaim of space, simply the addition of a more recent tombstone. @@ -271,7 +286,11 @@ book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) -> book_delete(Pid, Bucket, Key, IndexSpecs) -> book_put(Pid, Bucket, Key, delete, IndexSpecs, ?STD_TAG). -%% @doc - GET and HAD requests + +-spec book_get(pid(), any(), any(), atom()) -> {ok, any()}|not_found. +-spec book_head(pid(), any(), any(), atom()) -> {ok, any()}|not_found. + +%% @doc - GET and HEAD requests %% %% The Bookie supports both GET and HEAD requests, with the HEAD request %% returning only the metadata and not the actual object value. The HEAD @@ -280,11 +299,6 @@ book_delete(Pid, Bucket, Key, IndexSpecs) -> %% GET requests first follow the path of a HEAD request, and if an object is %% found, then fetch the value from the Journal via the Inker. -book_get(Pid, Bucket, Key) -> - book_get(Pid, Bucket, Key, ?STD_TAG). - -book_head(Pid, Bucket, Key) -> - book_head(Pid, Bucket, Key, ?STD_TAG). book_get(Pid, Bucket, Key, Tag) -> gen_server:call(Pid, {get, Bucket, Key, Tag}, infinity). @@ -292,6 +306,15 @@ book_get(Pid, Bucket, Key, Tag) -> book_head(Pid, Bucket, Key, Tag) -> gen_server:call(Pid, {head, Bucket, Key, Tag}, infinity). +book_get(Pid, Bucket, Key) -> + book_get(Pid, Bucket, Key, ?STD_TAG). + +book_head(Pid, Bucket, Key) -> + book_head(Pid, Bucket, Key, ?STD_TAG). + + +-spec book_returnfolder(pid(), tuple()) -> {async, fun()}. + %% @doc Snapshots/Clones %% %% If there is a snapshot request (e.g. to iterate over the keys) the Bookie @@ -343,6 +366,12 @@ book_head(Pid, Bucket, Key, Tag) -> book_returnfolder(Pid, RunnerType) -> gen_server:call(Pid, {return_runner, RunnerType}, infinity). + +-spec book_snapshot(pid(), + store|ledger, + tuple()|undefined, + boolean()|undefined) -> {ok, pid(), pid()|null}. + %% @doc create a snapshot of the store %% %% Snapshot can be based on a pre-defined query (which will be used to filter @@ -353,6 +382,10 @@ book_returnfolder(Pid, RunnerType) -> book_snapshot(Pid, SnapType, Query, LongRunning) -> gen_server:call(Pid, {snapshot, SnapType, Query, LongRunning}, infinity). + +-spec book_compactjournal(pid(), integer()) -> ok. +-spec book_islastcompactionpending(pid()) -> boolean(). + %% @doc Call for compaction of the Journal %% %% the scheduling of Journla compaction is called externally, so it is assumed @@ -366,6 +399,10 @@ book_compactjournal(Pid, Timeout) -> book_islastcompactionpending(Pid) -> gen_server:call(Pid, confirm_compact, infinity). + +-spec book_close(pid()) -> ok. +-spec book_destroy(pid()) -> ok. + %% @doc Clean shutdown %% %% A clean shutdown will persist all the information in the Penciller memory diff --git a/src/leveled_sup.erl b/src/leveled_sup.erl deleted file mode 100644 index 58d4b71..0000000 --- a/src/leveled_sup.erl +++ /dev/null @@ -1,30 +0,0 @@ --module(leveled_sup). - --behaviour(supervisor). - -%% API --export([start_link/0]). - -%% Supervisor callbacks --export([init/1]). - -%% Helper macro for declaring children of supervisor --define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). - -%% =================================================================== -%% API functions -%% =================================================================== - -%% Currently this is just to keep dialyzer happy -%% Run the store directly using leveled_bookie:book_start/4 or bookie_start/1 - -start_link() -> - supervisor:start_link({local, leveled_bookie}, ?MODULE, []). - -%% =================================================================== -%% Supervisor callbacks -%% =================================================================== - -init([]) -> - {ok, { {one_for_one, 5, 10}, []} }. - From e8bd712fb875fd2c0766aa28946cde7b691a0042 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 8 Nov 2017 11:20:22 +0000 Subject: [PATCH 2/5] Tidy up test shutdown --- src/leveled_iclerk.erl | 4 +++- src/leveled_inker.erl | 7 ++++++- src/leveled_log.erl | 2 +- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index f60de64..fe4260b 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -923,7 +923,9 @@ compact_empty_file_test() -> {3, {o, "Bucket", "Key3", null}}], LedgerFun1 = fun(_Srv, _Key, _ObjSQN) -> false end, Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4), - ?assertMatch(100.0, Score1). + ?assertMatch(100.0, Score1), + ok = leveled_cdb:cdb_deletepending(CDB2), + ok = leveled_cdb:cdb_destroy(CDB2). compare_candidate_test() -> Candidate1 = #candidate{low_sqn=1}, diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 9859aa2..92e644f 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -299,6 +299,7 @@ ink_compactjournal(Pid, Checker, InitiateFun, CloseFun, FilterFun, Timeout) -> FilterFun, Timeout}, infinity). + -spec ink_compactioncomplete(pid()) -> ok. %% @doc %% Used by a clerk to state that a compaction process is over, only change @@ -933,7 +934,6 @@ clean_subdir(DirPath) -> end, Files). - simple_inker_test() -> RootPath = "../test/journal", build_dummy_journal(), @@ -977,6 +977,9 @@ test_ledgerkey(Key) -> {o, "Bucket", Key, null}. compact_journal_test() -> + {timeout, 60, fun compact_journal_testto/0}. + +compact_journal_testto() -> RootPath = "../test/journal", build_dummy_journal(fun test_ledgerkey/1), CDBopts = #cdb_options{max_size=300000}, @@ -1044,6 +1047,8 @@ compact_journal_test() -> ?assertMatch(false, R), ?assertMatch(2, length(CompactedManifest2)), ink_close(Ink1), + % Need to wait for delete_pending files to timeout + timer:sleep(10000), clean_testdir(RootPath). empty_manifest_test() -> diff --git a/src/leveled_log.erl b/src/leveled_log.erl index c3508b0..8cb6c5e 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -297,7 +297,7 @@ {"CDB04", {info, "Deletion confirmed for file ~s at ManifestSQN ~w"}}, {"CDB05", - {info, "Closing of filename ~s for Reason ~w"}}, + {info, "Closing of filename ~s for reason ~w"}}, {"CDB06", {info, "File to be truncated at last position of ~w with end of " ++ "file at ~w"}}, From 22e894c928fb9ef5156c741af6164c8614d643f1 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 8 Nov 2017 12:58:09 +0000 Subject: [PATCH 3/5] Allow waste retnetion to be ignored If wast retention period is undefined, then it should be ignored - and no waste retained (rather than retaining waste for 24 hours as at present). This wasn't working anyway - as reopen reader didn't get the cdb options (which didn't have the waste path on anyway) - so waste would not eb retained if the file had been opened after a stop/start. --- src/leveled_bookie.erl | 6 ++-- src/leveled_cdb.erl | 15 +++++---- src/leveled_iclerk.erl | 60 ++++++++++++++++++------------------ src/leveled_inker.erl | 70 +++++++++++++++++++++++++++++------------- src/leveled_log.erl | 8 +++-- 5 files changed, 97 insertions(+), 62 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 642676a..43205c0 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -157,7 +157,7 @@ book_start(RootPath, LedgerCacheSize, JournalSize, SyncStrategy) -> %% - compression_point %% %% Both of the first two options relate to compaction in the Journal. The -%% retain_strategydetermines if a skinny record of the object should be +%% retain_strategy determines if a skinny record of the object should be %% retained following compaction, and how that should be used when recovering %% lost state in the Ledger. %% @@ -170,7 +170,9 @@ book_start(RootPath, LedgerCacheSize, JournalSize, SyncStrategy) -> %% Currently compacted records no longer in use are not removed but moved to %% a journal_waste folder, and the waste_retention_period determines how long %% this history should be kept for (for example to allow for it to be backed -%% up before deletion). +%% up before deletion). If the waste_retention_period (in seconds) is +%% undefined, then there will be no holding of this waste - unused files will +%% be immediately deleted. %% %% Compression method and point allow Leveled to be switched from using bif %% based compression (zlib) to suing nif based compression (lz4). The diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 73959b1..b92efd1 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -66,7 +66,7 @@ cdb_open_writer/2, cdb_open_reader/1, cdb_open_reader/2, - cdb_reopen_reader/2, + cdb_reopen_reader/3, cdb_get/2, cdb_put/3, cdb_mput/2, @@ -138,7 +138,7 @@ cdb_open_writer(Filename, Opts) -> ok = gen_fsm:sync_send_event(Pid, {open_writer, Filename}, infinity), {ok, Pid}. --spec cdb_reopen_reader(string(), binary()) -> {ok, pid()}. +-spec cdb_reopen_reader(string(), binary(), cdb_options()) -> {ok, pid()}. %% @doc %% Open an existing file that has already been moved into read-only mode. The %% LastKey should be known, as it has been stored in the manifest. Knowing the @@ -147,8 +147,9 @@ cdb_open_writer(Filename, Opts) -> %% %% The LastKey is the Key of the last object added to the file - and is used to %% determine when scans over a file have completed. -cdb_reopen_reader(Filename, LastKey) -> - {ok, Pid} = gen_fsm:start(?MODULE, [#cdb_options{binary_mode=true}], []), +cdb_reopen_reader(Filename, LastKey, CDBopts) -> + {ok, Pid} = + gen_fsm:start(?MODULE, [CDBopts#cdb_options{binary_mode=true}], []), ok = gen_fsm:sync_send_event(Pid, {open_reader, Filename, LastKey}, infinity), @@ -692,17 +693,19 @@ handle_info(_Msg, StateName, State) -> {next_state, StateName, State}. terminate(Reason, StateName, State) -> - leveled_log:log("CDB05", [State#state.filename, Reason]), + leveled_log:log("CDB05", [State#state.filename, StateName, Reason]), case {State#state.handle, StateName, State#state.waste_path} of {undefined, _, _} -> ok; {Handle, delete_pending, undefined} -> ok = file:close(Handle), - ok = file:delete(State#state.filename); + ok = file:delete(State#state.filename), + leveled_log:log("CDB20", [State#state.filename]); {Handle, delete_pending, WasteFP} -> file:close(Handle), Components = filename:split(State#state.filename), NewName = WasteFP ++ lists:last(Components), + leveled_log:log("CDB19", [State#state.filename, NewName]), file:rename(State#state.filename, NewName); {Handle, _, _} -> file:close(Handle) diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index fe4260b..195c362 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -101,7 +101,6 @@ -define(MAXRUN_COMPACTION_TARGET, 70.0). -define(CRC_SIZE, 4). -define(DEFAULT_RELOAD_STRATEGY, leveled_codec:inker_reload_strategy([])). --define(DEFAULT_WASTE_RETENTION_PERIOD, 86400). -define(INTERVALS_PER_HOUR, 4). -record(state, {inker :: pid() | undefined, @@ -150,18 +149,15 @@ init([IClerkOpts]) -> ReloadStrategy = IClerkOpts#iclerk_options.reload_strategy, CDBopts = IClerkOpts#iclerk_options.cdb_options, WP = CDBopts#cdb_options.waste_path, - WRP = case IClerkOpts#iclerk_options.waste_retention_period of - undefined -> - ?DEFAULT_WASTE_RETENTION_PERIOD; - WRP0 -> - WRP0 - end, - MRL = case IClerkOpts#iclerk_options.max_run_length of - undefined -> - ?MAX_COMPACTION_RUN; - MRL0 -> - MRL0 - end, + WRP = IClerkOpts#iclerk_options.waste_retention_period, + + MRL = + case IClerkOpts#iclerk_options.max_run_length of + undefined -> + ?MAX_COMPACTION_RUN; + MRL0 -> + MRL0 + end, {ok, #state{max_run_length = MRL, inker = IClerkOpts#iclerk_options.inker, @@ -616,23 +612,27 @@ write_values(KVCList, CDBopts, Journal0, ManSlice0, PressMethod) -> end. clear_waste(State) -> - WP = State#state.waste_path, - WRP = State#state.waste_retention_period, - {ok, ClearedJournals} = file:list_dir(WP), - N = calendar:datetime_to_gregorian_seconds(calendar:local_time()), - lists:foreach(fun(DelJ) -> - LMD = filelib:last_modified(WP ++ DelJ), - case N - calendar:datetime_to_gregorian_seconds(LMD) of - LMD_Delta when LMD_Delta >= WRP -> - ok = file:delete(WP ++ DelJ), - leveled_log:log("IC010", [WP ++ DelJ]); - LMD_Delta -> - leveled_log:log("IC011", [WP ++ DelJ, - LMD_Delta]), - ok - end - end, - ClearedJournals). + case State#state.waste_path of + undefined -> + ok; + WP -> + WRP = State#state.waste_retention_period, + {ok, ClearedJournals} = file:list_dir(WP), + N = calendar:datetime_to_gregorian_seconds(calendar:local_time()), + DeleteJournalFun = + fun(DelJ) -> + LMD = filelib:last_modified(WP ++ DelJ), + case N - calendar:datetime_to_gregorian_seconds(LMD) of + LMD_Delta when LMD_Delta >= WRP -> + ok = file:delete(WP ++ DelJ), + leveled_log:log("IC010", [WP ++ DelJ]); + LMD_Delta -> + leveled_log:log("IC011", [WP ++ DelJ, LMD_Delta]), + ok + end + end, + lists:foreach(DeleteJournalFun, ClearedJournals) + end. %%%============================================================================ diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 92e644f..2fa154c 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -493,25 +493,21 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================ start_from_file(InkOpts) -> - RootPath = InkOpts#inker_options.root_path, - CDBopts = InkOpts#inker_options.cdb_options, + CDBopts = get_cdbopts(InkOpts), + + RootPath = InkOpts#inker_options.root_path, JournalFP = filepath(RootPath, journal_dir), filelib:ensure_dir(JournalFP), CompactFP = filepath(RootPath, journal_compact_dir), filelib:ensure_dir(CompactFP), - WasteFP = filepath(RootPath, journal_waste_dir), - filelib:ensure_dir(WasteFP), ManifestFP = filepath(RootPath, manifest_dir), ok = filelib:ensure_dir(ManifestFP), + IClerkCDBOpts = CDBopts#cdb_options{file_path = CompactFP}, - {ok, ManifestFilenames} = file:list_dir(ManifestFP), - - IClerkCDBOpts = CDBopts#cdb_options{file_path = CompactFP, - waste_path = WasteFP}, + WRP = InkOpts#inker_options.waste_retention_period, ReloadStrategy = InkOpts#inker_options.reload_strategy, MRL = InkOpts#inker_options.max_run_length, - WRP = InkOpts#inker_options.waste_retention_period, PressMethod = InkOpts#inker_options.compression_method, PressOnReceipt = InkOpts#inker_options.compress_on_receipt, IClerkOpts = #iclerk_options{inker = self(), @@ -520,8 +516,10 @@ start_from_file(InkOpts) -> reload_strategy = ReloadStrategy, compression_method = PressMethod, max_run_length = MRL}, + {ok, Clerk} = leveled_iclerk:clerk_new(IClerkOpts), + {ok, ManifestFilenames} = file:list_dir(ManifestFP), {Manifest, ManifestSQN, JournalSQN, @@ -533,11 +531,28 @@ start_from_file(InkOpts) -> journal_sqn = JournalSQN, active_journaldb = ActiveJournal, root_path = RootPath, - cdb_options = CDBopts#cdb_options{waste_path=WasteFP}, + cdb_options = CDBopts, compression_method = PressMethod, compress_on_receipt = PressOnReceipt, clerk = Clerk}}. +get_cdbopts(InkOpts)-> + CDBopts = InkOpts#inker_options.cdb_options, + WasteFP = + case InkOpts#inker_options.waste_retention_period of + undefined -> + % If the waste retention period is undefined, there will + % be no retention of waste. This is triggered by making + % the waste path undefined + undefined; + _WRP -> + WFP = filepath(InkOpts#inker_options.root_path, + journal_waste_dir), + filelib:ensure_dir(WFP), + WFP + end, + CDBopts#cdb_options{waste_path = WasteFP}. + put_object(LedgerKey, Object, KeyChanges, State) -> NewSQN = State#state.journal_sqn + 1, @@ -677,8 +692,8 @@ open_all_manifest(Man0, RootPath, CDBOpts) -> PFN = FN ++ "." ++ ?PENDING_FILEX, case filelib:is_file(CFN) of true -> - {ok, Pid} = leveled_cdb:cdb_reopen_reader(CFN, - LK_RO), + {ok, Pid} = + leveled_cdb:cdb_reopen_reader(CFN, LK_RO, CDBOpts), {LowSQN, FN, Pid, LK_RO}; false -> W = leveled_cdb:cdb_open_writer(PFN, CDBOpts), @@ -920,6 +935,7 @@ build_dummy_journal(KeyConvertF) -> clean_testdir(RootPath) -> clean_subdir(filepath(RootPath, journal_dir)), clean_subdir(filepath(RootPath, journal_compact_dir)), + clean_subdir(filepath(RootPath, journal_waste_dir)), clean_subdir(filepath(RootPath, manifest_dir)). clean_subdir(DirPath) -> @@ -976,19 +992,26 @@ simple_inker_completeactivejournal_test() -> test_ledgerkey(Key) -> {o, "Bucket", Key, null}. -compact_journal_test() -> - {timeout, 60, fun compact_journal_testto/0}. +compact_journal_wasteretained_test_() -> + {timeout, 60, fun() -> compact_journal_testto(300, true) end}. -compact_journal_testto() -> +compact_journal_wastediscarded_test_() -> + {timeout, 60, fun() -> compact_journal_testto(undefined, false) end}. + +compact_journal_testto(WRP, ExpectedFiles) -> RootPath = "../test/journal", - build_dummy_journal(fun test_ledgerkey/1), CDBopts = #cdb_options{max_size=300000}, RStrategy = [{?STD_TAG, recovr}], - {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, - cdb_options=CDBopts, - reload_strategy=RStrategy, - compression_method=native, - compress_on_receipt=false}), + InkOpts = #inker_options{root_path=RootPath, + cdb_options=CDBopts, + reload_strategy=RStrategy, + waste_retention_period=WRP, + compression_method=native, + compress_on_receipt=false}, + + build_dummy_journal(fun test_ledgerkey/1), + {ok, Ink1} = ink_start(InkOpts), + {ok, NewSQN1, _ObjSize} = ink_put(Ink1, test_ledgerkey("KeyAA"), "TestValueAA", @@ -1036,7 +1059,7 @@ compact_journal_testto() -> timer:sleep(1000), CompactedManifest2 = ink_getmanifest(Ink1), R = lists:foldl(fun({_SQN, FN, _P, _LK}, Acc) -> - case string:str(FN, "post_compact") of + case string:str(FN, ?COMPACT_FP) of N when N > 0 -> true; 0 -> @@ -1049,6 +1072,9 @@ compact_journal_testto() -> ink_close(Ink1), % Need to wait for delete_pending files to timeout timer:sleep(10000), + % Are there filese in the waste folder after compaction + {ok, WasteFNs} = file:list_dir(filepath(RootPath, journal_waste_dir)), + ?assertMatch(ExpectedFiles, length(WasteFNs) > 0), clean_testdir(RootPath). empty_manifest_test() -> diff --git a/src/leveled_log.erl b/src/leveled_log.erl index 8cb6c5e..7d27c2b 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -297,7 +297,7 @@ {"CDB04", {info, "Deletion confirmed for file ~s at ManifestSQN ~w"}}, {"CDB05", - {info, "Closing of filename ~s for reason ~w"}}, + {info, "Closing of filename ~s from state ~w for reason ~w"}}, {"CDB06", {info, "File to be truncated at last position of ~w with end of " ++ "file at ~w"}}, @@ -327,7 +327,11 @@ {info, "After ~w PUTs total write time is ~w total sync time is ~w " ++ "and max write time is ~w and max sync time is ~w"}}, {"CDB18", - {info, "Handled return and write of hashtable"}} + {info, "Handled return and write of hashtable"}}, + {"CDB19", + {info, "Transferring filename ~s to waste ~s"}}, + {"CDB20", + {info, "Deleting filename ~s as no waste retention period defined"}} ]). From 1d2effc773a1f51216ff8b0e8d2aae1ba2ef5cde Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 8 Nov 2017 15:09:23 +0000 Subject: [PATCH 4/5] Improve docs and specs Focus on leveled_cdb --- src/leveled_cdb.erl | 109 ++++++++++++++++++++++++++++-------------- src/leveled_inker.erl | 13 +++-- 2 files changed, 84 insertions(+), 38 deletions(-) diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index b92efd1..7a35b51 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -753,25 +753,8 @@ set_writeops(SyncStrategy) -> -endif. -%% from_dict(FileName,ListOfKeyValueTuples) -%% Given a filename and a dictionary, create a cdb -%% using the key value pairs from the dict. -from_dict(FileName,Dict) -> - KeyValueList = dict:to_list(Dict), - create(FileName, KeyValueList). - -%% -%% create(FileName,ListOfKeyValueTuples) -> ok -%% Given a filename and a list of {key,value} tuples, -%% this function creates a CDB -%% -create(FileName,KeyValueList) -> - {ok, Handle} = file:open(FileName, ?WRITE_OPS), - {ok, _} = file:position(Handle, {bof, ?BASE_POSITION}), - {BasePos, HashTree} = write_key_value_pairs(Handle, KeyValueList), - close_file(Handle, HashTree, BasePos). - - +-spec open_active_file(list()) -> {integer(), ets:tid(), any()}. +%% @doc %% Open an active file - one for which it is assumed the hash tables have not %% yet been written %% @@ -797,6 +780,11 @@ open_active_file(FileName) when is_list(FileName) -> end, {LastPosition, HashTree, LastKey}. +-spec put(list()|file:io_device(), + any(), any(), + {integer(), ets:tid()}, boolean(), integer()) + -> roll|{file:io_device(), integer(), ets:tid()}. +%% @doc %% put(Handle, Key, Value, {LastPosition, HashDict}) -> {NewPosition, KeyDict} %% Append to an active file a new key/value pair returning an updated %% dictionary of Keys and positions. Returns an updated Position @@ -822,6 +810,14 @@ put(Handle, Key, Value, {LastPosition, HashTree}, BinaryMode, MaxSize) -> put_hashtree(Key, LastPosition, HashTree)} end. + +-spec mput(file:io_device(), + list(tuple()), + {integer(), ets:tid()}, boolean(), integer()) + -> roll|{file:io_device(), integer(), ets:tid(), any()}. +%% @doc +%% Multiple puts - either all will succeed or it will return roll with non +%% succeeding. mput(Handle, KVList, {LastPosition, HashTree0}, BinaryMode, MaxSize) -> {KPList, Bin, LastKey} = multi_key_value_to_record(KVList, BinaryMode, @@ -840,18 +836,11 @@ mput(Handle, KVList, {LastPosition, HashTree0}, BinaryMode, MaxSize) -> {Handle, PotentialNewSize, HashTree1, LastKey} end. -%% Should not be used for non-test PUTs by the inker - as the Max File Size -%% should be taken from the startup options not the default -put(FileName, Key, Value, {LastPosition, HashTree}) -> - put(FileName, Key, Value, {LastPosition, HashTree}, - ?BINARY_MODE, ?MAX_FILE_SIZE). - -%% -%% get(FileName,Key) -> {key,value} -%% Given a filename and a key, returns a key and value tuple. -%% - +-spec get_withcache(file:io_device(), any(), tuple(), boolean()) -> tuple(). +%% @doc +%% Using a cache of the Index array - get a K/V pair from the file using the +%% Key get_withcache(Handle, Key, Cache, BinaryMode) -> get(Handle, Key, Cache, true, BinaryMode). @@ -861,6 +850,16 @@ get_withcache(Handle, Key, Cache, QuickCheck, BinaryMode) -> get(FileNameOrHandle, Key, BinaryMode) -> get(FileNameOrHandle, Key, no_cache, true, BinaryMode). + +-spec get(list()|file:io_device(), + any(), no_cache|tuple(), + loose_presence|any(), boolean()) + -> tuple()|probably|missing. +%% @doc +%% Get a K/V pair from the file using the Key. QuickCheck can be set to +%% loose_presence if all is required is a loose check of presence (that the +%% Key is probably present as there is a hash in the hash table which matches +%% that Key) get(FileName, Key, Cache, QuickCheck, BinaryMode) when is_list(FileName) -> {ok, Handle} = file:open(FileName,[binary, raw, read]), get(Handle, Key, Cache, QuickCheck, BinaryMode); @@ -896,11 +895,12 @@ get_index(Handle, Index, no_cache) -> % Get location of hashtable and number of entries in the hash read_next_2_integers(Handle); get_index(_Handle, Index, Cache) -> - element(Index + 1, Cache). + element(Index + 1, Cache). +-spec get_mem(any(), list()|file:io_device(), ets:tid(), boolean()) -> + tuple()|probably|missing. +%% @doc %% Get a Key/Value pair from an active CDB file (with no hash table written) -%% This requires a key dictionary to be passed in (mapping keys to positions) -%% Will return {Key, Value} or missing get_mem(Key, FNOrHandle, HashTree, BinaryMode) -> get_mem(Key, FNOrHandle, HashTree, BinaryMode, true). @@ -915,11 +915,18 @@ get_mem(Key, Handle, HashTree, BinaryMode, QuickCheck) -> {loose_presence, _L} -> probably; _ -> - extract_kvpair(Handle, ListToCheck, Key, BinaryMode) + extract_kvpair(Handle, ListToCheck, Key, BinaryMode) end. +-spec get_nextkey(list()|file:io_device()) -> + nomorekeys| + {any(), nomorekeys}| + {any(), file:io_device(), {integer(), integer()}}. +%% @doc %% Get the next key at a position in the file (or the first key if no position -%% is passed). Will return both a key and the next position +%% is passed). Will return both a key and the next position, or nomorekeys if +%% the end has been reached (either in place of the result if there are no +%% more keys, or in place of the position if the returned key is the last key) get_nextkey(Filename) when is_list(Filename) -> {ok, Handle} = file:open(Filename, [binary, raw, read]), get_nextkey(Handle); @@ -944,6 +951,10 @@ get_nextkey(Handle, {Position, FirstHashPosition}) -> nomorekeys end. +-spec hashtable_calc(ets:tid(), integer()) -> {list(), binary()}. +%% @doc +%% Create a binary representation of the hash table to be written to the end +%% of the file hashtable_calc(HashTree, StartPos) -> Seq = lists:seq(0, 255), SWC = os:timestamp(), @@ -1599,6 +1610,34 @@ write_hash_tables([Index|Rest], HashTree, CurrPos, BasePos, %% of {key,value} tuples from the CDB. %% + +%% from_dict(FileName,ListOfKeyValueTuples) +%% Given a filename and a dictionary, create a cdb +%% using the key value pairs from the dict. +from_dict(FileName,Dict) -> + KeyValueList = dict:to_list(Dict), + create(FileName, KeyValueList). + + +%% +%% create(FileName,ListOfKeyValueTuples) -> ok +%% Given a filename and a list of {key,value} tuples, +%% this function creates a CDB +%% +create(FileName,KeyValueList) -> + {ok, Handle} = file:open(FileName, ?WRITE_OPS), + {ok, _} = file:position(Handle, {bof, ?BASE_POSITION}), + {BasePos, HashTree} = write_key_value_pairs(Handle, KeyValueList), + close_file(Handle, HashTree, BasePos). + + +%% Should not be used for non-test PUTs by the inker - as the Max File Size +%% should be taken from the startup options not the default +put(FileName, Key, Value, {LastPosition, HashTree}) -> + put(FileName, Key, Value, {LastPosition, HashTree}, + ?BINARY_MODE, ?MAX_FILE_SIZE). + + dump(FileName) -> {ok, Handle} = file:open(FileName, [binary, raw, read]), Fn = fun(Index, Acc) -> diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 2fa154c..06ff861 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -493,9 +493,12 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================ start_from_file(InkOpts) -> - + % Setting the correct CDB options is important when starting the inker, in + % particular for waste retention which is determined by the CDB options + % with which the file was last opened CDBopts = get_cdbopts(InkOpts), + % Determine filepaths RootPath = InkOpts#inker_options.root_path, JournalFP = filepath(RootPath, journal_dir), filelib:ensure_dir(JournalFP), @@ -503,6 +506,8 @@ start_from_file(InkOpts) -> filelib:ensure_dir(CompactFP), ManifestFP = filepath(RootPath, manifest_dir), ok = filelib:ensure_dir(ManifestFP), + % The IClerk must start files with the compaction file path so that they + % will be stored correctly in this folder IClerkCDBOpts = CDBopts#cdb_options{file_path = CompactFP}, WRP = InkOpts#inker_options.waste_retention_period, @@ -519,6 +524,8 @@ start_from_file(InkOpts) -> {ok, Clerk} = leveled_iclerk:clerk_new(IClerkOpts), + % The building of the manifest will load all the CDB files, starting a + % new leveled_cdb process for each file {ok, ManifestFilenames} = file:list_dir(ManifestFP), {Manifest, ManifestSQN, @@ -1071,8 +1078,8 @@ compact_journal_testto(WRP, ExpectedFiles) -> ?assertMatch(2, length(CompactedManifest2)), ink_close(Ink1), % Need to wait for delete_pending files to timeout - timer:sleep(10000), - % Are there filese in the waste folder after compaction + timer:sleep(12000), + % Are there files in the waste folder after compaction {ok, WasteFNs} = file:list_dir(filepath(RootPath, journal_waste_dir)), ?assertMatch(ExpectedFiles, length(WasteFNs) > 0), clean_testdir(RootPath). From 7de4dccbd98bdddd6ac37ca2baa44db7cd2c3b3e Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 8 Nov 2017 16:18:48 +0000 Subject: [PATCH 5/5] Extend journal compaction test to cover with and without waste retention. Also makes sure that CDB files in a restarted store will respect the wast retention period set. --- src/leveled_bookie.erl | 25 ++++++++++--- test/end_to_end/basic_SUITE.erl | 66 +++++++++++++++++++++++---------- 2 files changed, 66 insertions(+), 25 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 43205c0..e711009 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -105,6 +105,7 @@ -type book_state() :: #state{}. -type sync_mode() :: sync|none|riak_sync. +-type ledger_cache() :: #ledger_cache{}. %%%============================================================================ %%% API @@ -606,11 +607,15 @@ code_change(_OldVsn, State, _Extra) -> %%% External functions %%%============================================================================ -%% @doc Empty the ledger cache table following a push +-spec empty_ledgercache() -> ledger_cache(). +%% @doc +%% Empty the ledger cache table following a push empty_ledgercache() -> #ledger_cache{mem = ets:new(empty, [ordered_set])}. -%% @doc push the ledgercache to the Penciller - which should respond ok or +-spec push_ledgercache(pid(), ledger_cache()) -> ok|returned. +%% @doc +%% Push the ledgercache to the Penciller - which should respond ok or %% returned. If the response is ok the cache can be flushed, but if the %% response is returned the cache should continue to build and it should try %% to flush at a later date @@ -621,8 +626,10 @@ push_ledgercache(Penciller, Cache) -> Cache#ledger_cache.max_sqn}, leveled_penciller:pcl_pushmem(Penciller, CacheToLoad). -%% @doc the ledger cache can be built from a queue, for example when -%% loading the ledger from the head of the journal on startup +-spec loadqueue_ledgercache(ledger_cache()) -> ledger_cache(). +%% @doc +%% The ledger cache can be built from a queue, for example when loading the +%% ledger from the head of the journal on startup %% %% The queue should be build using [NewKey|Acc] so that the most recent %% key is kept in the sort @@ -631,7 +638,12 @@ loadqueue_ledgercache(Cache) -> T = leveled_tree:from_orderedlist(SL, ?CACHE_TYPE), Cache#ledger_cache{load_queue = [], loader = T}. -%% @doc Allow all a snapshot to be created from part of the store, preferably +-spec snapshot_store(ledger_cache(), + pid(), null|pid(), store|ledger, + undefined|tuple(), undefined|boolean()) -> + {ok, pid(), pid()|null}. +%% @doc +%% Allow all a snapshot to be created from part of the store, preferably %% passing in a query filter so that all of the LoopState does not need to %% be copied from the real actor to the clone %% @@ -688,6 +700,9 @@ snapshot_store(State, SnapType, Query, LongRunning) -> Query, LongRunning). +-spec fetch_value(pid(), {any(), integer()}) -> not_present|any(). +%% @doc +%% Fetch a value from the Journal fetch_value(Inker, {Key, SQN}) -> SW = os:timestamp(), case leveled_inker:ink_fetch(Inker, Key, SQN) of diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index ec59e54..e58b433 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -112,21 +112,27 @@ many_put_fetch_head(_Config) -> ok = leveled_bookie:book_destroy(Bookie3). journal_compaction(_Config) -> + journal_compaction_tester(false, 3600), + journal_compaction_tester(false, undefined), + journal_compaction_tester(true, 3600). + +journal_compaction_tester(Restart, WRP) -> RootPath = testutil:reset_filestructure(), StartOpts1 = [{root_path, RootPath}, {max_journalsize, 10000000}, {max_run_length, 1}, - {sync_strategy, testutil:sync_strategy()}], - {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), - ok = leveled_bookie:book_compactjournal(Bookie1, 30000), + {sync_strategy, testutil:sync_strategy()}, + {waste_retention_period, WRP}], + {ok, Bookie0} = leveled_bookie:book_start(StartOpts1), + ok = leveled_bookie:book_compactjournal(Bookie0, 30000), {TestObject, TestSpec} = testutil:generate_testobject(), - ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), - testutil:check_forobject(Bookie1, TestObject), + ok = testutil:book_riakput(Bookie0, TestObject, TestSpec), + testutil:check_forobject(Bookie0, TestObject), ObjList1 = testutil:generate_objects(20000, 2), - testutil:riakload(Bookie1, ObjList1), + testutil:riakload(Bookie0, ObjList1), ChkList1 = lists:sublist(lists:sort(ObjList1), 10000), - testutil:check_forlist(Bookie1, ChkList1), - testutil:check_forobject(Bookie1, TestObject), + testutil:check_forlist(Bookie0, ChkList1), + testutil:check_forobject(Bookie0, TestObject), {B2, K2, V2, Spec2, MD} = {"Bucket2", "Key2", "Value2", @@ -134,18 +140,18 @@ journal_compaction(_Config) -> [{"MDK2", "MDV2"}]}, {TestObject2, TestSpec2} = testutil:generate_testobject(B2, K2, V2, Spec2, MD), - ok = testutil:book_riakput(Bookie1, TestObject2, TestSpec2), - ok = leveled_bookie:book_compactjournal(Bookie1, 30000), - testutil:check_forlist(Bookie1, ChkList1), - testutil:check_forobject(Bookie1, TestObject), - testutil:check_forobject(Bookie1, TestObject2), - testutil:check_forlist(Bookie1, ChkList1), - testutil:check_forobject(Bookie1, TestObject), - testutil:check_forobject(Bookie1, TestObject2), + ok = testutil:book_riakput(Bookie0, TestObject2, TestSpec2), + ok = leveled_bookie:book_compactjournal(Bookie0, 30000), + testutil:check_forlist(Bookie0, ChkList1), + testutil:check_forobject(Bookie0, TestObject), + testutil:check_forobject(Bookie0, TestObject2), + testutil:check_forlist(Bookie0, ChkList1), + testutil:check_forobject(Bookie0, TestObject), + testutil:check_forobject(Bookie0, TestObject2), %% Delete some of the objects ObjListD = testutil:generate_objects(10000, 2), lists:foreach(fun({_R, O, _S}) -> - testutil:book_riakdelete(Bookie1, + testutil:book_riakdelete(Bookie0, O#r_object.bucket, O#r_object.key, []) @@ -154,7 +160,17 @@ journal_compaction(_Config) -> %% Now replace all the other objects ObjList2 = testutil:generate_objects(40000, 10002), - testutil:riakload(Bookie1, ObjList2), + testutil:riakload(Bookie0, ObjList2), + + Bookie1 = + case Restart of + true -> + ok = leveled_bookie:book_close(Bookie0), + {ok, RestartedB} = leveled_bookie:book_start(StartOpts1), + RestartedB; + false -> + Bookie0 + end, ok = leveled_bookie:book_compactjournal(Bookie1, 30000), @@ -184,7 +200,12 @@ journal_compaction(_Config) -> [2000,2000,2000,2000,2000,2000]), {ok, ClearedJournals} = file:list_dir(WasteFP), io:format("~w ClearedJournals found~n", [length(ClearedJournals)]), - true = length(ClearedJournals) > 0, + case is_integer(WRP) of + true -> + true = length(ClearedJournals) > 0; + false -> + true = length(ClearedJournals) == 0 + end, ChkList3 = lists:sublist(lists:sort(ObjList2), 500), testutil:check_forlist(Bookie1, ChkList3), @@ -212,7 +233,12 @@ journal_compaction(_Config) -> {ok, ClearedJournalsPC} = file:list_dir(WasteFP), io:format("~w ClearedJournals found~n", [length(ClearedJournalsPC)]), - true = length(ClearedJournalsPC) == 0, + case is_integer(WRP) of + true -> + true = length(ClearedJournals) > 0; + false -> + true = length(ClearedJournals) == 0 + end, testutil:reset_filestructure(10000).