Merge branch 'master' into mas-i311-mergeselector

This commit is contained in:
Martin Sumner 2020-03-30 20:07:05 +01:00
commit 9e56bfa947
13 changed files with 859 additions and 239 deletions

View file

@ -120,7 +120,7 @@ Three potential recovery strategies are supported to provide some flexibility fo
- retain - on compaction KeyDeltas are retained in the Journal, only values are removed.
- recalc (not yet implemented) - the compaction rules assume that on recovery the key changes will be recalculated by comparing the change with the current database state. In recovery the key changes will be recalculated by comparing the change with the current database state.
- recalc - the compaction rules assume that on recovery the key changes will be recalculated by comparing the change with the current database state. In recovery the key changes will be recalculated by comparing the change with the current database state. A user-defined function should be passed in at startup to achieve this recalculation (to override `leveled_head:diff_indexspecs/3`).
### Hot Backups

View file

@ -77,11 +77,11 @@ However, what if the Ledger had been erased? This could happen due to some corr
The are three potential strategies:
- `skip` - don't worry about this scenario, require the Ledger to be backed up;
- `recovr` - don't worry about this scenario, require the Ledger to be backed up;
- `retain` - discard the object itself on compaction but keep the key changes;
- `recalc` - recalculate the indexes on reload by comparing the information on the object with the current state of the Ledger (as would be required by the PUT process when comparing IndexSpecs at PUT time).
There is no code for `recalc` at present it is simply a logical possibility. So to set a reload strategy there should be an entry like `{reload_strategy, [{TagName, skip|retain}]}`. By default tags are pre-set to `retain`. If there is no need to handle a corrupted Ledger, then all tags could be set to `skip`.
To set a reload strategy requires a list of tuples to match tag names to strategy `{reload_strategy, [{TagName, recovr|retain|recalc}]}`. By default tags are pre-set to `retain`. If there is no need to handle a corrupted Ledger, then all tags could be set to `recovr` - this assumes that either the ledger files are protected by some other means from corruption, or an external anti-entropy mechanism will recover the lost data.
## Compression Method

View file

@ -93,8 +93,6 @@
]).
-export([empty_ledgercache/0,
loadqueue_ledgercache/1,
push_ledgercache/2,
snapshot_store/6,
fetch_value/2,
journal_notfound/4]).
@ -105,6 +103,7 @@
-include_lib("eunit/include/eunit.hrl").
-define(LOADING_PAUSE, 1000).
-define(CACHE_SIZE, 2500).
-define(MIN_CACHE_SIZE, 100).
-define(MIN_PCL_CACHE_SIZE, 400).
@ -166,7 +165,7 @@
-record(state, {inker :: pid() | undefined,
penciller :: pid() | undefined,
cache_size :: integer() | undefined,
ledger_cache = #ledger_cache{},
ledger_cache = #ledger_cache{} :: ledger_cache(),
is_snapshot :: boolean() | undefined,
slow_offer = false :: boolean(),
@ -310,12 +309,16 @@
% resilience outside of the store), or retain (retain a history of
% key changes, even when the object value has been compacted).
%
% There is a third, theoretical and untested strategy, which is
% recalc - which would require when reloading the Ledger from the
% Journal, to recalculate the index changes based on the current
% state of the Ledger and the object metadata.
% There is a third strategy, which is recalc, where on reloading
% the Ledger from the Journal, the key changes are recalculated by
% comparing the extracted metadata from the Journal object, with the
% extracted metadata from the current Ledger object it is set to
% replace (should one be present). Implementing the recalc
% strategy requires a override function for
% `leveled_head:diff_indexspecs/3`.
% A function for the ?RIAK_TAG is provided and tested.
%
% reload_strategy ptions are a list - to map from a tag to the
% reload_strategy options are a list - to map from a tag to the
% strategy (recovr|retain|recalc). Defualt strategies are:
% [{?RIAK_TAG, retain}, {?STD_TAG, retain}]
{max_pencillercachesize, pos_integer()|undefined} |
@ -378,7 +381,16 @@
% true
].
-type initial_loadfun() ::
fun((leveled_codec:journal_key(),
any(),
non_neg_integer(),
{non_neg_integer(), non_neg_integer(), ledger_cache()},
fun((any()) -> {binary(), non_neg_integer()})) ->
{loop|stop,
{non_neg_integer(), non_neg_integer(), ledger_cache()}}).
-export_type([initial_loadfun/0]).
%%%============================================================================
%%% API
@ -1097,7 +1109,7 @@ book_destroy(Pid) ->
%% to store the backup.
%%
%% Backup files are hard-linked. Does not work in head_only mode, or if
%% index changes are used with a `skip` compaction/reload strategy
%% index changes are used with a `recovr` compaction/reload strategy
book_hotbackup(Pid) ->
gen_server:call(Pid, hot_backup, infinity).
@ -1250,8 +1262,7 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State)
SQN,
Object,
ObjSize,
{IndexSpecs, TTL},
State),
{IndexSpecs, TTL}),
Cache0 = addto_ledgercache(Changes, State#state.ledger_cache),
{_SW2, Timings2} = update_timings(SW1, {put, mem}, Timings1),
@ -1288,8 +1299,7 @@ handle_call({mput, ObjectSpecs, TTL}, From, State)
Changes =
preparefor_ledgercache(?INKT_MPUT, ?DUMMY,
SQN, null, length(ObjectSpecs),
{ObjectSpecs, TTL},
State),
{ObjectSpecs, TTL}),
Cache0 = addto_ledgercache(Changes, State#state.ledger_cache),
case State#state.slow_offer of
true ->
@ -1537,6 +1547,23 @@ code_change(_OldVsn, State, _Extra) ->
empty_ledgercache() ->
#ledger_cache{mem = ets:new(empty, [ordered_set])}.
-spec push_to_penciller(pid(), ledger_cache()) -> ok.
%% @doc
%% The push to penciller must start as a tree to correctly de-duplicate
%% the list by order before becoming a de-duplicated list for loading
push_to_penciller(Penciller, LedgerCache) ->
push_to_penciller_loop(Penciller, loadqueue_ledgercache(LedgerCache)).
push_to_penciller_loop(Penciller, LedgerCache) ->
case push_ledgercache(Penciller, LedgerCache) of
returned ->
timer:sleep(?LOADING_PAUSE),
push_to_penciller_loop(Penciller, LedgerCache);
ok ->
ok
end.
-spec push_ledgercache(pid(), ledger_cache()) -> ok|returned.
%% @doc
%% Push the ledgercache to the Penciller - which should respond ok or
@ -1642,10 +1669,22 @@ startup(InkerOpts, PencillerOpts, State) ->
{ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts),
LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller),
leveled_log:log("B0005", [LedgerSQN]),
ReloadStrategy = InkerOpts#inker_options.reload_strategy,
LoadFun = get_loadfun(ReloadStrategy, Penciller, State),
BatchFun =
fun(BatchAcc, _Acc) ->
push_to_penciller(Penciller, BatchAcc)
end,
InitAccFun =
fun(FN, CurrentMinSQN) ->
leveled_log:log("I0014", [FN, CurrentMinSQN]),
empty_ledgercache()
end,
ok = leveled_inker:ink_loadpcl(Inker,
LedgerSQN + 1,
get_loadfun(State),
Penciller),
LoadFun,
InitAccFun,
BatchFun),
ok = leveled_inker:ink_checksqn(Inker, LedgerSQN),
{Inker, Penciller}.
@ -2161,30 +2200,26 @@ check_notfound(CheckFrequency, CheckFun) ->
-spec preparefor_ledgercache(leveled_codec:journal_key_tag()|null,
leveled_codec:ledger_key()|?DUMMY,
integer(), any(), integer(),
leveled_codec:journal_keychanges(),
book_state())
-> {integer()|no_lookup,
integer(),
non_neg_integer(), any(), integer(),
leveled_codec:journal_keychanges())
-> {leveled_codec:segment_hash(),
non_neg_integer(),
list(leveled_codec:ledger_kv())}.
%% @doc
%% Prepare an object and its related key changes for addition to the Ledger
%% via the Ledger Cache.
preparefor_ledgercache(?INKT_MPUT,
?DUMMY, SQN, _O, _S, {ObjSpecs, TTL},
_State) ->
?DUMMY, SQN, _O, _S, {ObjSpecs, TTL}) ->
ObjChanges = leveled_codec:obj_objectspecs(ObjSpecs, SQN, TTL),
{no_lookup, SQN, ObjChanges};
preparefor_ledgercache(?INKT_KEYD,
LedgerKey, SQN, _Obj, _Size, {IdxSpecs, TTL},
_State) ->
LedgerKey, SQN, _Obj, _Size, {IdxSpecs, TTL}) ->
{Bucket, Key} = leveled_codec:from_ledgerkey(LedgerKey),
KeyChanges =
leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL),
{no_lookup, SQN, KeyChanges};
preparefor_ledgercache(_InkTag,
LedgerKey, SQN, Obj, Size, {IdxSpecs, TTL},
_State) ->
LedgerKey, SQN, Obj, Size, {IdxSpecs, TTL}) ->
{Bucket, Key, MetaValue, {KeyH, _ObjH}, _LastMods} =
leveled_codec:generate_ledgerkv(LedgerKey, SQN, Obj, Size, TTL),
KeyChanges =
@ -2193,8 +2228,58 @@ preparefor_ledgercache(_InkTag,
{KeyH, SQN, KeyChanges}.
-spec addto_ledgercache({integer()|no_lookup,
integer(),
-spec recalcfor_ledgercache(leveled_codec:journal_key_tag()|null,
leveled_codec:ledger_key()|?DUMMY,
non_neg_integer(), any(), integer(),
leveled_codec:journal_keychanges(),
ledger_cache(),
pid())
-> {leveled_codec:segment_hash(),
non_neg_integer(),
list(leveled_codec:ledger_kv())}.
%% @doc
%% When loading from the journal to the ledger, may hit a key which has the
%% `recalc` strategy. Such a key needs to recalculate the key changes by
%% comparison with the current state of the ledger, assuming it is a full
%% journal entry (i.e. KeyDeltas which may be a result of previously running
%% with a retain strategy should be ignored).
recalcfor_ledgercache(InkTag,
_LedgerKey, SQN, _Obj, _Size, {_IdxSpecs, _TTL},
_LedgerCache,
_Penciller)
when InkTag == ?INKT_MPUT; InkTag == ?INKT_KEYD ->
{no_lookup, SQN, []};
recalcfor_ledgercache(_InkTag,
LK, SQN, Obj, Size, {_IgnoreJournalIdxSpecs, TTL},
LedgerCache,
Penciller) ->
{Bucket, Key, MetaValue, {KeyH, _ObjH}, _LastMods} =
leveled_codec:generate_ledgerkv(LK, SQN, Obj, Size, TTL),
OldObject =
case check_in_ledgercache(LK, KeyH, LedgerCache, loader) of
false ->
leveled_penciller:pcl_fetch(Penciller, LK, KeyH, true);
{value, KV} ->
KV
end,
OldMetadata =
case OldObject of
not_present ->
not_present;
{LK, LV} ->
leveled_codec:get_metadata(LV)
end,
UpdMetadata = leveled_codec:get_metadata(MetaValue),
IdxSpecs =
leveled_head:diff_indexspecs(element(1, LK), UpdMetadata, OldMetadata),
{KeyH,
SQN,
[{LK, MetaValue}]
++ leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL)}.
-spec addto_ledgercache({leveled_codec:segment_hash(),
non_neg_integer(),
list(leveled_codec:ledger_kv())},
ledger_cache())
-> ledger_cache().
@ -2230,6 +2315,32 @@ addto_ledgercache({H, SQN, KeyChanges}, Cache, loader) ->
max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}.
-spec check_in_ledgercache(leveled_codec:ledger_key(),
leveled_codec:segment_hash(),
ledger_cache(),
loader) ->
false | {value, leveled_codec:ledger_kv()}.
%% @doc
%% Check the ledger cache for a Key, when the ledger cache is in loader mode
%% and so is populating a queue not an ETS table
check_in_ledgercache(PK, Hash, Cache, loader) ->
case leveled_pmem:check_index(Hash, Cache#ledger_cache.index) of
[] ->
false;
_ ->
search(fun({K,_V}) -> K == PK end,
lists:reverse(Cache#ledger_cache.load_queue))
end.
-spec search(fun((any()) -> boolean()), list()) -> {value, any()}|false.
search(Pred, [Hd|Tail]) ->
case Pred(Hd) of
true -> {value, Hd};
false -> search(Pred, Tail)
end;
search(Pred, []) when is_function(Pred, 1) ->
false.
-spec maybepush_ledgercache(integer(), ledger_cache(), pid())
-> {ok|returned, ledger_cache()}.
%% @doc
@ -2276,44 +2387,47 @@ maybe_withjitter(_CacheSize, _MaxCacheSize) ->
false.
-spec get_loadfun(book_state()) -> fun().
-spec get_loadfun(leveled_codec:compaction_strategy(), pid(), book_state())
-> initial_loadfun().
%% @doc
%% The LoadFun will be used by the Inker when walking across the Journal to
%% load the Penciller at startup
get_loadfun(State) ->
PrepareFun =
fun(Tag, PK, SQN, Obj, VS, IdxSpecs) ->
preparefor_ledgercache(Tag, PK, SQN, Obj, VS, IdxSpecs, State)
end,
LoadFun =
%% load the Penciller at startup.
get_loadfun(ReloadStrat, Penciller, _State) ->
fun(KeyInJournal, ValueInJournal, _Pos, Acc0, ExtractFun) ->
{MinSQN, MaxSQN, OutputTree} = Acc0,
{MinSQN, MaxSQN, LedgerCache} = Acc0,
{SQN, InkTag, PK} = KeyInJournal,
% VBin may already be a term
{VBin, VSize} = ExtractFun(ValueInJournal),
{Obj, IdxSpecs} = leveled_codec:split_inkvalue(VBin),
case SQN of
SQN when SQN < MinSQN ->
{loop, Acc0};
SQN when SQN < MaxSQN ->
Chngs = PrepareFun(InkTag, PK, SQN, Obj, VSize, IdxSpecs),
{loop,
{MinSQN,
MaxSQN,
addto_ledgercache(Chngs, OutputTree, loader)}};
MaxSQN ->
leveled_log:log("B0006", [SQN]),
Chngs = PrepareFun(InkTag, PK, SQN, Obj, VSize, IdxSpecs),
{stop,
{MinSQN,
MaxSQN,
addto_ledgercache(Chngs, OutputTree, loader)}};
SQN when SQN > MaxSQN ->
leveled_log:log("B0007", [MaxSQN, SQN]),
{stop, Acc0}
end
{stop, Acc0};
_ ->
{VBin, ValSize} = ExtractFun(ValueInJournal),
% VBin may already be a term
{Obj, IdxSpecs} = leveled_codec:split_inkvalue(VBin),
Chngs =
case leveled_codec:get_tagstrategy(PK, ReloadStrat) of
recalc ->
recalcfor_ledgercache(InkTag, PK, SQN,
Obj, ValSize, IdxSpecs,
LedgerCache,
Penciller);
_ ->
preparefor_ledgercache(InkTag, PK, SQN,
Obj, ValSize, IdxSpecs)
end,
LoadFun.
case SQN of
MaxSQN ->
leveled_log:log("B0006", [SQN]),
LC0 = addto_ledgercache(Chngs, LedgerCache, loader),
{stop, {MinSQN, MaxSQN, LC0}};
_ ->
LC0 = addto_ledgercache(Chngs, LedgerCache, loader),
{loop, {MinSQN, MaxSQN, LC0}}
end
end
end.
delete_path(DirPath) ->
@ -3166,6 +3280,10 @@ sqnorder_mutatefold_test() ->
ok = book_destroy(Bookie1).
search_test() ->
?assertMatch({value, 5}, search(fun(X) -> X == 5 end, lists:seq(1, 10))),
?assertMatch(false, search(fun(X) -> X == 55 end, lists:seq(1, 10))).
check_notfound_test() ->
ProbablyFun = fun() -> probably end,
MissingFun = fun() -> missing end,

View file

@ -49,7 +49,8 @@
segment_hash/1,
to_lookup/1,
next_key/1,
return_proxy/4]).
return_proxy/4,
get_metadata/1]).
-define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w").
-define(NRT_IDX, "$aae.").
@ -88,7 +89,7 @@
-type ledger_kv() ::
{ledger_key(), ledger_value()}.
-type compaction_method() ::
retain|skip|recalc.
retain|recovr|recalc.
-type compaction_strategy() ::
list({tag(), compaction_method()}).
-type journal_key_tag() ::
@ -243,6 +244,10 @@ strip_to_indexdetails({_, V}) when tuple_size(V) > 4 ->
striphead_to_v1details(V) ->
{element(1, V), element(2, V), element(3, V), element(4, V)}.
-spec get_metadata(ledger_value()) -> metadata().
get_metadata(LV) ->
element(4, LV).
-spec key_dominates(ledger_kv(), ledger_kv()) ->
left_hand_first|right_hand_first|left_hand_dominant|right_hand_dominant.
%% @doc
@ -358,24 +363,24 @@ endkey_passed(EndKey, CheckingKey) ->
-spec inker_reload_strategy(compaction_strategy()) -> compaction_strategy().
%% @doc
%% Take the default startegy for compaction, and override the approach for any
%% Take the default strategy for compaction, and override the approach for any
%% tags passed in
inker_reload_strategy(AltList) ->
ReloadStrategy0 =
DefaultList =
lists:map(fun leveled_head:default_reload_strategy/1,
leveled_head:defined_objecttags()),
lists:foldl(fun({X, Y}, SList) ->
lists:keyreplace(X, 1, SList, {X, Y})
end,
ReloadStrategy0,
AltList).
lists:ukeymerge(1,
lists:ukeysort(1, AltList),
lists:ukeysort(1, DefaultList)).
-spec get_tagstrategy(ledger_key(), compaction_strategy())
-> skip|retain|recalc.
-spec get_tagstrategy(ledger_key()|tag()|dummy, compaction_strategy())
-> compaction_method().
%% @doc
%% Work out the compaction strategy for the key
get_tagstrategy({Tag, _, _, _}, Strategy) ->
get_tagstrategy(Tag, Strategy);
get_tagstrategy(Tag, Strategy) ->
case lists:keyfind(Tag, 1, Strategy) of
{Tag, TagStrat} ->
TagStrat;

View file

@ -22,7 +22,8 @@
-export([key_to_canonicalbinary/1,
build_head/2,
extract_metadata/3
extract_metadata/3,
diff_indexspecs/3
]).
-export([get_size/2,
@ -71,12 +72,15 @@
-type object_metadata() :: riak_metadata()|std_metadata()|head_metadata().
-type appdefinable_function() ::
key_to_canonicalbinary | build_head | extract_metadata.
key_to_canonicalbinary | build_head | extract_metadata | diff_indexspecs.
% Functions for which default behaviour can be over-written for the
% application's own tags
-type appdefinable_function_tuple() ::
{appdefinable_function(), fun()}.
-type index_op() :: add | remove.
-type index_value() :: integer() | binary().
-type head() ::
binary()|tuple().
% TODO:
@ -174,6 +178,41 @@ default_extract_metadata(_Tag, SizeAsStoredInJournal, Obj) ->
{{standard_hash(Obj), SizeAsStoredInJournal, undefined}, []}.
-spec diff_indexspecs(object_tag(),
object_metadata(),
object_metadata()|not_present)
-> leveled_codec:index_specs().
%% @doc
%% Take an object metadata part from within the journal, and an object metadata
%% part from the ledger (which should have a lower SQN), and generate index
%% specs by determining the difference between the index specs on the object
%% to be loaded and that on object already stored.
%%
%% This is only relevant where the journal compaction strategy of `recalc` is
%% used, the Keychanges will be used when `retain` is the compaction strategy
diff_indexspecs(?RIAK_TAG, UpdatedMetadata, OldMetadata) ->
UpdIndexes =
get_indexes_from_siblingmetabin(element(1, UpdatedMetadata), []),
OldIndexes =
case OldMetadata of
not_present ->
[];
_ ->
get_indexes_from_siblingmetabin(element(1, OldMetadata), [])
end,
diff_index_data(OldIndexes, UpdIndexes);
diff_indexspecs(?STD_TAG, UpdatedMetadata, CurrentMetadata) ->
default_diff_indexspecs(?STD_TAG, UpdatedMetadata, CurrentMetadata);
diff_indexspecs(Tag, UpdatedMetadata, CurrentMetadata) ->
OverrideFun =
get_appdefined_function(diff_indexspecs,
fun default_diff_indexspecs/3,
3),
OverrideFun(Tag, UpdatedMetadata, CurrentMetadata).
default_diff_indexspecs(_Tag, _UpdatedMetadata, _CurrentMetadata) ->
[].
%%%============================================================================
%%% Standard External Functions
%%%============================================================================
@ -190,7 +229,7 @@ defined_objecttags() ->
leveled_codec:compaction_method()}.
%% @doc
%% State the compaction_method to be used when reloading the Ledger from the
%% journal for each object tag. Note, no compaction startegy required for
%% journal for each object tag. Note, no compaction strategy required for
%% head_only tag
default_reload_strategy(Tag) ->
{Tag, retain}.
@ -317,3 +356,155 @@ get_metadata_from_siblings(<<ValLen:32/integer, Rest0/binary>>,
MetaLen:32/integer,
MetaBin:MetaLen/binary>>,
[LastMod|LastMods]).
get_indexes_from_siblingmetabin(<<0:32/integer,
MetaLen:32/integer,
MetaBin:MetaLen/binary,
RestBin/binary>>,
Indexes) ->
UpdIndexes = lists:umerge(get_indexes_frommetabin(MetaBin), Indexes),
get_indexes_from_siblingmetabin(RestBin, UpdIndexes);
get_indexes_from_siblingmetabin(<<SibCount:32/integer, RestBin/binary>>,
Indexes) when SibCount > 0 ->
get_indexes_from_siblingmetabin(RestBin, Indexes);
get_indexes_from_siblingmetabin(_, Indexes) ->
Indexes.
%% @doc
%% Parse the metabinary for an individual sibling and return a list of index
%% entries.
get_indexes_frommetabin(<<_LMD1:32/integer, _LMD2:32/integer, _LMD3:32/integer,
VTagLen:8/integer, _VTag:VTagLen/binary,
Deleted:1/binary-unit:8,
MetaRestBin/binary>>) when Deleted /= <<1>> ->
lists:usort(indexes_of_metabinary(MetaRestBin));
get_indexes_frommetabin(_) ->
[].
indexes_of_metabinary(<<>>) ->
[];
indexes_of_metabinary(<<KeyLen:32/integer, KeyBin:KeyLen/binary,
ValueLen:32/integer, ValueBin:ValueLen/binary,
Rest/binary>>) ->
Key = decode_maybe_binary(KeyBin),
case Key of
<<"index">> ->
Value = decode_maybe_binary(ValueBin),
Value;
_ ->
indexes_of_metabinary(Rest)
end.
decode_maybe_binary(<<1, Bin/binary>>) ->
Bin;
decode_maybe_binary(<<0, Bin/binary>>) ->
binary_to_term(Bin);
decode_maybe_binary(<<_Other:8, Bin/binary>>) ->
Bin.
-spec diff_index_data([{binary(), index_value()}],
[{binary(), index_value()}]) ->
[{index_op(), binary(), index_value()}].
diff_index_data(OldIndexes, AllIndexes) ->
OldIndexSet = ordsets:from_list(OldIndexes),
AllIndexSet = ordsets:from_list(AllIndexes),
diff_specs_core(AllIndexSet, OldIndexSet).
diff_specs_core(AllIndexSet, OldIndexSet) ->
NewIndexSet = ordsets:subtract(AllIndexSet, OldIndexSet),
RemoveIndexSet =
ordsets:subtract(OldIndexSet, AllIndexSet),
NewIndexSpecs =
assemble_index_specs(ordsets:subtract(NewIndexSet, OldIndexSet),
add),
RemoveIndexSpecs =
assemble_index_specs(RemoveIndexSet,
remove),
NewIndexSpecs ++ RemoveIndexSpecs.
%% @doc Assemble a list of index specs in the
%% form of triplets of the form
%% {IndexOperation, IndexField, IndexValue}.
-spec assemble_index_specs([{binary(), binary()}], index_op()) ->
[{index_op(), binary(), binary()}].
assemble_index_specs(Indexes, IndexOp) ->
[{IndexOp, Index, Value} || {Index, Value} <- Indexes].
%%%============================================================================
%%% Test
%%%============================================================================
-ifdef(TEST).
index_extract_test() ->
SibMetaBin =
<<0,0,0,1,0,0,0,0,0,0,0,221,0,0,6,48,0,4,130,247,0,1,250,134,
1,101,0,0,0,0,4,1,77,68,75,0,0,0,44,0,131,107,0,39,77,68,
86,101,49,55,52,55,48,50,55,45,54,50,99,49,45,52,48,57,55,
45,97,53,102,50,45,53,54,98,51,98,97,57,57,99,55,56,50,0,0,
0,6,1,105,110,100,101,120,0,0,0,79,0,131,108,0,0,0,2,104,2,
107,0,8,105,100,120,49,95,98,105,110,107,0,20,50,49,53,50,
49,49,48,55,50,51,49,55,51,48,83,111,112,104,105,97,104,2,
107,0,8,105,100,120,49,95,98,105,110,107,0,19,50,49,56,50,
48,53,49,48,49,51,48,49,52,54,65,118,101,114,121,106,0,0,0,
5,1,77,68,75,50,0,0,0,44,0,131,107,0,39,77,68,86,101,49,55,
52,55,48,50,55,45,54,50,99,49,45,52,48,57,55,45,97,53,102,
50,45,53,54,98,51,98,97,57,57,99,55,56,50>>,
Indexes = get_indexes_from_siblingmetabin(SibMetaBin, []),
ExpIndexes = [{"idx1_bin","21521107231730Sophia"},
{"idx1_bin","21820510130146Avery"}],
?assertMatch(ExpIndexes, Indexes),
SibMetaBinNoIdx =
<<0,0,0,1,0,0,0,0,0,0,0,128,0,0,6,48,0,4,130,247,0,1,250,134,
1,101,0,0,0,0,4,1,77,68,75,0,0,0,44,0,131,107,0,39,77,68,
86,101,49,55,52,55,48,50,55,45,54,50,99,49,45,52,48,57,55,
45,97,53,102,50,45,53,54,98,51,98,97,57,57,99,55,56,50,0,0,0,
5,1,77,68,75,50,0,0,0,44,0,131,107,0,39,77,68,86,101,49,55,
52,55,48,50,55,45,54,50,99,49,45,52,48,57,55,45,97,53,102,
50,45,53,54,98,51,98,97,57,57,99,55,56,50>>,
?assertMatch([], get_indexes_from_siblingmetabin(SibMetaBinNoIdx, [])),
SibMetaBinOverhang =
<<0,0,0,1,0,0,0,0,0,0,0,221,0,0,6,48,0,4,130,247,0,1,250,134,
1,101,0,0,0,0,4,1,77,68,75,0,0,0,44,0,131,107,0,39,77,68,
86,101,49,55,52,55,48,50,55,45,54,50,99,49,45,52,48,57,55,
45,97,53,102,50,45,53,54,98,51,98,97,57,57,99,55,56,50,0,0,
0,6,1,105,110,100,101,120,0,0,0,79,0,131,108,0,0,0,2,104,2,
107,0,8,105,100,120,49,95,98,105,110,107,0,20,50,49,53,50,
49,49,48,55,50,51,49,55,51,48,83,111,112,104,105,97,104,2,
107,0,8,105,100,120,49,95,98,105,110,107,0,19,50,49,56,50,
48,53,49,48,49,51,48,49,52,54,65,118,101,114,121,106,0,0,0,
5,1,77,68,75,50,0,0,0,44,0,131,107,0,39,77,68,86,101,49,55,
52,55,48,50,55,45,54,50,99,49,45,52,48,57,55,45,97,53,102,
50,45,53,54,98,51,98,97,57,57,99,55,56,50,0,0,0,0,0,0,0,4,
0,0,0,0>>,
?assertMatch(ExpIndexes,
get_indexes_from_siblingmetabin(SibMetaBinOverhang, [])).
diff_index_test() ->
UpdIndexes =
[{<<"idx1_bin">>,<<"20840930001702Zoe">>},
{<<"idx1_bin">>,<<"20931011172606Emily">>}],
OldIndexes =
[{<<"idx1_bin">>,<<"20231126131808Madison">>},
{<<"idx1_bin">>,<<"20931011172606Emily">>}],
IdxSpecs = diff_index_data(OldIndexes, UpdIndexes),
?assertMatch([{add, <<"idx1_bin">>, <<"20840930001702Zoe">>},
{remove, <<"idx1_bin">>,<<"20231126131808Madison">>}], IdxSpecs).
decode_test() ->
Bin = <<"999">>,
BinTerm = term_to_binary("999"),
?assertMatch("999", binary_to_list(
decode_maybe_binary(<<1:8/integer, Bin/binary>>))),
?assertMatch("999", decode_maybe_binary(<<0:8/integer, BinTerm/binary>>)),
?assertMatch("999", binary_to_list(
decode_maybe_binary(<<2:8/integer, Bin/binary>>))).
-endif.

View file

@ -145,6 +145,15 @@
% released from a compaction run of a single file to make it a run
% worthwhile of compaction (released space is 100.0 - target e.g. 70.0
% means that 30.0% should be released)
-type key_size() ::
{{non_neg_integer(),
leveled_codec:journal_key_tag(),
leveled_codec:ledger_key()}, non_neg_integer()}.
-type corrupted_test_key_size() ::
{{non_neg_integer(),
leveled_codec:journal_key_tag(),
leveled_codec:ledger_key(),
null}, non_neg_integer()}.
%%%============================================================================
%%% API
@ -288,6 +297,8 @@ handle_call(stop, _From, State) ->
handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Manifest0},
State) ->
leveled_log:log("IC014", [State#state.reload_strategy,
State#state.max_run_length]),
% Empty the waste folder
clear_waste(State),
SW = os:timestamp(),
@ -298,7 +309,11 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Manifest0},
% Don't want to process a queued call waiting on an old manifest
[_Active|Manifest] = Manifest0,
{FilterServer, MaxSQN} = InitiateFun(Checker),
ok = clerk_scorefilelist(self(), Manifest),
NotRollingFun =
fun({_LowSQN, _FN, Pid, _LK}) ->
not leveled_cdb:cdb_isrolling(Pid)
end,
ok = clerk_scorefilelist(self(), lists:filter(NotRollingFun, Manifest)),
ScoringState =
#scoring_state{filter_fun = FilterFun,
filter_server = FilterServer,
@ -315,7 +330,8 @@ handle_cast({score_filelist, [Entry|Tail]}, State) ->
ScoringState#scoring_state.filter_server,
ScoringState#scoring_state.max_sqn,
?SAMPLE_SIZE,
?BATCH_SIZE),
?BATCH_SIZE,
State#state.reload_strategy),
Candidate =
#candidate{low_sqn = LowSQN,
filename = FN,
@ -493,7 +509,10 @@ schedule_compaction(CompactionHours, RunsPerDay, CurrentTS) ->
%%% Internal functions
%%%============================================================================
-spec check_single_file(pid(), fun(), any(), non_neg_integer(),
non_neg_integer(), non_neg_integer(),
leveled_codec:compaction_strategy()) ->
float().
%% @doc
%% Get a score for a single CDB file in the journal. This will pull out a bunch
%% of keys and sizes at random in an efficient way (by scanning the hashtable
@ -505,13 +524,19 @@ schedule_compaction(CompactionHours, RunsPerDay, CurrentTS) ->
%%
%% The score is based on a random sample - so will not be consistent between
%% calls.
check_single_file(CDB, FilterFun, FilterServer, MaxSQN, SampleSize, BatchSize) ->
check_single_file(CDB, FilterFun, FilterServer, MaxSQN,
SampleSize, BatchSize,
ReloadStrategy) ->
FN = leveled_cdb:cdb_filename(CDB),
SW = os:timestamp(),
PositionList = leveled_cdb:cdb_getpositions(CDB, SampleSize),
KeySizeList = fetch_inbatches(PositionList, BatchSize, CDB, []),
Score =
size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN),
size_comparison_score(KeySizeList,
FilterFun,
FilterServer,
MaxSQN,
ReloadStrategy),
safely_log_filescore(PositionList, FN, Score, SW),
Score.
@ -523,7 +548,15 @@ safely_log_filescore(PositionList, FN, Score, SW) ->
div length(PositionList),
leveled_log:log_timer("IC004", [Score, AvgJump, FN], SW).
size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN) ->
-spec size_comparison_score(list(key_size() | corrupted_test_key_size()),
fun(),
any(),
non_neg_integer(),
leveled_codec:compaction_strategy()) ->
float().
size_comparison_score(KeySizeList,
FilterFun, FilterServer, MaxSQN,
RS) ->
FoldFunForSizeCompare =
fun(KS, {ActSize, RplSize}) ->
case KS of
@ -532,7 +565,18 @@ size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN) ->
leveled_codec:is_full_journalentry({SQN, Type, PK}),
case IsJournalEntry of
false ->
TS = leveled_codec:get_tagstrategy(PK, RS),
% If the strategy is to retain key deltas, then
% scoring must reflect that. Key deltas are
% possible even if strategy does not allow as
% there is support for changing strategy from
% retain to recalc
case TS of
retain ->
{ActSize + Size - ?CRC_SIZE, RplSize};
_ ->
{ActSize, RplSize + Size - ?CRC_SIZE}
end;
true ->
Check = FilterFun(FilterServer, PK, SQN),
case {Check, SQN > MaxSQN} of
@ -567,7 +611,8 @@ fetch_inbatches([], _BatchSize, CDB, CheckedList) ->
ok = leveled_cdb:cdb_clerkcomplete(CDB),
CheckedList;
fetch_inbatches(PositionList, BatchSize, CDB, CheckedList) ->
{Batch, Tail} = if
{Batch, Tail} =
if
length(PositionList) >= BatchSize ->
lists:split(BatchSize, PositionList);
true ->
@ -760,10 +805,11 @@ split_positions_into_batches(Positions, Journal, Batches) ->
%% recalculating the KeyChanges by looking at the object when we reload. So
%% old objects can be discarded.
%%
%% If the strategy is skip, we don't care about KeyDeltas. Note though, that
%% If the strategy is recovr, we don't care about KeyDeltas. Note though, that
%% if the ledger is deleted it may not be possible to safely rebuild a KeyStore
%% if it contains index entries. The hot_backup approach is also not safe with
%% a `skip` strategy.
%% a `recovr` strategy. The recovr strategy assumes faults in the ledger will
%% be resolved via application-level anti-entropy
filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) ->
FoldFun =
filter_output_fun(FilterFun, FilterServer, MaxSQN, ReloadStrategy),
@ -998,6 +1044,7 @@ fetch_testcdb(RP) ->
check_single_file_test() ->
RP = "test/test_area/",
RS = leveled_codec:inker_reload_strategy([]),
ok = filelib:ensure_dir(leveled_inker:filepath(RP, journal_dir)),
{ok, CDB} = fetch_testcdb(RP),
LedgerSrv1 = [{8, {o, "Bucket", "Key1", null}},
@ -1010,14 +1057,14 @@ check_single_file_test() ->
_ ->
replaced
end end,
Score1 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 4),
Score1 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 4, RS),
?assertMatch(37.5, Score1),
LedgerFun2 = fun(_Srv, _Key, _ObjSQN) -> current end,
Score2 = check_single_file(CDB, LedgerFun2, LedgerSrv1, 9, 8, 4),
Score2 = check_single_file(CDB, LedgerFun2, LedgerSrv1, 9, 8, 4, RS),
?assertMatch(100.0, Score2),
Score3 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 3),
Score3 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 3, RS),
?assertMatch(37.5, Score3),
Score4 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 4, 8, 4),
Score4 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 4, 8, 4, RS),
?assertMatch(75.0, Score4),
ok = leveled_cdb:cdb_deletepending(CDB),
ok = leveled_cdb:cdb_destroy(CDB).
@ -1132,6 +1179,7 @@ compact_empty_file_test() ->
RP = "test/test_area/",
ok = filelib:ensure_dir(leveled_inker:filepath(RP, journal_dir)),
FN1 = leveled_inker:filepath(RP, 1, new_journal),
RS = leveled_codec:inker_reload_strategy([]),
CDBopts = #cdb_options{binary_mode=true},
{ok, CDB1} = leveled_cdb:cdb_open_writer(FN1, CDBopts),
{ok, FN2} = leveled_cdb:cdb_complete(CDB1),
@ -1140,7 +1188,7 @@ compact_empty_file_test() ->
{2, {o, "Bucket", "Key2", null}},
{3, {o, "Bucket", "Key3", null}}],
LedgerFun1 = fun(_Srv, _Key, _ObjSQN) -> replaced end,
Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4),
Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4, RS),
?assertMatch(0.0, Score1),
ok = leveled_cdb:cdb_deletepending(CDB2),
ok = leveled_cdb:cdb_destroy(CDB2).
@ -1207,15 +1255,22 @@ compact_singlefile_totwosmallfiles_testto() ->
size_score_test() ->
KeySizeList =
[{{1, ?INKT_STND, "Key1"}, 104},
{{2, ?INKT_STND, "Key2"}, 124},
{{3, ?INKT_STND, "Key3"}, 144},
{{4, ?INKT_STND, "Key4"}, 154},
{{5, ?INKT_STND, "Key5", "Subk1"}, 164},
{{6, ?INKT_STND, "Key6"}, 174},
{{7, ?INKT_STND, "Key7"}, 184}],
[{{1, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key1">>, null}}, 104},
{{2, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key2">>, null}}, 124},
{{3, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key3">>, null}}, 144},
{{4, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key4">>, null}}, 154},
{{5,
?INKT_STND,
{?STD_TAG, <<"B">>, <<"Key5">>, <<"Subk1">>}, null},
164},
{{6, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key6">>, null}}, 174},
{{7, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key7">>, null}}, 184}],
MaxSQN = 6,
CurrentList = ["Key1", "Key4", "Key5", "Key6"],
CurrentList =
[{?STD_TAG, <<"B">>, <<"Key1">>, null},
{?STD_TAG, <<"B">>, <<"Key4">>, null},
{?STD_TAG, <<"B">>, <<"Key5">>, <<"Subk1">>},
{?STD_TAG, <<"B">>, <<"Key6">>, null}],
FilterFun =
fun(L, K, _SQN) ->
case lists:member(K, L) of
@ -1223,7 +1278,13 @@ size_score_test() ->
false -> replaced
end
end,
Score = size_comparison_score(KeySizeList, FilterFun, CurrentList, MaxSQN),
Score =
size_comparison_score(KeySizeList,
FilterFun,
CurrentList,
MaxSQN,
leveled_codec:inker_reload_strategy([])),
io:format("Score ~w", [Score]),
?assertMatch(true, Score > 69.0),
?assertMatch(true, Score < 70.0).

View file

@ -101,7 +101,7 @@
ink_fetch/3,
ink_keycheck/3,
ink_fold/4,
ink_loadpcl/4,
ink_loadpcl/5,
ink_registersnapshot/2,
ink_confirmdelete/2,
ink_compactjournal/3,
@ -133,7 +133,6 @@
-define(WASTE_FP, "waste").
-define(JOURNAL_FILEX, "cdb").
-define(PENDING_FILEX, "pnd").
-define(LOADING_PAUSE, 1000).
-define(LOADING_BATCH, 1000).
-define(TEST_KC, {[], infinity}).
@ -321,7 +320,11 @@ ink_fold(Pid, MinSQN, FoldFuns, Acc) ->
{fold, MinSQN, FoldFuns, Acc, by_runner},
infinity).
-spec ink_loadpcl(pid(), integer(), fun(), pid()) -> ok.
-spec ink_loadpcl(pid(),
integer(),
leveled_bookie:initial_loadfun(),
fun((string(), non_neg_integer()) -> any()),
fun((any(), any()) -> ok)) -> ok.
%%
%% Function to prompt load of the Ledger at startup. The Penciller should
%% have determined the lowest SQN not present in the Ledger, and the inker
@ -330,20 +333,11 @@ ink_fold(Pid, MinSQN, FoldFuns, Acc) ->
%%
%% The load fun should be a five arity function like:
%% load_fun(KeyInJournal, ValueInJournal, _Position, Acc0, ExtractFun)
ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) ->
BatchFun =
fun(BatchAcc, _Acc) ->
push_to_penciller(Penciller, BatchAcc)
end,
InitAccFun =
fun(FN, CurrentMinSQN) ->
leveled_log:log("I0014", [FN, CurrentMinSQN]),
leveled_bookie:empty_ledgercache()
end,
ink_loadpcl(Pid, MinSQN, LoadFun, InitAccFun, BatchFun) ->
gen_server:call(Pid,
{fold,
MinSQN,
{FilterFun, InitAccFun, BatchFun},
{LoadFun, InitAccFun, BatchFun},
ok,
as_ink},
infinity).
@ -1197,22 +1191,6 @@ foldfile_between_sequence(MinSQN, MaxSQN, FoldFuns,
end.
push_to_penciller(Penciller, LedgerCache) ->
% The push to penciller must start as a tree to correctly de-duplicate
% the list by order before becoming a de-duplicated list for loading
LC0 = leveled_bookie:loadqueue_ledgercache(LedgerCache),
push_to_penciller_loop(Penciller, LC0).
push_to_penciller_loop(Penciller, LedgerCache) ->
case leveled_bookie:push_ledgercache(Penciller, LedgerCache) of
returned ->
timer:sleep(?LOADING_PAUSE),
push_to_penciller_loop(Penciller, LedgerCache);
ok ->
ok
end.
sequencenumbers_fromfilenames(Filenames, Regex, IntName) ->
lists:foldl(fun(FN, Acc) ->
case re:run(FN,

View file

@ -358,6 +358,8 @@
{"IC013",
{warn, "File with name ~s to be ignored in manifest as scanning for "
++ "first key returned empty - maybe corrupted"}},
{"IC014",
{info, "Compaction to be run with strategy ~w and max_run_length ~w"}},
{"CDB01",
{info, "Opening file for writing with filename ~s"}},

View file

@ -1581,7 +1581,7 @@ accumulate_positions({K, V}, {PosBinAcc, NoHashCount, HashAcc, LMDAcc}) ->
NHC:7/integer,
PosBinAcc/binary>>,
0,
HashAcc,
[H1|HashAcc],
LMDAcc0}
end;
false ->
@ -3501,6 +3501,7 @@ simple_persisted_tester(SSTNewFun) ->
Acc
end
end,
true = [] == MapFun({FirstKey, "V"}, []), % coverage cheat within MapFun
KVList3 = lists:foldl(MapFun, [], KVList2),
SW2 = os:timestamp(),
lists:foreach(fun({K, H, _V}) ->

View file

@ -2,11 +2,14 @@
-include_lib("common_test/include/ct.hrl").
-include("include/leveled.hrl").
-export([all/0]).
-export([application_defined_tag/1
-export([
application_defined_tag/1,
bespoketag_recalc/1
]).
all() -> [
application_defined_tag
application_defined_tag,
bespoketag_recalc
].
@ -62,6 +65,8 @@ application_defined_tag_tester(KeyCount, Tag, Functions, ExpectMD) ->
StartOpts1 = [{root_path, RootPath},
{sync_strategy, testutil:sync_strategy()},
{log_level, warn},
{reload_strategy,
[{bespoke_tag1, retain}, {bespoke_tag2, retain}]},
{override_functions, Functions}],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
Value = leveled_rand:rand_bytes(512),
@ -108,8 +113,6 @@ application_defined_tag_tester(KeyCount, Tag, Functions, ExpectMD) ->
ok = leveled_bookie:book_close(Bookie2).
object_generator(Count, V) ->
Hash = erlang:phash2({count, V}),
Random = leveled_rand:uniform(1000),
@ -119,3 +122,113 @@ object_generator(Count, V) ->
Key,
[{hash, Hash}, {shard, Count rem 10},
{random, Random}, {value, V}]}.
bespoketag_recalc(_Config) ->
%% Get a sensible behaviour using the recalc compaction strategy with a
%% bespoke tag
RootPath = testutil:reset_filestructure(),
B0 = <<"B0">>,
KeyCount = 7000,
ExtractMDFun =
fun(bespoke_tag, Size, Obj) ->
[{index, IL}, {value, _V}] = Obj,
{{erlang:phash2(term_to_binary(Obj)),
Size,
{index, IL}},
[os:timestamp()]}
end,
CalcIndexFun =
fun(bespoke_tag, UpdMeta, PrvMeta) ->
% io:format("UpdMeta ~w PrvMeta ~w~n", [UpdMeta, PrvMeta]),
{index, UpdIndexes} = element(3, UpdMeta),
IndexDeltas =
case PrvMeta of
not_present ->
UpdIndexes;
PrvMeta when is_tuple(PrvMeta) ->
{index, PrvIndexes} = element(3, PrvMeta),
lists:subtract(UpdIndexes, PrvIndexes)
end,
lists:map(fun(I) -> {add, <<"temp_int">>, I} end, IndexDeltas)
end,
BookOpts = [{root_path, RootPath},
{cache_size, 1000},
{max_journalobjectcount, 6000},
{max_pencillercachesize, 8000},
{sync_strategy, testutil:sync_strategy()},
{reload_strategy, [{bespoke_tag, recalc}]},
{override_functions,
[{extract_metadata, ExtractMDFun},
{diff_indexspecs, CalcIndexFun}]}],
{ok, Book1} = leveled_bookie:book_start(BookOpts),
LoadFun =
fun(Book, MustFind) ->
fun(I) ->
testutil:stdload_object(Book,
B0, integer_to_binary(I rem KeyCount),
I, erlang:phash2({value, I}),
infinity, bespoke_tag, false, MustFind)
end
end,
lists:foreach(LoadFun(Book1, false), lists:seq(1, KeyCount)),
lists:foreach(LoadFun(Book1, true), lists:seq(KeyCount + 1, KeyCount * 2)),
FoldFun =
fun(_B0, {IV0, _K0}, Acc) ->
case IV0 - 1 of
Acc ->
Acc + 1;
_Unexpected ->
% io:format("Eh? - ~w ~w~n", [Unexpected, Acc]),
Acc + 1
end
end,
CountFold =
fun(Book, CurrentCount) ->
leveled_bookie:book_indexfold(Book,
B0,
{FoldFun, 0},
{<<"temp_int">>, 0, CurrentCount},
{true, undefined})
end,
{async, FolderA} = CountFold(Book1, 2 * KeyCount),
CountA = FolderA(),
io:format("Counted double index entries ~w - everything loaded OK~n",
[CountA]),
true = 2 * KeyCount == CountA,
ok = leveled_bookie:book_close(Book1),
{ok, Book2} = leveled_bookie:book_start(BookOpts),
lists:foreach(LoadFun(Book2, true), lists:seq(KeyCount * 2 + 1, KeyCount * 3)),
{async, FolderB} = CountFold(Book2, 3 * KeyCount),
CountB = FolderB(),
true = 3 * KeyCount == CountB,
testutil:compact_and_wait(Book2),
ok = leveled_bookie:book_close(Book2),
io:format("Restart from blank ledger~n"),
leveled_penciller:clean_testdir(proplists:get_value(root_path, BookOpts) ++
"/ledger"),
{ok, Book3} = leveled_bookie:book_start(BookOpts),
{async, FolderC} = CountFold(Book3, 3 * KeyCount),
CountC = FolderC(),
io:format("All index entries ~w present - recalc ok~n",
[CountC]),
true = 3 * KeyCount == CountC,
ok = leveled_bookie:book_close(Book3),
testutil:reset_filestructure().

View file

@ -7,7 +7,10 @@
hot_backup_simple/1,
hot_backup_changes/1,
retain_strategy/1,
recalc_strategy/1,
recalc_transition_strategy/1,
recovr_strategy/1,
stdtag_recalc/1,
aae_missingjournal/1,
aae_bustedjournal/1,
journal_compaction_bustedjournal/1,
@ -21,13 +24,16 @@ all() -> [
hot_backup_simple,
hot_backup_changes,
retain_strategy,
recalc_strategy,
recalc_transition_strategy,
recovr_strategy,
aae_missingjournal,
aae_bustedjournal,
journal_compaction_bustedjournal,
close_duringcompaction,
allkeydelta_journal_multicompact,
recompact_keydeltas
recompact_keydeltas,
stdtag_recalc
].
@ -145,8 +151,6 @@ recovery_with_samekeyupdates(_Config) ->
testutil:reset_filestructure().
hot_backup_simple(_Config) ->
% The journal may have a hot backup. This allows for an online Bookie
% to be sent a message to prepare a backup function, which an asynchronous
@ -233,85 +237,172 @@ hot_backup_changes(_Config) ->
testutil:reset_filestructure().
retain_strategy(_Config) ->
rotate_wipe_compact(retain, retain).
recalc_strategy(_Config) ->
rotate_wipe_compact(recalc, recalc).
recalc_transition_strategy(_Config) ->
rotate_wipe_compact(retain, recalc).
rotate_wipe_compact(Strategy1, Strategy2) ->
RootPath = testutil:reset_filestructure(),
BookOpts = [{root_path, RootPath},
{cache_size, 1000},
{max_journalobjectcount, 5000},
{sync_strategy, testutil:sync_strategy()},
{reload_strategy, [{?RIAK_TAG, retain}]}],
{reload_strategy, [{?RIAK_TAG, Strategy1}]}],
BookOptsAlt = [{root_path, RootPath},
{cache_size, 1000},
{max_journalobjectcount, 2000},
{sync_strategy, testutil:sync_strategy()},
{reload_strategy, [{?RIAK_TAG, retain}]},
{reload_strategy, [{?RIAK_TAG, Strategy2}]},
{max_run_length, 8}],
{ok, Spcl3, LastV3} = rotating_object_check(BookOpts, "Bucket3", 800),
{ok, Spcl3, LastV3} = rotating_object_check(BookOpts, "Bucket3", 400),
ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3}]),
{ok, Spcl4, LastV4} = rotating_object_check(BookOpts, "Bucket4", 1600),
{ok, Spcl4, LastV4} = rotating_object_check(BookOpts, "Bucket4", 800),
ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3},
{"Bucket4", Spcl4, LastV4}]),
{ok, Spcl5, LastV5} = rotating_object_check(BookOpts, "Bucket5", 3200),
ok = restart_from_blankledger(BookOptsAlt, [{"Bucket3", Spcl3, LastV3},
{"Bucket5", Spcl5, LastV5}]),
{ok, Spcl6, LastV6} = rotating_object_check(BookOpts, "Bucket6", 6400),
{ok, Spcl5, LastV5} = rotating_object_check(BookOpts, "Bucket5", 1600),
ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3},
{"Bucket4", Spcl4, LastV4},
{"Bucket5", Spcl5, LastV5},
{"Bucket6", Spcl6, LastV6}]),
{"Bucket5", Spcl5, LastV5}]),
{ok, Spcl6, LastV6} = rotating_object_check(BookOpts, "Bucket6", 3200),
{ok, Book1} = leveled_bookie:book_start(BookOpts),
compact_and_wait(Book1),
compact_and_wait(Book1),
ok = leveled_bookie:book_close(Book1),
ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3},
ok = restart_from_blankledger(BookOptsAlt, [{"Bucket3", Spcl3, LastV3},
{"Bucket4", Spcl4, LastV4},
{"Bucket5", Spcl5, LastV5},
{"Bucket6", Spcl6, LastV6}]),
{ok, Book2} = leveled_bookie:book_start(BookOptsAlt),
compact_and_wait(Book2),
ok = leveled_bookie:book_close(Book2),
{KSpcL2, _V2} = testutil:put_indexed_objects(Book2, "AltBucket6", 3000),
ok = restart_from_blankledger(BookOptsAlt, [{"Bucket3", Spcl3, LastV3},
{"Bucket4", Spcl4, LastV4},
{"Bucket5", Spcl5, LastV5},
{"Bucket6", Spcl6, LastV6}]),
{ok, Book3} = leveled_bookie:book_start(BookOptsAlt),
{KSpcL2, _V2} = testutil:put_indexed_objects(Book3, "AltBucket6", 3000),
Q2 = fun(RT) -> {index_query,
"AltBucket6",
{fun testutil:foldkeysfun/3, []},
{"idx1_bin", "#", "|"},
{RT, undefined}}
end,
{async, KFolder2A} = leveled_bookie:book_returnfolder(Book2, Q2(false)),
{async, KFolder2A} = leveled_bookie:book_returnfolder(Book3, Q2(false)),
KeyList2A = lists:usort(KFolder2A()),
true = length(KeyList2A) == 3000,
DeleteFun =
fun({DK, [{add, DIdx, DTerm}]}) ->
ok = testutil:book_riakdelete(Book2,
ok = testutil:book_riakdelete(Book3,
"AltBucket6",
DK,
[{remove, DIdx, DTerm}])
end,
lists:foreach(DeleteFun, KSpcL2),
{async, KFolder2AD} = leveled_bookie:book_returnfolder(Book2, Q2(false)),
KeyList2AD = lists:usort(KFolder2AD()),
true = length(KeyList2AD) == 0,
ok = leveled_bookie:book_close(Book2),
{ok, Book3} = leveled_bookie:book_start(BookOptsAlt),
io:format("Compact after deletions~n"),
compact_and_wait(Book3),
compact_and_wait(Book3),
{async, KFolder3AD} = leveled_bookie:book_returnfolder(Book3, Q2(false)),
KeyList3AD = lists:usort(KFolder3AD()),
true = length(KeyList3AD) == 0,
ok = leveled_bookie:book_close(Book3),
{ok, Book4} = leveled_bookie:book_start(BookOptsAlt),
io:format("Compact after deletions~n"),
compact_and_wait(Book4),
{async, KFolder4AD} = leveled_bookie:book_returnfolder(Book4, Q2(false)),
KeyList4AD = lists:usort(KFolder4AD()),
true = length(KeyList4AD) == 0,
ok = leveled_bookie:book_close(Book4),
testutil:reset_filestructure().
stdtag_recalc(_Config) ->
%% Setting the ?STD_TAG to do recalc, should result in the ?STD_TAG
%% behaving like recovr - as no recalc is done for ?STD_TAG
%% NOTE -This is a test to confirm bad things happen!
RootPath = testutil:reset_filestructure(),
B0 = <<"B0">>,
KeyCount = 7000,
BookOpts = [{root_path, RootPath},
{cache_size, 1000},
{max_journalobjectcount, 5000},
{max_pencillercachesize, 10000},
{sync_strategy, testutil:sync_strategy()},
{reload_strategy, [{?STD_TAG, recalc}]}],
{ok, Book1} = leveled_bookie:book_start(BookOpts),
LoadFun =
fun(Book) ->
fun(I) ->
testutil:stdload_object(Book,
B0, erlang:phash2(I rem KeyCount),
I, erlang:phash2({value, I}),
infinity, ?STD_TAG, false, false)
end
end,
lists:foreach(LoadFun(Book1), lists:seq(1, KeyCount)),
lists:foreach(LoadFun(Book1), lists:seq(KeyCount + 1, KeyCount * 2)),
CountFold =
fun(Book, CurrentCount) ->
leveled_bookie:book_indexfold(Book,
B0,
{fun(_BF, _KT, Acc) -> Acc + 1 end,
0},
{<<"temp_int">>, 0, CurrentCount},
{true, undefined})
end,
{async, FolderA} = CountFold(Book1, 2 * KeyCount),
CountA = FolderA(),
io:format("Counted double index entries ~w - everything loaded OK~n",
[CountA]),
true = 2 * KeyCount == CountA,
ok = leveled_bookie:book_close(Book1),
{ok, Book2} = leveled_bookie:book_start(BookOpts),
lists:foreach(LoadFun(Book2), lists:seq(KeyCount * 2 + 1, KeyCount * 3)),
{async, FolderB} = CountFold(Book2, 3 * KeyCount),
CountB = FolderB(),
io:format("Maybe counted less index entries ~w - everything not loaded~n",
[CountB]),
true = 3 * KeyCount >= CountB,
compact_and_wait(Book2),
ok = leveled_bookie:book_close(Book2),
io:format("Restart from blank ledger"),
leveled_penciller:clean_testdir(proplists:get_value(root_path, BookOpts) ++
"/ledger"),
{ok, Book3} = leveled_bookie:book_start(BookOpts),
{async, FolderC} = CountFold(Book3, 3 * KeyCount),
CountC = FolderC(),
io:format("Missing index entries ~w - recalc not supported on ?STD_TAG~n",
[CountC]),
true = 3 * KeyCount > CountC,
ok = leveled_bookie:book_close(Book3),
testutil:reset_filestructure().
@ -845,6 +936,10 @@ rotating_object_check(BookOpts, B, NumberOfObjects) ->
B,
KSpcL2,
false),
ok = testutil:check_indexed_objects(Book1,
B,
KSpcL1 ++ KSpcL2 ++ KSpcL3,
V3),
ok = leveled_bookie:book_close(Book1),
{ok, Book2} = leveled_bookie:book_start(BookOpts),
ok = testutil:check_indexed_objects(Book2,

View file

@ -905,7 +905,7 @@ handoff(_Config) ->
{sync_strategy, sync}],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
% Add some noe Riak objects in - which should be ignored in folds.
% Add some none Riak objects in - which should be ignored in folds.
Hashes = testutil:stdload(Bookie1, 1000),
% Generate 200K objects to be used within the test, and load them into
% the first store (outputting the generated objects as a list of lists)

View file

@ -10,6 +10,7 @@
stdload/2,
stdload_expiring/3,
stdload_object/6,
stdload_object/9,
reset_filestructure/0,
reset_filestructure/1,
check_bucket_stats/2,
@ -59,7 +60,8 @@
get_value_from_objectlistitem/1,
numbered_key/1,
fixed_bin_key/1,
convert_to_seconds/1]).
convert_to_seconds/1,
compact_and_wait/1]).
-define(RETURN_TERMS, {true, undefined}).
-define(SLOWOFFER_DELAY, 5).
@ -68,6 +70,7 @@
-define(MD_VTAG, <<"X-Riak-VTag">>).
-define(MD_LASTMOD, <<"X-Riak-Last-Modified">>).
-define(MD_DELETED, <<"X-Riak-Deleted">>).
-define(MD_INDEX, <<"index">>).
-define(EMPTY_VTAG_BIN, <<"e">>).
-define(ROOT_PATH, "test").
@ -240,17 +243,35 @@ stdload_expiring(Book, KeyCount, TTL, V, Acc) ->
stdload_expiring(Book, KeyCount - 1, TTL, V, [{I, B, K}|Acc]).
stdload_object(Book, B, K, I, V, TTL) ->
Obj = [{index, I}, {value, V}],
IdxSpecs =
case leveled_bookie:book_get(Book, B, K) of
{ok, PrevObj} ->
{index, OldI} = lists:keyfind(index, 1, PrevObj),
io:format("Remove index ~w for ~w~n", [OldI, I]),
[{remove, <<"temp_int">>, OldI}, {add, <<"temp_int">>, I}];
not_found ->
[{add, <<"temp_int">>, I}]
stdload_object(Book, B, K, I, V, TTL, ?STD_TAG, true, false).
stdload_object(Book, B, K, I, V, TTL, Tag, RemovePrev2i, MustFind) ->
Obj = [{index, [I]}, {value, V}],
{IdxSpecs, Obj0} =
case {leveled_bookie:book_get(Book, B, K, Tag), MustFind} of
{{ok, PrevObj}, _} ->
{index, PrevIs} = lists:keyfind(index, 1, PrevObj),
case RemovePrev2i of
true ->
MapFun =
fun(OldI) -> {remove, <<"temp_int">>, OldI} end,
{[{add, <<"temp_int">>, I}|lists:map(MapFun, PrevIs)],
Obj};
false ->
{[{add, <<"temp_int">>, I}],
[{index, [I|PrevIs]}, {value, V}]}
end;
{not_found, false} ->
{[{add, <<"temp_int">>, I}], Obj}
end,
R =
case TTL of
infinity ->
leveled_bookie:book_put(Book, B, K, Obj0, IdxSpecs, Tag);
TTL when is_integer(TTL) ->
leveled_bookie:book_tempput(Book, B, K, Obj0,
IdxSpecs, Tag, TTL)
end,
R = leveled_bookie:book_tempput(Book, B, K, Obj, IdxSpecs, ?STD_TAG, TTL),
case R of
ok ->
ok;
@ -261,6 +282,7 @@ stdload_object(Book, B, K, I, V, TTL) ->
reset_filestructure() ->
reset_filestructure(0, ?ROOT_PATH).
@ -517,23 +539,30 @@ set_object(Bucket, Key, Value, IndexGen) ->
set_object(Bucket, Key, Value, IndexGen, []).
set_object(Bucket, Key, Value, IndexGen, Indexes2Remove) ->
set_object(Bucket, Key, Value, IndexGen, Indexes2Remove, []).
set_object(Bucket, Key, Value, IndexGen, Indexes2Remove, IndexesNotToRemove) ->
IdxSpecs = IndexGen(),
Indexes =
lists:map(fun({add, IdxF, IdxV}) -> {IdxF, IdxV} end,
IdxSpecs ++ IndexesNotToRemove),
Obj = {Bucket,
Key,
Value,
IndexGen() ++ lists:map(fun({add, IdxF, IdxV}) ->
{remove, IdxF, IdxV} end,
IdxSpecs ++
lists:map(fun({add, IdxF, IdxV}) -> {remove, IdxF, IdxV} end,
Indexes2Remove),
[{"MDK", "MDV" ++ Key},
{"MDK2", "MDV" ++ Key},
{?MD_LASTMOD, os:timestamp()}]},
{B1, K1, V1, Spec1, MD} = Obj,
[{<<"MDK">>, "MDV" ++ Key},
{<<"MDK2">>, "MDV" ++ Key},
{?MD_LASTMOD, os:timestamp()},
{?MD_INDEX, Indexes}]},
{B1, K1, V1, DeltaSpecs, MD} = Obj,
Content = #r_content{metadata=dict:from_list(MD), value=V1},
{#r_object{bucket=B1,
key=K1,
contents=[Content],
vclock=generate_vclock()},
Spec1}.
DeltaSpecs}.
get_value_from_objectlistitem({_Int, Obj, _Spc}) ->
[Content] = Obj#r_object.contents,
@ -762,26 +791,39 @@ put_altered_indexed_objects(Book, Bucket, KSpecL) ->
put_altered_indexed_objects(Book, Bucket, KSpecL, true).
put_altered_indexed_objects(Book, Bucket, KSpecL, RemoveOld2i) ->
IndexGen = testutil:get_randomindexes_generator(1),
V = testutil:get_compressiblevalue(),
RplKSpecL = lists:map(fun({K, Spc}) ->
AddSpc = if
RemoveOld2i == true ->
[lists:keyfind(add, 1, Spc)];
RemoveOld2i == false ->
[]
IndexGen = get_randomindexes_generator(1),
V = get_compressiblevalue(),
FindAdditionFun = fun(SpcItem) -> element(1, SpcItem) == add end,
MapFun =
fun({K, Spc}) ->
OldSpecs = lists:filter(FindAdditionFun, Spc),
{RemoveSpc, AddSpc} =
case RemoveOld2i of
true ->
{OldSpecs, []};
false ->
{[], OldSpecs}
end,
{O, AltSpc} = testutil:set_object(Bucket,
K,
V,
IndexGen,
AddSpc),
case book_riakput(Book, O, AltSpc) of
{O, DeltaSpecs} =
set_object(Bucket, K, V,
IndexGen, RemoveSpc, AddSpc),
% DeltaSpecs should be new indexes added, and any old indexes which
% have been removed by this change where RemoveOld2i is true.
%
% The actual indexes within the object should reflect any history
% of indexes i.e. when RemoveOld2i is false.
%
% The [{Key, SpecL}] returned should accrue additions over loops if
% RemoveOld2i is false
case book_riakput(Book, O, DeltaSpecs) of
ok -> ok;
pause -> timer:sleep(?SLOWOFFER_DELAY)
end,
{K, AltSpc} end,
KSpecL),
% Note that order in the SpecL is important, as
% check_indexed_objects, needs to find the latest item added
{K, DeltaSpecs ++ AddSpc}
end,
RplKSpecL = lists:map(MapFun, KSpecL),
{RplKSpecL, V}.
rotating_object_check(RootPath, B, NumberOfObjects) ->
@ -790,16 +832,16 @@ rotating_object_check(RootPath, B, NumberOfObjects) ->
{max_journalsize, 5000000},
{sync_strategy, sync_strategy()}],
{ok, Book1} = leveled_bookie:book_start(BookOpts),
{KSpcL1, V1} = testutil:put_indexed_objects(Book1, B, NumberOfObjects),
ok = testutil:check_indexed_objects(Book1, B, KSpcL1, V1),
{KSpcL2, V2} = testutil:put_altered_indexed_objects(Book1, B, KSpcL1),
ok = testutil:check_indexed_objects(Book1, B, KSpcL2, V2),
{KSpcL3, V3} = testutil:put_altered_indexed_objects(Book1, B, KSpcL2),
{KSpcL1, V1} = put_indexed_objects(Book1, B, NumberOfObjects),
ok = check_indexed_objects(Book1, B, KSpcL1, V1),
{KSpcL2, V2} = put_altered_indexed_objects(Book1, B, KSpcL1),
ok = check_indexed_objects(Book1, B, KSpcL2, V2),
{KSpcL3, V3} = put_altered_indexed_objects(Book1, B, KSpcL2),
ok = leveled_bookie:book_close(Book1),
{ok, Book2} = leveled_bookie:book_start(BookOpts),
ok = testutil:check_indexed_objects(Book2, B, KSpcL3, V3),
{KSpcL4, V4} = testutil:put_altered_indexed_objects(Book2, B, KSpcL3),
ok = testutil:check_indexed_objects(Book2, B, KSpcL4, V4),
ok = check_indexed_objects(Book2, B, KSpcL3, V3),
{KSpcL4, V4} = put_altered_indexed_objects(Book2, B, KSpcL3),
ok = check_indexed_objects(Book2, B, KSpcL4, V4),
Query = {keylist, ?RIAK_TAG, B, {fun foldkeysfun/3, []}},
{async, BList} = leveled_bookie:book_returnfolder(Book2, Query),
true = NumberOfObjects == length(BList()),
@ -839,16 +881,9 @@ restore_topending(RootPath, FileName) ->
find_journals(RootPath) ->
{ok, FNsA_J} = file:list_dir(RootPath ++ "/journal/journal_files"),
{ok, Regex} = re:compile(".*\.cdb"),
CDBFiles = lists:foldl(fun(FN, Acc) -> case re:run(FN, Regex) of
nomatch ->
Acc;
_ ->
[FN|Acc]
end
end,
[],
FNsA_J),
% Must not return a file with the .pnd extension
CDBFiles =
lists:filter(fun(FN) -> filename:extension(FN) == ".cdb" end, FNsA_J),
CDBFiles.
convert_to_seconds({MegaSec, Seconds, _MicroSec}) ->
@ -863,3 +898,24 @@ get_aae_segment({Type, Bucket}, Key) ->
leveled_tictac:keyto_segment32(<<Type/binary, Bucket/binary, Key/binary>>);
get_aae_segment(Bucket, Key) ->
leveled_tictac:keyto_segment32(<<Bucket/binary, Key/binary>>).
compact_and_wait(Book) ->
compact_and_wait(Book, 20000).
compact_and_wait(Book, WaitForDelete) ->
ok = leveled_bookie:book_compactjournal(Book, 30000),
F = fun leveled_bookie:book_islastcompactionpending/1,
lists:foldl(fun(X, Pending) ->
case Pending of
false ->
false;
true ->
io:format("Loop ~w waiting for journal "
++ "compaction to complete~n", [X]),
timer:sleep(20000),
F(Book)
end end,
true,
lists:seq(1, 15)),
io:format("Waiting for journal deletes~n"),
timer:sleep(WaitForDelete).