From ee5dc43cfd45e9d2ab2b88b24ff9c517363d4e94 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 1 May 2018 12:03:24 +0100 Subject: [PATCH] Add doc/spec to bookie Improve the comments and dialyzer hints within the bookie. --- src/leveled_bookie.erl | 228 ++++++++++++++++++++++++++++------------- 1 file changed, 158 insertions(+), 70 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 86d7fe8..10114ab 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -868,6 +868,98 @@ fetch_value(Inker, {Key, SQN}) -> %%% Internal functions %%%============================================================================ +-spec startup(#inker_options{}, #penciller_options{}, book_state()) + -> {pid(), pid()}. +%% @doc +%% Startup the Inker and the Penciller, and prompt the loading of the Penciller +%% from the Inker. The Penciller may be shutdown without the latest data +%% having been persisted: and so the Iker must be able to update the Penciller +%% on startup with anything that happened but wasn't flushed to disk. +startup(InkerOpts, PencillerOpts, State) -> + {ok, Inker} = leveled_inker:ink_start(InkerOpts), + {ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts), + LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller), + leveled_log:log("B0005", [LedgerSQN]), + ok = leveled_inker:ink_loadpcl(Inker, + LedgerSQN + 1, + get_loadfun(State), + Penciller), + {Inker, Penciller}. + + +-spec set_options(list()) -> {#inker_options{}, #penciller_options{}}. +%% @doc +%% Take the passed in property list of operations and extract out any relevant +%% options to the Inker or the Penciller +set_options(Opts) -> + MaxJournalSize0 = get_opt(max_journalsize, Opts, 10000000000), + JournalSizeJitter = MaxJournalSize0 div (100 div ?JOURNAL_SIZE_JITTER), + MaxJournalSize = MaxJournalSize0 - + erlang:phash2(self()) rem JournalSizeJitter, + + SyncStrat = get_opt(sync_strategy, Opts, sync), + WRP = get_opt(waste_retention_period, Opts), + + AltStrategy = get_opt(reload_strategy, Opts, []), + ReloadStrategy = leveled_codec:inker_reload_strategy(AltStrategy), + + PCLL0CacheSize = get_opt(max_pencillercachesize, Opts), + RootPath = get_opt(root_path, Opts), + + JournalFP = RootPath ++ "/" ++ ?JOURNAL_FP, + LedgerFP = RootPath ++ "/" ++ ?LEDGER_FP, + ok = filelib:ensure_dir(JournalFP), + ok = filelib:ensure_dir(LedgerFP), + + CompressionMethod = + case get_opt(compression_method, Opts, ?COMPRESSION_METHOD) of + native -> + % Note native compression will have reduced performance + % https://github.com/martinsumner/leveled/issues/95 + native; + lz4 -> + % Must include lz4 library in rebar.config + lz4 + end, + CompressOnReceipt = + case get_opt(compression_point, Opts, ?COMPRESSION_POINT) of + on_receipt -> + % Note this will add measurable delay to PUT time + % https://github.com/martinsumner/leveled/issues/95 + true; + on_compact -> + % If using lz4 this is not recommended + false + end, + + {#inker_options{root_path = JournalFP, + reload_strategy = ReloadStrategy, + max_run_length = get_opt(max_run_length, Opts), + waste_retention_period = WRP, + compression_method = CompressionMethod, + compress_on_receipt = CompressOnReceipt, + cdb_options = + #cdb_options{max_size=MaxJournalSize, + binary_mode=true, + sync_strategy=SyncStrat}}, + #penciller_options{root_path = LedgerFP, + max_inmemory_tablesize = PCLL0CacheSize, + levelzero_cointoss = true, + compression_method = CompressionMethod}}. + + +-spec return_snapfun(book_state(), store|ledger, + tuple()|no_lookup|undefined, + boolean(), boolean()) -> fun(). +%% @doc +%% Generates a function from which a snapshot can be created. The primary +%% factor here is the SnapPreFold boolean. If this is true then the snapshot +%% will be taken before the Fold function is returned. If SnapPreFold is +%% false then the snapshot will be taken when the Fold function is called. +%% +%% SnapPrefold is to be used when the intention is to queue the fold, and so +%% claling of the fold may be delayed, but it is still desired that the fold +%% represent the point in time that the query was requested. return_snapfun(State, SnapType, Query, LongRunning, SnapPreFold) -> case SnapPreFold of true -> @@ -883,6 +975,13 @@ return_snapfun(State, SnapType, Query, LongRunning, SnapPreFold) -> fun() -> book_snapshot(Self, SnapType, Query, LongRunning) end end. +-spec snaptype_by_presence(boolean()) -> store|ledger. +%% @doc +%% Folds that traverse over object heads, may also either require to return +%% the object,or at least confirm th eobject is present in the Ledger. This +%% is achieved by enabling presence - and this will change the type of +%% snapshot to one that covers the whole store (i.e. both ledger and journal), +%% rather than just the ledger. snaptype_by_presence(true) -> store; snaptype_by_presence(false) -> @@ -918,7 +1017,6 @@ get_runner(State, {keylist, Tag, Bucket, FoldAccT}) -> get_runner(State, {keylist, Tag, Bucket, KeyRange, FoldAccT}) -> SnapFun = return_snapfun(State, ledger, no_lookup, true, true), leveled_runner:bucketkey_query(SnapFun, Tag, Bucket, KeyRange, FoldAccT); - %% Set of runners for object or metadata folds get_runner(State, {foldheads_allkeys, @@ -1078,6 +1176,11 @@ return_ledger_keyrange(Tag, Bucket, KeyRange) -> {StartKey, EndKey, SnapQuery}. +-spec maybe_longrunning(erlang:timestamp(), atom()) -> ok. +%% @doc +%% Check the length of time an operation (named by Aspect) has taken, and +%% see if it has crossed the long running threshold. If so log to indicate +%% a long running event has occurred. maybe_longrunning(SW, Aspect) -> case timer:now_diff(os:timestamp(), SW) of N when N > ?LONG_RUNNING -> @@ -1086,6 +1189,12 @@ maybe_longrunning(SW, Aspect) -> ok end. +-spec readycache_forsnapshot(ledger_cache(), tuple()|no_lookup|undefined) + -> ledger_cache(). +%% @doc +%% Strip the ledger cach back to only the relevant informaiton needed in +%% the query, and to make the cache a snapshot (and so not subject to changes +%% such as additions to the ets table) readycache_forsnapshot(LedgerCache, {StartKey, EndKey}) -> {KL, MinSQN, MaxSQN} = scan_table(LedgerCache#ledger_cache.mem, StartKey, @@ -1127,6 +1236,13 @@ readycache_forsnapshot(LedgerCache, Query) -> max_sqn=LedgerCache#ledger_cache.max_sqn} end. +-spec scan_table(ets:tab(), tuple(), tuple()) -> + {list(), non_neg_integer()|infinity, non_neg_integer()}. +%% @doc +%% Query the ETS table to find a range of keys (start inclusive). Should also +%% return the miniumum and maximum sequence number found in the query. This +%% is just then used as a safety check when loading these results into the +%% penciller snapshot scan_table(Table, StartKey, EndKey) -> case ets:lookup(Table, StartKey) of [] -> @@ -1158,74 +1274,11 @@ scan_table(Table, StartKey, EndKey, Acc, MinSQN, MaxSQN) -> end. -set_options(Opts) -> - MaxJournalSize0 = get_opt(max_journalsize, Opts, 10000000000), - JournalSizeJitter = MaxJournalSize0 div (100 div ?JOURNAL_SIZE_JITTER), - MaxJournalSize = MaxJournalSize0 - - erlang:phash2(self()) rem JournalSizeJitter, - - SyncStrat = get_opt(sync_strategy, Opts, sync), - WRP = get_opt(waste_retention_period, Opts), - - AltStrategy = get_opt(reload_strategy, Opts, []), - ReloadStrategy = leveled_codec:inker_reload_strategy(AltStrategy), - - PCLL0CacheSize = get_opt(max_pencillercachesize, Opts), - RootPath = get_opt(root_path, Opts), - - JournalFP = RootPath ++ "/" ++ ?JOURNAL_FP, - LedgerFP = RootPath ++ "/" ++ ?LEDGER_FP, - ok = filelib:ensure_dir(JournalFP), - ok = filelib:ensure_dir(LedgerFP), - - CompressionMethod = - case get_opt(compression_method, Opts, ?COMPRESSION_METHOD) of - native -> - % Note native compression will have reduced performance - % https://github.com/martinsumner/leveled/issues/95 - native; - lz4 -> - % Must include lz4 library in rebar.config - lz4 - end, - CompressOnReceipt = - case get_opt(compression_point, Opts, ?COMPRESSION_POINT) of - on_receipt -> - % Note this will add measurable delay to PUT time - % https://github.com/martinsumner/leveled/issues/95 - true; - on_compact -> - % If using lz4 this is not recommended - false - end, - - {#inker_options{root_path = JournalFP, - reload_strategy = ReloadStrategy, - max_run_length = get_opt(max_run_length, Opts), - waste_retention_period = WRP, - compression_method = CompressionMethod, - compress_on_receipt = CompressOnReceipt, - cdb_options = - #cdb_options{max_size=MaxJournalSize, - binary_mode=true, - sync_strategy=SyncStrat}}, - #penciller_options{root_path = LedgerFP, - max_inmemory_tablesize = PCLL0CacheSize, - levelzero_cointoss = true, - compression_method = CompressionMethod}}. - -startup(InkerOpts, PencillerOpts, State) -> - {ok, Inker} = leveled_inker:ink_start(InkerOpts), - {ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts), - LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller), - leveled_log:log("B0005", [LedgerSQN]), - ok = leveled_inker:ink_loadpcl(Inker, - LedgerSQN + 1, - get_loadfun(State), - Penciller), - {Inker, Penciller}. - - +-spec fetch_head(tuple(), pid(), ledger_cache()) -> not_present|tuple(). +%% @doc +%% Fetch only the head of the object from the Ledger (or the bookie's recent +%% ledger cache if it has just been updated). not_present is returned if the +%% Key is not found fetch_head(Key, Penciller, LedgerCache) -> SW = os:timestamp(), CacheResult = @@ -1282,6 +1335,13 @@ preparefor_ledgercache(_InkTag, {KeyH, SQN, KeyChanges}. +-spec addto_ledgercache({integer()|no_lookup, + integer(), list()}, ledger_cache()) + -> ledger_cache(). +%% @doc +%% Add a set of changes associated with a single sequence number (journal +%% update) and key to the ledger cache. If the changes are not to be looked +%% up directly, then they will not be indexed to accelerate lookup addto_ledgercache({H, SQN, KeyChanges}, Cache) -> ets:insert(Cache#ledger_cache.mem, KeyChanges), UpdIndex = leveled_pmem:prepare_for_index(Cache#ledger_cache.index, H), @@ -1289,6 +1349,15 @@ addto_ledgercache({H, SQN, KeyChanges}, Cache) -> min_sqn=min(SQN, Cache#ledger_cache.min_sqn), max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}. +-spec addto_ledgercache({integer()|no_lookup, + integer(), list()}, ledger_cache(), loader) + -> ledger_cache(). +%% @doc +%% Add a set of changes associated witha single sequence number (journal +%% update) to the ledger cache. This is used explicitly when laoding the +%% ledger from the Journal (i.e. at startup) - and in this case the ETS insert +%% can be bypassed, as all changes will be flushed to the Penciller before the +%% load is complete. addto_ledgercache({H, SQN, KeyChanges}, Cache, loader) -> UpdQ = KeyChanges ++ Cache#ledger_cache.load_queue, UpdIndex = leveled_pmem:prepare_for_index(Cache#ledger_cache.index, H), @@ -1298,6 +1367,18 @@ addto_ledgercache({H, SQN, KeyChanges}, Cache, loader) -> max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}. +-spec maybepush_ledgercache(integer(), ledger_cache(), pid()) + -> {ok|returned, ledger_cache()}. +%% @doc +%% Following an update to the ledger cache, check if this now big enough to be +%% pushed down to the Penciller. There is some random jittering here, to +%% prevent coordination across leveled instances (e.g. when running in Riak). +%% +%% The penciller may be too busy, as the LSM tree is backed up with merge +%% activity. In this case the update is not made and 'returned' not ok is set +%% in the reply. Try again later when it isn't busy (and also potentially +%% implement a slow_offer state to slow down the pace at which PUTs are being +%% received) maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> Tab = Cache#ledger_cache.mem, CacheSize = ets:info(Tab, size), @@ -1321,7 +1402,10 @@ maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> {ok, Cache} end. - +-spec maybe_withjitter(integer(), integer()) -> boolean(). +%% @doc +%% Push down randomly, but the closer to the maximum size, the more likely a +%% push should be maybe_withjitter(CacheSize, MaxCacheSize) -> if CacheSize > MaxCacheSize -> @@ -1337,6 +1421,10 @@ maybe_withjitter(CacheSize, MaxCacheSize) -> end. +-spec get_loadfun(book_state()) -> fun(). +%% @doc +%% The LoadFun will be sued by the Inker when walking across the Journal to +%% load the Penciller at startup get_loadfun(State) -> PrepareFun = fun(Tag, PK, SQN, Obj, VS, IdxSpecs) ->