Add doc/spec to bookie
Improve the comments and dialyzer hints within the bookie.
This commit is contained in:
parent
8123e49c7a
commit
ee5dc43cfd
1 changed files with 158 additions and 70 deletions
|
@ -868,6 +868,98 @@ fetch_value(Inker, {Key, SQN}) ->
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
|
-spec startup(#inker_options{}, #penciller_options{}, book_state())
|
||||||
|
-> {pid(), pid()}.
|
||||||
|
%% @doc
|
||||||
|
%% Startup the Inker and the Penciller, and prompt the loading of the Penciller
|
||||||
|
%% from the Inker. The Penciller may be shutdown without the latest data
|
||||||
|
%% having been persisted: and so the Iker must be able to update the Penciller
|
||||||
|
%% on startup with anything that happened but wasn't flushed to disk.
|
||||||
|
startup(InkerOpts, PencillerOpts, State) ->
|
||||||
|
{ok, Inker} = leveled_inker:ink_start(InkerOpts),
|
||||||
|
{ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts),
|
||||||
|
LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller),
|
||||||
|
leveled_log:log("B0005", [LedgerSQN]),
|
||||||
|
ok = leveled_inker:ink_loadpcl(Inker,
|
||||||
|
LedgerSQN + 1,
|
||||||
|
get_loadfun(State),
|
||||||
|
Penciller),
|
||||||
|
{Inker, Penciller}.
|
||||||
|
|
||||||
|
|
||||||
|
-spec set_options(list()) -> {#inker_options{}, #penciller_options{}}.
|
||||||
|
%% @doc
|
||||||
|
%% Take the passed in property list of operations and extract out any relevant
|
||||||
|
%% options to the Inker or the Penciller
|
||||||
|
set_options(Opts) ->
|
||||||
|
MaxJournalSize0 = get_opt(max_journalsize, Opts, 10000000000),
|
||||||
|
JournalSizeJitter = MaxJournalSize0 div (100 div ?JOURNAL_SIZE_JITTER),
|
||||||
|
MaxJournalSize = MaxJournalSize0 -
|
||||||
|
erlang:phash2(self()) rem JournalSizeJitter,
|
||||||
|
|
||||||
|
SyncStrat = get_opt(sync_strategy, Opts, sync),
|
||||||
|
WRP = get_opt(waste_retention_period, Opts),
|
||||||
|
|
||||||
|
AltStrategy = get_opt(reload_strategy, Opts, []),
|
||||||
|
ReloadStrategy = leveled_codec:inker_reload_strategy(AltStrategy),
|
||||||
|
|
||||||
|
PCLL0CacheSize = get_opt(max_pencillercachesize, Opts),
|
||||||
|
RootPath = get_opt(root_path, Opts),
|
||||||
|
|
||||||
|
JournalFP = RootPath ++ "/" ++ ?JOURNAL_FP,
|
||||||
|
LedgerFP = RootPath ++ "/" ++ ?LEDGER_FP,
|
||||||
|
ok = filelib:ensure_dir(JournalFP),
|
||||||
|
ok = filelib:ensure_dir(LedgerFP),
|
||||||
|
|
||||||
|
CompressionMethod =
|
||||||
|
case get_opt(compression_method, Opts, ?COMPRESSION_METHOD) of
|
||||||
|
native ->
|
||||||
|
% Note native compression will have reduced performance
|
||||||
|
% https://github.com/martinsumner/leveled/issues/95
|
||||||
|
native;
|
||||||
|
lz4 ->
|
||||||
|
% Must include lz4 library in rebar.config
|
||||||
|
lz4
|
||||||
|
end,
|
||||||
|
CompressOnReceipt =
|
||||||
|
case get_opt(compression_point, Opts, ?COMPRESSION_POINT) of
|
||||||
|
on_receipt ->
|
||||||
|
% Note this will add measurable delay to PUT time
|
||||||
|
% https://github.com/martinsumner/leveled/issues/95
|
||||||
|
true;
|
||||||
|
on_compact ->
|
||||||
|
% If using lz4 this is not recommended
|
||||||
|
false
|
||||||
|
end,
|
||||||
|
|
||||||
|
{#inker_options{root_path = JournalFP,
|
||||||
|
reload_strategy = ReloadStrategy,
|
||||||
|
max_run_length = get_opt(max_run_length, Opts),
|
||||||
|
waste_retention_period = WRP,
|
||||||
|
compression_method = CompressionMethod,
|
||||||
|
compress_on_receipt = CompressOnReceipt,
|
||||||
|
cdb_options =
|
||||||
|
#cdb_options{max_size=MaxJournalSize,
|
||||||
|
binary_mode=true,
|
||||||
|
sync_strategy=SyncStrat}},
|
||||||
|
#penciller_options{root_path = LedgerFP,
|
||||||
|
max_inmemory_tablesize = PCLL0CacheSize,
|
||||||
|
levelzero_cointoss = true,
|
||||||
|
compression_method = CompressionMethod}}.
|
||||||
|
|
||||||
|
|
||||||
|
-spec return_snapfun(book_state(), store|ledger,
|
||||||
|
tuple()|no_lookup|undefined,
|
||||||
|
boolean(), boolean()) -> fun().
|
||||||
|
%% @doc
|
||||||
|
%% Generates a function from which a snapshot can be created. The primary
|
||||||
|
%% factor here is the SnapPreFold boolean. If this is true then the snapshot
|
||||||
|
%% will be taken before the Fold function is returned. If SnapPreFold is
|
||||||
|
%% false then the snapshot will be taken when the Fold function is called.
|
||||||
|
%%
|
||||||
|
%% SnapPrefold is to be used when the intention is to queue the fold, and so
|
||||||
|
%% claling of the fold may be delayed, but it is still desired that the fold
|
||||||
|
%% represent the point in time that the query was requested.
|
||||||
return_snapfun(State, SnapType, Query, LongRunning, SnapPreFold) ->
|
return_snapfun(State, SnapType, Query, LongRunning, SnapPreFold) ->
|
||||||
case SnapPreFold of
|
case SnapPreFold of
|
||||||
true ->
|
true ->
|
||||||
|
@ -883,6 +975,13 @@ return_snapfun(State, SnapType, Query, LongRunning, SnapPreFold) ->
|
||||||
fun() -> book_snapshot(Self, SnapType, Query, LongRunning) end
|
fun() -> book_snapshot(Self, SnapType, Query, LongRunning) end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec snaptype_by_presence(boolean()) -> store|ledger.
|
||||||
|
%% @doc
|
||||||
|
%% Folds that traverse over object heads, may also either require to return
|
||||||
|
%% the object,or at least confirm th eobject is present in the Ledger. This
|
||||||
|
%% is achieved by enabling presence - and this will change the type of
|
||||||
|
%% snapshot to one that covers the whole store (i.e. both ledger and journal),
|
||||||
|
%% rather than just the ledger.
|
||||||
snaptype_by_presence(true) ->
|
snaptype_by_presence(true) ->
|
||||||
store;
|
store;
|
||||||
snaptype_by_presence(false) ->
|
snaptype_by_presence(false) ->
|
||||||
|
@ -918,7 +1017,6 @@ get_runner(State, {keylist, Tag, Bucket, FoldAccT}) ->
|
||||||
get_runner(State, {keylist, Tag, Bucket, KeyRange, FoldAccT}) ->
|
get_runner(State, {keylist, Tag, Bucket, KeyRange, FoldAccT}) ->
|
||||||
SnapFun = return_snapfun(State, ledger, no_lookup, true, true),
|
SnapFun = return_snapfun(State, ledger, no_lookup, true, true),
|
||||||
leveled_runner:bucketkey_query(SnapFun, Tag, Bucket, KeyRange, FoldAccT);
|
leveled_runner:bucketkey_query(SnapFun, Tag, Bucket, KeyRange, FoldAccT);
|
||||||
|
|
||||||
%% Set of runners for object or metadata folds
|
%% Set of runners for object or metadata folds
|
||||||
get_runner(State,
|
get_runner(State,
|
||||||
{foldheads_allkeys,
|
{foldheads_allkeys,
|
||||||
|
@ -1078,6 +1176,11 @@ return_ledger_keyrange(Tag, Bucket, KeyRange) ->
|
||||||
{StartKey, EndKey, SnapQuery}.
|
{StartKey, EndKey, SnapQuery}.
|
||||||
|
|
||||||
|
|
||||||
|
-spec maybe_longrunning(erlang:timestamp(), atom()) -> ok.
|
||||||
|
%% @doc
|
||||||
|
%% Check the length of time an operation (named by Aspect) has taken, and
|
||||||
|
%% see if it has crossed the long running threshold. If so log to indicate
|
||||||
|
%% a long running event has occurred.
|
||||||
maybe_longrunning(SW, Aspect) ->
|
maybe_longrunning(SW, Aspect) ->
|
||||||
case timer:now_diff(os:timestamp(), SW) of
|
case timer:now_diff(os:timestamp(), SW) of
|
||||||
N when N > ?LONG_RUNNING ->
|
N when N > ?LONG_RUNNING ->
|
||||||
|
@ -1086,6 +1189,12 @@ maybe_longrunning(SW, Aspect) ->
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec readycache_forsnapshot(ledger_cache(), tuple()|no_lookup|undefined)
|
||||||
|
-> ledger_cache().
|
||||||
|
%% @doc
|
||||||
|
%% Strip the ledger cach back to only the relevant informaiton needed in
|
||||||
|
%% the query, and to make the cache a snapshot (and so not subject to changes
|
||||||
|
%% such as additions to the ets table)
|
||||||
readycache_forsnapshot(LedgerCache, {StartKey, EndKey}) ->
|
readycache_forsnapshot(LedgerCache, {StartKey, EndKey}) ->
|
||||||
{KL, MinSQN, MaxSQN} = scan_table(LedgerCache#ledger_cache.mem,
|
{KL, MinSQN, MaxSQN} = scan_table(LedgerCache#ledger_cache.mem,
|
||||||
StartKey,
|
StartKey,
|
||||||
|
@ -1127,6 +1236,13 @@ readycache_forsnapshot(LedgerCache, Query) ->
|
||||||
max_sqn=LedgerCache#ledger_cache.max_sqn}
|
max_sqn=LedgerCache#ledger_cache.max_sqn}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec scan_table(ets:tab(), tuple(), tuple()) ->
|
||||||
|
{list(), non_neg_integer()|infinity, non_neg_integer()}.
|
||||||
|
%% @doc
|
||||||
|
%% Query the ETS table to find a range of keys (start inclusive). Should also
|
||||||
|
%% return the miniumum and maximum sequence number found in the query. This
|
||||||
|
%% is just then used as a safety check when loading these results into the
|
||||||
|
%% penciller snapshot
|
||||||
scan_table(Table, StartKey, EndKey) ->
|
scan_table(Table, StartKey, EndKey) ->
|
||||||
case ets:lookup(Table, StartKey) of
|
case ets:lookup(Table, StartKey) of
|
||||||
[] ->
|
[] ->
|
||||||
|
@ -1158,74 +1274,11 @@ scan_table(Table, StartKey, EndKey, Acc, MinSQN, MaxSQN) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
set_options(Opts) ->
|
-spec fetch_head(tuple(), pid(), ledger_cache()) -> not_present|tuple().
|
||||||
MaxJournalSize0 = get_opt(max_journalsize, Opts, 10000000000),
|
%% @doc
|
||||||
JournalSizeJitter = MaxJournalSize0 div (100 div ?JOURNAL_SIZE_JITTER),
|
%% Fetch only the head of the object from the Ledger (or the bookie's recent
|
||||||
MaxJournalSize = MaxJournalSize0 -
|
%% ledger cache if it has just been updated). not_present is returned if the
|
||||||
erlang:phash2(self()) rem JournalSizeJitter,
|
%% Key is not found
|
||||||
|
|
||||||
SyncStrat = get_opt(sync_strategy, Opts, sync),
|
|
||||||
WRP = get_opt(waste_retention_period, Opts),
|
|
||||||
|
|
||||||
AltStrategy = get_opt(reload_strategy, Opts, []),
|
|
||||||
ReloadStrategy = leveled_codec:inker_reload_strategy(AltStrategy),
|
|
||||||
|
|
||||||
PCLL0CacheSize = get_opt(max_pencillercachesize, Opts),
|
|
||||||
RootPath = get_opt(root_path, Opts),
|
|
||||||
|
|
||||||
JournalFP = RootPath ++ "/" ++ ?JOURNAL_FP,
|
|
||||||
LedgerFP = RootPath ++ "/" ++ ?LEDGER_FP,
|
|
||||||
ok = filelib:ensure_dir(JournalFP),
|
|
||||||
ok = filelib:ensure_dir(LedgerFP),
|
|
||||||
|
|
||||||
CompressionMethod =
|
|
||||||
case get_opt(compression_method, Opts, ?COMPRESSION_METHOD) of
|
|
||||||
native ->
|
|
||||||
% Note native compression will have reduced performance
|
|
||||||
% https://github.com/martinsumner/leveled/issues/95
|
|
||||||
native;
|
|
||||||
lz4 ->
|
|
||||||
% Must include lz4 library in rebar.config
|
|
||||||
lz4
|
|
||||||
end,
|
|
||||||
CompressOnReceipt =
|
|
||||||
case get_opt(compression_point, Opts, ?COMPRESSION_POINT) of
|
|
||||||
on_receipt ->
|
|
||||||
% Note this will add measurable delay to PUT time
|
|
||||||
% https://github.com/martinsumner/leveled/issues/95
|
|
||||||
true;
|
|
||||||
on_compact ->
|
|
||||||
% If using lz4 this is not recommended
|
|
||||||
false
|
|
||||||
end,
|
|
||||||
|
|
||||||
{#inker_options{root_path = JournalFP,
|
|
||||||
reload_strategy = ReloadStrategy,
|
|
||||||
max_run_length = get_opt(max_run_length, Opts),
|
|
||||||
waste_retention_period = WRP,
|
|
||||||
compression_method = CompressionMethod,
|
|
||||||
compress_on_receipt = CompressOnReceipt,
|
|
||||||
cdb_options =
|
|
||||||
#cdb_options{max_size=MaxJournalSize,
|
|
||||||
binary_mode=true,
|
|
||||||
sync_strategy=SyncStrat}},
|
|
||||||
#penciller_options{root_path = LedgerFP,
|
|
||||||
max_inmemory_tablesize = PCLL0CacheSize,
|
|
||||||
levelzero_cointoss = true,
|
|
||||||
compression_method = CompressionMethod}}.
|
|
||||||
|
|
||||||
startup(InkerOpts, PencillerOpts, State) ->
|
|
||||||
{ok, Inker} = leveled_inker:ink_start(InkerOpts),
|
|
||||||
{ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts),
|
|
||||||
LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller),
|
|
||||||
leveled_log:log("B0005", [LedgerSQN]),
|
|
||||||
ok = leveled_inker:ink_loadpcl(Inker,
|
|
||||||
LedgerSQN + 1,
|
|
||||||
get_loadfun(State),
|
|
||||||
Penciller),
|
|
||||||
{Inker, Penciller}.
|
|
||||||
|
|
||||||
|
|
||||||
fetch_head(Key, Penciller, LedgerCache) ->
|
fetch_head(Key, Penciller, LedgerCache) ->
|
||||||
SW = os:timestamp(),
|
SW = os:timestamp(),
|
||||||
CacheResult =
|
CacheResult =
|
||||||
|
@ -1282,6 +1335,13 @@ preparefor_ledgercache(_InkTag,
|
||||||
{KeyH, SQN, KeyChanges}.
|
{KeyH, SQN, KeyChanges}.
|
||||||
|
|
||||||
|
|
||||||
|
-spec addto_ledgercache({integer()|no_lookup,
|
||||||
|
integer(), list()}, ledger_cache())
|
||||||
|
-> ledger_cache().
|
||||||
|
%% @doc
|
||||||
|
%% Add a set of changes associated with a single sequence number (journal
|
||||||
|
%% update) and key to the ledger cache. If the changes are not to be looked
|
||||||
|
%% up directly, then they will not be indexed to accelerate lookup
|
||||||
addto_ledgercache({H, SQN, KeyChanges}, Cache) ->
|
addto_ledgercache({H, SQN, KeyChanges}, Cache) ->
|
||||||
ets:insert(Cache#ledger_cache.mem, KeyChanges),
|
ets:insert(Cache#ledger_cache.mem, KeyChanges),
|
||||||
UpdIndex = leveled_pmem:prepare_for_index(Cache#ledger_cache.index, H),
|
UpdIndex = leveled_pmem:prepare_for_index(Cache#ledger_cache.index, H),
|
||||||
|
@ -1289,6 +1349,15 @@ addto_ledgercache({H, SQN, KeyChanges}, Cache) ->
|
||||||
min_sqn=min(SQN, Cache#ledger_cache.min_sqn),
|
min_sqn=min(SQN, Cache#ledger_cache.min_sqn),
|
||||||
max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}.
|
max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}.
|
||||||
|
|
||||||
|
-spec addto_ledgercache({integer()|no_lookup,
|
||||||
|
integer(), list()}, ledger_cache(), loader)
|
||||||
|
-> ledger_cache().
|
||||||
|
%% @doc
|
||||||
|
%% Add a set of changes associated witha single sequence number (journal
|
||||||
|
%% update) to the ledger cache. This is used explicitly when laoding the
|
||||||
|
%% ledger from the Journal (i.e. at startup) - and in this case the ETS insert
|
||||||
|
%% can be bypassed, as all changes will be flushed to the Penciller before the
|
||||||
|
%% load is complete.
|
||||||
addto_ledgercache({H, SQN, KeyChanges}, Cache, loader) ->
|
addto_ledgercache({H, SQN, KeyChanges}, Cache, loader) ->
|
||||||
UpdQ = KeyChanges ++ Cache#ledger_cache.load_queue,
|
UpdQ = KeyChanges ++ Cache#ledger_cache.load_queue,
|
||||||
UpdIndex = leveled_pmem:prepare_for_index(Cache#ledger_cache.index, H),
|
UpdIndex = leveled_pmem:prepare_for_index(Cache#ledger_cache.index, H),
|
||||||
|
@ -1298,6 +1367,18 @@ addto_ledgercache({H, SQN, KeyChanges}, Cache, loader) ->
|
||||||
max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}.
|
max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}.
|
||||||
|
|
||||||
|
|
||||||
|
-spec maybepush_ledgercache(integer(), ledger_cache(), pid())
|
||||||
|
-> {ok|returned, ledger_cache()}.
|
||||||
|
%% @doc
|
||||||
|
%% Following an update to the ledger cache, check if this now big enough to be
|
||||||
|
%% pushed down to the Penciller. There is some random jittering here, to
|
||||||
|
%% prevent coordination across leveled instances (e.g. when running in Riak).
|
||||||
|
%%
|
||||||
|
%% The penciller may be too busy, as the LSM tree is backed up with merge
|
||||||
|
%% activity. In this case the update is not made and 'returned' not ok is set
|
||||||
|
%% in the reply. Try again later when it isn't busy (and also potentially
|
||||||
|
%% implement a slow_offer state to slow down the pace at which PUTs are being
|
||||||
|
%% received)
|
||||||
maybepush_ledgercache(MaxCacheSize, Cache, Penciller) ->
|
maybepush_ledgercache(MaxCacheSize, Cache, Penciller) ->
|
||||||
Tab = Cache#ledger_cache.mem,
|
Tab = Cache#ledger_cache.mem,
|
||||||
CacheSize = ets:info(Tab, size),
|
CacheSize = ets:info(Tab, size),
|
||||||
|
@ -1321,7 +1402,10 @@ maybepush_ledgercache(MaxCacheSize, Cache, Penciller) ->
|
||||||
{ok, Cache}
|
{ok, Cache}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec maybe_withjitter(integer(), integer()) -> boolean().
|
||||||
|
%% @doc
|
||||||
|
%% Push down randomly, but the closer to the maximum size, the more likely a
|
||||||
|
%% push should be
|
||||||
maybe_withjitter(CacheSize, MaxCacheSize) ->
|
maybe_withjitter(CacheSize, MaxCacheSize) ->
|
||||||
if
|
if
|
||||||
CacheSize > MaxCacheSize ->
|
CacheSize > MaxCacheSize ->
|
||||||
|
@ -1337,6 +1421,10 @@ maybe_withjitter(CacheSize, MaxCacheSize) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
-spec get_loadfun(book_state()) -> fun().
|
||||||
|
%% @doc
|
||||||
|
%% The LoadFun will be sued by the Inker when walking across the Journal to
|
||||||
|
%% load the Penciller at startup
|
||||||
get_loadfun(State) ->
|
get_loadfun(State) ->
|
||||||
PrepareFun =
|
PrepareFun =
|
||||||
fun(Tag, PK, SQN, Obj, VS, IdxSpecs) ->
|
fun(Tag, PK, SQN, Obj, VS, IdxSpecs) ->
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue