From 694d2c39f878cad2c534189fa04f4fdacea8b9a7 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Sun, 15 Mar 2020 22:14:42 +0000 Subject: [PATCH 01/13] Support for recalc Initial test included for running with recallc, and also transition from retain to recalc. Moves all logic for startup fold into leveled_bookie - avoid the Inker requiring any direct knowledge about implementation of the Penciller. --- src/leveled_bookie.erl | 228 +++++++++++++++++++++-------- src/leveled_codec.erl | 13 +- src/leveled_head.erl | 162 +++++++++++++++++++- src/leveled_iclerk.erl | 106 ++++++++++---- src/leveled_inker.erl | 38 +---- test/end_to_end/recovery_SUITE.erl | 80 ++++++---- test/end_to_end/testutil.erl | 82 ++++++----- 7 files changed, 522 insertions(+), 187 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 74707dc..bcbf4ef 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -93,8 +93,6 @@ ]). -export([empty_ledgercache/0, - loadqueue_ledgercache/1, - push_ledgercache/2, snapshot_store/6, fetch_value/2, journal_notfound/4]). @@ -105,6 +103,7 @@ -include_lib("eunit/include/eunit.hrl"). +-define(LOADING_PAUSE, 1000). -define(CACHE_SIZE, 2500). -define(MIN_CACHE_SIZE, 100). -define(MIN_PCL_CACHE_SIZE, 400). @@ -166,7 +165,7 @@ -record(state, {inker :: pid() | undefined, penciller :: pid() | undefined, cache_size :: integer() | undefined, - ledger_cache = #ledger_cache{}, + ledger_cache = #ledger_cache{} :: ledger_cache(), is_snapshot :: boolean() | undefined, slow_offer = false :: boolean(), @@ -315,7 +314,7 @@ % Journal, to recalculate the index changes based on the current % state of the Ledger and the object metadata. % - % reload_strategy ptions are a list - to map from a tag to the + % reload_strategy options are a list - to map from a tag to the % strategy (recovr|retain|recalc). Defualt strategies are: % [{?RIAK_TAG, retain}, {?STD_TAG, retain}] {max_pencillercachesize, pos_integer()|undefined} | @@ -378,7 +377,16 @@ % true ]. +-type initial_loadfun() :: + fun((leveled_codec:journal_key(), + any(), + non_neg_integer(), + {non_neg_integer(), non_neg_integer(), ledger_cache()}, + fun((any()) -> {binary(), non_neg_integer()})) -> + {loop|stop, + {non_neg_integer(), non_neg_integer(), ledger_cache()}}). +-export_type([initial_loadfun/0]). %%%============================================================================ %%% API @@ -1250,8 +1258,7 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) SQN, Object, ObjSize, - {IndexSpecs, TTL}, - State), + {IndexSpecs, TTL}), Cache0 = addto_ledgercache(Changes, State#state.ledger_cache), {_SW2, Timings2} = update_timings(SW1, {put, mem}, Timings1), @@ -1288,8 +1295,7 @@ handle_call({mput, ObjectSpecs, TTL}, From, State) Changes = preparefor_ledgercache(?INKT_MPUT, ?DUMMY, SQN, null, length(ObjectSpecs), - {ObjectSpecs, TTL}, - State), + {ObjectSpecs, TTL}), Cache0 = addto_ledgercache(Changes, State#state.ledger_cache), case State#state.slow_offer of true -> @@ -1537,6 +1543,23 @@ code_change(_OldVsn, State, _Extra) -> empty_ledgercache() -> #ledger_cache{mem = ets:new(empty, [ordered_set])}. + +-spec push_to_penciller(pid(), ledger_cache()) -> ok. +%% @doc +%% The push to penciller must start as a tree to correctly de-duplicate +%% the list by order before becoming a de-duplicated list for loading +push_to_penciller(Penciller, LedgerCache) -> + push_to_penciller_loop(Penciller, loadqueue_ledgercache(LedgerCache)). + +push_to_penciller_loop(Penciller, LedgerCache) -> + case push_ledgercache(Penciller, LedgerCache) of + returned -> + timer:sleep(?LOADING_PAUSE), + push_to_penciller_loop(Penciller, LedgerCache); + ok -> + ok + end. + -spec push_ledgercache(pid(), ledger_cache()) -> ok|returned. %% @doc %% Push the ledgercache to the Penciller - which should respond ok or @@ -1642,10 +1665,22 @@ startup(InkerOpts, PencillerOpts, State) -> {ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts), LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller), leveled_log:log("B0005", [LedgerSQN]), + ReloadStrategy = InkerOpts#inker_options.reload_strategy, + LoadFun = get_loadfun(ReloadStrategy, Penciller, State), + BatchFun = + fun(BatchAcc, _Acc) -> + push_to_penciller(Penciller, BatchAcc) + end, + InitAccFun = + fun(FN, CurrentMinSQN) -> + leveled_log:log("I0014", [FN, CurrentMinSQN]), + empty_ledgercache() + end, ok = leveled_inker:ink_loadpcl(Inker, LedgerSQN + 1, - get_loadfun(State), - Penciller), + LoadFun, + InitAccFun, + BatchFun), ok = leveled_inker:ink_checksqn(Inker, LedgerSQN), {Inker, Penciller}. @@ -2161,30 +2196,26 @@ check_notfound(CheckFrequency, CheckFun) -> -spec preparefor_ledgercache(leveled_codec:journal_key_tag()|null, leveled_codec:ledger_key()|?DUMMY, - integer(), any(), integer(), - leveled_codec:journal_keychanges(), - book_state()) - -> {integer()|no_lookup, - integer(), + non_neg_integer(), any(), integer(), + leveled_codec:journal_keychanges()) + -> {leveled_codec:segment_hash(), + non_neg_integer(), list(leveled_codec:ledger_kv())}. %% @doc %% Prepare an object and its related key changes for addition to the Ledger %% via the Ledger Cache. preparefor_ledgercache(?INKT_MPUT, - ?DUMMY, SQN, _O, _S, {ObjSpecs, TTL}, - _State) -> + ?DUMMY, SQN, _O, _S, {ObjSpecs, TTL}) -> ObjChanges = leveled_codec:obj_objectspecs(ObjSpecs, SQN, TTL), {no_lookup, SQN, ObjChanges}; preparefor_ledgercache(?INKT_KEYD, - LedgerKey, SQN, _Obj, _Size, {IdxSpecs, TTL}, - _State) -> + LedgerKey, SQN, _Obj, _Size, {IdxSpecs, TTL}) -> {Bucket, Key} = leveled_codec:from_ledgerkey(LedgerKey), KeyChanges = leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL), {no_lookup, SQN, KeyChanges}; preparefor_ledgercache(_InkTag, - LedgerKey, SQN, Obj, Size, {IdxSpecs, TTL}, - _State) -> + LedgerKey, SQN, Obj, Size, {IdxSpecs, TTL}) -> {Bucket, Key, MetaValue, {KeyH, _ObjH}, _LastMods} = leveled_codec:generate_ledgerkv(LedgerKey, SQN, Obj, Size, TTL), KeyChanges = @@ -2193,8 +2224,58 @@ preparefor_ledgercache(_InkTag, {KeyH, SQN, KeyChanges}. --spec addto_ledgercache({integer()|no_lookup, - integer(), +-spec recalcfor_ledgercache(leveled_codec:journal_key_tag()|null, + leveled_codec:ledger_key()|?DUMMY, + non_neg_integer(), any(), integer(), + leveled_codec:journal_keychanges(), + ledger_cache(), + pid()) + -> {leveled_codec:segment_hash(), + non_neg_integer(), + list(leveled_codec:ledger_kv())}. +%% @doc +%% When loading from the journal to the ledger, may hit a key which has the +%% `recalc` strategy. Such a key needs to recalculate the key changes by +%% comparison with the current state of the ledger, assuming it is a full +%% journal entry (i.e. KeyDeltas which may be a result of previously running +%% with a retain strategy should be ignored). +recalcfor_ledgercache(InkTag, + _LedgerKey, SQN, _Obj, _Size, {_IdxSpecs, _TTL}, + _LedgerCache, + _Penciller) + when InkTag == ?INKT_MPUT; InkTag == ?INKT_KEYD -> + {no_lookup, SQN, []}; +recalcfor_ledgercache(_InkTag, + LK, SQN, Obj, Size, {_IgnoreJournalIdxSpecs, TTL}, + LedgerCache, + Penciller) -> + {Bucket, Key, MetaValue, {KeyH, _ObjH}, _LastMods} = + leveled_codec:generate_ledgerkv(LK, SQN, Obj, Size, TTL), + OldObject = + case check_in_ledgercache(LK, KeyH, LedgerCache, loader) of + false -> + leveled_penciller:pcl_fetch(Penciller, LK, KeyH, true); + {value, KV} -> + KV + end, + OldMetadata = + case OldObject of + not_present -> + not_present; + {LK, LV} -> + leveled_codec:get_metadata(LV) + end, + UpdMetadata = leveled_codec:get_metadata(MetaValue), + IdxSpecs = + leveled_head:diff_indexspecs(element(1, LK), UpdMetadata, OldMetadata), + {KeyH, + SQN, + [{LK, MetaValue}] + ++ leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL)}. + + +-spec addto_ledgercache({leveled_codec:segment_hash(), + non_neg_integer(), list(leveled_codec:ledger_kv())}, ledger_cache()) -> ledger_cache(). @@ -2230,6 +2311,32 @@ addto_ledgercache({H, SQN, KeyChanges}, Cache, loader) -> max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}. +-spec check_in_ledgercache(leveled_codec:ledger_key(), + leveled_codec:segment_hash(), + ledger_cache(), + loader) -> + false | {value, leveled_codec:ledger_kv()}. +%% @doc +%% Check the ledger cache for a Key, when the ledger cache is in loader mode +%% and so is populating a queue not an ETS table +check_in_ledgercache(PK, Hash, Cache, loader) -> + case leveled_pmem:check_index(Hash, Cache#ledger_cache.index) of + [] -> + false; + _ -> + search(fun({K,_V}) -> K == PK end, + lists:reverse(Cache#ledger_cache.load_queue)) + end. + +-spec search(fun((any()) -> boolean()), list()) -> {value, any()}|false. +search(Pred, [Hd|Tail]) -> + case Pred(Hd) of + true -> {value, Hd}; + false -> search(Pred, Tail) + end; +search(Pred, []) when is_function(Pred, 1) -> + false. + -spec maybepush_ledgercache(integer(), ledger_cache(), pid()) -> {ok|returned, ledger_cache()}. %% @doc @@ -2276,44 +2383,47 @@ maybe_withjitter(_CacheSize, _MaxCacheSize) -> false. --spec get_loadfun(book_state()) -> fun(). +-spec get_loadfun(leveled_codec:compaction_strategy(), pid(), book_state()) + -> initial_loadfun(). %% @doc %% The LoadFun will be used by the Inker when walking across the Journal to -%% load the Penciller at startup -get_loadfun(State) -> - PrepareFun = - fun(Tag, PK, SQN, Obj, VS, IdxSpecs) -> - preparefor_ledgercache(Tag, PK, SQN, Obj, VS, IdxSpecs, State) - end, - LoadFun = - fun(KeyInJournal, ValueInJournal, _Pos, Acc0, ExtractFun) -> - {MinSQN, MaxSQN, OutputTree} = Acc0, - {SQN, InkTag, PK} = KeyInJournal, - % VBin may already be a term - {VBin, VSize} = ExtractFun(ValueInJournal), - {Obj, IdxSpecs} = leveled_codec:split_inkvalue(VBin), - case SQN of - SQN when SQN < MinSQN -> - {loop, Acc0}; - SQN when SQN < MaxSQN -> - Chngs = PrepareFun(InkTag, PK, SQN, Obj, VSize, IdxSpecs), - {loop, - {MinSQN, - MaxSQN, - addto_ledgercache(Chngs, OutputTree, loader)}}; - MaxSQN -> - leveled_log:log("B0006", [SQN]), - Chngs = PrepareFun(InkTag, PK, SQN, Obj, VSize, IdxSpecs), - {stop, - {MinSQN, - MaxSQN, - addto_ledgercache(Chngs, OutputTree, loader)}}; - SQN when SQN > MaxSQN -> - leveled_log:log("B0007", [MaxSQN, SQN]), - {stop, Acc0} - end - end, - LoadFun. +%% load the Penciller at startup. +get_loadfun(ReloadStrat, Penciller, _State) -> + fun(KeyInJournal, ValueInJournal, _Pos, Acc0, ExtractFun) -> + {MinSQN, MaxSQN, LedgerCache} = Acc0, + {SQN, InkTag, PK} = KeyInJournal, + case SQN of + SQN when SQN < MinSQN -> + {loop, Acc0}; + SQN when SQN > MaxSQN -> + leveled_log:log("B0007", [MaxSQN, SQN]), + {stop, Acc0}; + _ -> + {VBin, ValSize} = ExtractFun(ValueInJournal), + % VBin may already be a term + {Obj, IdxSpecs} = leveled_codec:split_inkvalue(VBin), + Chngs = + case leveled_codec:get_tagstrategy(PK, ReloadStrat) of + recalc -> + recalcfor_ledgercache(InkTag, PK, SQN, + Obj, ValSize, IdxSpecs, + LedgerCache, + Penciller); + _ -> + preparefor_ledgercache(InkTag, PK, SQN, + Obj, ValSize, IdxSpecs) + end, + case SQN of + MaxSQN -> + leveled_log:log("B0006", [SQN]), + LC0 = addto_ledgercache(Chngs, LedgerCache, loader), + {stop, {MinSQN, MaxSQN, LC0}}; + _ -> + LC0 = addto_ledgercache(Chngs, LedgerCache, loader), + {loop, {MinSQN, MaxSQN, LC0}} + end + end + end. delete_path(DirPath) -> diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 91386fc..95e894e 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -49,7 +49,8 @@ segment_hash/1, to_lookup/1, next_key/1, - return_proxy/4]). + return_proxy/4, + get_metadata/1]). -define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w"). -define(NRT_IDX, "$aae."). @@ -243,6 +244,10 @@ strip_to_indexdetails({_, V}) when tuple_size(V) > 4 -> striphead_to_v1details(V) -> {element(1, V), element(2, V), element(3, V), element(4, V)}. +-spec get_metadata(ledger_value()) -> metadata(). +get_metadata(LV) -> + element(4, LV). + -spec key_dominates(ledger_kv(), ledger_kv()) -> left_hand_first|right_hand_first|left_hand_dominant|right_hand_dominant. %% @doc @@ -358,7 +363,7 @@ endkey_passed(EndKey, CheckingKey) -> -spec inker_reload_strategy(compaction_strategy()) -> compaction_strategy(). %% @doc -%% Take the default startegy for compaction, and override the approach for any +%% Take the default strategy for compaction, and override the approach for any %% tags passed in inker_reload_strategy(AltList) -> ReloadStrategy0 = @@ -371,11 +376,13 @@ inker_reload_strategy(AltList) -> AltList). --spec get_tagstrategy(ledger_key(), compaction_strategy()) +-spec get_tagstrategy(ledger_key()|tag()|dummy, compaction_strategy()) -> skip|retain|recalc. %% @doc %% Work out the compaction strategy for the key get_tagstrategy({Tag, _, _, _}, Strategy) -> + get_tagstrategy(Tag, Strategy); +get_tagstrategy(Tag, Strategy) -> case lists:keyfind(Tag, 1, Strategy) of {Tag, TagStrat} -> TagStrat; diff --git a/src/leveled_head.erl b/src/leveled_head.erl index db94074..0669ec5 100644 --- a/src/leveled_head.erl +++ b/src/leveled_head.erl @@ -22,7 +22,8 @@ -export([key_to_canonicalbinary/1, build_head/2, - extract_metadata/3 + extract_metadata/3, + diff_indexspecs/3 ]). -export([get_size/2, @@ -71,12 +72,15 @@ -type object_metadata() :: riak_metadata()|std_metadata()|head_metadata(). -type appdefinable_function() :: - key_to_canonicalbinary | build_head | extract_metadata. + key_to_canonicalbinary | build_head | extract_metadata | diff_indexspecs. % Functions for which default behaviour can be over-written for the % application's own tags -type appdefinable_function_tuple() :: {appdefinable_function(), fun()}. +-type index_op() :: add | remove. +-type index_value() :: integer() | binary(). + -type head() :: binary()|tuple(). % TODO: @@ -174,6 +178,41 @@ default_extract_metadata(_Tag, SizeAsStoredInJournal, Obj) -> {{standard_hash(Obj), SizeAsStoredInJournal, undefined}, []}. +-spec diff_indexspecs(object_tag(), + object_metadata(), + object_metadata()|not_present) + -> leveled_codec:index_specs(). +%% @doc +%% Take an object metadata part from within the journal, and an object metadata +%% part from the ledger (which should have a lower SQN), and generate index +%% specs by determining the difference between the index specs on the object +%% to be loaded and that on object already stored. +%% +%% This is only relevant where the journal compaction strategy of `recalc` is +%% used, the Keychanges will be used when `retain` is the compaction strategy +diff_indexspecs(?RIAK_TAG, UpdatedMetadata, OldMetadata) -> + UpdIndexes = + get_indexes_from_siblingmetabin(element(1, UpdatedMetadata), []), + OldIndexes = + case OldMetadata of + not_present -> + []; + _ -> + get_indexes_from_siblingmetabin(element(1, OldMetadata), []) + end, + diff_index_data(OldIndexes, UpdIndexes); +diff_indexspecs(?STD_TAG, UpdatedMetadata, CurrentMetadata) -> + default_diff_indexspecs(?STD_TAG, UpdatedMetadata, CurrentMetadata); +diff_indexspecs(Tag, UpdatedMetadata, CurrentMetadata) -> + OverrideFun = + get_appdefined_function(diff_indexspecs, + fun default_diff_indexspecs/3, + 3), + OverrideFun(Tag, UpdatedMetadata, CurrentMetadata). + +default_diff_indexspecs(_Tag, _UpdatedMetadata, _CurrentMetadata) -> + []. + %%%============================================================================ %%% Standard External Functions %%%============================================================================ @@ -190,7 +229,7 @@ defined_objecttags() -> leveled_codec:compaction_method()}. %% @doc %% State the compaction_method to be used when reloading the Ledger from the -%% journal for each object tag. Note, no compaction startegy required for +%% journal for each object tag. Note, no compaction strategy required for %% head_only tag default_reload_strategy(Tag) -> {Tag, retain}. @@ -317,3 +356,120 @@ get_metadata_from_siblings(<>, MetaLen:32/integer, MetaBin:MetaLen/binary>>, [LastMod|LastMods]). + + +get_indexes_from_siblingmetabin(<<0:32/integer, + MetaLen:32/integer, + MetaBin:MetaLen/binary, + RestBin/binary>>, + Indexes) -> + UpdIndexes = lists:umerge(get_indexes_frommetabin(MetaBin), Indexes), + get_indexes_from_siblingmetabin(RestBin, UpdIndexes); +get_indexes_from_siblingmetabin(<>, + Indexes) when SibCount > 0 -> + get_indexes_from_siblingmetabin(RestBin, Indexes); +get_indexes_from_siblingmetabin(_, Indexes) -> + Indexes. + + +%% @doc +%% Parse the metabinary for an individual sibling and return a list of index +%% entries. +get_indexes_frommetabin(<<_LMD1:32/integer, _LMD2:32/integer, _LMD3:32/integer, + VTagLen:8/integer, _VTag:VTagLen/binary, + Deleted:1/binary-unit:8, + MetaRestBin/binary>>) when Deleted /= <<1>> -> + lists:usort(indexes_of_metabinary(MetaRestBin)); +get_indexes_frommetabin(_) -> + []. + + +indexes_of_metabinary(<<>>) -> + []; +indexes_of_metabinary(<>) -> + Key = decode_maybe_binary(KeyBin), + case Key of + <<"index">> -> + Value = decode_maybe_binary(ValueBin), + Value; + _ -> + indexes_of_metabinary(Rest) + end. + + +decode_maybe_binary(<<1, Bin/binary>>) -> + Bin; +decode_maybe_binary(<<0, Bin/binary>>) -> + binary_to_term(Bin); +decode_maybe_binary(<<_Other:8, Bin/binary>>) -> + Bin. + +-spec diff_index_data([{binary(), index_value()}], + [{binary(), index_value()}]) -> + [{index_op(), binary(), index_value()}]. +diff_index_data(OldIndexes, AllIndexes) -> + OldIndexSet = ordsets:from_list(OldIndexes), + AllIndexSet = ordsets:from_list(AllIndexes), + diff_specs_core(AllIndexSet, OldIndexSet). + + +diff_specs_core(AllIndexSet, OldIndexSet) -> + NewIndexSet = ordsets:subtract(AllIndexSet, OldIndexSet), + RemoveIndexSet = + ordsets:subtract(OldIndexSet, AllIndexSet), + NewIndexSpecs = + assemble_index_specs(ordsets:subtract(NewIndexSet, OldIndexSet), + add), + RemoveIndexSpecs = + assemble_index_specs(RemoveIndexSet, + remove), + NewIndexSpecs ++ RemoveIndexSpecs. + +%% @doc Assemble a list of index specs in the +%% form of triplets of the form +%% {IndexOperation, IndexField, IndexValue}. +-spec assemble_index_specs([{binary(), binary()}], index_op()) -> + [{index_op(), binary(), binary()}]. +assemble_index_specs(Indexes, IndexOp) -> + [{IndexOp, Index, Value} || {Index, Value} <- Indexes]. + + +%%%============================================================================ +%%% Test +%%%============================================================================ + +-ifdef(TEST). + + +index_extract_test() -> + SibMetaBin = <<0,0,0,1,0,0,0,0,0,0,0,221,0,0,6,48,0,4,130,247,0,1,250,134, + 1,101,0,0,0,0,4,1,77,68,75,0,0,0,44,0,131,107,0,39,77,68, + 86,101,49,55,52,55,48,50,55,45,54,50,99,49,45,52,48,57,55, + 45,97,53,102,50,45,53,54,98,51,98,97,57,57,99,55,56,50,0,0, + 0,6,1,105,110,100,101,120,0,0,0,79,0,131,108,0,0,0,2,104,2, + 107,0,8,105,100,120,49,95,98,105,110,107,0,20,50,49,53,50, + 49,49,48,55,50,51,49,55,51,48,83,111,112,104,105,97,104,2, + 107,0,8,105,100,120,49,95,98,105,110,107,0,19,50,49,56,50, + 48,53,49,48,49,51,48,49,52,54,65,118,101,114,121,106,0,0,0, + 5,1,77,68,75,50,0,0,0,44,0,131,107,0,39,77,68,86,101,49,55, + 52,55,48,50,55,45,54,50,99,49,45,52,48,57,55,45,97,53,102, + 50,45,53,54,98,51,98,97,57,57,99,55,56,50>>, + Indexes = get_indexes_from_siblingmetabin(SibMetaBin, []), + ExpIndexes = [{"idx1_bin","21521107231730Sophia"}, + {"idx1_bin","21820510130146Avery"}], + ?assertMatch(ExpIndexes, Indexes). + +diff_index_test() -> + UpdIndexes = + [{<<"idx1_bin">>,<<"20840930001702Zoe">>}, + {<<"idx1_bin">>,<<"20931011172606Emily">>}], + OldIndexes = + [{<<"idx1_bin">>,<<"20231126131808Madison">>}, + {<<"idx1_bin">>,<<"20931011172606Emily">>}], + IdxSpecs = diff_index_data(OldIndexes, UpdIndexes), + ?assertMatch([{add, <<"idx1_bin">>, <<"20840930001702Zoe">>}, + {remove, <<"idx1_bin">>,<<"20231126131808Madison">>}], IdxSpecs). + +-endif. \ No newline at end of file diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index e996c15..afc2c8c 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -145,6 +145,15 @@ % released from a compaction run of a single file to make it a run % worthwhile of compaction (released space is 100.0 - target e.g. 70.0 % means that 30.0% should be released) +-type key_size() :: + {{non_neg_integer(), + leveled_codec:journal_key_tag(), + leveled_codec:ledger_key()}, non_neg_integer()}. +-type corrupted_test_key_size() :: + {{non_neg_integer(), + leveled_codec:journal_key_tag(), + leveled_codec:ledger_key(), + null}, non_neg_integer()}. %%%============================================================================ %%% API @@ -315,7 +324,8 @@ handle_cast({score_filelist, [Entry|Tail]}, State) -> ScoringState#scoring_state.filter_server, ScoringState#scoring_state.max_sqn, ?SAMPLE_SIZE, - ?BATCH_SIZE), + ?BATCH_SIZE, + State#state.reload_strategy), Candidate = #candidate{low_sqn = LowSQN, filename = FN, @@ -493,7 +503,10 @@ schedule_compaction(CompactionHours, RunsPerDay, CurrentTS) -> %%% Internal functions %%%============================================================================ - +-spec check_single_file(pid(), fun(), any(), non_neg_integer(), + non_neg_integer(), non_neg_integer(), + leveled_codec:compaction_strategy()) -> + float(). %% @doc %% Get a score for a single CDB file in the journal. This will pull out a bunch %% of keys and sizes at random in an efficient way (by scanning the hashtable @@ -505,13 +518,19 @@ schedule_compaction(CompactionHours, RunsPerDay, CurrentTS) -> %% %% The score is based on a random sample - so will not be consistent between %% calls. -check_single_file(CDB, FilterFun, FilterServer, MaxSQN, SampleSize, BatchSize) -> +check_single_file(CDB, FilterFun, FilterServer, MaxSQN, + SampleSize, BatchSize, + ReloadStrategy) -> FN = leveled_cdb:cdb_filename(CDB), SW = os:timestamp(), PositionList = leveled_cdb:cdb_getpositions(CDB, SampleSize), KeySizeList = fetch_inbatches(PositionList, BatchSize, CDB, []), Score = - size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN), + size_comparison_score(KeySizeList, + FilterFun, + FilterServer, + MaxSQN, + ReloadStrategy), safely_log_filescore(PositionList, FN, Score, SW), Score. @@ -523,7 +542,15 @@ safely_log_filescore(PositionList, FN, Score, SW) -> div length(PositionList), leveled_log:log_timer("IC004", [Score, AvgJump, FN], SW). -size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN) -> +-spec size_comparison_score(list(key_size() | corrupted_test_key_size()), + fun(), + any(), + non_neg_integer(), + leveled_codec:compaction_strategy()) -> + float(). +size_comparison_score(KeySizeList, + FilterFun, FilterServer, MaxSQN, + RS) -> FoldFunForSizeCompare = fun(KS, {ActSize, RplSize}) -> case KS of @@ -532,7 +559,18 @@ size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN) -> leveled_codec:is_full_journalentry({SQN, Type, PK}), case IsJournalEntry of false -> - {ActSize + Size - ?CRC_SIZE, RplSize}; + TS = leveled_codec:get_tagstrategy(PK, RS), + % If the strategy is to retain key deltas, then + % scoring must reflect that. Key deltas are + % possible even if strategy does not allow as + % there is support for changing strategy from + % retain to recalc + case TS of + retain -> + {ActSize + Size - ?CRC_SIZE, RplSize}; + _ -> + {ActSize, RplSize + Size - ?CRC_SIZE} + end; true -> Check = FilterFun(FilterServer, PK, SQN), case {Check, SQN > MaxSQN} of @@ -567,12 +605,13 @@ fetch_inbatches([], _BatchSize, CDB, CheckedList) -> ok = leveled_cdb:cdb_clerkcomplete(CDB), CheckedList; fetch_inbatches(PositionList, BatchSize, CDB, CheckedList) -> - {Batch, Tail} = if - length(PositionList) >= BatchSize -> - lists:split(BatchSize, PositionList); - true -> - {PositionList, []} - end, + {Batch, Tail} = + if + length(PositionList) >= BatchSize -> + lists:split(BatchSize, PositionList); + true -> + {PositionList, []} + end, KL_List = leveled_cdb:cdb_directfetch(CDB, Batch, key_size), fetch_inbatches(Tail, BatchSize, CDB, CheckedList ++ KL_List). @@ -998,6 +1037,7 @@ fetch_testcdb(RP) -> check_single_file_test() -> RP = "test/test_area/", + RS = leveled_codec:inker_reload_strategy([]), ok = filelib:ensure_dir(leveled_inker:filepath(RP, journal_dir)), {ok, CDB} = fetch_testcdb(RP), LedgerSrv1 = [{8, {o, "Bucket", "Key1", null}}, @@ -1010,14 +1050,14 @@ check_single_file_test() -> _ -> replaced end end, - Score1 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 4), + Score1 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 4, RS), ?assertMatch(37.5, Score1), LedgerFun2 = fun(_Srv, _Key, _ObjSQN) -> current end, - Score2 = check_single_file(CDB, LedgerFun2, LedgerSrv1, 9, 8, 4), + Score2 = check_single_file(CDB, LedgerFun2, LedgerSrv1, 9, 8, 4, RS), ?assertMatch(100.0, Score2), - Score3 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 3), + Score3 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 3, RS), ?assertMatch(37.5, Score3), - Score4 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 4, 8, 4), + Score4 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 4, 8, 4, RS), ?assertMatch(75.0, Score4), ok = leveled_cdb:cdb_deletepending(CDB), ok = leveled_cdb:cdb_destroy(CDB). @@ -1132,6 +1172,7 @@ compact_empty_file_test() -> RP = "test/test_area/", ok = filelib:ensure_dir(leveled_inker:filepath(RP, journal_dir)), FN1 = leveled_inker:filepath(RP, 1, new_journal), + RS = leveled_codec:inker_reload_strategy([]), CDBopts = #cdb_options{binary_mode=true}, {ok, CDB1} = leveled_cdb:cdb_open_writer(FN1, CDBopts), {ok, FN2} = leveled_cdb:cdb_complete(CDB1), @@ -1140,7 +1181,7 @@ compact_empty_file_test() -> {2, {o, "Bucket", "Key2", null}}, {3, {o, "Bucket", "Key3", null}}], LedgerFun1 = fun(_Srv, _Key, _ObjSQN) -> replaced end, - Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4), + Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4, RS), ?assertMatch(0.0, Score1), ok = leveled_cdb:cdb_deletepending(CDB2), ok = leveled_cdb:cdb_destroy(CDB2). @@ -1207,15 +1248,22 @@ compact_singlefile_totwosmallfiles_testto() -> size_score_test() -> KeySizeList = - [{{1, ?INKT_STND, "Key1"}, 104}, - {{2, ?INKT_STND, "Key2"}, 124}, - {{3, ?INKT_STND, "Key3"}, 144}, - {{4, ?INKT_STND, "Key4"}, 154}, - {{5, ?INKT_STND, "Key5", "Subk1"}, 164}, - {{6, ?INKT_STND, "Key6"}, 174}, - {{7, ?INKT_STND, "Key7"}, 184}], + [{{1, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key1">>, null}}, 104}, + {{2, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key2">>, null}}, 124}, + {{3, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key3">>, null}}, 144}, + {{4, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key4">>, null}}, 154}, + {{5, + ?INKT_STND, + {?STD_TAG, <<"B">>, <<"Key5">>, <<"Subk1">>}, null}, + 164}, + {{6, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key6">>, null}}, 174}, + {{7, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key7">>, null}}, 184}], MaxSQN = 6, - CurrentList = ["Key1", "Key4", "Key5", "Key6"], + CurrentList = + [{?STD_TAG, <<"B">>, <<"Key1">>, null}, + {?STD_TAG, <<"B">>, <<"Key4">>, null}, + {?STD_TAG, <<"B">>, <<"Key5">>, <<"Subk1">>}, + {?STD_TAG, <<"B">>, <<"Key6">>, null}], FilterFun = fun(L, K, _SQN) -> case lists:member(K, L) of @@ -1223,7 +1271,13 @@ size_score_test() -> false -> replaced end end, - Score = size_comparison_score(KeySizeList, FilterFun, CurrentList, MaxSQN), + Score = + size_comparison_score(KeySizeList, + FilterFun, + CurrentList, + MaxSQN, + leveled_codec:inker_reload_strategy([])), + io:format("Score ~w", [Score]), ?assertMatch(true, Score > 69.0), ?assertMatch(true, Score < 70.0). diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index aa9d14d..ab26ca7 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -101,7 +101,7 @@ ink_fetch/3, ink_keycheck/3, ink_fold/4, - ink_loadpcl/4, + ink_loadpcl/5, ink_registersnapshot/2, ink_confirmdelete/2, ink_compactjournal/3, @@ -133,7 +133,6 @@ -define(WASTE_FP, "waste"). -define(JOURNAL_FILEX, "cdb"). -define(PENDING_FILEX, "pnd"). --define(LOADING_PAUSE, 1000). -define(LOADING_BATCH, 1000). -define(TEST_KC, {[], infinity}). @@ -321,7 +320,11 @@ ink_fold(Pid, MinSQN, FoldFuns, Acc) -> {fold, MinSQN, FoldFuns, Acc, by_runner}, infinity). --spec ink_loadpcl(pid(), integer(), fun(), pid()) -> ok. +-spec ink_loadpcl(pid(), + integer(), + leveled_bookie:initial_loadfun(), + fun((string(), non_neg_integer()) -> any()), + fun((any(), any()) -> ok)) -> ok. %% %% Function to prompt load of the Ledger at startup. The Penciller should %% have determined the lowest SQN not present in the Ledger, and the inker @@ -330,20 +333,11 @@ ink_fold(Pid, MinSQN, FoldFuns, Acc) -> %% %% The load fun should be a five arity function like: %% load_fun(KeyInJournal, ValueInJournal, _Position, Acc0, ExtractFun) -ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> - BatchFun = - fun(BatchAcc, _Acc) -> - push_to_penciller(Penciller, BatchAcc) - end, - InitAccFun = - fun(FN, CurrentMinSQN) -> - leveled_log:log("I0014", [FN, CurrentMinSQN]), - leveled_bookie:empty_ledgercache() - end, +ink_loadpcl(Pid, MinSQN, LoadFun, InitAccFun, BatchFun) -> gen_server:call(Pid, {fold, MinSQN, - {FilterFun, InitAccFun, BatchFun}, + {LoadFun, InitAccFun, BatchFun}, ok, as_ink}, infinity). @@ -1195,22 +1189,6 @@ foldfile_between_sequence(MinSQN, MaxSQN, FoldFuns, LastPosition, FN) end. - - -push_to_penciller(Penciller, LedgerCache) -> - % The push to penciller must start as a tree to correctly de-duplicate - % the list by order before becoming a de-duplicated list for loading - LC0 = leveled_bookie:loadqueue_ledgercache(LedgerCache), - push_to_penciller_loop(Penciller, LC0). - -push_to_penciller_loop(Penciller, LedgerCache) -> - case leveled_bookie:push_ledgercache(Penciller, LedgerCache) of - returned -> - timer:sleep(?LOADING_PAUSE), - push_to_penciller_loop(Penciller, LedgerCache); - ok -> - ok - end. sequencenumbers_fromfilenames(Filenames, Regex, IntName) -> diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index 640b688..3538cea 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -7,6 +7,8 @@ hot_backup_simple/1, hot_backup_changes/1, retain_strategy/1, + recalc_strategy/1, + recalc_transition_strategy/1, recovr_strategy/1, aae_missingjournal/1, aae_bustedjournal/1, @@ -21,6 +23,8 @@ all() -> [ hot_backup_simple, hot_backup_changes, retain_strategy, + recalc_strategy, + recalc_transition_strategy, recovr_strategy, aae_missingjournal, aae_bustedjournal, @@ -233,85 +237,97 @@ hot_backup_changes(_Config) -> testutil:reset_filestructure(). - retain_strategy(_Config) -> + rotate_wipe_compact(retain, retain). + +recalc_strategy(_Config) -> + rotate_wipe_compact(recalc, recalc). + +recalc_transition_strategy(_Config) -> + rotate_wipe_compact(retain, recalc). + + +rotate_wipe_compact(Strategy1, Strategy2) -> RootPath = testutil:reset_filestructure(), BookOpts = [{root_path, RootPath}, {cache_size, 1000}, {max_journalobjectcount, 5000}, {sync_strategy, testutil:sync_strategy()}, - {reload_strategy, [{?RIAK_TAG, retain}]}], + {reload_strategy, [{?RIAK_TAG, Strategy1}]}], BookOptsAlt = [{root_path, RootPath}, {cache_size, 1000}, {max_journalobjectcount, 2000}, {sync_strategy, testutil:sync_strategy()}, - {reload_strategy, [{?RIAK_TAG, retain}]}, + {reload_strategy, [{?RIAK_TAG, Strategy2}]}, {max_run_length, 8}], - {ok, Spcl3, LastV3} = rotating_object_check(BookOpts, "Bucket3", 800), + {ok, Spcl3, LastV3} = rotating_object_check(BookOpts, "Bucket3", 400), ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3}]), - {ok, Spcl4, LastV4} = rotating_object_check(BookOpts, "Bucket4", 1600), + {ok, Spcl4, LastV4} = rotating_object_check(BookOpts, "Bucket4", 800), ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3}, {"Bucket4", Spcl4, LastV4}]), - {ok, Spcl5, LastV5} = rotating_object_check(BookOpts, "Bucket5", 3200), - ok = restart_from_blankledger(BookOptsAlt, [{"Bucket3", Spcl3, LastV3}, - {"Bucket5", Spcl5, LastV5}]), - {ok, Spcl6, LastV6} = rotating_object_check(BookOpts, "Bucket6", 6400), + {ok, Spcl5, LastV5} = rotating_object_check(BookOpts, "Bucket5", 1600), ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3}, - {"Bucket4", Spcl4, LastV4}, - {"Bucket5", Spcl5, LastV5}, - {"Bucket6", Spcl6, LastV6}]), + {"Bucket5", Spcl5, LastV5}]), + {ok, Spcl6, LastV6} = rotating_object_check(BookOpts, "Bucket6", 3200), {ok, Book1} = leveled_bookie:book_start(BookOpts), compact_and_wait(Book1), - compact_and_wait(Book1), ok = leveled_bookie:book_close(Book1), - ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3}, + ok = restart_from_blankledger(BookOptsAlt, [{"Bucket3", Spcl3, LastV3}, {"Bucket4", Spcl4, LastV4}, {"Bucket5", Spcl5, LastV5}, {"Bucket6", Spcl6, LastV6}]), {ok, Book2} = leveled_bookie:book_start(BookOptsAlt), + compact_and_wait(Book2), + ok = leveled_bookie:book_close(Book2), - {KSpcL2, _V2} = testutil:put_indexed_objects(Book2, "AltBucket6", 3000), + ok = restart_from_blankledger(BookOptsAlt, [{"Bucket3", Spcl3, LastV3}, + {"Bucket4", Spcl4, LastV4}, + {"Bucket5", Spcl5, LastV5}, + {"Bucket6", Spcl6, LastV6}]), + + {ok, Book3} = leveled_bookie:book_start(BookOptsAlt), + + {KSpcL2, _V2} = testutil:put_indexed_objects(Book3, "AltBucket6", 3000), Q2 = fun(RT) -> {index_query, "AltBucket6", {fun testutil:foldkeysfun/3, []}, {"idx1_bin", "#", "|"}, {RT, undefined}} end, - {async, KFolder2A} = leveled_bookie:book_returnfolder(Book2, Q2(false)), + {async, KFolder2A} = leveled_bookie:book_returnfolder(Book3, Q2(false)), KeyList2A = lists:usort(KFolder2A()), true = length(KeyList2A) == 3000, DeleteFun = fun({DK, [{add, DIdx, DTerm}]}) -> - ok = testutil:book_riakdelete(Book2, + ok = testutil:book_riakdelete(Book3, "AltBucket6", DK, [{remove, DIdx, DTerm}]) end, lists:foreach(DeleteFun, KSpcL2), - {async, KFolder2AD} = leveled_bookie:book_returnfolder(Book2, Q2(false)), - KeyList2AD = lists:usort(KFolder2AD()), - true = length(KeyList2AD) == 0, - - ok = leveled_bookie:book_close(Book2), - - {ok, Book3} = leveled_bookie:book_start(BookOptsAlt), - - io:format("Compact after deletions~n"), - - compact_and_wait(Book3), - compact_and_wait(Book3), - {async, KFolder3AD} = leveled_bookie:book_returnfolder(Book3, Q2(false)), KeyList3AD = lists:usort(KFolder3AD()), true = length(KeyList3AD) == 0, ok = leveled_bookie:book_close(Book3), + {ok, Book4} = leveled_bookie:book_start(BookOptsAlt), + + io:format("Compact after deletions~n"), + + compact_and_wait(Book4), + + {async, KFolder4AD} = leveled_bookie:book_returnfolder(Book4, Q2(false)), + KeyList4AD = lists:usort(KFolder4AD()), + true = length(KeyList4AD) == 0, + + ok = leveled_bookie:book_close(Book4), + testutil:reset_filestructure(). @@ -845,6 +861,10 @@ rotating_object_check(BookOpts, B, NumberOfObjects) -> B, KSpcL2, false), + ok = testutil:check_indexed_objects(Book1, + B, + KSpcL1 ++ KSpcL2 ++ KSpcL3, + V3), ok = leveled_bookie:book_close(Book1), {ok, Book2} = leveled_bookie:book_start(BookOpts), ok = testutil:check_indexed_objects(Book2, diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index e2918dc..94ed966 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -68,6 +68,7 @@ -define(MD_VTAG, <<"X-Riak-VTag">>). -define(MD_LASTMOD, <<"X-Riak-Last-Modified">>). -define(MD_DELETED, <<"X-Riak-Deleted">>). +-define(MD_INDEX, <<"index">>). -define(EMPTY_VTAG_BIN, <<"e">>). -define(ROOT_PATH, "test"). @@ -517,23 +518,30 @@ set_object(Bucket, Key, Value, IndexGen) -> set_object(Bucket, Key, Value, IndexGen, []). set_object(Bucket, Key, Value, IndexGen, Indexes2Remove) -> - + set_object(Bucket, Key, Value, IndexGen, Indexes2Remove, []). + +set_object(Bucket, Key, Value, IndexGen, Indexes2Remove, IndexesNotToRemove) -> + IdxSpecs = IndexGen(), + Indexes = + lists:map(fun({add, IdxF, IdxV}) -> {IdxF, IdxV} end, + IdxSpecs ++ IndexesNotToRemove), Obj = {Bucket, Key, Value, - IndexGen() ++ lists:map(fun({add, IdxF, IdxV}) -> - {remove, IdxF, IdxV} end, - Indexes2Remove), - [{"MDK", "MDV" ++ Key}, - {"MDK2", "MDV" ++ Key}, - {?MD_LASTMOD, os:timestamp()}]}, + IdxSpecs ++ + lists:map(fun({add, IdxF, IdxV}) -> {remove, IdxF, IdxV} end, + Indexes2Remove), + [{<<"MDK">>, "MDV" ++ Key}, + {<<"MDK2">>, "MDV" ++ Key}, + {?MD_LASTMOD, os:timestamp()}, + {?MD_INDEX, Indexes}]}, {B1, K1, V1, Spec1, MD} = Obj, Content = #r_content{metadata=dict:from_list(MD), value=V1}, {#r_object{bucket=B1, key=K1, contents=[Content], vclock=generate_vclock()}, - Spec1}. + Spec1 ++ IndexesNotToRemove}. get_value_from_objectlistitem({_Int, Obj, _Spc}) -> [Content] = Obj#r_object.contents, @@ -762,26 +770,28 @@ put_altered_indexed_objects(Book, Bucket, KSpecL) -> put_altered_indexed_objects(Book, Bucket, KSpecL, true). put_altered_indexed_objects(Book, Bucket, KSpecL, RemoveOld2i) -> - IndexGen = testutil:get_randomindexes_generator(1), - V = testutil:get_compressiblevalue(), - RplKSpecL = lists:map(fun({K, Spc}) -> - AddSpc = if - RemoveOld2i == true -> - [lists:keyfind(add, 1, Spc)]; - RemoveOld2i == false -> - [] - end, - {O, AltSpc} = testutil:set_object(Bucket, - K, - V, - IndexGen, - AddSpc), - case book_riakput(Book, O, AltSpc) of - ok -> ok; - pause -> timer:sleep(?SLOWOFFER_DELAY) - end, - {K, AltSpc} end, - KSpecL), + IndexGen = get_randomindexes_generator(1), + V = get_compressiblevalue(), + FindAdditionFun = fun(SpcItem) -> element(1, SpcItem) == add end, + MapFun = + fun({K, Spc}) -> + {RemoveSpc, AddSpc} = + case RemoveOld2i of + true -> + {lists:filter(FindAdditionFun, Spc), []}; + false -> + {[], lists:filter(FindAdditionFun, Spc)} + end, + {O, AltSpc} = + set_object(Bucket, K, V, + IndexGen, RemoveSpc, AddSpc), + case book_riakput(Book, O, AltSpc) of + ok -> ok; + pause -> timer:sleep(?SLOWOFFER_DELAY) + end, + {K, AltSpc} + end, + RplKSpecL = lists:map(MapFun, KSpecL), {RplKSpecL, V}. rotating_object_check(RootPath, B, NumberOfObjects) -> @@ -790,16 +800,16 @@ rotating_object_check(RootPath, B, NumberOfObjects) -> {max_journalsize, 5000000}, {sync_strategy, sync_strategy()}], {ok, Book1} = leveled_bookie:book_start(BookOpts), - {KSpcL1, V1} = testutil:put_indexed_objects(Book1, B, NumberOfObjects), - ok = testutil:check_indexed_objects(Book1, B, KSpcL1, V1), - {KSpcL2, V2} = testutil:put_altered_indexed_objects(Book1, B, KSpcL1), - ok = testutil:check_indexed_objects(Book1, B, KSpcL2, V2), - {KSpcL3, V3} = testutil:put_altered_indexed_objects(Book1, B, KSpcL2), + {KSpcL1, V1} = put_indexed_objects(Book1, B, NumberOfObjects), + ok = check_indexed_objects(Book1, B, KSpcL1, V1), + {KSpcL2, V2} = put_altered_indexed_objects(Book1, B, KSpcL1), + ok = check_indexed_objects(Book1, B, KSpcL2, V2), + {KSpcL3, V3} = put_altered_indexed_objects(Book1, B, KSpcL2), ok = leveled_bookie:book_close(Book1), {ok, Book2} = leveled_bookie:book_start(BookOpts), - ok = testutil:check_indexed_objects(Book2, B, KSpcL3, V3), - {KSpcL4, V4} = testutil:put_altered_indexed_objects(Book2, B, KSpcL3), - ok = testutil:check_indexed_objects(Book2, B, KSpcL4, V4), + ok = check_indexed_objects(Book2, B, KSpcL3, V3), + {KSpcL4, V4} = put_altered_indexed_objects(Book2, B, KSpcL3), + ok = check_indexed_objects(Book2, B, KSpcL4, V4), Query = {keylist, ?RIAK_TAG, B, {fun foldkeysfun/3, []}}, {async, BList} = leveled_bookie:book_returnfolder(Book2, Query), true = NumberOfObjects == length(BList()), From 706ba8a6745aa923ef1ff8d186f59fd334283fb8 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Sun, 15 Mar 2020 23:15:09 +0000 Subject: [PATCH 02/13] Resolve issues with passing specs around --- test/end_to_end/testutil.erl | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index 94ed966..52db533 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -535,13 +535,13 @@ set_object(Bucket, Key, Value, IndexGen, Indexes2Remove, IndexesNotToRemove) -> {<<"MDK2">>, "MDV" ++ Key}, {?MD_LASTMOD, os:timestamp()}, {?MD_INDEX, Indexes}]}, - {B1, K1, V1, Spec1, MD} = Obj, + {B1, K1, V1, DeltaSpecs, MD} = Obj, Content = #r_content{metadata=dict:from_list(MD), value=V1}, {#r_object{bucket=B1, key=K1, contents=[Content], vclock=generate_vclock()}, - Spec1 ++ IndexesNotToRemove}. + DeltaSpecs}. get_value_from_objectlistitem({_Int, Obj, _Spc}) -> [Content] = Obj#r_object.contents, @@ -775,21 +775,32 @@ put_altered_indexed_objects(Book, Bucket, KSpecL, RemoveOld2i) -> FindAdditionFun = fun(SpcItem) -> element(1, SpcItem) == add end, MapFun = fun({K, Spc}) -> + OldSpecs = lists:filter(FindAdditionFun, Spc), {RemoveSpc, AddSpc} = case RemoveOld2i of true -> - {lists:filter(FindAdditionFun, Spc), []}; + {OldSpecs, []}; false -> - {[], lists:filter(FindAdditionFun, Spc)} + {[], OldSpecs} end, - {O, AltSpc} = + {O, DeltaSpecs} = set_object(Bucket, K, V, IndexGen, RemoveSpc, AddSpc), - case book_riakput(Book, O, AltSpc) of + % DeltaSpecs should be new indexes added, and any old indexes which + % have been removed by this change where RemoveOld2i is true. + % + % The actual indexes within the object should reflect any history + % of indexes i.e. when RemoveOld2i is false. + % + % The [{Key, SpecL}] returned should accrue additions over loops if + % RemoveOld2i is false + case book_riakput(Book, O, DeltaSpecs) of ok -> ok; pause -> timer:sleep(?SLOWOFFER_DELAY) end, - {K, AltSpc} + % Note that order in the SpecL is important, as + % check_indexed_objects, needs to find the latest item added + {K, DeltaSpecs ++ AddSpc} end, RplKSpecL = lists:map(MapFun, KSpecL), {RplKSpecL, V}. From 9d92ca0773020d2103d8366fb482a0e40913dbc8 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 16 Mar 2020 12:51:14 +0000 Subject: [PATCH 03/13] Add tests for appDefined functions --- src/leveled_bookie.erl | 4 + src/leveled_codec.erl | 10 +- src/leveled_head.erl | 9 ++ test/end_to_end/appdefined_SUITE.erl | 134 ++++++++++++++++++++++++++- test/end_to_end/recovery_SUITE.erl | 81 +++++++++++++++- test/end_to_end/testutil.erl | 77 ++++++++++++--- 6 files changed, 289 insertions(+), 26 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index bcbf4ef..27c158d 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -3276,6 +3276,10 @@ sqnorder_mutatefold_test() -> ok = book_destroy(Bookie1). +search_test() -> + ?assertMatch({value, 5}, search(fun(X) -> X == 5 end, lists:seq(1, 10))), + ?assertMatch(false, search(fun(X) -> X == 55 end, lists:seq(1, 10))). + check_notfound_test() -> ProbablyFun = fun() -> probably end, MissingFun = fun() -> missing end, diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 95e894e..35ab5d1 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -366,14 +366,12 @@ endkey_passed(EndKey, CheckingKey) -> %% Take the default strategy for compaction, and override the approach for any %% tags passed in inker_reload_strategy(AltList) -> - ReloadStrategy0 = + DefaultList = lists:map(fun leveled_head:default_reload_strategy/1, leveled_head:defined_objecttags()), - lists:foldl(fun({X, Y}, SList) -> - lists:keyreplace(X, 1, SList, {X, Y}) - end, - ReloadStrategy0, - AltList). + lists:ukeymerge(1, + lists:ukeysort(1, AltList), + lists:ukeysort(1, DefaultList)). -spec get_tagstrategy(ledger_key()|tag()|dummy, compaction_strategy()) diff --git a/src/leveled_head.erl b/src/leveled_head.erl index 0669ec5..ad4bbfa 100644 --- a/src/leveled_head.erl +++ b/src/leveled_head.erl @@ -472,4 +472,13 @@ diff_index_test() -> ?assertMatch([{add, <<"idx1_bin">>, <<"20840930001702Zoe">>}, {remove, <<"idx1_bin">>,<<"20231126131808Madison">>}], IdxSpecs). +decode_test() -> + Bin = <<"999">>, + BinTerm = term_to_binary("999"), + ?assertMatch("999", binary_to_list( + decode_maybe_binary(<<1:8/integer, Bin/binary>>))), + ?assertMatch("999", decode_maybe_binary(<<0:8/integer, BinTerm/binary>>)), + ?assertMatch("999", binary_to_list( + decode_maybe_binary(<<2:8/integer, Bin/binary>>))). + -endif. \ No newline at end of file diff --git a/test/end_to_end/appdefined_SUITE.erl b/test/end_to_end/appdefined_SUITE.erl index 79301de..c041844 100644 --- a/test/end_to_end/appdefined_SUITE.erl +++ b/test/end_to_end/appdefined_SUITE.erl @@ -2,11 +2,14 @@ -include_lib("common_test/include/ct.hrl"). -include("include/leveled.hrl"). -export([all/0]). --export([application_defined_tag/1 +-export([ + application_defined_tag/1, + bespoketag_recalc/1 ]). all() -> [ - application_defined_tag + % application_defined_tag, + bespoketag_recalc ]. @@ -62,6 +65,8 @@ application_defined_tag_tester(KeyCount, Tag, Functions, ExpectMD) -> StartOpts1 = [{root_path, RootPath}, {sync_strategy, testutil:sync_strategy()}, {log_level, warn}, + {reload_strategy, + [{bespoke_tag1, retain}, {bespoke_tag2, retain}]}, {override_functions, Functions}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), Value = leveled_rand:rand_bytes(512), @@ -107,8 +112,6 @@ application_defined_tag_tester(KeyCount, Tag, Functions, ExpectMD) -> ok = leveled_bookie:book_close(Bookie2). - - object_generator(Count, V) -> Hash = erlang:phash2({count, V}), @@ -118,4 +121,125 @@ object_generator(Count, V) -> {Bucket, Key, [{hash, Hash}, {shard, Count rem 10}, - {random, Random}, {value, V}]}. \ No newline at end of file + {random, Random}, {value, V}]}. + + + +bespoketag_recalc(_Config) -> + %% Get a sensible behaviour using the recalc compaction strategy with a + %% bespoke tag + + RootPath = testutil:reset_filestructure(), + B0 = <<"B0">>, + KeyCount = 7000, + + ExtractMDFun = + fun(bespoke_tag, Size, Obj) -> + [{index, IL}, {value, _V}] = Obj, + {{erlang:phash2(term_to_binary(Obj)), + Size, + {index, IL}}, + [os:timestamp()]} + end, + CalcIndexFun = + fun(bespoke_tag, UpdMeta, PrvMeta) -> + % io:format("UpdMeta ~w PrvMeta ~w~n", [UpdMeta, PrvMeta]), + {index, UpdIndexes} = element(3, UpdMeta), + IndexDeltas = + case PrvMeta of + not_present -> + UpdIndexes; + PrvMeta when is_tuple(PrvMeta) -> + {index, PrvIndexes} = element(3, PrvMeta), + lists:subtract(UpdIndexes, PrvIndexes) + end, + lists:map(fun(I) -> {add, <<"temp_int">>, I} end, IndexDeltas) + end, + + BookOpts = [{root_path, RootPath}, + {cache_size, 1000}, + {max_journalobjectcount, 6000}, + {max_pencillercachesize, 8000}, + {sync_strategy, testutil:sync_strategy()}, + {reload_strategy, [{bespoke_tag, recalc}]}, + {override_functions, + [{extract_metadata, ExtractMDFun}, + {diff_indexspecs, CalcIndexFun}]}], + + {ok, Book1} = leveled_bookie:book_start(BookOpts), + LoadFun = + fun(Book, MustFind) -> + fun(I) -> + testutil:stdload_object(Book, + B0, list_to_binary(["A"|integer_to_list(I rem KeyCount)]), + I, erlang:phash2({value, I}), + infinity, bespoke_tag, false, MustFind) + end + end, + lists:foreach(LoadFun(Book1, false), lists:seq(1, KeyCount)), + lists:foreach(LoadFun(Book1, true), lists:seq(KeyCount + 1, KeyCount * 2)), + + FoldFun = + fun(_B0, {IV0, _K0}, Acc) -> + case IV0 - 1 of + Acc -> + Acc + 1; + _Unexpected -> + % io:format("Eh? - ~w ~w~n", [Unexpected, Acc]), + Acc + 1 + end + end, + + CountFold = + fun(Book, CurrentCount) -> + leveled_bookie:book_indexfold(Book, + B0, + {FoldFun, 0}, + {<<"temp_int">>, 0, CurrentCount}, + {true, undefined}) + end, + + {async, FolderA} = CountFold(Book1, 2 * KeyCount), + CountA = FolderA(), + io:format("Counted double index entries ~w - everything loaded OK~n", + [CountA]), + true = 2 * KeyCount == CountA, + + io:format("Before close looking for Key 999 ~w~n", + [leveled_bookie:book_head(Book1, B0, <<"A999">>, bespoke_tag)]), + + ok = leveled_bookie:book_close(Book1), + + {ok, Book2} = leveled_bookie:book_start(BookOpts), + io:format("After opening looking for Key 999 ~w~n", + [leveled_bookie:book_head(Book2, B0, <<"A999">>, bespoke_tag)]), + + lists:foreach(LoadFun(Book2, true), lists:seq(KeyCount * 2 + 1, KeyCount * 3)), + + io:format("After fresh load looking for Key 999 ~w~n", + [leveled_bookie:book_head(Book2, B0, <<"A999">>, bespoke_tag)]), + + {async, FolderB} = CountFold(Book2, 3 * KeyCount), + CountB = FolderB(), + io:format("Counted triple index entries ~w - everything re-loaded~n", + [CountB]), + true = 3 * KeyCount == CountB, + + testutil:compact_and_wait(Book2), + ok = leveled_bookie:book_close(Book2), + + io:format("Restart from blank ledger~n"), + + leveled_penciller:clean_testdir(proplists:get_value(root_path, BookOpts) ++ + "/ledger"), + {ok, Book3} = leveled_bookie:book_start(BookOpts), + + {async, FolderC} = CountFold(Book3, 3 * KeyCount), + CountC = FolderC(), + io:format("All index entries ~w present - recalc ok~n", + [CountC]), + true = 3 * KeyCount == CountC, + + ok = leveled_bookie:book_close(Book3), + + testutil:reset_filestructure(). \ No newline at end of file diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index 3538cea..44d9418 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -10,6 +10,7 @@ recalc_strategy/1, recalc_transition_strategy/1, recovr_strategy/1, + stdtag_recalc/1, aae_missingjournal/1, aae_bustedjournal/1, journal_compaction_bustedjournal/1, @@ -31,7 +32,8 @@ all() -> [ journal_compaction_bustedjournal, close_duringcompaction, allkeydelta_journal_multicompact, - recompact_keydeltas + recompact_keydeltas, + stdtag_recalc ]. @@ -149,8 +151,6 @@ recovery_with_samekeyupdates(_Config) -> testutil:reset_filestructure(). - - hot_backup_simple(_Config) -> % The journal may have a hot backup. This allows for an online Bookie % to be sent a message to prepare a backup function, which an asynchronous @@ -331,6 +331,81 @@ rotate_wipe_compact(Strategy1, Strategy2) -> testutil:reset_filestructure(). +stdtag_recalc(_Config) -> + %% Setting the ?STD_TAG to do recalc, should result in the ?STD_TAG + %% behaving like recovr - as no recalc is done for ?STD_TAG + + %% NOTE -This is a test to confirm bad things happen! + + RootPath = testutil:reset_filestructure(), + B0 = <<"B0">>, + KeyCount = 7000, + BookOpts = [{root_path, RootPath}, + {cache_size, 1000}, + {max_journalobjectcount, 5000}, + {max_pencillercachesize, 10000}, + {sync_strategy, testutil:sync_strategy()}, + {reload_strategy, [{?STD_TAG, recalc}]}], + {ok, Book1} = leveled_bookie:book_start(BookOpts), + LoadFun = + fun(Book) -> + fun(I) -> + testutil:stdload_object(Book, + B0, erlang:phash2(I rem KeyCount), + I, erlang:phash2({value, I}), + infinity, ?STD_TAG, false, false) + end + end, + lists:foreach(LoadFun(Book1), lists:seq(1, KeyCount)), + lists:foreach(LoadFun(Book1), lists:seq(KeyCount + 1, KeyCount * 2)), + + CountFold = + fun(Book, CurrentCount) -> + leveled_bookie:book_indexfold(Book, + B0, + {fun(_BF, _KT, Acc) -> Acc + 1 end, + 0}, + {<<"temp_int">>, 0, CurrentCount}, + {true, undefined}) + end, + + {async, FolderA} = CountFold(Book1, 2 * KeyCount), + CountA = FolderA(), + io:format("Counted double index entries ~w - everything loaded OK~n", + [CountA]), + true = 2 * KeyCount == CountA, + + ok = leveled_bookie:book_close(Book1), + + {ok, Book2} = leveled_bookie:book_start(BookOpts), + lists:foreach(LoadFun(Book2), lists:seq(KeyCount * 2 + 1, KeyCount * 3)), + + {async, FolderB} = CountFold(Book2, 3 * KeyCount), + CountB = FolderB(), + io:format("Maybe counted less index entries ~w - everything not loaded~n", + [CountB]), + true = 3 * KeyCount >= CountB, + + compact_and_wait(Book2), + ok = leveled_bookie:book_close(Book2), + + io:format("Restart from blank ledger"), + + leveled_penciller:clean_testdir(proplists:get_value(root_path, BookOpts) ++ + "/ledger"), + {ok, Book3} = leveled_bookie:book_start(BookOpts), + + {async, FolderC} = CountFold(Book3, 3 * KeyCount), + CountC = FolderC(), + io:format("Missing index entries ~w - recalc not supported on ?STD_TAG~n", + [CountC]), + true = 3 * KeyCount > CountC, + + ok = leveled_bookie:book_close(Book3), + + testutil:reset_filestructure(). + + recovr_strategy(_Config) -> RootPath = testutil:reset_filestructure(), BookOpts = [{root_path, RootPath}, diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index 52db533..b84d757 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -10,6 +10,7 @@ stdload/2, stdload_expiring/3, stdload_object/6, + stdload_object/9, reset_filestructure/0, reset_filestructure/1, check_bucket_stats/2, @@ -59,7 +60,8 @@ get_value_from_objectlistitem/1, numbered_key/1, fixed_bin_key/1, - convert_to_seconds/1]). + convert_to_seconds/1, + compact_and_wait/1]). -define(RETURN_TERMS, {true, undefined}). -define(SLOWOFFER_DELAY, 5). @@ -241,17 +243,46 @@ stdload_expiring(Book, KeyCount, TTL, V, Acc) -> stdload_expiring(Book, KeyCount - 1, TTL, V, [{I, B, K}|Acc]). stdload_object(Book, B, K, I, V, TTL) -> - Obj = [{index, I}, {value, V}], - IdxSpecs = - case leveled_bookie:book_get(Book, B, K) of - {ok, PrevObj} -> - {index, OldI} = lists:keyfind(index, 1, PrevObj), - io:format("Remove index ~w for ~w~n", [OldI, I]), - [{remove, <<"temp_int">>, OldI}, {add, <<"temp_int">>, I}]; - not_found -> - [{add, <<"temp_int">>, I}] + stdload_object(Book, B, K, I, V, TTL, ?STD_TAG, true, false). + +stdload_object(Book, B, K, I, V, TTL, Tag, RemovePrev2i, MustFind) -> + Obj = [{index, [I]}, {value, V}], + {IdxSpecs, Obj0} = + case {leveled_bookie:book_get(Book, B, K, Tag), MustFind} of + {{ok, PrevObj}, _} -> + {index, PrevIs} = lists:keyfind(index, 1, PrevObj), + case RemovePrev2i of + true -> + MapFun = + fun(OldI) -> {remove, <<"temp_int">>, OldI} end, + {[{add, <<"temp_int">>, I}|lists:map(MapFun, PrevIs)], + Obj}; + false -> + {[{add, <<"temp_int">>, I}], + [{index, [I|PrevIs]}, {value, V}]} + end; + {not_found, false} -> + {[{add, <<"temp_int">>, I}], Obj}; + {not_found, true} -> + HR = leveled_bookie:book_head(Book, B, K, Tag), + io:format("Unexpected not_found for key=~w I=~w HR=~w~n ", + [K, I, HR]), + {[{add, <<"temp_int">>, I}], Obj} end, - R = leveled_bookie:book_tempput(Book, B, K, Obj, IdxSpecs, ?STD_TAG, TTL), + R = + case TTL of + infinity -> + leveled_bookie:book_put(Book, B, K, Obj0, IdxSpecs, Tag); + TTL when is_integer(TTL) -> + leveled_bookie:book_tempput(Book, B, K, Obj0, + IdxSpecs, Tag, TTL) + end, + case K of + <<57, 57, 57>> -> + io:format("K ~w I ~w R ~w~n", [K, I, R]); + _ -> + ok + end, case R of ok -> ok; @@ -262,6 +293,7 @@ stdload_object(Book, B, K, I, V, TTL) -> + reset_filestructure() -> reset_filestructure(0, ?ROOT_PATH). @@ -883,4 +915,25 @@ get_aae_segment(Obj) -> get_aae_segment({Type, Bucket}, Key) -> leveled_tictac:keyto_segment32(<>); get_aae_segment(Bucket, Key) -> - leveled_tictac:keyto_segment32(<>). \ No newline at end of file + leveled_tictac:keyto_segment32(<>). + +compact_and_wait(Book) -> + compact_and_wait(Book, 20000). + +compact_and_wait(Book, WaitForDelete) -> + ok = leveled_bookie:book_compactjournal(Book, 30000), + F = fun leveled_bookie:book_islastcompactionpending/1, + lists:foldl(fun(X, Pending) -> + case Pending of + false -> + false; + true -> + io:format("Loop ~w waiting for journal " + ++ "compaction to complete~n", [X]), + timer:sleep(20000), + F(Book) + end end, + true, + lists:seq(1, 15)), + io:format("Waiting for journal deletes~n"), + timer:sleep(WaitForDelete). From 6350302ea8cfa462c98b5d698d5ae8868b345323 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 16 Mar 2020 13:32:52 +0000 Subject: [PATCH 04/13] Uncomment test --- test/end_to_end/appdefined_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/end_to_end/appdefined_SUITE.erl b/test/end_to_end/appdefined_SUITE.erl index c041844..8f44560 100644 --- a/test/end_to_end/appdefined_SUITE.erl +++ b/test/end_to_end/appdefined_SUITE.erl @@ -8,7 +8,7 @@ ]). all() -> [ - % application_defined_tag, + application_defined_tag, bespoketag_recalc ]. From dbceda876cea052ea4899114af1e0e1dbfe68870 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 16 Mar 2020 16:35:06 +0000 Subject: [PATCH 05/13] Issue with tag order https://github.com/martinsumner/leveled/issues/309 Resolve issue, and remove test log entries used when discovering issue. --- src/leveled_sst.erl | 4 ++-- test/end_to_end/appdefined_SUITE.erl | 13 +------------ test/end_to_end/testutil.erl | 11 ----------- 3 files changed, 3 insertions(+), 25 deletions(-) diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 2eddd34..330d1d4 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -1511,7 +1511,7 @@ accumulate_positions({K, V}, {PosBinAcc, NoHashCount, HashAcc, LMDAcc}) -> NHC:7/integer, PosBinAcc/binary>>, 0, - HashAcc, + [H1|HashAcc], LMDAcc0} end; false -> @@ -2304,7 +2304,7 @@ split_lists(KVList1, SlotLists, N, PressMethod, IdxModDate) -> -spec merge_lists(list(), list(), tuple(), sst_options(), boolean()) -> {list(), list(), list(tuple()), tuple()|null}. %% @doc -%% Merge lists when merging across more thna one file. KVLists that are +%% Merge lists when merging across more than one file. KVLists that are %% provided may include pointers to fetch more Keys/Values from the source %% file merge_lists(KVList1, KVList2, LevelInfo, SSTOpts, IndexModDate) -> diff --git a/test/end_to_end/appdefined_SUITE.erl b/test/end_to_end/appdefined_SUITE.erl index 8f44560..9a8cf39 100644 --- a/test/end_to_end/appdefined_SUITE.erl +++ b/test/end_to_end/appdefined_SUITE.erl @@ -171,7 +171,7 @@ bespoketag_recalc(_Config) -> fun(Book, MustFind) -> fun(I) -> testutil:stdload_object(Book, - B0, list_to_binary(["A"|integer_to_list(I rem KeyCount)]), + B0, integer_to_binary(I rem KeyCount), I, erlang:phash2({value, I}), infinity, bespoke_tag, false, MustFind) end @@ -205,24 +205,13 @@ bespoketag_recalc(_Config) -> [CountA]), true = 2 * KeyCount == CountA, - io:format("Before close looking for Key 999 ~w~n", - [leveled_bookie:book_head(Book1, B0, <<"A999">>, bespoke_tag)]), - ok = leveled_bookie:book_close(Book1), {ok, Book2} = leveled_bookie:book_start(BookOpts), - io:format("After opening looking for Key 999 ~w~n", - [leveled_bookie:book_head(Book2, B0, <<"A999">>, bespoke_tag)]), - lists:foreach(LoadFun(Book2, true), lists:seq(KeyCount * 2 + 1, KeyCount * 3)), - io:format("After fresh load looking for Key 999 ~w~n", - [leveled_bookie:book_head(Book2, B0, <<"A999">>, bespoke_tag)]), - {async, FolderB} = CountFold(Book2, 3 * KeyCount), CountB = FolderB(), - io:format("Counted triple index entries ~w - everything re-loaded~n", - [CountB]), true = 3 * KeyCount == CountB, testutil:compact_and_wait(Book2), diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index b84d757..00ad9ad 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -262,11 +262,6 @@ stdload_object(Book, B, K, I, V, TTL, Tag, RemovePrev2i, MustFind) -> [{index, [I|PrevIs]}, {value, V}]} end; {not_found, false} -> - {[{add, <<"temp_int">>, I}], Obj}; - {not_found, true} -> - HR = leveled_bookie:book_head(Book, B, K, Tag), - io:format("Unexpected not_found for key=~w I=~w HR=~w~n ", - [K, I, HR]), {[{add, <<"temp_int">>, I}], Obj} end, R = @@ -277,12 +272,6 @@ stdload_object(Book, B, K, I, V, TTL, Tag, RemovePrev2i, MustFind) -> leveled_bookie:book_tempput(Book, B, K, Obj0, IdxSpecs, Tag, TTL) end, - case K of - <<57, 57, 57>> -> - io:format("K ~w I ~w R ~w~n", [K, I, R]); - _ -> - ok - end, case R of ok -> ok; From b49a5ff53d5cd964a1f1e7cf9925ebc785f20a4b Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 16 Mar 2020 17:35:38 +0000 Subject: [PATCH 06/13] Additional unit tests of MetaBin handling --- src/leveled_head.erl | 51 +++++++++++++++++++++++++++++++++----------- 1 file changed, 38 insertions(+), 13 deletions(-) diff --git a/src/leveled_head.erl b/src/leveled_head.erl index ad4bbfa..2db0f72 100644 --- a/src/leveled_head.erl +++ b/src/leveled_head.erl @@ -444,22 +444,47 @@ assemble_index_specs(Indexes, IndexOp) -> index_extract_test() -> - SibMetaBin = <<0,0,0,1,0,0,0,0,0,0,0,221,0,0,6,48,0,4,130,247,0,1,250,134, - 1,101,0,0,0,0,4,1,77,68,75,0,0,0,44,0,131,107,0,39,77,68, - 86,101,49,55,52,55,48,50,55,45,54,50,99,49,45,52,48,57,55, - 45,97,53,102,50,45,53,54,98,51,98,97,57,57,99,55,56,50,0,0, - 0,6,1,105,110,100,101,120,0,0,0,79,0,131,108,0,0,0,2,104,2, - 107,0,8,105,100,120,49,95,98,105,110,107,0,20,50,49,53,50, - 49,49,48,55,50,51,49,55,51,48,83,111,112,104,105,97,104,2, - 107,0,8,105,100,120,49,95,98,105,110,107,0,19,50,49,56,50, - 48,53,49,48,49,51,48,49,52,54,65,118,101,114,121,106,0,0,0, - 5,1,77,68,75,50,0,0,0,44,0,131,107,0,39,77,68,86,101,49,55, - 52,55,48,50,55,45,54,50,99,49,45,52,48,57,55,45,97,53,102, - 50,45,53,54,98,51,98,97,57,57,99,55,56,50>>, + SibMetaBin = + <<0,0,0,1,0,0,0,0,0,0,0,221,0,0,6,48,0,4,130,247,0,1,250,134, + 1,101,0,0,0,0,4,1,77,68,75,0,0,0,44,0,131,107,0,39,77,68, + 86,101,49,55,52,55,48,50,55,45,54,50,99,49,45,52,48,57,55, + 45,97,53,102,50,45,53,54,98,51,98,97,57,57,99,55,56,50,0,0, + 0,6,1,105,110,100,101,120,0,0,0,79,0,131,108,0,0,0,2,104,2, + 107,0,8,105,100,120,49,95,98,105,110,107,0,20,50,49,53,50, + 49,49,48,55,50,51,49,55,51,48,83,111,112,104,105,97,104,2, + 107,0,8,105,100,120,49,95,98,105,110,107,0,19,50,49,56,50, + 48,53,49,48,49,51,48,49,52,54,65,118,101,114,121,106,0,0,0, + 5,1,77,68,75,50,0,0,0,44,0,131,107,0,39,77,68,86,101,49,55, + 52,55,48,50,55,45,54,50,99,49,45,52,48,57,55,45,97,53,102, + 50,45,53,54,98,51,98,97,57,57,99,55,56,50>>, Indexes = get_indexes_from_siblingmetabin(SibMetaBin, []), ExpIndexes = [{"idx1_bin","21521107231730Sophia"}, {"idx1_bin","21820510130146Avery"}], - ?assertMatch(ExpIndexes, Indexes). + ?assertMatch(ExpIndexes, Indexes), + SibMetaBinNoIdx = + <<0,0,0,1,0,0,0,0,0,0,0,128,0,0,6,48,0,4,130,247,0,1,250,134, + 1,101,0,0,0,0,4,1,77,68,75,0,0,0,44,0,131,107,0,39,77,68, + 86,101,49,55,52,55,48,50,55,45,54,50,99,49,45,52,48,57,55, + 45,97,53,102,50,45,53,54,98,51,98,97,57,57,99,55,56,50,0,0,0, + 5,1,77,68,75,50,0,0,0,44,0,131,107,0,39,77,68,86,101,49,55, + 52,55,48,50,55,45,54,50,99,49,45,52,48,57,55,45,97,53,102, + 50,45,53,54,98,51,98,97,57,57,99,55,56,50>>, + ?assertMatch([], get_indexes_from_siblingmetabin(SibMetaBinNoIdx, [])), + SibMetaBinOverhang = + <<0,0,0,1,0,0,0,0,0,0,0,221,0,0,6,48,0,4,130,247,0,1,250,134, + 1,101,0,0,0,0,4,1,77,68,75,0,0,0,44,0,131,107,0,39,77,68, + 86,101,49,55,52,55,48,50,55,45,54,50,99,49,45,52,48,57,55, + 45,97,53,102,50,45,53,54,98,51,98,97,57,57,99,55,56,50,0,0, + 0,6,1,105,110,100,101,120,0,0,0,79,0,131,108,0,0,0,2,104,2, + 107,0,8,105,100,120,49,95,98,105,110,107,0,20,50,49,53,50, + 49,49,48,55,50,51,49,55,51,48,83,111,112,104,105,97,104,2, + 107,0,8,105,100,120,49,95,98,105,110,107,0,19,50,49,56,50, + 48,53,49,48,49,51,48,49,52,54,65,118,101,114,121,106,0,0,0, + 5,1,77,68,75,50,0,0,0,44,0,131,107,0,39,77,68,86,101,49,55, + 52,55,48,50,55,45,54,50,99,49,45,52,48,57,55,45,97,53,102, + 50,45,53,54,98,51,98,97,57,57,99,55,56,50,0,0,0,0>>, + ?assertMatch(ExpIndexes, + get_indexes_from_siblingmetabin(SibMetaBinOverhang, [])). diff_index_test() -> UpdIndexes = From 5f7d261a87d19fc9378dd35018822dbe07519b52 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 16 Mar 2020 18:53:40 +0000 Subject: [PATCH 07/13] Improve test Genuine overhang --- src/leveled_head.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/leveled_head.erl b/src/leveled_head.erl index 2db0f72..3ee66c2 100644 --- a/src/leveled_head.erl +++ b/src/leveled_head.erl @@ -482,7 +482,8 @@ index_extract_test() -> 48,53,49,48,49,51,48,49,52,54,65,118,101,114,121,106,0,0,0, 5,1,77,68,75,50,0,0,0,44,0,131,107,0,39,77,68,86,101,49,55, 52,55,48,50,55,45,54,50,99,49,45,52,48,57,55,45,97,53,102, - 50,45,53,54,98,51,98,97,57,57,99,55,56,50,0,0,0,0>>, + 50,45,53,54,98,51,98,97,57,57,99,55,56,50,0,0,0,0,0,0,0,4, + 0,0,0,0>>, ?assertMatch(ExpIndexes, get_indexes_from_siblingmetabin(SibMetaBinOverhang, [])). From 808a858d0953330f4c1d0b3f22d97a059611d0b9 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 16 Mar 2020 21:41:47 +0000 Subject: [PATCH 08/13] Don't score a rolling file In giving an empty file a score of 0, a race condition was exposed. A file might not be active, but might still be rolling - and then cna get scored as 0, and immediately compacted. It will then be removed from the journal manifest. Check each file is not rolling before making it a candidate for rolling. --- src/leveled_iclerk.erl | 6 +++++- test/end_to_end/riak_SUITE.erl | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index afc2c8c..d69dd92 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -307,7 +307,11 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Manifest0}, % Don't want to process a queued call waiting on an old manifest [_Active|Manifest] = Manifest0, {FilterServer, MaxSQN} = InitiateFun(Checker), - ok = clerk_scorefilelist(self(), Manifest), + NotRollingFun = + fun({_LowSQN, _FN, Pid, _LK}) -> + not leveled_cdb:cdb_isrolling(Pid) + end, + ok = clerk_scorefilelist(self(), lists:filter(NotRollingFun, Manifest)), ScoringState = #scoring_state{filter_fun = FilterFun, filter_server = FilterServer, diff --git a/test/end_to_end/riak_SUITE.erl b/test/end_to_end/riak_SUITE.erl index 5e82ce7..da51127 100644 --- a/test/end_to_end/riak_SUITE.erl +++ b/test/end_to_end/riak_SUITE.erl @@ -905,7 +905,7 @@ handoff(_Config) -> {sync_strategy, sync}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), - % Add some noe Riak objects in - which should be ignored in folds. + % Add some none Riak objects in - which should be ignored in folds. Hashes = testutil:stdload(Bookie1, 1000), % Generate 200K objects to be used within the test, and load them into % the first store (outputting the generated objects as a list of lists) From 5b4edfebb6c4102de4a41fd0aaf7774ef6b12625 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 17 Mar 2020 14:20:57 +0000 Subject: [PATCH 09/13] Coverage cheat Very rarely, this line in the tests this line is not covered - so cheating here to consistently pass coverage --- src/leveled_sst.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 330d1d4..56de69e 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -3319,6 +3319,7 @@ simple_persisted_tester(SSTNewFun) -> Acc end end, + true = [] == MapFun({FirstKey, "V"}, []), % coverage cheat within MapFun KVList3 = lists:foldl(MapFun, [], KVList2), SW2 = os:timestamp(), lists:foreach(fun({K, H, _V}) -> From 50cb98ecdd37b88a32465c51d1de97130d82b0de Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 17 Mar 2020 17:29:59 +0000 Subject: [PATCH 10/13] Resolve intermittent test failure the previous regex filter still allowed files with cdb in the body of the name (which can be true as filenames are guid based) --- test/end_to_end/testutil.erl | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index 00ad9ad..0fe9a66 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -881,16 +881,9 @@ restore_topending(RootPath, FileName) -> find_journals(RootPath) -> {ok, FNsA_J} = file:list_dir(RootPath ++ "/journal/journal_files"), - {ok, Regex} = re:compile(".*\.cdb"), - CDBFiles = lists:foldl(fun(FN, Acc) -> case re:run(FN, Regex) of - nomatch -> - Acc; - _ -> - [FN|Acc] - end - end, - [], - FNsA_J), + % Must not return a file with the .pnd extension + CDBFiles = + lists:filter(fun(FN) -> filename:extension(FN) == ".cdb" end, FNsA_J), CDBFiles. convert_to_seconds({MegaSec, Seconds, _MicroSec}) -> From 8a9db9e75ed4bd9508bea0abd7d2fc3b2f54e9a9 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 23 Mar 2020 16:45:28 +0000 Subject: [PATCH 11/13] Add log of startegy when clerk starts compaction --- src/leveled_iclerk.erl | 2 ++ src/leveled_log.erl | 2 ++ 2 files changed, 4 insertions(+) diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index d69dd92..0551a31 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -297,6 +297,8 @@ handle_call(stop, _From, State) -> handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Manifest0}, State) -> + leveled_log:log("IC014", [State#state.reload_strategy, + State#state.max_run_length]), % Empty the waste folder clear_waste(State), SW = os:timestamp(), diff --git a/src/leveled_log.erl b/src/leveled_log.erl index 51072fa..ea84b51 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -356,6 +356,8 @@ {"IC013", {warn, "File with name ~s to be ignored in manifest as scanning for " ++ "first key returned empty - maybe corrupted"}}, + {"IC014", + {info, "Compaction to be run with strategy ~w and max_run_length ~w"}}, {"CDB01", {info, "Opening file for writing with filename ~s"}}, From 20a7a2257108f4a743584528471dc03f32da5a3a Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 24 Mar 2020 20:21:44 +0000 Subject: [PATCH 12/13] Add documentation for recalc option --- docs/DESIGN.md | 2 +- docs/STARTUP_OPTIONS.md | 4 ++-- src/leveled_bookie.erl | 12 ++++++++---- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/docs/DESIGN.md b/docs/DESIGN.md index 8e8c142..d095821 100644 --- a/docs/DESIGN.md +++ b/docs/DESIGN.md @@ -120,7 +120,7 @@ Three potential recovery strategies are supported to provide some flexibility fo - retain - on compaction KeyDeltas are retained in the Journal, only values are removed. -- recalc (not yet implemented) - the compaction rules assume that on recovery the key changes will be recalculated by comparing the change with the current database state. In recovery the key changes will be recalculated by comparing the change with the current database state. +- recalc - the compaction rules assume that on recovery the key changes will be recalculated by comparing the change with the current database state. In recovery the key changes will be recalculated by comparing the change with the current database state. A user-defined function should be passed in at startup to achieve this recalculation (to override `leveled_head:diff_indexspeacs/3`). ### Hot Backups diff --git a/docs/STARTUP_OPTIONS.md b/docs/STARTUP_OPTIONS.md index e73a021..c802f73 100644 --- a/docs/STARTUP_OPTIONS.md +++ b/docs/STARTUP_OPTIONS.md @@ -77,11 +77,11 @@ However, what if the Ledger had been erased? This could happen due to some corr The are three potential strategies: - - `skip` - don't worry about this scenario, require the Ledger to be backed up; + - `recovr` - don't worry about this scenario, require the Ledger to be backed up; - `retain` - discard the object itself on compaction but keep the key changes; - `recalc` - recalculate the indexes on reload by comparing the information on the object with the current state of the Ledger (as would be required by the PUT process when comparing IndexSpecs at PUT time). -There is no code for `recalc` at present it is simply a logical possibility. So to set a reload strategy there should be an entry like `{reload_strategy, [{TagName, skip|retain}]}`. By default tags are pre-set to `retain`. If there is no need to handle a corrupted Ledger, then all tags could be set to `skip`. +To set a reload strategy requires a list of tuples to match tag names to strategy `{reload_strategy, [{TagName, recovr|retain|recalc}]}`. By default tags are pre-set to `retain`. If there is no need to handle a corrupted Ledger, then all tags could be set to `recovr` - this assumes that either the ledger files are protected by some other means from corruption, or an external anti-entropy mechanism will recover the lost data. ## Compression Method diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 27c158d..1aeea16 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -309,10 +309,14 @@ % resilience outside of the store), or retain (retain a history of % key changes, even when the object value has been compacted). % - % There is a third, theoretical and untested strategy, which is - % recalc - which would require when reloading the Ledger from the - % Journal, to recalculate the index changes based on the current - % state of the Ledger and the object metadata. + % There is a third strategy, which is recalc, where on reloading + % the Ledger from the Journal, the key changes are recalculated by + % comparing the extracted metadata from the Journal object, with the + % extracted metadata from the current Ledger object it is set to + % replace (should one be present). Implementing the recalc + % strategy requires a override function for + % `leveled_head:diff_indexspecs/3`. + % A function for the ?RIAK_TAG is provided and tested. % % reload_strategy options are a list - to map from a tag to the % strategy (recovr|retain|recalc). Defualt strategies are: From e175948378ac8ec7385d58c3dfcb3c4f54bf942b Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 26 Mar 2020 14:25:09 +0000 Subject: [PATCH 13/13] Remove references ot 'skip' strategy Now called `recovr` --- docs/DESIGN.md | 2 +- src/leveled_bookie.erl | 2 +- src/leveled_codec.erl | 4 ++-- src/leveled_iclerk.erl | 5 +++-- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/docs/DESIGN.md b/docs/DESIGN.md index d095821..7adfd57 100644 --- a/docs/DESIGN.md +++ b/docs/DESIGN.md @@ -120,7 +120,7 @@ Three potential recovery strategies are supported to provide some flexibility fo - retain - on compaction KeyDeltas are retained in the Journal, only values are removed. -- recalc - the compaction rules assume that on recovery the key changes will be recalculated by comparing the change with the current database state. In recovery the key changes will be recalculated by comparing the change with the current database state. A user-defined function should be passed in at startup to achieve this recalculation (to override `leveled_head:diff_indexspeacs/3`). +- recalc - the compaction rules assume that on recovery the key changes will be recalculated by comparing the change with the current database state. In recovery the key changes will be recalculated by comparing the change with the current database state. A user-defined function should be passed in at startup to achieve this recalculation (to override `leveled_head:diff_indexspecs/3`). ### Hot Backups diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 1aeea16..e1a2728 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -1109,7 +1109,7 @@ book_destroy(Pid) -> %% to store the backup. %% %% Backup files are hard-linked. Does not work in head_only mode, or if -%% index changes are used with a `skip` compaction/reload strategy +%% index changes are used with a `recovr` compaction/reload strategy book_hotbackup(Pid) -> gen_server:call(Pid, hot_backup, infinity). diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 35ab5d1..d040723 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -89,7 +89,7 @@ -type ledger_kv() :: {ledger_key(), ledger_value()}. -type compaction_method() :: - retain|skip|recalc. + retain|recovr|recalc. -type compaction_strategy() :: list({tag(), compaction_method()}). -type journal_key_tag() :: @@ -375,7 +375,7 @@ inker_reload_strategy(AltList) -> -spec get_tagstrategy(ledger_key()|tag()|dummy, compaction_strategy()) - -> skip|retain|recalc. + -> compaction_method(). %% @doc %% Work out the compaction strategy for the key get_tagstrategy({Tag, _, _, _}, Strategy) -> diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 0551a31..9f1256d 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -805,10 +805,11 @@ split_positions_into_batches(Positions, Journal, Batches) -> %% recalculating the KeyChanges by looking at the object when we reload. So %% old objects can be discarded. %% -%% If the strategy is skip, we don't care about KeyDeltas. Note though, that +%% If the strategy is recovr, we don't care about KeyDeltas. Note though, that %% if the ledger is deleted it may not be possible to safely rebuild a KeyStore %% if it contains index entries. The hot_backup approach is also not safe with -%% a `skip` strategy. +%% a `recovr` strategy. The recovr strategy assumes faults in the ledger will +%% be resolved via application-level anti-entropy filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) -> FoldFun = filter_output_fun(FilterFun, FilterServer, MaxSQN, ReloadStrategy),