Merge pull request #310 from martinsumner/mas-i306-implementrecalc

Mas i306 implementrecalc
This commit is contained in:
Martin Sumner 2020-03-30 20:06:48 +01:00 committed by GitHub
commit febdac27f6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 860 additions and 240 deletions

View file

@ -120,7 +120,7 @@ Three potential recovery strategies are supported to provide some flexibility fo
- retain - on compaction KeyDeltas are retained in the Journal, only values are removed. - retain - on compaction KeyDeltas are retained in the Journal, only values are removed.
- recalc (not yet implemented) - the compaction rules assume that on recovery the key changes will be recalculated by comparing the change with the current database state. In recovery the key changes will be recalculated by comparing the change with the current database state. - recalc - the compaction rules assume that on recovery the key changes will be recalculated by comparing the change with the current database state. In recovery the key changes will be recalculated by comparing the change with the current database state. A user-defined function should be passed in at startup to achieve this recalculation (to override `leveled_head:diff_indexspecs/3`).
### Hot Backups ### Hot Backups

View file

@ -77,11 +77,11 @@ However, what if the Ledger had been erased? This could happen due to some corr
The are three potential strategies: The are three potential strategies:
- `skip` - don't worry about this scenario, require the Ledger to be backed up; - `recovr` - don't worry about this scenario, require the Ledger to be backed up;
- `retain` - discard the object itself on compaction but keep the key changes; - `retain` - discard the object itself on compaction but keep the key changes;
- `recalc` - recalculate the indexes on reload by comparing the information on the object with the current state of the Ledger (as would be required by the PUT process when comparing IndexSpecs at PUT time). - `recalc` - recalculate the indexes on reload by comparing the information on the object with the current state of the Ledger (as would be required by the PUT process when comparing IndexSpecs at PUT time).
There is no code for `recalc` at present it is simply a logical possibility. So to set a reload strategy there should be an entry like `{reload_strategy, [{TagName, skip|retain}]}`. By default tags are pre-set to `retain`. If there is no need to handle a corrupted Ledger, then all tags could be set to `skip`. To set a reload strategy requires a list of tuples to match tag names to strategy `{reload_strategy, [{TagName, recovr|retain|recalc}]}`. By default tags are pre-set to `retain`. If there is no need to handle a corrupted Ledger, then all tags could be set to `recovr` - this assumes that either the ledger files are protected by some other means from corruption, or an external anti-entropy mechanism will recover the lost data.
## Compression Method ## Compression Method

View file

@ -93,8 +93,6 @@
]). ]).
-export([empty_ledgercache/0, -export([empty_ledgercache/0,
loadqueue_ledgercache/1,
push_ledgercache/2,
snapshot_store/6, snapshot_store/6,
fetch_value/2, fetch_value/2,
journal_notfound/4]). journal_notfound/4]).
@ -105,6 +103,7 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-define(LOADING_PAUSE, 1000).
-define(CACHE_SIZE, 2500). -define(CACHE_SIZE, 2500).
-define(MIN_CACHE_SIZE, 100). -define(MIN_CACHE_SIZE, 100).
-define(MIN_PCL_CACHE_SIZE, 400). -define(MIN_PCL_CACHE_SIZE, 400).
@ -166,7 +165,7 @@
-record(state, {inker :: pid() | undefined, -record(state, {inker :: pid() | undefined,
penciller :: pid() | undefined, penciller :: pid() | undefined,
cache_size :: integer() | undefined, cache_size :: integer() | undefined,
ledger_cache = #ledger_cache{}, ledger_cache = #ledger_cache{} :: ledger_cache(),
is_snapshot :: boolean() | undefined, is_snapshot :: boolean() | undefined,
slow_offer = false :: boolean(), slow_offer = false :: boolean(),
@ -310,12 +309,16 @@
% resilience outside of the store), or retain (retain a history of % resilience outside of the store), or retain (retain a history of
% key changes, even when the object value has been compacted). % key changes, even when the object value has been compacted).
% %
% There is a third, theoretical and untested strategy, which is % There is a third strategy, which is recalc, where on reloading
% recalc - which would require when reloading the Ledger from the % the Ledger from the Journal, the key changes are recalculated by
% Journal, to recalculate the index changes based on the current % comparing the extracted metadata from the Journal object, with the
% state of the Ledger and the object metadata. % extracted metadata from the current Ledger object it is set to
% replace (should one be present). Implementing the recalc
% strategy requires a override function for
% `leveled_head:diff_indexspecs/3`.
% A function for the ?RIAK_TAG is provided and tested.
% %
% reload_strategy ptions are a list - to map from a tag to the % reload_strategy options are a list - to map from a tag to the
% strategy (recovr|retain|recalc). Defualt strategies are: % strategy (recovr|retain|recalc). Defualt strategies are:
% [{?RIAK_TAG, retain}, {?STD_TAG, retain}] % [{?RIAK_TAG, retain}, {?STD_TAG, retain}]
{max_pencillercachesize, pos_integer()|undefined} | {max_pencillercachesize, pos_integer()|undefined} |
@ -378,7 +381,16 @@
% true % true
]. ].
-type initial_loadfun() ::
fun((leveled_codec:journal_key(),
any(),
non_neg_integer(),
{non_neg_integer(), non_neg_integer(), ledger_cache()},
fun((any()) -> {binary(), non_neg_integer()})) ->
{loop|stop,
{non_neg_integer(), non_neg_integer(), ledger_cache()}}).
-export_type([initial_loadfun/0]).
%%%============================================================================ %%%============================================================================
%%% API %%% API
@ -1097,7 +1109,7 @@ book_destroy(Pid) ->
%% to store the backup. %% to store the backup.
%% %%
%% Backup files are hard-linked. Does not work in head_only mode, or if %% Backup files are hard-linked. Does not work in head_only mode, or if
%% index changes are used with a `skip` compaction/reload strategy %% index changes are used with a `recovr` compaction/reload strategy
book_hotbackup(Pid) -> book_hotbackup(Pid) ->
gen_server:call(Pid, hot_backup, infinity). gen_server:call(Pid, hot_backup, infinity).
@ -1250,8 +1262,7 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State)
SQN, SQN,
Object, Object,
ObjSize, ObjSize,
{IndexSpecs, TTL}, {IndexSpecs, TTL}),
State),
Cache0 = addto_ledgercache(Changes, State#state.ledger_cache), Cache0 = addto_ledgercache(Changes, State#state.ledger_cache),
{_SW2, Timings2} = update_timings(SW1, {put, mem}, Timings1), {_SW2, Timings2} = update_timings(SW1, {put, mem}, Timings1),
@ -1288,8 +1299,7 @@ handle_call({mput, ObjectSpecs, TTL}, From, State)
Changes = Changes =
preparefor_ledgercache(?INKT_MPUT, ?DUMMY, preparefor_ledgercache(?INKT_MPUT, ?DUMMY,
SQN, null, length(ObjectSpecs), SQN, null, length(ObjectSpecs),
{ObjectSpecs, TTL}, {ObjectSpecs, TTL}),
State),
Cache0 = addto_ledgercache(Changes, State#state.ledger_cache), Cache0 = addto_ledgercache(Changes, State#state.ledger_cache),
case State#state.slow_offer of case State#state.slow_offer of
true -> true ->
@ -1537,6 +1547,23 @@ code_change(_OldVsn, State, _Extra) ->
empty_ledgercache() -> empty_ledgercache() ->
#ledger_cache{mem = ets:new(empty, [ordered_set])}. #ledger_cache{mem = ets:new(empty, [ordered_set])}.
-spec push_to_penciller(pid(), ledger_cache()) -> ok.
%% @doc
%% The push to penciller must start as a tree to correctly de-duplicate
%% the list by order before becoming a de-duplicated list for loading
push_to_penciller(Penciller, LedgerCache) ->
push_to_penciller_loop(Penciller, loadqueue_ledgercache(LedgerCache)).
push_to_penciller_loop(Penciller, LedgerCache) ->
case push_ledgercache(Penciller, LedgerCache) of
returned ->
timer:sleep(?LOADING_PAUSE),
push_to_penciller_loop(Penciller, LedgerCache);
ok ->
ok
end.
-spec push_ledgercache(pid(), ledger_cache()) -> ok|returned. -spec push_ledgercache(pid(), ledger_cache()) -> ok|returned.
%% @doc %% @doc
%% Push the ledgercache to the Penciller - which should respond ok or %% Push the ledgercache to the Penciller - which should respond ok or
@ -1642,10 +1669,22 @@ startup(InkerOpts, PencillerOpts, State) ->
{ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts), {ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts),
LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller), LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller),
leveled_log:log("B0005", [LedgerSQN]), leveled_log:log("B0005", [LedgerSQN]),
ReloadStrategy = InkerOpts#inker_options.reload_strategy,
LoadFun = get_loadfun(ReloadStrategy, Penciller, State),
BatchFun =
fun(BatchAcc, _Acc) ->
push_to_penciller(Penciller, BatchAcc)
end,
InitAccFun =
fun(FN, CurrentMinSQN) ->
leveled_log:log("I0014", [FN, CurrentMinSQN]),
empty_ledgercache()
end,
ok = leveled_inker:ink_loadpcl(Inker, ok = leveled_inker:ink_loadpcl(Inker,
LedgerSQN + 1, LedgerSQN + 1,
get_loadfun(State), LoadFun,
Penciller), InitAccFun,
BatchFun),
ok = leveled_inker:ink_checksqn(Inker, LedgerSQN), ok = leveled_inker:ink_checksqn(Inker, LedgerSQN),
{Inker, Penciller}. {Inker, Penciller}.
@ -2161,30 +2200,26 @@ check_notfound(CheckFrequency, CheckFun) ->
-spec preparefor_ledgercache(leveled_codec:journal_key_tag()|null, -spec preparefor_ledgercache(leveled_codec:journal_key_tag()|null,
leveled_codec:ledger_key()|?DUMMY, leveled_codec:ledger_key()|?DUMMY,
integer(), any(), integer(), non_neg_integer(), any(), integer(),
leveled_codec:journal_keychanges(), leveled_codec:journal_keychanges())
book_state()) -> {leveled_codec:segment_hash(),
-> {integer()|no_lookup, non_neg_integer(),
integer(),
list(leveled_codec:ledger_kv())}. list(leveled_codec:ledger_kv())}.
%% @doc %% @doc
%% Prepare an object and its related key changes for addition to the Ledger %% Prepare an object and its related key changes for addition to the Ledger
%% via the Ledger Cache. %% via the Ledger Cache.
preparefor_ledgercache(?INKT_MPUT, preparefor_ledgercache(?INKT_MPUT,
?DUMMY, SQN, _O, _S, {ObjSpecs, TTL}, ?DUMMY, SQN, _O, _S, {ObjSpecs, TTL}) ->
_State) ->
ObjChanges = leveled_codec:obj_objectspecs(ObjSpecs, SQN, TTL), ObjChanges = leveled_codec:obj_objectspecs(ObjSpecs, SQN, TTL),
{no_lookup, SQN, ObjChanges}; {no_lookup, SQN, ObjChanges};
preparefor_ledgercache(?INKT_KEYD, preparefor_ledgercache(?INKT_KEYD,
LedgerKey, SQN, _Obj, _Size, {IdxSpecs, TTL}, LedgerKey, SQN, _Obj, _Size, {IdxSpecs, TTL}) ->
_State) ->
{Bucket, Key} = leveled_codec:from_ledgerkey(LedgerKey), {Bucket, Key} = leveled_codec:from_ledgerkey(LedgerKey),
KeyChanges = KeyChanges =
leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL), leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL),
{no_lookup, SQN, KeyChanges}; {no_lookup, SQN, KeyChanges};
preparefor_ledgercache(_InkTag, preparefor_ledgercache(_InkTag,
LedgerKey, SQN, Obj, Size, {IdxSpecs, TTL}, LedgerKey, SQN, Obj, Size, {IdxSpecs, TTL}) ->
_State) ->
{Bucket, Key, MetaValue, {KeyH, _ObjH}, _LastMods} = {Bucket, Key, MetaValue, {KeyH, _ObjH}, _LastMods} =
leveled_codec:generate_ledgerkv(LedgerKey, SQN, Obj, Size, TTL), leveled_codec:generate_ledgerkv(LedgerKey, SQN, Obj, Size, TTL),
KeyChanges = KeyChanges =
@ -2193,8 +2228,58 @@ preparefor_ledgercache(_InkTag,
{KeyH, SQN, KeyChanges}. {KeyH, SQN, KeyChanges}.
-spec addto_ledgercache({integer()|no_lookup, -spec recalcfor_ledgercache(leveled_codec:journal_key_tag()|null,
integer(), leveled_codec:ledger_key()|?DUMMY,
non_neg_integer(), any(), integer(),
leveled_codec:journal_keychanges(),
ledger_cache(),
pid())
-> {leveled_codec:segment_hash(),
non_neg_integer(),
list(leveled_codec:ledger_kv())}.
%% @doc
%% When loading from the journal to the ledger, may hit a key which has the
%% `recalc` strategy. Such a key needs to recalculate the key changes by
%% comparison with the current state of the ledger, assuming it is a full
%% journal entry (i.e. KeyDeltas which may be a result of previously running
%% with a retain strategy should be ignored).
recalcfor_ledgercache(InkTag,
_LedgerKey, SQN, _Obj, _Size, {_IdxSpecs, _TTL},
_LedgerCache,
_Penciller)
when InkTag == ?INKT_MPUT; InkTag == ?INKT_KEYD ->
{no_lookup, SQN, []};
recalcfor_ledgercache(_InkTag,
LK, SQN, Obj, Size, {_IgnoreJournalIdxSpecs, TTL},
LedgerCache,
Penciller) ->
{Bucket, Key, MetaValue, {KeyH, _ObjH}, _LastMods} =
leveled_codec:generate_ledgerkv(LK, SQN, Obj, Size, TTL),
OldObject =
case check_in_ledgercache(LK, KeyH, LedgerCache, loader) of
false ->
leveled_penciller:pcl_fetch(Penciller, LK, KeyH, true);
{value, KV} ->
KV
end,
OldMetadata =
case OldObject of
not_present ->
not_present;
{LK, LV} ->
leveled_codec:get_metadata(LV)
end,
UpdMetadata = leveled_codec:get_metadata(MetaValue),
IdxSpecs =
leveled_head:diff_indexspecs(element(1, LK), UpdMetadata, OldMetadata),
{KeyH,
SQN,
[{LK, MetaValue}]
++ leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL)}.
-spec addto_ledgercache({leveled_codec:segment_hash(),
non_neg_integer(),
list(leveled_codec:ledger_kv())}, list(leveled_codec:ledger_kv())},
ledger_cache()) ledger_cache())
-> ledger_cache(). -> ledger_cache().
@ -2230,6 +2315,32 @@ addto_ledgercache({H, SQN, KeyChanges}, Cache, loader) ->
max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}. max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}.
-spec check_in_ledgercache(leveled_codec:ledger_key(),
leveled_codec:segment_hash(),
ledger_cache(),
loader) ->
false | {value, leveled_codec:ledger_kv()}.
%% @doc
%% Check the ledger cache for a Key, when the ledger cache is in loader mode
%% and so is populating a queue not an ETS table
check_in_ledgercache(PK, Hash, Cache, loader) ->
case leveled_pmem:check_index(Hash, Cache#ledger_cache.index) of
[] ->
false;
_ ->
search(fun({K,_V}) -> K == PK end,
lists:reverse(Cache#ledger_cache.load_queue))
end.
-spec search(fun((any()) -> boolean()), list()) -> {value, any()}|false.
search(Pred, [Hd|Tail]) ->
case Pred(Hd) of
true -> {value, Hd};
false -> search(Pred, Tail)
end;
search(Pred, []) when is_function(Pred, 1) ->
false.
-spec maybepush_ledgercache(integer(), ledger_cache(), pid()) -spec maybepush_ledgercache(integer(), ledger_cache(), pid())
-> {ok|returned, ledger_cache()}. -> {ok|returned, ledger_cache()}.
%% @doc %% @doc
@ -2276,44 +2387,47 @@ maybe_withjitter(_CacheSize, _MaxCacheSize) ->
false. false.
-spec get_loadfun(book_state()) -> fun(). -spec get_loadfun(leveled_codec:compaction_strategy(), pid(), book_state())
-> initial_loadfun().
%% @doc %% @doc
%% The LoadFun will be used by the Inker when walking across the Journal to %% The LoadFun will be used by the Inker when walking across the Journal to
%% load the Penciller at startup %% load the Penciller at startup.
get_loadfun(State) -> get_loadfun(ReloadStrat, Penciller, _State) ->
PrepareFun = fun(KeyInJournal, ValueInJournal, _Pos, Acc0, ExtractFun) ->
fun(Tag, PK, SQN, Obj, VS, IdxSpecs) -> {MinSQN, MaxSQN, LedgerCache} = Acc0,
preparefor_ledgercache(Tag, PK, SQN, Obj, VS, IdxSpecs, State) {SQN, InkTag, PK} = KeyInJournal,
end, case SQN of
LoadFun = SQN when SQN < MinSQN ->
fun(KeyInJournal, ValueInJournal, _Pos, Acc0, ExtractFun) -> {loop, Acc0};
{MinSQN, MaxSQN, OutputTree} = Acc0, SQN when SQN > MaxSQN ->
{SQN, InkTag, PK} = KeyInJournal, leveled_log:log("B0007", [MaxSQN, SQN]),
% VBin may already be a term {stop, Acc0};
{VBin, VSize} = ExtractFun(ValueInJournal), _ ->
{Obj, IdxSpecs} = leveled_codec:split_inkvalue(VBin), {VBin, ValSize} = ExtractFun(ValueInJournal),
case SQN of % VBin may already be a term
SQN when SQN < MinSQN -> {Obj, IdxSpecs} = leveled_codec:split_inkvalue(VBin),
{loop, Acc0}; Chngs =
SQN when SQN < MaxSQN -> case leveled_codec:get_tagstrategy(PK, ReloadStrat) of
Chngs = PrepareFun(InkTag, PK, SQN, Obj, VSize, IdxSpecs), recalc ->
{loop, recalcfor_ledgercache(InkTag, PK, SQN,
{MinSQN, Obj, ValSize, IdxSpecs,
MaxSQN, LedgerCache,
addto_ledgercache(Chngs, OutputTree, loader)}}; Penciller);
MaxSQN -> _ ->
leveled_log:log("B0006", [SQN]), preparefor_ledgercache(InkTag, PK, SQN,
Chngs = PrepareFun(InkTag, PK, SQN, Obj, VSize, IdxSpecs), Obj, ValSize, IdxSpecs)
{stop, end,
{MinSQN, case SQN of
MaxSQN, MaxSQN ->
addto_ledgercache(Chngs, OutputTree, loader)}}; leveled_log:log("B0006", [SQN]),
SQN when SQN > MaxSQN -> LC0 = addto_ledgercache(Chngs, LedgerCache, loader),
leveled_log:log("B0007", [MaxSQN, SQN]), {stop, {MinSQN, MaxSQN, LC0}};
{stop, Acc0} _ ->
end LC0 = addto_ledgercache(Chngs, LedgerCache, loader),
end, {loop, {MinSQN, MaxSQN, LC0}}
LoadFun. end
end
end.
delete_path(DirPath) -> delete_path(DirPath) ->
@ -3166,6 +3280,10 @@ sqnorder_mutatefold_test() ->
ok = book_destroy(Bookie1). ok = book_destroy(Bookie1).
search_test() ->
?assertMatch({value, 5}, search(fun(X) -> X == 5 end, lists:seq(1, 10))),
?assertMatch(false, search(fun(X) -> X == 55 end, lists:seq(1, 10))).
check_notfound_test() -> check_notfound_test() ->
ProbablyFun = fun() -> probably end, ProbablyFun = fun() -> probably end,
MissingFun = fun() -> missing end, MissingFun = fun() -> missing end,

View file

@ -49,7 +49,8 @@
segment_hash/1, segment_hash/1,
to_lookup/1, to_lookup/1,
next_key/1, next_key/1,
return_proxy/4]). return_proxy/4,
get_metadata/1]).
-define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w"). -define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w").
-define(NRT_IDX, "$aae."). -define(NRT_IDX, "$aae.").
@ -88,7 +89,7 @@
-type ledger_kv() :: -type ledger_kv() ::
{ledger_key(), ledger_value()}. {ledger_key(), ledger_value()}.
-type compaction_method() :: -type compaction_method() ::
retain|skip|recalc. retain|recovr|recalc.
-type compaction_strategy() :: -type compaction_strategy() ::
list({tag(), compaction_method()}). list({tag(), compaction_method()}).
-type journal_key_tag() :: -type journal_key_tag() ::
@ -243,6 +244,10 @@ strip_to_indexdetails({_, V}) when tuple_size(V) > 4 ->
striphead_to_v1details(V) -> striphead_to_v1details(V) ->
{element(1, V), element(2, V), element(3, V), element(4, V)}. {element(1, V), element(2, V), element(3, V), element(4, V)}.
-spec get_metadata(ledger_value()) -> metadata().
get_metadata(LV) ->
element(4, LV).
-spec key_dominates(ledger_kv(), ledger_kv()) -> -spec key_dominates(ledger_kv(), ledger_kv()) ->
left_hand_first|right_hand_first|left_hand_dominant|right_hand_dominant. left_hand_first|right_hand_first|left_hand_dominant|right_hand_dominant.
%% @doc %% @doc
@ -358,24 +363,24 @@ endkey_passed(EndKey, CheckingKey) ->
-spec inker_reload_strategy(compaction_strategy()) -> compaction_strategy(). -spec inker_reload_strategy(compaction_strategy()) -> compaction_strategy().
%% @doc %% @doc
%% Take the default startegy for compaction, and override the approach for any %% Take the default strategy for compaction, and override the approach for any
%% tags passed in %% tags passed in
inker_reload_strategy(AltList) -> inker_reload_strategy(AltList) ->
ReloadStrategy0 = DefaultList =
lists:map(fun leveled_head:default_reload_strategy/1, lists:map(fun leveled_head:default_reload_strategy/1,
leveled_head:defined_objecttags()), leveled_head:defined_objecttags()),
lists:foldl(fun({X, Y}, SList) -> lists:ukeymerge(1,
lists:keyreplace(X, 1, SList, {X, Y}) lists:ukeysort(1, AltList),
end, lists:ukeysort(1, DefaultList)).
ReloadStrategy0,
AltList).
-spec get_tagstrategy(ledger_key(), compaction_strategy()) -spec get_tagstrategy(ledger_key()|tag()|dummy, compaction_strategy())
-> skip|retain|recalc. -> compaction_method().
%% @doc %% @doc
%% Work out the compaction strategy for the key %% Work out the compaction strategy for the key
get_tagstrategy({Tag, _, _, _}, Strategy) -> get_tagstrategy({Tag, _, _, _}, Strategy) ->
get_tagstrategy(Tag, Strategy);
get_tagstrategy(Tag, Strategy) ->
case lists:keyfind(Tag, 1, Strategy) of case lists:keyfind(Tag, 1, Strategy) of
{Tag, TagStrat} -> {Tag, TagStrat} ->
TagStrat; TagStrat;

View file

@ -22,7 +22,8 @@
-export([key_to_canonicalbinary/1, -export([key_to_canonicalbinary/1,
build_head/2, build_head/2,
extract_metadata/3 extract_metadata/3,
diff_indexspecs/3
]). ]).
-export([get_size/2, -export([get_size/2,
@ -71,12 +72,15 @@
-type object_metadata() :: riak_metadata()|std_metadata()|head_metadata(). -type object_metadata() :: riak_metadata()|std_metadata()|head_metadata().
-type appdefinable_function() :: -type appdefinable_function() ::
key_to_canonicalbinary | build_head | extract_metadata. key_to_canonicalbinary | build_head | extract_metadata | diff_indexspecs.
% Functions for which default behaviour can be over-written for the % Functions for which default behaviour can be over-written for the
% application's own tags % application's own tags
-type appdefinable_function_tuple() :: -type appdefinable_function_tuple() ::
{appdefinable_function(), fun()}. {appdefinable_function(), fun()}.
-type index_op() :: add | remove.
-type index_value() :: integer() | binary().
-type head() :: -type head() ::
binary()|tuple(). binary()|tuple().
% TODO: % TODO:
@ -174,6 +178,41 @@ default_extract_metadata(_Tag, SizeAsStoredInJournal, Obj) ->
{{standard_hash(Obj), SizeAsStoredInJournal, undefined}, []}. {{standard_hash(Obj), SizeAsStoredInJournal, undefined}, []}.
-spec diff_indexspecs(object_tag(),
object_metadata(),
object_metadata()|not_present)
-> leveled_codec:index_specs().
%% @doc
%% Take an object metadata part from within the journal, and an object metadata
%% part from the ledger (which should have a lower SQN), and generate index
%% specs by determining the difference between the index specs on the object
%% to be loaded and that on object already stored.
%%
%% This is only relevant where the journal compaction strategy of `recalc` is
%% used, the Keychanges will be used when `retain` is the compaction strategy
diff_indexspecs(?RIAK_TAG, UpdatedMetadata, OldMetadata) ->
UpdIndexes =
get_indexes_from_siblingmetabin(element(1, UpdatedMetadata), []),
OldIndexes =
case OldMetadata of
not_present ->
[];
_ ->
get_indexes_from_siblingmetabin(element(1, OldMetadata), [])
end,
diff_index_data(OldIndexes, UpdIndexes);
diff_indexspecs(?STD_TAG, UpdatedMetadata, CurrentMetadata) ->
default_diff_indexspecs(?STD_TAG, UpdatedMetadata, CurrentMetadata);
diff_indexspecs(Tag, UpdatedMetadata, CurrentMetadata) ->
OverrideFun =
get_appdefined_function(diff_indexspecs,
fun default_diff_indexspecs/3,
3),
OverrideFun(Tag, UpdatedMetadata, CurrentMetadata).
default_diff_indexspecs(_Tag, _UpdatedMetadata, _CurrentMetadata) ->
[].
%%%============================================================================ %%%============================================================================
%%% Standard External Functions %%% Standard External Functions
%%%============================================================================ %%%============================================================================
@ -190,7 +229,7 @@ defined_objecttags() ->
leveled_codec:compaction_method()}. leveled_codec:compaction_method()}.
%% @doc %% @doc
%% State the compaction_method to be used when reloading the Ledger from the %% State the compaction_method to be used when reloading the Ledger from the
%% journal for each object tag. Note, no compaction startegy required for %% journal for each object tag. Note, no compaction strategy required for
%% head_only tag %% head_only tag
default_reload_strategy(Tag) -> default_reload_strategy(Tag) ->
{Tag, retain}. {Tag, retain}.
@ -317,3 +356,155 @@ get_metadata_from_siblings(<<ValLen:32/integer, Rest0/binary>>,
MetaLen:32/integer, MetaLen:32/integer,
MetaBin:MetaLen/binary>>, MetaBin:MetaLen/binary>>,
[LastMod|LastMods]). [LastMod|LastMods]).
get_indexes_from_siblingmetabin(<<0:32/integer,
MetaLen:32/integer,
MetaBin:MetaLen/binary,
RestBin/binary>>,
Indexes) ->
UpdIndexes = lists:umerge(get_indexes_frommetabin(MetaBin), Indexes),
get_indexes_from_siblingmetabin(RestBin, UpdIndexes);
get_indexes_from_siblingmetabin(<<SibCount:32/integer, RestBin/binary>>,
Indexes) when SibCount > 0 ->
get_indexes_from_siblingmetabin(RestBin, Indexes);
get_indexes_from_siblingmetabin(_, Indexes) ->
Indexes.
%% @doc
%% Parse the metabinary for an individual sibling and return a list of index
%% entries.
get_indexes_frommetabin(<<_LMD1:32/integer, _LMD2:32/integer, _LMD3:32/integer,
VTagLen:8/integer, _VTag:VTagLen/binary,
Deleted:1/binary-unit:8,
MetaRestBin/binary>>) when Deleted /= <<1>> ->
lists:usort(indexes_of_metabinary(MetaRestBin));
get_indexes_frommetabin(_) ->
[].
indexes_of_metabinary(<<>>) ->
[];
indexes_of_metabinary(<<KeyLen:32/integer, KeyBin:KeyLen/binary,
ValueLen:32/integer, ValueBin:ValueLen/binary,
Rest/binary>>) ->
Key = decode_maybe_binary(KeyBin),
case Key of
<<"index">> ->
Value = decode_maybe_binary(ValueBin),
Value;
_ ->
indexes_of_metabinary(Rest)
end.
decode_maybe_binary(<<1, Bin/binary>>) ->
Bin;
decode_maybe_binary(<<0, Bin/binary>>) ->
binary_to_term(Bin);
decode_maybe_binary(<<_Other:8, Bin/binary>>) ->
Bin.
-spec diff_index_data([{binary(), index_value()}],
[{binary(), index_value()}]) ->
[{index_op(), binary(), index_value()}].
diff_index_data(OldIndexes, AllIndexes) ->
OldIndexSet = ordsets:from_list(OldIndexes),
AllIndexSet = ordsets:from_list(AllIndexes),
diff_specs_core(AllIndexSet, OldIndexSet).
diff_specs_core(AllIndexSet, OldIndexSet) ->
NewIndexSet = ordsets:subtract(AllIndexSet, OldIndexSet),
RemoveIndexSet =
ordsets:subtract(OldIndexSet, AllIndexSet),
NewIndexSpecs =
assemble_index_specs(ordsets:subtract(NewIndexSet, OldIndexSet),
add),
RemoveIndexSpecs =
assemble_index_specs(RemoveIndexSet,
remove),
NewIndexSpecs ++ RemoveIndexSpecs.
%% @doc Assemble a list of index specs in the
%% form of triplets of the form
%% {IndexOperation, IndexField, IndexValue}.
-spec assemble_index_specs([{binary(), binary()}], index_op()) ->
[{index_op(), binary(), binary()}].
assemble_index_specs(Indexes, IndexOp) ->
[{IndexOp, Index, Value} || {Index, Value} <- Indexes].
%%%============================================================================
%%% Test
%%%============================================================================
-ifdef(TEST).
index_extract_test() ->
SibMetaBin =
<<0,0,0,1,0,0,0,0,0,0,0,221,0,0,6,48,0,4,130,247,0,1,250,134,
1,101,0,0,0,0,4,1,77,68,75,0,0,0,44,0,131,107,0,39,77,68,
86,101,49,55,52,55,48,50,55,45,54,50,99,49,45,52,48,57,55,
45,97,53,102,50,45,53,54,98,51,98,97,57,57,99,55,56,50,0,0,
0,6,1,105,110,100,101,120,0,0,0,79,0,131,108,0,0,0,2,104,2,
107,0,8,105,100,120,49,95,98,105,110,107,0,20,50,49,53,50,
49,49,48,55,50,51,49,55,51,48,83,111,112,104,105,97,104,2,
107,0,8,105,100,120,49,95,98,105,110,107,0,19,50,49,56,50,
48,53,49,48,49,51,48,49,52,54,65,118,101,114,121,106,0,0,0,
5,1,77,68,75,50,0,0,0,44,0,131,107,0,39,77,68,86,101,49,55,
52,55,48,50,55,45,54,50,99,49,45,52,48,57,55,45,97,53,102,
50,45,53,54,98,51,98,97,57,57,99,55,56,50>>,
Indexes = get_indexes_from_siblingmetabin(SibMetaBin, []),
ExpIndexes = [{"idx1_bin","21521107231730Sophia"},
{"idx1_bin","21820510130146Avery"}],
?assertMatch(ExpIndexes, Indexes),
SibMetaBinNoIdx =
<<0,0,0,1,0,0,0,0,0,0,0,128,0,0,6,48,0,4,130,247,0,1,250,134,
1,101,0,0,0,0,4,1,77,68,75,0,0,0,44,0,131,107,0,39,77,68,
86,101,49,55,52,55,48,50,55,45,54,50,99,49,45,52,48,57,55,
45,97,53,102,50,45,53,54,98,51,98,97,57,57,99,55,56,50,0,0,0,
5,1,77,68,75,50,0,0,0,44,0,131,107,0,39,77,68,86,101,49,55,
52,55,48,50,55,45,54,50,99,49,45,52,48,57,55,45,97,53,102,
50,45,53,54,98,51,98,97,57,57,99,55,56,50>>,
?assertMatch([], get_indexes_from_siblingmetabin(SibMetaBinNoIdx, [])),
SibMetaBinOverhang =
<<0,0,0,1,0,0,0,0,0,0,0,221,0,0,6,48,0,4,130,247,0,1,250,134,
1,101,0,0,0,0,4,1,77,68,75,0,0,0,44,0,131,107,0,39,77,68,
86,101,49,55,52,55,48,50,55,45,54,50,99,49,45,52,48,57,55,
45,97,53,102,50,45,53,54,98,51,98,97,57,57,99,55,56,50,0,0,
0,6,1,105,110,100,101,120,0,0,0,79,0,131,108,0,0,0,2,104,2,
107,0,8,105,100,120,49,95,98,105,110,107,0,20,50,49,53,50,
49,49,48,55,50,51,49,55,51,48,83,111,112,104,105,97,104,2,
107,0,8,105,100,120,49,95,98,105,110,107,0,19,50,49,56,50,
48,53,49,48,49,51,48,49,52,54,65,118,101,114,121,106,0,0,0,
5,1,77,68,75,50,0,0,0,44,0,131,107,0,39,77,68,86,101,49,55,
52,55,48,50,55,45,54,50,99,49,45,52,48,57,55,45,97,53,102,
50,45,53,54,98,51,98,97,57,57,99,55,56,50,0,0,0,0,0,0,0,4,
0,0,0,0>>,
?assertMatch(ExpIndexes,
get_indexes_from_siblingmetabin(SibMetaBinOverhang, [])).
diff_index_test() ->
UpdIndexes =
[{<<"idx1_bin">>,<<"20840930001702Zoe">>},
{<<"idx1_bin">>,<<"20931011172606Emily">>}],
OldIndexes =
[{<<"idx1_bin">>,<<"20231126131808Madison">>},
{<<"idx1_bin">>,<<"20931011172606Emily">>}],
IdxSpecs = diff_index_data(OldIndexes, UpdIndexes),
?assertMatch([{add, <<"idx1_bin">>, <<"20840930001702Zoe">>},
{remove, <<"idx1_bin">>,<<"20231126131808Madison">>}], IdxSpecs).
decode_test() ->
Bin = <<"999">>,
BinTerm = term_to_binary("999"),
?assertMatch("999", binary_to_list(
decode_maybe_binary(<<1:8/integer, Bin/binary>>))),
?assertMatch("999", decode_maybe_binary(<<0:8/integer, BinTerm/binary>>)),
?assertMatch("999", binary_to_list(
decode_maybe_binary(<<2:8/integer, Bin/binary>>))).
-endif.

View file

@ -145,6 +145,15 @@
% released from a compaction run of a single file to make it a run % released from a compaction run of a single file to make it a run
% worthwhile of compaction (released space is 100.0 - target e.g. 70.0 % worthwhile of compaction (released space is 100.0 - target e.g. 70.0
% means that 30.0% should be released) % means that 30.0% should be released)
-type key_size() ::
{{non_neg_integer(),
leveled_codec:journal_key_tag(),
leveled_codec:ledger_key()}, non_neg_integer()}.
-type corrupted_test_key_size() ::
{{non_neg_integer(),
leveled_codec:journal_key_tag(),
leveled_codec:ledger_key(),
null}, non_neg_integer()}.
%%%============================================================================ %%%============================================================================
%%% API %%% API
@ -288,6 +297,8 @@ handle_call(stop, _From, State) ->
handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Manifest0}, handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Manifest0},
State) -> State) ->
leveled_log:log("IC014", [State#state.reload_strategy,
State#state.max_run_length]),
% Empty the waste folder % Empty the waste folder
clear_waste(State), clear_waste(State),
SW = os:timestamp(), SW = os:timestamp(),
@ -298,7 +309,11 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Manifest0},
% Don't want to process a queued call waiting on an old manifest % Don't want to process a queued call waiting on an old manifest
[_Active|Manifest] = Manifest0, [_Active|Manifest] = Manifest0,
{FilterServer, MaxSQN} = InitiateFun(Checker), {FilterServer, MaxSQN} = InitiateFun(Checker),
ok = clerk_scorefilelist(self(), Manifest), NotRollingFun =
fun({_LowSQN, _FN, Pid, _LK}) ->
not leveled_cdb:cdb_isrolling(Pid)
end,
ok = clerk_scorefilelist(self(), lists:filter(NotRollingFun, Manifest)),
ScoringState = ScoringState =
#scoring_state{filter_fun = FilterFun, #scoring_state{filter_fun = FilterFun,
filter_server = FilterServer, filter_server = FilterServer,
@ -315,7 +330,8 @@ handle_cast({score_filelist, [Entry|Tail]}, State) ->
ScoringState#scoring_state.filter_server, ScoringState#scoring_state.filter_server,
ScoringState#scoring_state.max_sqn, ScoringState#scoring_state.max_sqn,
?SAMPLE_SIZE, ?SAMPLE_SIZE,
?BATCH_SIZE), ?BATCH_SIZE,
State#state.reload_strategy),
Candidate = Candidate =
#candidate{low_sqn = LowSQN, #candidate{low_sqn = LowSQN,
filename = FN, filename = FN,
@ -493,7 +509,10 @@ schedule_compaction(CompactionHours, RunsPerDay, CurrentTS) ->
%%% Internal functions %%% Internal functions
%%%============================================================================ %%%============================================================================
-spec check_single_file(pid(), fun(), any(), non_neg_integer(),
non_neg_integer(), non_neg_integer(),
leveled_codec:compaction_strategy()) ->
float().
%% @doc %% @doc
%% Get a score for a single CDB file in the journal. This will pull out a bunch %% Get a score for a single CDB file in the journal. This will pull out a bunch
%% of keys and sizes at random in an efficient way (by scanning the hashtable %% of keys and sizes at random in an efficient way (by scanning the hashtable
@ -505,13 +524,19 @@ schedule_compaction(CompactionHours, RunsPerDay, CurrentTS) ->
%% %%
%% The score is based on a random sample - so will not be consistent between %% The score is based on a random sample - so will not be consistent between
%% calls. %% calls.
check_single_file(CDB, FilterFun, FilterServer, MaxSQN, SampleSize, BatchSize) -> check_single_file(CDB, FilterFun, FilterServer, MaxSQN,
SampleSize, BatchSize,
ReloadStrategy) ->
FN = leveled_cdb:cdb_filename(CDB), FN = leveled_cdb:cdb_filename(CDB),
SW = os:timestamp(), SW = os:timestamp(),
PositionList = leveled_cdb:cdb_getpositions(CDB, SampleSize), PositionList = leveled_cdb:cdb_getpositions(CDB, SampleSize),
KeySizeList = fetch_inbatches(PositionList, BatchSize, CDB, []), KeySizeList = fetch_inbatches(PositionList, BatchSize, CDB, []),
Score = Score =
size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN), size_comparison_score(KeySizeList,
FilterFun,
FilterServer,
MaxSQN,
ReloadStrategy),
safely_log_filescore(PositionList, FN, Score, SW), safely_log_filescore(PositionList, FN, Score, SW),
Score. Score.
@ -523,7 +548,15 @@ safely_log_filescore(PositionList, FN, Score, SW) ->
div length(PositionList), div length(PositionList),
leveled_log:log_timer("IC004", [Score, AvgJump, FN], SW). leveled_log:log_timer("IC004", [Score, AvgJump, FN], SW).
size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN) -> -spec size_comparison_score(list(key_size() | corrupted_test_key_size()),
fun(),
any(),
non_neg_integer(),
leveled_codec:compaction_strategy()) ->
float().
size_comparison_score(KeySizeList,
FilterFun, FilterServer, MaxSQN,
RS) ->
FoldFunForSizeCompare = FoldFunForSizeCompare =
fun(KS, {ActSize, RplSize}) -> fun(KS, {ActSize, RplSize}) ->
case KS of case KS of
@ -532,7 +565,18 @@ size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN) ->
leveled_codec:is_full_journalentry({SQN, Type, PK}), leveled_codec:is_full_journalentry({SQN, Type, PK}),
case IsJournalEntry of case IsJournalEntry of
false -> false ->
{ActSize + Size - ?CRC_SIZE, RplSize}; TS = leveled_codec:get_tagstrategy(PK, RS),
% If the strategy is to retain key deltas, then
% scoring must reflect that. Key deltas are
% possible even if strategy does not allow as
% there is support for changing strategy from
% retain to recalc
case TS of
retain ->
{ActSize + Size - ?CRC_SIZE, RplSize};
_ ->
{ActSize, RplSize + Size - ?CRC_SIZE}
end;
true -> true ->
Check = FilterFun(FilterServer, PK, SQN), Check = FilterFun(FilterServer, PK, SQN),
case {Check, SQN > MaxSQN} of case {Check, SQN > MaxSQN} of
@ -567,12 +611,13 @@ fetch_inbatches([], _BatchSize, CDB, CheckedList) ->
ok = leveled_cdb:cdb_clerkcomplete(CDB), ok = leveled_cdb:cdb_clerkcomplete(CDB),
CheckedList; CheckedList;
fetch_inbatches(PositionList, BatchSize, CDB, CheckedList) -> fetch_inbatches(PositionList, BatchSize, CDB, CheckedList) ->
{Batch, Tail} = if {Batch, Tail} =
length(PositionList) >= BatchSize -> if
lists:split(BatchSize, PositionList); length(PositionList) >= BatchSize ->
true -> lists:split(BatchSize, PositionList);
{PositionList, []} true ->
end, {PositionList, []}
end,
KL_List = leveled_cdb:cdb_directfetch(CDB, Batch, key_size), KL_List = leveled_cdb:cdb_directfetch(CDB, Batch, key_size),
fetch_inbatches(Tail, BatchSize, CDB, CheckedList ++ KL_List). fetch_inbatches(Tail, BatchSize, CDB, CheckedList ++ KL_List).
@ -760,10 +805,11 @@ split_positions_into_batches(Positions, Journal, Batches) ->
%% recalculating the KeyChanges by looking at the object when we reload. So %% recalculating the KeyChanges by looking at the object when we reload. So
%% old objects can be discarded. %% old objects can be discarded.
%% %%
%% If the strategy is skip, we don't care about KeyDeltas. Note though, that %% If the strategy is recovr, we don't care about KeyDeltas. Note though, that
%% if the ledger is deleted it may not be possible to safely rebuild a KeyStore %% if the ledger is deleted it may not be possible to safely rebuild a KeyStore
%% if it contains index entries. The hot_backup approach is also not safe with %% if it contains index entries. The hot_backup approach is also not safe with
%% a `skip` strategy. %% a `recovr` strategy. The recovr strategy assumes faults in the ledger will
%% be resolved via application-level anti-entropy
filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) -> filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) ->
FoldFun = FoldFun =
filter_output_fun(FilterFun, FilterServer, MaxSQN, ReloadStrategy), filter_output_fun(FilterFun, FilterServer, MaxSQN, ReloadStrategy),
@ -998,6 +1044,7 @@ fetch_testcdb(RP) ->
check_single_file_test() -> check_single_file_test() ->
RP = "test/test_area/", RP = "test/test_area/",
RS = leveled_codec:inker_reload_strategy([]),
ok = filelib:ensure_dir(leveled_inker:filepath(RP, journal_dir)), ok = filelib:ensure_dir(leveled_inker:filepath(RP, journal_dir)),
{ok, CDB} = fetch_testcdb(RP), {ok, CDB} = fetch_testcdb(RP),
LedgerSrv1 = [{8, {o, "Bucket", "Key1", null}}, LedgerSrv1 = [{8, {o, "Bucket", "Key1", null}},
@ -1010,14 +1057,14 @@ check_single_file_test() ->
_ -> _ ->
replaced replaced
end end, end end,
Score1 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 4), Score1 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 4, RS),
?assertMatch(37.5, Score1), ?assertMatch(37.5, Score1),
LedgerFun2 = fun(_Srv, _Key, _ObjSQN) -> current end, LedgerFun2 = fun(_Srv, _Key, _ObjSQN) -> current end,
Score2 = check_single_file(CDB, LedgerFun2, LedgerSrv1, 9, 8, 4), Score2 = check_single_file(CDB, LedgerFun2, LedgerSrv1, 9, 8, 4, RS),
?assertMatch(100.0, Score2), ?assertMatch(100.0, Score2),
Score3 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 3), Score3 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 3, RS),
?assertMatch(37.5, Score3), ?assertMatch(37.5, Score3),
Score4 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 4, 8, 4), Score4 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 4, 8, 4, RS),
?assertMatch(75.0, Score4), ?assertMatch(75.0, Score4),
ok = leveled_cdb:cdb_deletepending(CDB), ok = leveled_cdb:cdb_deletepending(CDB),
ok = leveled_cdb:cdb_destroy(CDB). ok = leveled_cdb:cdb_destroy(CDB).
@ -1132,6 +1179,7 @@ compact_empty_file_test() ->
RP = "test/test_area/", RP = "test/test_area/",
ok = filelib:ensure_dir(leveled_inker:filepath(RP, journal_dir)), ok = filelib:ensure_dir(leveled_inker:filepath(RP, journal_dir)),
FN1 = leveled_inker:filepath(RP, 1, new_journal), FN1 = leveled_inker:filepath(RP, 1, new_journal),
RS = leveled_codec:inker_reload_strategy([]),
CDBopts = #cdb_options{binary_mode=true}, CDBopts = #cdb_options{binary_mode=true},
{ok, CDB1} = leveled_cdb:cdb_open_writer(FN1, CDBopts), {ok, CDB1} = leveled_cdb:cdb_open_writer(FN1, CDBopts),
{ok, FN2} = leveled_cdb:cdb_complete(CDB1), {ok, FN2} = leveled_cdb:cdb_complete(CDB1),
@ -1140,7 +1188,7 @@ compact_empty_file_test() ->
{2, {o, "Bucket", "Key2", null}}, {2, {o, "Bucket", "Key2", null}},
{3, {o, "Bucket", "Key3", null}}], {3, {o, "Bucket", "Key3", null}}],
LedgerFun1 = fun(_Srv, _Key, _ObjSQN) -> replaced end, LedgerFun1 = fun(_Srv, _Key, _ObjSQN) -> replaced end,
Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4), Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4, RS),
?assertMatch(0.0, Score1), ?assertMatch(0.0, Score1),
ok = leveled_cdb:cdb_deletepending(CDB2), ok = leveled_cdb:cdb_deletepending(CDB2),
ok = leveled_cdb:cdb_destroy(CDB2). ok = leveled_cdb:cdb_destroy(CDB2).
@ -1207,15 +1255,22 @@ compact_singlefile_totwosmallfiles_testto() ->
size_score_test() -> size_score_test() ->
KeySizeList = KeySizeList =
[{{1, ?INKT_STND, "Key1"}, 104}, [{{1, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key1">>, null}}, 104},
{{2, ?INKT_STND, "Key2"}, 124}, {{2, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key2">>, null}}, 124},
{{3, ?INKT_STND, "Key3"}, 144}, {{3, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key3">>, null}}, 144},
{{4, ?INKT_STND, "Key4"}, 154}, {{4, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key4">>, null}}, 154},
{{5, ?INKT_STND, "Key5", "Subk1"}, 164}, {{5,
{{6, ?INKT_STND, "Key6"}, 174}, ?INKT_STND,
{{7, ?INKT_STND, "Key7"}, 184}], {?STD_TAG, <<"B">>, <<"Key5">>, <<"Subk1">>}, null},
164},
{{6, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key6">>, null}}, 174},
{{7, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key7">>, null}}, 184}],
MaxSQN = 6, MaxSQN = 6,
CurrentList = ["Key1", "Key4", "Key5", "Key6"], CurrentList =
[{?STD_TAG, <<"B">>, <<"Key1">>, null},
{?STD_TAG, <<"B">>, <<"Key4">>, null},
{?STD_TAG, <<"B">>, <<"Key5">>, <<"Subk1">>},
{?STD_TAG, <<"B">>, <<"Key6">>, null}],
FilterFun = FilterFun =
fun(L, K, _SQN) -> fun(L, K, _SQN) ->
case lists:member(K, L) of case lists:member(K, L) of
@ -1223,7 +1278,13 @@ size_score_test() ->
false -> replaced false -> replaced
end end
end, end,
Score = size_comparison_score(KeySizeList, FilterFun, CurrentList, MaxSQN), Score =
size_comparison_score(KeySizeList,
FilterFun,
CurrentList,
MaxSQN,
leveled_codec:inker_reload_strategy([])),
io:format("Score ~w", [Score]),
?assertMatch(true, Score > 69.0), ?assertMatch(true, Score > 69.0),
?assertMatch(true, Score < 70.0). ?assertMatch(true, Score < 70.0).

View file

@ -101,7 +101,7 @@
ink_fetch/3, ink_fetch/3,
ink_keycheck/3, ink_keycheck/3,
ink_fold/4, ink_fold/4,
ink_loadpcl/4, ink_loadpcl/5,
ink_registersnapshot/2, ink_registersnapshot/2,
ink_confirmdelete/2, ink_confirmdelete/2,
ink_compactjournal/3, ink_compactjournal/3,
@ -133,7 +133,6 @@
-define(WASTE_FP, "waste"). -define(WASTE_FP, "waste").
-define(JOURNAL_FILEX, "cdb"). -define(JOURNAL_FILEX, "cdb").
-define(PENDING_FILEX, "pnd"). -define(PENDING_FILEX, "pnd").
-define(LOADING_PAUSE, 1000).
-define(LOADING_BATCH, 1000). -define(LOADING_BATCH, 1000).
-define(TEST_KC, {[], infinity}). -define(TEST_KC, {[], infinity}).
@ -321,7 +320,11 @@ ink_fold(Pid, MinSQN, FoldFuns, Acc) ->
{fold, MinSQN, FoldFuns, Acc, by_runner}, {fold, MinSQN, FoldFuns, Acc, by_runner},
infinity). infinity).
-spec ink_loadpcl(pid(), integer(), fun(), pid()) -> ok. -spec ink_loadpcl(pid(),
integer(),
leveled_bookie:initial_loadfun(),
fun((string(), non_neg_integer()) -> any()),
fun((any(), any()) -> ok)) -> ok.
%% %%
%% Function to prompt load of the Ledger at startup. The Penciller should %% Function to prompt load of the Ledger at startup. The Penciller should
%% have determined the lowest SQN not present in the Ledger, and the inker %% have determined the lowest SQN not present in the Ledger, and the inker
@ -330,20 +333,11 @@ ink_fold(Pid, MinSQN, FoldFuns, Acc) ->
%% %%
%% The load fun should be a five arity function like: %% The load fun should be a five arity function like:
%% load_fun(KeyInJournal, ValueInJournal, _Position, Acc0, ExtractFun) %% load_fun(KeyInJournal, ValueInJournal, _Position, Acc0, ExtractFun)
ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> ink_loadpcl(Pid, MinSQN, LoadFun, InitAccFun, BatchFun) ->
BatchFun =
fun(BatchAcc, _Acc) ->
push_to_penciller(Penciller, BatchAcc)
end,
InitAccFun =
fun(FN, CurrentMinSQN) ->
leveled_log:log("I0014", [FN, CurrentMinSQN]),
leveled_bookie:empty_ledgercache()
end,
gen_server:call(Pid, gen_server:call(Pid,
{fold, {fold,
MinSQN, MinSQN,
{FilterFun, InitAccFun, BatchFun}, {LoadFun, InitAccFun, BatchFun},
ok, ok,
as_ink}, as_ink},
infinity). infinity).
@ -1197,22 +1191,6 @@ foldfile_between_sequence(MinSQN, MaxSQN, FoldFuns,
end. end.
push_to_penciller(Penciller, LedgerCache) ->
% The push to penciller must start as a tree to correctly de-duplicate
% the list by order before becoming a de-duplicated list for loading
LC0 = leveled_bookie:loadqueue_ledgercache(LedgerCache),
push_to_penciller_loop(Penciller, LC0).
push_to_penciller_loop(Penciller, LedgerCache) ->
case leveled_bookie:push_ledgercache(Penciller, LedgerCache) of
returned ->
timer:sleep(?LOADING_PAUSE),
push_to_penciller_loop(Penciller, LedgerCache);
ok ->
ok
end.
sequencenumbers_fromfilenames(Filenames, Regex, IntName) -> sequencenumbers_fromfilenames(Filenames, Regex, IntName) ->
lists:foldl(fun(FN, Acc) -> lists:foldl(fun(FN, Acc) ->
case re:run(FN, case re:run(FN,

View file

@ -356,6 +356,8 @@
{"IC013", {"IC013",
{warn, "File with name ~s to be ignored in manifest as scanning for " {warn, "File with name ~s to be ignored in manifest as scanning for "
++ "first key returned empty - maybe corrupted"}}, ++ "first key returned empty - maybe corrupted"}},
{"IC014",
{info, "Compaction to be run with strategy ~w and max_run_length ~w"}},
{"CDB01", {"CDB01",
{info, "Opening file for writing with filename ~s"}}, {info, "Opening file for writing with filename ~s"}},

View file

@ -1511,7 +1511,7 @@ accumulate_positions({K, V}, {PosBinAcc, NoHashCount, HashAcc, LMDAcc}) ->
NHC:7/integer, NHC:7/integer,
PosBinAcc/binary>>, PosBinAcc/binary>>,
0, 0,
HashAcc, [H1|HashAcc],
LMDAcc0} LMDAcc0}
end; end;
false -> false ->
@ -2304,7 +2304,7 @@ split_lists(KVList1, SlotLists, N, PressMethod, IdxModDate) ->
-spec merge_lists(list(), list(), tuple(), sst_options(), boolean()) -> -spec merge_lists(list(), list(), tuple(), sst_options(), boolean()) ->
{list(), list(), list(tuple()), tuple()|null}. {list(), list(), list(tuple()), tuple()|null}.
%% @doc %% @doc
%% Merge lists when merging across more thna one file. KVLists that are %% Merge lists when merging across more than one file. KVLists that are
%% provided may include pointers to fetch more Keys/Values from the source %% provided may include pointers to fetch more Keys/Values from the source
%% file %% file
merge_lists(KVList1, KVList2, LevelInfo, SSTOpts, IndexModDate) -> merge_lists(KVList1, KVList2, LevelInfo, SSTOpts, IndexModDate) ->
@ -3319,6 +3319,7 @@ simple_persisted_tester(SSTNewFun) ->
Acc Acc
end end
end, end,
true = [] == MapFun({FirstKey, "V"}, []), % coverage cheat within MapFun
KVList3 = lists:foldl(MapFun, [], KVList2), KVList3 = lists:foldl(MapFun, [], KVList2),
SW2 = os:timestamp(), SW2 = os:timestamp(),
lists:foreach(fun({K, H, _V}) -> lists:foreach(fun({K, H, _V}) ->

View file

@ -2,11 +2,14 @@
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include("include/leveled.hrl"). -include("include/leveled.hrl").
-export([all/0]). -export([all/0]).
-export([application_defined_tag/1 -export([
application_defined_tag/1,
bespoketag_recalc/1
]). ]).
all() -> [ all() -> [
application_defined_tag application_defined_tag,
bespoketag_recalc
]. ].
@ -62,6 +65,8 @@ application_defined_tag_tester(KeyCount, Tag, Functions, ExpectMD) ->
StartOpts1 = [{root_path, RootPath}, StartOpts1 = [{root_path, RootPath},
{sync_strategy, testutil:sync_strategy()}, {sync_strategy, testutil:sync_strategy()},
{log_level, warn}, {log_level, warn},
{reload_strategy,
[{bespoke_tag1, retain}, {bespoke_tag2, retain}]},
{override_functions, Functions}], {override_functions, Functions}],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1), {ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
Value = leveled_rand:rand_bytes(512), Value = leveled_rand:rand_bytes(512),
@ -108,8 +113,6 @@ application_defined_tag_tester(KeyCount, Tag, Functions, ExpectMD) ->
ok = leveled_bookie:book_close(Bookie2). ok = leveled_bookie:book_close(Bookie2).
object_generator(Count, V) -> object_generator(Count, V) ->
Hash = erlang:phash2({count, V}), Hash = erlang:phash2({count, V}),
Random = leveled_rand:uniform(1000), Random = leveled_rand:uniform(1000),
@ -119,3 +122,113 @@ object_generator(Count, V) ->
Key, Key,
[{hash, Hash}, {shard, Count rem 10}, [{hash, Hash}, {shard, Count rem 10},
{random, Random}, {value, V}]}. {random, Random}, {value, V}]}.
bespoketag_recalc(_Config) ->
%% Get a sensible behaviour using the recalc compaction strategy with a
%% bespoke tag
RootPath = testutil:reset_filestructure(),
B0 = <<"B0">>,
KeyCount = 7000,
ExtractMDFun =
fun(bespoke_tag, Size, Obj) ->
[{index, IL}, {value, _V}] = Obj,
{{erlang:phash2(term_to_binary(Obj)),
Size,
{index, IL}},
[os:timestamp()]}
end,
CalcIndexFun =
fun(bespoke_tag, UpdMeta, PrvMeta) ->
% io:format("UpdMeta ~w PrvMeta ~w~n", [UpdMeta, PrvMeta]),
{index, UpdIndexes} = element(3, UpdMeta),
IndexDeltas =
case PrvMeta of
not_present ->
UpdIndexes;
PrvMeta when is_tuple(PrvMeta) ->
{index, PrvIndexes} = element(3, PrvMeta),
lists:subtract(UpdIndexes, PrvIndexes)
end,
lists:map(fun(I) -> {add, <<"temp_int">>, I} end, IndexDeltas)
end,
BookOpts = [{root_path, RootPath},
{cache_size, 1000},
{max_journalobjectcount, 6000},
{max_pencillercachesize, 8000},
{sync_strategy, testutil:sync_strategy()},
{reload_strategy, [{bespoke_tag, recalc}]},
{override_functions,
[{extract_metadata, ExtractMDFun},
{diff_indexspecs, CalcIndexFun}]}],
{ok, Book1} = leveled_bookie:book_start(BookOpts),
LoadFun =
fun(Book, MustFind) ->
fun(I) ->
testutil:stdload_object(Book,
B0, integer_to_binary(I rem KeyCount),
I, erlang:phash2({value, I}),
infinity, bespoke_tag, false, MustFind)
end
end,
lists:foreach(LoadFun(Book1, false), lists:seq(1, KeyCount)),
lists:foreach(LoadFun(Book1, true), lists:seq(KeyCount + 1, KeyCount * 2)),
FoldFun =
fun(_B0, {IV0, _K0}, Acc) ->
case IV0 - 1 of
Acc ->
Acc + 1;
_Unexpected ->
% io:format("Eh? - ~w ~w~n", [Unexpected, Acc]),
Acc + 1
end
end,
CountFold =
fun(Book, CurrentCount) ->
leveled_bookie:book_indexfold(Book,
B0,
{FoldFun, 0},
{<<"temp_int">>, 0, CurrentCount},
{true, undefined})
end,
{async, FolderA} = CountFold(Book1, 2 * KeyCount),
CountA = FolderA(),
io:format("Counted double index entries ~w - everything loaded OK~n",
[CountA]),
true = 2 * KeyCount == CountA,
ok = leveled_bookie:book_close(Book1),
{ok, Book2} = leveled_bookie:book_start(BookOpts),
lists:foreach(LoadFun(Book2, true), lists:seq(KeyCount * 2 + 1, KeyCount * 3)),
{async, FolderB} = CountFold(Book2, 3 * KeyCount),
CountB = FolderB(),
true = 3 * KeyCount == CountB,
testutil:compact_and_wait(Book2),
ok = leveled_bookie:book_close(Book2),
io:format("Restart from blank ledger~n"),
leveled_penciller:clean_testdir(proplists:get_value(root_path, BookOpts) ++
"/ledger"),
{ok, Book3} = leveled_bookie:book_start(BookOpts),
{async, FolderC} = CountFold(Book3, 3 * KeyCount),
CountC = FolderC(),
io:format("All index entries ~w present - recalc ok~n",
[CountC]),
true = 3 * KeyCount == CountC,
ok = leveled_bookie:book_close(Book3),
testutil:reset_filestructure().

View file

@ -7,7 +7,10 @@
hot_backup_simple/1, hot_backup_simple/1,
hot_backup_changes/1, hot_backup_changes/1,
retain_strategy/1, retain_strategy/1,
recalc_strategy/1,
recalc_transition_strategy/1,
recovr_strategy/1, recovr_strategy/1,
stdtag_recalc/1,
aae_missingjournal/1, aae_missingjournal/1,
aae_bustedjournal/1, aae_bustedjournal/1,
journal_compaction_bustedjournal/1, journal_compaction_bustedjournal/1,
@ -21,13 +24,16 @@ all() -> [
hot_backup_simple, hot_backup_simple,
hot_backup_changes, hot_backup_changes,
retain_strategy, retain_strategy,
recalc_strategy,
recalc_transition_strategy,
recovr_strategy, recovr_strategy,
aae_missingjournal, aae_missingjournal,
aae_bustedjournal, aae_bustedjournal,
journal_compaction_bustedjournal, journal_compaction_bustedjournal,
close_duringcompaction, close_duringcompaction,
allkeydelta_journal_multicompact, allkeydelta_journal_multicompact,
recompact_keydeltas recompact_keydeltas,
stdtag_recalc
]. ].
@ -145,8 +151,6 @@ recovery_with_samekeyupdates(_Config) ->
testutil:reset_filestructure(). testutil:reset_filestructure().
hot_backup_simple(_Config) -> hot_backup_simple(_Config) ->
% The journal may have a hot backup. This allows for an online Bookie % The journal may have a hot backup. This allows for an online Bookie
% to be sent a message to prepare a backup function, which an asynchronous % to be sent a message to prepare a backup function, which an asynchronous
@ -233,85 +237,172 @@ hot_backup_changes(_Config) ->
testutil:reset_filestructure(). testutil:reset_filestructure().
retain_strategy(_Config) -> retain_strategy(_Config) ->
rotate_wipe_compact(retain, retain).
recalc_strategy(_Config) ->
rotate_wipe_compact(recalc, recalc).
recalc_transition_strategy(_Config) ->
rotate_wipe_compact(retain, recalc).
rotate_wipe_compact(Strategy1, Strategy2) ->
RootPath = testutil:reset_filestructure(), RootPath = testutil:reset_filestructure(),
BookOpts = [{root_path, RootPath}, BookOpts = [{root_path, RootPath},
{cache_size, 1000}, {cache_size, 1000},
{max_journalobjectcount, 5000}, {max_journalobjectcount, 5000},
{sync_strategy, testutil:sync_strategy()}, {sync_strategy, testutil:sync_strategy()},
{reload_strategy, [{?RIAK_TAG, retain}]}], {reload_strategy, [{?RIAK_TAG, Strategy1}]}],
BookOptsAlt = [{root_path, RootPath}, BookOptsAlt = [{root_path, RootPath},
{cache_size, 1000}, {cache_size, 1000},
{max_journalobjectcount, 2000}, {max_journalobjectcount, 2000},
{sync_strategy, testutil:sync_strategy()}, {sync_strategy, testutil:sync_strategy()},
{reload_strategy, [{?RIAK_TAG, retain}]}, {reload_strategy, [{?RIAK_TAG, Strategy2}]},
{max_run_length, 8}], {max_run_length, 8}],
{ok, Spcl3, LastV3} = rotating_object_check(BookOpts, "Bucket3", 800), {ok, Spcl3, LastV3} = rotating_object_check(BookOpts, "Bucket3", 400),
ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3}]), ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3}]),
{ok, Spcl4, LastV4} = rotating_object_check(BookOpts, "Bucket4", 1600), {ok, Spcl4, LastV4} = rotating_object_check(BookOpts, "Bucket4", 800),
ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3}, ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3},
{"Bucket4", Spcl4, LastV4}]), {"Bucket4", Spcl4, LastV4}]),
{ok, Spcl5, LastV5} = rotating_object_check(BookOpts, "Bucket5", 3200), {ok, Spcl5, LastV5} = rotating_object_check(BookOpts, "Bucket5", 1600),
ok = restart_from_blankledger(BookOptsAlt, [{"Bucket3", Spcl3, LastV3},
{"Bucket5", Spcl5, LastV5}]),
{ok, Spcl6, LastV6} = rotating_object_check(BookOpts, "Bucket6", 6400),
ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3}, ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3},
{"Bucket4", Spcl4, LastV4}, {"Bucket5", Spcl5, LastV5}]),
{"Bucket5", Spcl5, LastV5}, {ok, Spcl6, LastV6} = rotating_object_check(BookOpts, "Bucket6", 3200),
{"Bucket6", Spcl6, LastV6}]),
{ok, Book1} = leveled_bookie:book_start(BookOpts), {ok, Book1} = leveled_bookie:book_start(BookOpts),
compact_and_wait(Book1), compact_and_wait(Book1),
compact_and_wait(Book1),
ok = leveled_bookie:book_close(Book1), ok = leveled_bookie:book_close(Book1),
ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3}, ok = restart_from_blankledger(BookOptsAlt, [{"Bucket3", Spcl3, LastV3},
{"Bucket4", Spcl4, LastV4}, {"Bucket4", Spcl4, LastV4},
{"Bucket5", Spcl5, LastV5}, {"Bucket5", Spcl5, LastV5},
{"Bucket6", Spcl6, LastV6}]), {"Bucket6", Spcl6, LastV6}]),
{ok, Book2} = leveled_bookie:book_start(BookOptsAlt), {ok, Book2} = leveled_bookie:book_start(BookOptsAlt),
compact_and_wait(Book2),
ok = leveled_bookie:book_close(Book2),
{KSpcL2, _V2} = testutil:put_indexed_objects(Book2, "AltBucket6", 3000), ok = restart_from_blankledger(BookOptsAlt, [{"Bucket3", Spcl3, LastV3},
{"Bucket4", Spcl4, LastV4},
{"Bucket5", Spcl5, LastV5},
{"Bucket6", Spcl6, LastV6}]),
{ok, Book3} = leveled_bookie:book_start(BookOptsAlt),
{KSpcL2, _V2} = testutil:put_indexed_objects(Book3, "AltBucket6", 3000),
Q2 = fun(RT) -> {index_query, Q2 = fun(RT) -> {index_query,
"AltBucket6", "AltBucket6",
{fun testutil:foldkeysfun/3, []}, {fun testutil:foldkeysfun/3, []},
{"idx1_bin", "#", "|"}, {"idx1_bin", "#", "|"},
{RT, undefined}} {RT, undefined}}
end, end,
{async, KFolder2A} = leveled_bookie:book_returnfolder(Book2, Q2(false)), {async, KFolder2A} = leveled_bookie:book_returnfolder(Book3, Q2(false)),
KeyList2A = lists:usort(KFolder2A()), KeyList2A = lists:usort(KFolder2A()),
true = length(KeyList2A) == 3000, true = length(KeyList2A) == 3000,
DeleteFun = DeleteFun =
fun({DK, [{add, DIdx, DTerm}]}) -> fun({DK, [{add, DIdx, DTerm}]}) ->
ok = testutil:book_riakdelete(Book2, ok = testutil:book_riakdelete(Book3,
"AltBucket6", "AltBucket6",
DK, DK,
[{remove, DIdx, DTerm}]) [{remove, DIdx, DTerm}])
end, end,
lists:foreach(DeleteFun, KSpcL2), lists:foreach(DeleteFun, KSpcL2),
{async, KFolder2AD} = leveled_bookie:book_returnfolder(Book2, Q2(false)),
KeyList2AD = lists:usort(KFolder2AD()),
true = length(KeyList2AD) == 0,
ok = leveled_bookie:book_close(Book2),
{ok, Book3} = leveled_bookie:book_start(BookOptsAlt),
io:format("Compact after deletions~n"),
compact_and_wait(Book3),
compact_and_wait(Book3),
{async, KFolder3AD} = leveled_bookie:book_returnfolder(Book3, Q2(false)), {async, KFolder3AD} = leveled_bookie:book_returnfolder(Book3, Q2(false)),
KeyList3AD = lists:usort(KFolder3AD()), KeyList3AD = lists:usort(KFolder3AD()),
true = length(KeyList3AD) == 0, true = length(KeyList3AD) == 0,
ok = leveled_bookie:book_close(Book3), ok = leveled_bookie:book_close(Book3),
{ok, Book4} = leveled_bookie:book_start(BookOptsAlt),
io:format("Compact after deletions~n"),
compact_and_wait(Book4),
{async, KFolder4AD} = leveled_bookie:book_returnfolder(Book4, Q2(false)),
KeyList4AD = lists:usort(KFolder4AD()),
true = length(KeyList4AD) == 0,
ok = leveled_bookie:book_close(Book4),
testutil:reset_filestructure().
stdtag_recalc(_Config) ->
%% Setting the ?STD_TAG to do recalc, should result in the ?STD_TAG
%% behaving like recovr - as no recalc is done for ?STD_TAG
%% NOTE -This is a test to confirm bad things happen!
RootPath = testutil:reset_filestructure(),
B0 = <<"B0">>,
KeyCount = 7000,
BookOpts = [{root_path, RootPath},
{cache_size, 1000},
{max_journalobjectcount, 5000},
{max_pencillercachesize, 10000},
{sync_strategy, testutil:sync_strategy()},
{reload_strategy, [{?STD_TAG, recalc}]}],
{ok, Book1} = leveled_bookie:book_start(BookOpts),
LoadFun =
fun(Book) ->
fun(I) ->
testutil:stdload_object(Book,
B0, erlang:phash2(I rem KeyCount),
I, erlang:phash2({value, I}),
infinity, ?STD_TAG, false, false)
end
end,
lists:foreach(LoadFun(Book1), lists:seq(1, KeyCount)),
lists:foreach(LoadFun(Book1), lists:seq(KeyCount + 1, KeyCount * 2)),
CountFold =
fun(Book, CurrentCount) ->
leveled_bookie:book_indexfold(Book,
B0,
{fun(_BF, _KT, Acc) -> Acc + 1 end,
0},
{<<"temp_int">>, 0, CurrentCount},
{true, undefined})
end,
{async, FolderA} = CountFold(Book1, 2 * KeyCount),
CountA = FolderA(),
io:format("Counted double index entries ~w - everything loaded OK~n",
[CountA]),
true = 2 * KeyCount == CountA,
ok = leveled_bookie:book_close(Book1),
{ok, Book2} = leveled_bookie:book_start(BookOpts),
lists:foreach(LoadFun(Book2), lists:seq(KeyCount * 2 + 1, KeyCount * 3)),
{async, FolderB} = CountFold(Book2, 3 * KeyCount),
CountB = FolderB(),
io:format("Maybe counted less index entries ~w - everything not loaded~n",
[CountB]),
true = 3 * KeyCount >= CountB,
compact_and_wait(Book2),
ok = leveled_bookie:book_close(Book2),
io:format("Restart from blank ledger"),
leveled_penciller:clean_testdir(proplists:get_value(root_path, BookOpts) ++
"/ledger"),
{ok, Book3} = leveled_bookie:book_start(BookOpts),
{async, FolderC} = CountFold(Book3, 3 * KeyCount),
CountC = FolderC(),
io:format("Missing index entries ~w - recalc not supported on ?STD_TAG~n",
[CountC]),
true = 3 * KeyCount > CountC,
ok = leveled_bookie:book_close(Book3),
testutil:reset_filestructure(). testutil:reset_filestructure().
@ -845,6 +936,10 @@ rotating_object_check(BookOpts, B, NumberOfObjects) ->
B, B,
KSpcL2, KSpcL2,
false), false),
ok = testutil:check_indexed_objects(Book1,
B,
KSpcL1 ++ KSpcL2 ++ KSpcL3,
V3),
ok = leveled_bookie:book_close(Book1), ok = leveled_bookie:book_close(Book1),
{ok, Book2} = leveled_bookie:book_start(BookOpts), {ok, Book2} = leveled_bookie:book_start(BookOpts),
ok = testutil:check_indexed_objects(Book2, ok = testutil:check_indexed_objects(Book2,

View file

@ -905,7 +905,7 @@ handoff(_Config) ->
{sync_strategy, sync}], {sync_strategy, sync}],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1), {ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
% Add some noe Riak objects in - which should be ignored in folds. % Add some none Riak objects in - which should be ignored in folds.
Hashes = testutil:stdload(Bookie1, 1000), Hashes = testutil:stdload(Bookie1, 1000),
% Generate 200K objects to be used within the test, and load them into % Generate 200K objects to be used within the test, and load them into
% the first store (outputting the generated objects as a list of lists) % the first store (outputting the generated objects as a list of lists)

View file

@ -10,6 +10,7 @@
stdload/2, stdload/2,
stdload_expiring/3, stdload_expiring/3,
stdload_object/6, stdload_object/6,
stdload_object/9,
reset_filestructure/0, reset_filestructure/0,
reset_filestructure/1, reset_filestructure/1,
check_bucket_stats/2, check_bucket_stats/2,
@ -59,7 +60,8 @@
get_value_from_objectlistitem/1, get_value_from_objectlistitem/1,
numbered_key/1, numbered_key/1,
fixed_bin_key/1, fixed_bin_key/1,
convert_to_seconds/1]). convert_to_seconds/1,
compact_and_wait/1]).
-define(RETURN_TERMS, {true, undefined}). -define(RETURN_TERMS, {true, undefined}).
-define(SLOWOFFER_DELAY, 5). -define(SLOWOFFER_DELAY, 5).
@ -68,6 +70,7 @@
-define(MD_VTAG, <<"X-Riak-VTag">>). -define(MD_VTAG, <<"X-Riak-VTag">>).
-define(MD_LASTMOD, <<"X-Riak-Last-Modified">>). -define(MD_LASTMOD, <<"X-Riak-Last-Modified">>).
-define(MD_DELETED, <<"X-Riak-Deleted">>). -define(MD_DELETED, <<"X-Riak-Deleted">>).
-define(MD_INDEX, <<"index">>).
-define(EMPTY_VTAG_BIN, <<"e">>). -define(EMPTY_VTAG_BIN, <<"e">>).
-define(ROOT_PATH, "test"). -define(ROOT_PATH, "test").
@ -240,17 +243,35 @@ stdload_expiring(Book, KeyCount, TTL, V, Acc) ->
stdload_expiring(Book, KeyCount - 1, TTL, V, [{I, B, K}|Acc]). stdload_expiring(Book, KeyCount - 1, TTL, V, [{I, B, K}|Acc]).
stdload_object(Book, B, K, I, V, TTL) -> stdload_object(Book, B, K, I, V, TTL) ->
Obj = [{index, I}, {value, V}], stdload_object(Book, B, K, I, V, TTL, ?STD_TAG, true, false).
IdxSpecs =
case leveled_bookie:book_get(Book, B, K) of stdload_object(Book, B, K, I, V, TTL, Tag, RemovePrev2i, MustFind) ->
{ok, PrevObj} -> Obj = [{index, [I]}, {value, V}],
{index, OldI} = lists:keyfind(index, 1, PrevObj), {IdxSpecs, Obj0} =
io:format("Remove index ~w for ~w~n", [OldI, I]), case {leveled_bookie:book_get(Book, B, K, Tag), MustFind} of
[{remove, <<"temp_int">>, OldI}, {add, <<"temp_int">>, I}]; {{ok, PrevObj}, _} ->
not_found -> {index, PrevIs} = lists:keyfind(index, 1, PrevObj),
[{add, <<"temp_int">>, I}] case RemovePrev2i of
true ->
MapFun =
fun(OldI) -> {remove, <<"temp_int">>, OldI} end,
{[{add, <<"temp_int">>, I}|lists:map(MapFun, PrevIs)],
Obj};
false ->
{[{add, <<"temp_int">>, I}],
[{index, [I|PrevIs]}, {value, V}]}
end;
{not_found, false} ->
{[{add, <<"temp_int">>, I}], Obj}
end,
R =
case TTL of
infinity ->
leveled_bookie:book_put(Book, B, K, Obj0, IdxSpecs, Tag);
TTL when is_integer(TTL) ->
leveled_bookie:book_tempput(Book, B, K, Obj0,
IdxSpecs, Tag, TTL)
end, end,
R = leveled_bookie:book_tempput(Book, B, K, Obj, IdxSpecs, ?STD_TAG, TTL),
case R of case R of
ok -> ok ->
ok; ok;
@ -261,6 +282,7 @@ stdload_object(Book, B, K, I, V, TTL) ->
reset_filestructure() -> reset_filestructure() ->
reset_filestructure(0, ?ROOT_PATH). reset_filestructure(0, ?ROOT_PATH).
@ -517,23 +539,30 @@ set_object(Bucket, Key, Value, IndexGen) ->
set_object(Bucket, Key, Value, IndexGen, []). set_object(Bucket, Key, Value, IndexGen, []).
set_object(Bucket, Key, Value, IndexGen, Indexes2Remove) -> set_object(Bucket, Key, Value, IndexGen, Indexes2Remove) ->
set_object(Bucket, Key, Value, IndexGen, Indexes2Remove, []).
set_object(Bucket, Key, Value, IndexGen, Indexes2Remove, IndexesNotToRemove) ->
IdxSpecs = IndexGen(),
Indexes =
lists:map(fun({add, IdxF, IdxV}) -> {IdxF, IdxV} end,
IdxSpecs ++ IndexesNotToRemove),
Obj = {Bucket, Obj = {Bucket,
Key, Key,
Value, Value,
IndexGen() ++ lists:map(fun({add, IdxF, IdxV}) -> IdxSpecs ++
{remove, IdxF, IdxV} end, lists:map(fun({add, IdxF, IdxV}) -> {remove, IdxF, IdxV} end,
Indexes2Remove), Indexes2Remove),
[{"MDK", "MDV" ++ Key}, [{<<"MDK">>, "MDV" ++ Key},
{"MDK2", "MDV" ++ Key}, {<<"MDK2">>, "MDV" ++ Key},
{?MD_LASTMOD, os:timestamp()}]}, {?MD_LASTMOD, os:timestamp()},
{B1, K1, V1, Spec1, MD} = Obj, {?MD_INDEX, Indexes}]},
{B1, K1, V1, DeltaSpecs, MD} = Obj,
Content = #r_content{metadata=dict:from_list(MD), value=V1}, Content = #r_content{metadata=dict:from_list(MD), value=V1},
{#r_object{bucket=B1, {#r_object{bucket=B1,
key=K1, key=K1,
contents=[Content], contents=[Content],
vclock=generate_vclock()}, vclock=generate_vclock()},
Spec1}. DeltaSpecs}.
get_value_from_objectlistitem({_Int, Obj, _Spc}) -> get_value_from_objectlistitem({_Int, Obj, _Spc}) ->
[Content] = Obj#r_object.contents, [Content] = Obj#r_object.contents,
@ -762,26 +791,39 @@ put_altered_indexed_objects(Book, Bucket, KSpecL) ->
put_altered_indexed_objects(Book, Bucket, KSpecL, true). put_altered_indexed_objects(Book, Bucket, KSpecL, true).
put_altered_indexed_objects(Book, Bucket, KSpecL, RemoveOld2i) -> put_altered_indexed_objects(Book, Bucket, KSpecL, RemoveOld2i) ->
IndexGen = testutil:get_randomindexes_generator(1), IndexGen = get_randomindexes_generator(1),
V = testutil:get_compressiblevalue(), V = get_compressiblevalue(),
RplKSpecL = lists:map(fun({K, Spc}) -> FindAdditionFun = fun(SpcItem) -> element(1, SpcItem) == add end,
AddSpc = if MapFun =
RemoveOld2i == true -> fun({K, Spc}) ->
[lists:keyfind(add, 1, Spc)]; OldSpecs = lists:filter(FindAdditionFun, Spc),
RemoveOld2i == false -> {RemoveSpc, AddSpc} =
[] case RemoveOld2i of
end, true ->
{O, AltSpc} = testutil:set_object(Bucket, {OldSpecs, []};
K, false ->
V, {[], OldSpecs}
IndexGen, end,
AddSpc), {O, DeltaSpecs} =
case book_riakput(Book, O, AltSpc) of set_object(Bucket, K, V,
ok -> ok; IndexGen, RemoveSpc, AddSpc),
pause -> timer:sleep(?SLOWOFFER_DELAY) % DeltaSpecs should be new indexes added, and any old indexes which
end, % have been removed by this change where RemoveOld2i is true.
{K, AltSpc} end, %
KSpecL), % The actual indexes within the object should reflect any history
% of indexes i.e. when RemoveOld2i is false.
%
% The [{Key, SpecL}] returned should accrue additions over loops if
% RemoveOld2i is false
case book_riakput(Book, O, DeltaSpecs) of
ok -> ok;
pause -> timer:sleep(?SLOWOFFER_DELAY)
end,
% Note that order in the SpecL is important, as
% check_indexed_objects, needs to find the latest item added
{K, DeltaSpecs ++ AddSpc}
end,
RplKSpecL = lists:map(MapFun, KSpecL),
{RplKSpecL, V}. {RplKSpecL, V}.
rotating_object_check(RootPath, B, NumberOfObjects) -> rotating_object_check(RootPath, B, NumberOfObjects) ->
@ -790,16 +832,16 @@ rotating_object_check(RootPath, B, NumberOfObjects) ->
{max_journalsize, 5000000}, {max_journalsize, 5000000},
{sync_strategy, sync_strategy()}], {sync_strategy, sync_strategy()}],
{ok, Book1} = leveled_bookie:book_start(BookOpts), {ok, Book1} = leveled_bookie:book_start(BookOpts),
{KSpcL1, V1} = testutil:put_indexed_objects(Book1, B, NumberOfObjects), {KSpcL1, V1} = put_indexed_objects(Book1, B, NumberOfObjects),
ok = testutil:check_indexed_objects(Book1, B, KSpcL1, V1), ok = check_indexed_objects(Book1, B, KSpcL1, V1),
{KSpcL2, V2} = testutil:put_altered_indexed_objects(Book1, B, KSpcL1), {KSpcL2, V2} = put_altered_indexed_objects(Book1, B, KSpcL1),
ok = testutil:check_indexed_objects(Book1, B, KSpcL2, V2), ok = check_indexed_objects(Book1, B, KSpcL2, V2),
{KSpcL3, V3} = testutil:put_altered_indexed_objects(Book1, B, KSpcL2), {KSpcL3, V3} = put_altered_indexed_objects(Book1, B, KSpcL2),
ok = leveled_bookie:book_close(Book1), ok = leveled_bookie:book_close(Book1),
{ok, Book2} = leveled_bookie:book_start(BookOpts), {ok, Book2} = leveled_bookie:book_start(BookOpts),
ok = testutil:check_indexed_objects(Book2, B, KSpcL3, V3), ok = check_indexed_objects(Book2, B, KSpcL3, V3),
{KSpcL4, V4} = testutil:put_altered_indexed_objects(Book2, B, KSpcL3), {KSpcL4, V4} = put_altered_indexed_objects(Book2, B, KSpcL3),
ok = testutil:check_indexed_objects(Book2, B, KSpcL4, V4), ok = check_indexed_objects(Book2, B, KSpcL4, V4),
Query = {keylist, ?RIAK_TAG, B, {fun foldkeysfun/3, []}}, Query = {keylist, ?RIAK_TAG, B, {fun foldkeysfun/3, []}},
{async, BList} = leveled_bookie:book_returnfolder(Book2, Query), {async, BList} = leveled_bookie:book_returnfolder(Book2, Query),
true = NumberOfObjects == length(BList()), true = NumberOfObjects == length(BList()),
@ -839,16 +881,9 @@ restore_topending(RootPath, FileName) ->
find_journals(RootPath) -> find_journals(RootPath) ->
{ok, FNsA_J} = file:list_dir(RootPath ++ "/journal/journal_files"), {ok, FNsA_J} = file:list_dir(RootPath ++ "/journal/journal_files"),
{ok, Regex} = re:compile(".*\.cdb"), % Must not return a file with the .pnd extension
CDBFiles = lists:foldl(fun(FN, Acc) -> case re:run(FN, Regex) of CDBFiles =
nomatch -> lists:filter(fun(FN) -> filename:extension(FN) == ".cdb" end, FNsA_J),
Acc;
_ ->
[FN|Acc]
end
end,
[],
FNsA_J),
CDBFiles. CDBFiles.
convert_to_seconds({MegaSec, Seconds, _MicroSec}) -> convert_to_seconds({MegaSec, Seconds, _MicroSec}) ->
@ -863,3 +898,24 @@ get_aae_segment({Type, Bucket}, Key) ->
leveled_tictac:keyto_segment32(<<Type/binary, Bucket/binary, Key/binary>>); leveled_tictac:keyto_segment32(<<Type/binary, Bucket/binary, Key/binary>>);
get_aae_segment(Bucket, Key) -> get_aae_segment(Bucket, Key) ->
leveled_tictac:keyto_segment32(<<Bucket/binary, Key/binary>>). leveled_tictac:keyto_segment32(<<Bucket/binary, Key/binary>>).
compact_and_wait(Book) ->
compact_and_wait(Book, 20000).
compact_and_wait(Book, WaitForDelete) ->
ok = leveled_bookie:book_compactjournal(Book, 30000),
F = fun leveled_bookie:book_islastcompactionpending/1,
lists:foldl(fun(X, Pending) ->
case Pending of
false ->
false;
true ->
io:format("Loop ~w waiting for journal "
++ "compaction to complete~n", [X]),
timer:sleep(20000),
F(Book)
end end,
true,
lists:seq(1, 15)),
io:format("Waiting for journal deletes~n"),
timer:sleep(WaitForDelete).