Support for recalc

Initial test included for running with recallc, and also transition from retain to recalc.

Moves all logic for startup fold into leveled_bookie - avoid the Inker requiring any direct knowledge about implementation of the Penciller.
This commit is contained in:
Martin Sumner 2020-03-15 22:14:42 +00:00
parent 1242dd4991
commit 694d2c39f8
7 changed files with 522 additions and 187 deletions

View file

@ -93,8 +93,6 @@
]).
-export([empty_ledgercache/0,
loadqueue_ledgercache/1,
push_ledgercache/2,
snapshot_store/6,
fetch_value/2,
journal_notfound/4]).
@ -105,6 +103,7 @@
-include_lib("eunit/include/eunit.hrl").
-define(LOADING_PAUSE, 1000).
-define(CACHE_SIZE, 2500).
-define(MIN_CACHE_SIZE, 100).
-define(MIN_PCL_CACHE_SIZE, 400).
@ -166,7 +165,7 @@
-record(state, {inker :: pid() | undefined,
penciller :: pid() | undefined,
cache_size :: integer() | undefined,
ledger_cache = #ledger_cache{},
ledger_cache = #ledger_cache{} :: ledger_cache(),
is_snapshot :: boolean() | undefined,
slow_offer = false :: boolean(),
@ -315,7 +314,7 @@
% Journal, to recalculate the index changes based on the current
% state of the Ledger and the object metadata.
%
% reload_strategy ptions are a list - to map from a tag to the
% reload_strategy options are a list - to map from a tag to the
% strategy (recovr|retain|recalc). Defualt strategies are:
% [{?RIAK_TAG, retain}, {?STD_TAG, retain}]
{max_pencillercachesize, pos_integer()|undefined} |
@ -378,7 +377,16 @@
% true
].
-type initial_loadfun() ::
fun((leveled_codec:journal_key(),
any(),
non_neg_integer(),
{non_neg_integer(), non_neg_integer(), ledger_cache()},
fun((any()) -> {binary(), non_neg_integer()})) ->
{loop|stop,
{non_neg_integer(), non_neg_integer(), ledger_cache()}}).
-export_type([initial_loadfun/0]).
%%%============================================================================
%%% API
@ -1250,8 +1258,7 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State)
SQN,
Object,
ObjSize,
{IndexSpecs, TTL},
State),
{IndexSpecs, TTL}),
Cache0 = addto_ledgercache(Changes, State#state.ledger_cache),
{_SW2, Timings2} = update_timings(SW1, {put, mem}, Timings1),
@ -1288,8 +1295,7 @@ handle_call({mput, ObjectSpecs, TTL}, From, State)
Changes =
preparefor_ledgercache(?INKT_MPUT, ?DUMMY,
SQN, null, length(ObjectSpecs),
{ObjectSpecs, TTL},
State),
{ObjectSpecs, TTL}),
Cache0 = addto_ledgercache(Changes, State#state.ledger_cache),
case State#state.slow_offer of
true ->
@ -1537,6 +1543,23 @@ code_change(_OldVsn, State, _Extra) ->
empty_ledgercache() ->
#ledger_cache{mem = ets:new(empty, [ordered_set])}.
-spec push_to_penciller(pid(), ledger_cache()) -> ok.
%% @doc
%% The push to penciller must start as a tree to correctly de-duplicate
%% the list by order before becoming a de-duplicated list for loading
push_to_penciller(Penciller, LedgerCache) ->
push_to_penciller_loop(Penciller, loadqueue_ledgercache(LedgerCache)).
push_to_penciller_loop(Penciller, LedgerCache) ->
case push_ledgercache(Penciller, LedgerCache) of
returned ->
timer:sleep(?LOADING_PAUSE),
push_to_penciller_loop(Penciller, LedgerCache);
ok ->
ok
end.
-spec push_ledgercache(pid(), ledger_cache()) -> ok|returned.
%% @doc
%% Push the ledgercache to the Penciller - which should respond ok or
@ -1642,10 +1665,22 @@ startup(InkerOpts, PencillerOpts, State) ->
{ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts),
LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller),
leveled_log:log("B0005", [LedgerSQN]),
ReloadStrategy = InkerOpts#inker_options.reload_strategy,
LoadFun = get_loadfun(ReloadStrategy, Penciller, State),
BatchFun =
fun(BatchAcc, _Acc) ->
push_to_penciller(Penciller, BatchAcc)
end,
InitAccFun =
fun(FN, CurrentMinSQN) ->
leveled_log:log("I0014", [FN, CurrentMinSQN]),
empty_ledgercache()
end,
ok = leveled_inker:ink_loadpcl(Inker,
LedgerSQN + 1,
get_loadfun(State),
Penciller),
LoadFun,
InitAccFun,
BatchFun),
ok = leveled_inker:ink_checksqn(Inker, LedgerSQN),
{Inker, Penciller}.
@ -2161,30 +2196,26 @@ check_notfound(CheckFrequency, CheckFun) ->
-spec preparefor_ledgercache(leveled_codec:journal_key_tag()|null,
leveled_codec:ledger_key()|?DUMMY,
integer(), any(), integer(),
leveled_codec:journal_keychanges(),
book_state())
-> {integer()|no_lookup,
integer(),
non_neg_integer(), any(), integer(),
leveled_codec:journal_keychanges())
-> {leveled_codec:segment_hash(),
non_neg_integer(),
list(leveled_codec:ledger_kv())}.
%% @doc
%% Prepare an object and its related key changes for addition to the Ledger
%% via the Ledger Cache.
preparefor_ledgercache(?INKT_MPUT,
?DUMMY, SQN, _O, _S, {ObjSpecs, TTL},
_State) ->
?DUMMY, SQN, _O, _S, {ObjSpecs, TTL}) ->
ObjChanges = leveled_codec:obj_objectspecs(ObjSpecs, SQN, TTL),
{no_lookup, SQN, ObjChanges};
preparefor_ledgercache(?INKT_KEYD,
LedgerKey, SQN, _Obj, _Size, {IdxSpecs, TTL},
_State) ->
LedgerKey, SQN, _Obj, _Size, {IdxSpecs, TTL}) ->
{Bucket, Key} = leveled_codec:from_ledgerkey(LedgerKey),
KeyChanges =
leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL),
{no_lookup, SQN, KeyChanges};
preparefor_ledgercache(_InkTag,
LedgerKey, SQN, Obj, Size, {IdxSpecs, TTL},
_State) ->
LedgerKey, SQN, Obj, Size, {IdxSpecs, TTL}) ->
{Bucket, Key, MetaValue, {KeyH, _ObjH}, _LastMods} =
leveled_codec:generate_ledgerkv(LedgerKey, SQN, Obj, Size, TTL),
KeyChanges =
@ -2193,8 +2224,58 @@ preparefor_ledgercache(_InkTag,
{KeyH, SQN, KeyChanges}.
-spec addto_ledgercache({integer()|no_lookup,
integer(),
-spec recalcfor_ledgercache(leveled_codec:journal_key_tag()|null,
leveled_codec:ledger_key()|?DUMMY,
non_neg_integer(), any(), integer(),
leveled_codec:journal_keychanges(),
ledger_cache(),
pid())
-> {leveled_codec:segment_hash(),
non_neg_integer(),
list(leveled_codec:ledger_kv())}.
%% @doc
%% When loading from the journal to the ledger, may hit a key which has the
%% `recalc` strategy. Such a key needs to recalculate the key changes by
%% comparison with the current state of the ledger, assuming it is a full
%% journal entry (i.e. KeyDeltas which may be a result of previously running
%% with a retain strategy should be ignored).
recalcfor_ledgercache(InkTag,
_LedgerKey, SQN, _Obj, _Size, {_IdxSpecs, _TTL},
_LedgerCache,
_Penciller)
when InkTag == ?INKT_MPUT; InkTag == ?INKT_KEYD ->
{no_lookup, SQN, []};
recalcfor_ledgercache(_InkTag,
LK, SQN, Obj, Size, {_IgnoreJournalIdxSpecs, TTL},
LedgerCache,
Penciller) ->
{Bucket, Key, MetaValue, {KeyH, _ObjH}, _LastMods} =
leveled_codec:generate_ledgerkv(LK, SQN, Obj, Size, TTL),
OldObject =
case check_in_ledgercache(LK, KeyH, LedgerCache, loader) of
false ->
leveled_penciller:pcl_fetch(Penciller, LK, KeyH, true);
{value, KV} ->
KV
end,
OldMetadata =
case OldObject of
not_present ->
not_present;
{LK, LV} ->
leveled_codec:get_metadata(LV)
end,
UpdMetadata = leveled_codec:get_metadata(MetaValue),
IdxSpecs =
leveled_head:diff_indexspecs(element(1, LK), UpdMetadata, OldMetadata),
{KeyH,
SQN,
[{LK, MetaValue}]
++ leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL)}.
-spec addto_ledgercache({leveled_codec:segment_hash(),
non_neg_integer(),
list(leveled_codec:ledger_kv())},
ledger_cache())
-> ledger_cache().
@ -2230,6 +2311,32 @@ addto_ledgercache({H, SQN, KeyChanges}, Cache, loader) ->
max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}.
-spec check_in_ledgercache(leveled_codec:ledger_key(),
leveled_codec:segment_hash(),
ledger_cache(),
loader) ->
false | {value, leveled_codec:ledger_kv()}.
%% @doc
%% Check the ledger cache for a Key, when the ledger cache is in loader mode
%% and so is populating a queue not an ETS table
check_in_ledgercache(PK, Hash, Cache, loader) ->
case leveled_pmem:check_index(Hash, Cache#ledger_cache.index) of
[] ->
false;
_ ->
search(fun({K,_V}) -> K == PK end,
lists:reverse(Cache#ledger_cache.load_queue))
end.
-spec search(fun((any()) -> boolean()), list()) -> {value, any()}|false.
search(Pred, [Hd|Tail]) ->
case Pred(Hd) of
true -> {value, Hd};
false -> search(Pred, Tail)
end;
search(Pred, []) when is_function(Pred, 1) ->
false.
-spec maybepush_ledgercache(integer(), ledger_cache(), pid())
-> {ok|returned, ledger_cache()}.
%% @doc
@ -2276,44 +2383,47 @@ maybe_withjitter(_CacheSize, _MaxCacheSize) ->
false.
-spec get_loadfun(book_state()) -> fun().
-spec get_loadfun(leveled_codec:compaction_strategy(), pid(), book_state())
-> initial_loadfun().
%% @doc
%% The LoadFun will be used by the Inker when walking across the Journal to
%% load the Penciller at startup
get_loadfun(State) ->
PrepareFun =
fun(Tag, PK, SQN, Obj, VS, IdxSpecs) ->
preparefor_ledgercache(Tag, PK, SQN, Obj, VS, IdxSpecs, State)
end,
LoadFun =
fun(KeyInJournal, ValueInJournal, _Pos, Acc0, ExtractFun) ->
{MinSQN, MaxSQN, OutputTree} = Acc0,
{SQN, InkTag, PK} = KeyInJournal,
% VBin may already be a term
{VBin, VSize} = ExtractFun(ValueInJournal),
{Obj, IdxSpecs} = leveled_codec:split_inkvalue(VBin),
case SQN of
SQN when SQN < MinSQN ->
{loop, Acc0};
SQN when SQN < MaxSQN ->
Chngs = PrepareFun(InkTag, PK, SQN, Obj, VSize, IdxSpecs),
{loop,
{MinSQN,
MaxSQN,
addto_ledgercache(Chngs, OutputTree, loader)}};
MaxSQN ->
leveled_log:log("B0006", [SQN]),
Chngs = PrepareFun(InkTag, PK, SQN, Obj, VSize, IdxSpecs),
{stop,
{MinSQN,
MaxSQN,
addto_ledgercache(Chngs, OutputTree, loader)}};
SQN when SQN > MaxSQN ->
leveled_log:log("B0007", [MaxSQN, SQN]),
{stop, Acc0}
end
end,
LoadFun.
%% load the Penciller at startup.
get_loadfun(ReloadStrat, Penciller, _State) ->
fun(KeyInJournal, ValueInJournal, _Pos, Acc0, ExtractFun) ->
{MinSQN, MaxSQN, LedgerCache} = Acc0,
{SQN, InkTag, PK} = KeyInJournal,
case SQN of
SQN when SQN < MinSQN ->
{loop, Acc0};
SQN when SQN > MaxSQN ->
leveled_log:log("B0007", [MaxSQN, SQN]),
{stop, Acc0};
_ ->
{VBin, ValSize} = ExtractFun(ValueInJournal),
% VBin may already be a term
{Obj, IdxSpecs} = leveled_codec:split_inkvalue(VBin),
Chngs =
case leveled_codec:get_tagstrategy(PK, ReloadStrat) of
recalc ->
recalcfor_ledgercache(InkTag, PK, SQN,
Obj, ValSize, IdxSpecs,
LedgerCache,
Penciller);
_ ->
preparefor_ledgercache(InkTag, PK, SQN,
Obj, ValSize, IdxSpecs)
end,
case SQN of
MaxSQN ->
leveled_log:log("B0006", [SQN]),
LC0 = addto_ledgercache(Chngs, LedgerCache, loader),
{stop, {MinSQN, MaxSQN, LC0}};
_ ->
LC0 = addto_ledgercache(Chngs, LedgerCache, loader),
{loop, {MinSQN, MaxSQN, LC0}}
end
end
end.
delete_path(DirPath) ->