diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 74707dc..bcbf4ef 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -93,8 +93,6 @@ ]). -export([empty_ledgercache/0, - loadqueue_ledgercache/1, - push_ledgercache/2, snapshot_store/6, fetch_value/2, journal_notfound/4]). @@ -105,6 +103,7 @@ -include_lib("eunit/include/eunit.hrl"). +-define(LOADING_PAUSE, 1000). -define(CACHE_SIZE, 2500). -define(MIN_CACHE_SIZE, 100). -define(MIN_PCL_CACHE_SIZE, 400). @@ -166,7 +165,7 @@ -record(state, {inker :: pid() | undefined, penciller :: pid() | undefined, cache_size :: integer() | undefined, - ledger_cache = #ledger_cache{}, + ledger_cache = #ledger_cache{} :: ledger_cache(), is_snapshot :: boolean() | undefined, slow_offer = false :: boolean(), @@ -315,7 +314,7 @@ % Journal, to recalculate the index changes based on the current % state of the Ledger and the object metadata. % - % reload_strategy ptions are a list - to map from a tag to the + % reload_strategy options are a list - to map from a tag to the % strategy (recovr|retain|recalc). Defualt strategies are: % [{?RIAK_TAG, retain}, {?STD_TAG, retain}] {max_pencillercachesize, pos_integer()|undefined} | @@ -378,7 +377,16 @@ % true ]. +-type initial_loadfun() :: + fun((leveled_codec:journal_key(), + any(), + non_neg_integer(), + {non_neg_integer(), non_neg_integer(), ledger_cache()}, + fun((any()) -> {binary(), non_neg_integer()})) -> + {loop|stop, + {non_neg_integer(), non_neg_integer(), ledger_cache()}}). +-export_type([initial_loadfun/0]). %%%============================================================================ %%% API @@ -1250,8 +1258,7 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) SQN, Object, ObjSize, - {IndexSpecs, TTL}, - State), + {IndexSpecs, TTL}), Cache0 = addto_ledgercache(Changes, State#state.ledger_cache), {_SW2, Timings2} = update_timings(SW1, {put, mem}, Timings1), @@ -1288,8 +1295,7 @@ handle_call({mput, ObjectSpecs, TTL}, From, State) Changes = preparefor_ledgercache(?INKT_MPUT, ?DUMMY, SQN, null, length(ObjectSpecs), - {ObjectSpecs, TTL}, - State), + {ObjectSpecs, TTL}), Cache0 = addto_ledgercache(Changes, State#state.ledger_cache), case State#state.slow_offer of true -> @@ -1537,6 +1543,23 @@ code_change(_OldVsn, State, _Extra) -> empty_ledgercache() -> #ledger_cache{mem = ets:new(empty, [ordered_set])}. + +-spec push_to_penciller(pid(), ledger_cache()) -> ok. +%% @doc +%% The push to penciller must start as a tree to correctly de-duplicate +%% the list by order before becoming a de-duplicated list for loading +push_to_penciller(Penciller, LedgerCache) -> + push_to_penciller_loop(Penciller, loadqueue_ledgercache(LedgerCache)). + +push_to_penciller_loop(Penciller, LedgerCache) -> + case push_ledgercache(Penciller, LedgerCache) of + returned -> + timer:sleep(?LOADING_PAUSE), + push_to_penciller_loop(Penciller, LedgerCache); + ok -> + ok + end. + -spec push_ledgercache(pid(), ledger_cache()) -> ok|returned. %% @doc %% Push the ledgercache to the Penciller - which should respond ok or @@ -1642,10 +1665,22 @@ startup(InkerOpts, PencillerOpts, State) -> {ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts), LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller), leveled_log:log("B0005", [LedgerSQN]), + ReloadStrategy = InkerOpts#inker_options.reload_strategy, + LoadFun = get_loadfun(ReloadStrategy, Penciller, State), + BatchFun = + fun(BatchAcc, _Acc) -> + push_to_penciller(Penciller, BatchAcc) + end, + InitAccFun = + fun(FN, CurrentMinSQN) -> + leveled_log:log("I0014", [FN, CurrentMinSQN]), + empty_ledgercache() + end, ok = leveled_inker:ink_loadpcl(Inker, LedgerSQN + 1, - get_loadfun(State), - Penciller), + LoadFun, + InitAccFun, + BatchFun), ok = leveled_inker:ink_checksqn(Inker, LedgerSQN), {Inker, Penciller}. @@ -2161,30 +2196,26 @@ check_notfound(CheckFrequency, CheckFun) -> -spec preparefor_ledgercache(leveled_codec:journal_key_tag()|null, leveled_codec:ledger_key()|?DUMMY, - integer(), any(), integer(), - leveled_codec:journal_keychanges(), - book_state()) - -> {integer()|no_lookup, - integer(), + non_neg_integer(), any(), integer(), + leveled_codec:journal_keychanges()) + -> {leveled_codec:segment_hash(), + non_neg_integer(), list(leveled_codec:ledger_kv())}. %% @doc %% Prepare an object and its related key changes for addition to the Ledger %% via the Ledger Cache. preparefor_ledgercache(?INKT_MPUT, - ?DUMMY, SQN, _O, _S, {ObjSpecs, TTL}, - _State) -> + ?DUMMY, SQN, _O, _S, {ObjSpecs, TTL}) -> ObjChanges = leveled_codec:obj_objectspecs(ObjSpecs, SQN, TTL), {no_lookup, SQN, ObjChanges}; preparefor_ledgercache(?INKT_KEYD, - LedgerKey, SQN, _Obj, _Size, {IdxSpecs, TTL}, - _State) -> + LedgerKey, SQN, _Obj, _Size, {IdxSpecs, TTL}) -> {Bucket, Key} = leveled_codec:from_ledgerkey(LedgerKey), KeyChanges = leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL), {no_lookup, SQN, KeyChanges}; preparefor_ledgercache(_InkTag, - LedgerKey, SQN, Obj, Size, {IdxSpecs, TTL}, - _State) -> + LedgerKey, SQN, Obj, Size, {IdxSpecs, TTL}) -> {Bucket, Key, MetaValue, {KeyH, _ObjH}, _LastMods} = leveled_codec:generate_ledgerkv(LedgerKey, SQN, Obj, Size, TTL), KeyChanges = @@ -2193,8 +2224,58 @@ preparefor_ledgercache(_InkTag, {KeyH, SQN, KeyChanges}. --spec addto_ledgercache({integer()|no_lookup, - integer(), +-spec recalcfor_ledgercache(leveled_codec:journal_key_tag()|null, + leveled_codec:ledger_key()|?DUMMY, + non_neg_integer(), any(), integer(), + leveled_codec:journal_keychanges(), + ledger_cache(), + pid()) + -> {leveled_codec:segment_hash(), + non_neg_integer(), + list(leveled_codec:ledger_kv())}. +%% @doc +%% When loading from the journal to the ledger, may hit a key which has the +%% `recalc` strategy. Such a key needs to recalculate the key changes by +%% comparison with the current state of the ledger, assuming it is a full +%% journal entry (i.e. KeyDeltas which may be a result of previously running +%% with a retain strategy should be ignored). +recalcfor_ledgercache(InkTag, + _LedgerKey, SQN, _Obj, _Size, {_IdxSpecs, _TTL}, + _LedgerCache, + _Penciller) + when InkTag == ?INKT_MPUT; InkTag == ?INKT_KEYD -> + {no_lookup, SQN, []}; +recalcfor_ledgercache(_InkTag, + LK, SQN, Obj, Size, {_IgnoreJournalIdxSpecs, TTL}, + LedgerCache, + Penciller) -> + {Bucket, Key, MetaValue, {KeyH, _ObjH}, _LastMods} = + leveled_codec:generate_ledgerkv(LK, SQN, Obj, Size, TTL), + OldObject = + case check_in_ledgercache(LK, KeyH, LedgerCache, loader) of + false -> + leveled_penciller:pcl_fetch(Penciller, LK, KeyH, true); + {value, KV} -> + KV + end, + OldMetadata = + case OldObject of + not_present -> + not_present; + {LK, LV} -> + leveled_codec:get_metadata(LV) + end, + UpdMetadata = leveled_codec:get_metadata(MetaValue), + IdxSpecs = + leveled_head:diff_indexspecs(element(1, LK), UpdMetadata, OldMetadata), + {KeyH, + SQN, + [{LK, MetaValue}] + ++ leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL)}. + + +-spec addto_ledgercache({leveled_codec:segment_hash(), + non_neg_integer(), list(leveled_codec:ledger_kv())}, ledger_cache()) -> ledger_cache(). @@ -2230,6 +2311,32 @@ addto_ledgercache({H, SQN, KeyChanges}, Cache, loader) -> max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}. +-spec check_in_ledgercache(leveled_codec:ledger_key(), + leveled_codec:segment_hash(), + ledger_cache(), + loader) -> + false | {value, leveled_codec:ledger_kv()}. +%% @doc +%% Check the ledger cache for a Key, when the ledger cache is in loader mode +%% and so is populating a queue not an ETS table +check_in_ledgercache(PK, Hash, Cache, loader) -> + case leveled_pmem:check_index(Hash, Cache#ledger_cache.index) of + [] -> + false; + _ -> + search(fun({K,_V}) -> K == PK end, + lists:reverse(Cache#ledger_cache.load_queue)) + end. + +-spec search(fun((any()) -> boolean()), list()) -> {value, any()}|false. +search(Pred, [Hd|Tail]) -> + case Pred(Hd) of + true -> {value, Hd}; + false -> search(Pred, Tail) + end; +search(Pred, []) when is_function(Pred, 1) -> + false. + -spec maybepush_ledgercache(integer(), ledger_cache(), pid()) -> {ok|returned, ledger_cache()}. %% @doc @@ -2276,44 +2383,47 @@ maybe_withjitter(_CacheSize, _MaxCacheSize) -> false. --spec get_loadfun(book_state()) -> fun(). +-spec get_loadfun(leveled_codec:compaction_strategy(), pid(), book_state()) + -> initial_loadfun(). %% @doc %% The LoadFun will be used by the Inker when walking across the Journal to -%% load the Penciller at startup -get_loadfun(State) -> - PrepareFun = - fun(Tag, PK, SQN, Obj, VS, IdxSpecs) -> - preparefor_ledgercache(Tag, PK, SQN, Obj, VS, IdxSpecs, State) - end, - LoadFun = - fun(KeyInJournal, ValueInJournal, _Pos, Acc0, ExtractFun) -> - {MinSQN, MaxSQN, OutputTree} = Acc0, - {SQN, InkTag, PK} = KeyInJournal, - % VBin may already be a term - {VBin, VSize} = ExtractFun(ValueInJournal), - {Obj, IdxSpecs} = leveled_codec:split_inkvalue(VBin), - case SQN of - SQN when SQN < MinSQN -> - {loop, Acc0}; - SQN when SQN < MaxSQN -> - Chngs = PrepareFun(InkTag, PK, SQN, Obj, VSize, IdxSpecs), - {loop, - {MinSQN, - MaxSQN, - addto_ledgercache(Chngs, OutputTree, loader)}}; - MaxSQN -> - leveled_log:log("B0006", [SQN]), - Chngs = PrepareFun(InkTag, PK, SQN, Obj, VSize, IdxSpecs), - {stop, - {MinSQN, - MaxSQN, - addto_ledgercache(Chngs, OutputTree, loader)}}; - SQN when SQN > MaxSQN -> - leveled_log:log("B0007", [MaxSQN, SQN]), - {stop, Acc0} - end - end, - LoadFun. +%% load the Penciller at startup. +get_loadfun(ReloadStrat, Penciller, _State) -> + fun(KeyInJournal, ValueInJournal, _Pos, Acc0, ExtractFun) -> + {MinSQN, MaxSQN, LedgerCache} = Acc0, + {SQN, InkTag, PK} = KeyInJournal, + case SQN of + SQN when SQN < MinSQN -> + {loop, Acc0}; + SQN when SQN > MaxSQN -> + leveled_log:log("B0007", [MaxSQN, SQN]), + {stop, Acc0}; + _ -> + {VBin, ValSize} = ExtractFun(ValueInJournal), + % VBin may already be a term + {Obj, IdxSpecs} = leveled_codec:split_inkvalue(VBin), + Chngs = + case leveled_codec:get_tagstrategy(PK, ReloadStrat) of + recalc -> + recalcfor_ledgercache(InkTag, PK, SQN, + Obj, ValSize, IdxSpecs, + LedgerCache, + Penciller); + _ -> + preparefor_ledgercache(InkTag, PK, SQN, + Obj, ValSize, IdxSpecs) + end, + case SQN of + MaxSQN -> + leveled_log:log("B0006", [SQN]), + LC0 = addto_ledgercache(Chngs, LedgerCache, loader), + {stop, {MinSQN, MaxSQN, LC0}}; + _ -> + LC0 = addto_ledgercache(Chngs, LedgerCache, loader), + {loop, {MinSQN, MaxSQN, LC0}} + end + end + end. delete_path(DirPath) -> diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 91386fc..95e894e 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -49,7 +49,8 @@ segment_hash/1, to_lookup/1, next_key/1, - return_proxy/4]). + return_proxy/4, + get_metadata/1]). -define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w"). -define(NRT_IDX, "$aae."). @@ -243,6 +244,10 @@ strip_to_indexdetails({_, V}) when tuple_size(V) > 4 -> striphead_to_v1details(V) -> {element(1, V), element(2, V), element(3, V), element(4, V)}. +-spec get_metadata(ledger_value()) -> metadata(). +get_metadata(LV) -> + element(4, LV). + -spec key_dominates(ledger_kv(), ledger_kv()) -> left_hand_first|right_hand_first|left_hand_dominant|right_hand_dominant. %% @doc @@ -358,7 +363,7 @@ endkey_passed(EndKey, CheckingKey) -> -spec inker_reload_strategy(compaction_strategy()) -> compaction_strategy(). %% @doc -%% Take the default startegy for compaction, and override the approach for any +%% Take the default strategy for compaction, and override the approach for any %% tags passed in inker_reload_strategy(AltList) -> ReloadStrategy0 = @@ -371,11 +376,13 @@ inker_reload_strategy(AltList) -> AltList). --spec get_tagstrategy(ledger_key(), compaction_strategy()) +-spec get_tagstrategy(ledger_key()|tag()|dummy, compaction_strategy()) -> skip|retain|recalc. %% @doc %% Work out the compaction strategy for the key get_tagstrategy({Tag, _, _, _}, Strategy) -> + get_tagstrategy(Tag, Strategy); +get_tagstrategy(Tag, Strategy) -> case lists:keyfind(Tag, 1, Strategy) of {Tag, TagStrat} -> TagStrat; diff --git a/src/leveled_head.erl b/src/leveled_head.erl index db94074..0669ec5 100644 --- a/src/leveled_head.erl +++ b/src/leveled_head.erl @@ -22,7 +22,8 @@ -export([key_to_canonicalbinary/1, build_head/2, - extract_metadata/3 + extract_metadata/3, + diff_indexspecs/3 ]). -export([get_size/2, @@ -71,12 +72,15 @@ -type object_metadata() :: riak_metadata()|std_metadata()|head_metadata(). -type appdefinable_function() :: - key_to_canonicalbinary | build_head | extract_metadata. + key_to_canonicalbinary | build_head | extract_metadata | diff_indexspecs. % Functions for which default behaviour can be over-written for the % application's own tags -type appdefinable_function_tuple() :: {appdefinable_function(), fun()}. +-type index_op() :: add | remove. +-type index_value() :: integer() | binary(). + -type head() :: binary()|tuple(). % TODO: @@ -174,6 +178,41 @@ default_extract_metadata(_Tag, SizeAsStoredInJournal, Obj) -> {{standard_hash(Obj), SizeAsStoredInJournal, undefined}, []}. +-spec diff_indexspecs(object_tag(), + object_metadata(), + object_metadata()|not_present) + -> leveled_codec:index_specs(). +%% @doc +%% Take an object metadata part from within the journal, and an object metadata +%% part from the ledger (which should have a lower SQN), and generate index +%% specs by determining the difference between the index specs on the object +%% to be loaded and that on object already stored. +%% +%% This is only relevant where the journal compaction strategy of `recalc` is +%% used, the Keychanges will be used when `retain` is the compaction strategy +diff_indexspecs(?RIAK_TAG, UpdatedMetadata, OldMetadata) -> + UpdIndexes = + get_indexes_from_siblingmetabin(element(1, UpdatedMetadata), []), + OldIndexes = + case OldMetadata of + not_present -> + []; + _ -> + get_indexes_from_siblingmetabin(element(1, OldMetadata), []) + end, + diff_index_data(OldIndexes, UpdIndexes); +diff_indexspecs(?STD_TAG, UpdatedMetadata, CurrentMetadata) -> + default_diff_indexspecs(?STD_TAG, UpdatedMetadata, CurrentMetadata); +diff_indexspecs(Tag, UpdatedMetadata, CurrentMetadata) -> + OverrideFun = + get_appdefined_function(diff_indexspecs, + fun default_diff_indexspecs/3, + 3), + OverrideFun(Tag, UpdatedMetadata, CurrentMetadata). + +default_diff_indexspecs(_Tag, _UpdatedMetadata, _CurrentMetadata) -> + []. + %%%============================================================================ %%% Standard External Functions %%%============================================================================ @@ -190,7 +229,7 @@ defined_objecttags() -> leveled_codec:compaction_method()}. %% @doc %% State the compaction_method to be used when reloading the Ledger from the -%% journal for each object tag. Note, no compaction startegy required for +%% journal for each object tag. Note, no compaction strategy required for %% head_only tag default_reload_strategy(Tag) -> {Tag, retain}. @@ -317,3 +356,120 @@ get_metadata_from_siblings(<>, MetaLen:32/integer, MetaBin:MetaLen/binary>>, [LastMod|LastMods]). + + +get_indexes_from_siblingmetabin(<<0:32/integer, + MetaLen:32/integer, + MetaBin:MetaLen/binary, + RestBin/binary>>, + Indexes) -> + UpdIndexes = lists:umerge(get_indexes_frommetabin(MetaBin), Indexes), + get_indexes_from_siblingmetabin(RestBin, UpdIndexes); +get_indexes_from_siblingmetabin(<>, + Indexes) when SibCount > 0 -> + get_indexes_from_siblingmetabin(RestBin, Indexes); +get_indexes_from_siblingmetabin(_, Indexes) -> + Indexes. + + +%% @doc +%% Parse the metabinary for an individual sibling and return a list of index +%% entries. +get_indexes_frommetabin(<<_LMD1:32/integer, _LMD2:32/integer, _LMD3:32/integer, + VTagLen:8/integer, _VTag:VTagLen/binary, + Deleted:1/binary-unit:8, + MetaRestBin/binary>>) when Deleted /= <<1>> -> + lists:usort(indexes_of_metabinary(MetaRestBin)); +get_indexes_frommetabin(_) -> + []. + + +indexes_of_metabinary(<<>>) -> + []; +indexes_of_metabinary(<>) -> + Key = decode_maybe_binary(KeyBin), + case Key of + <<"index">> -> + Value = decode_maybe_binary(ValueBin), + Value; + _ -> + indexes_of_metabinary(Rest) + end. + + +decode_maybe_binary(<<1, Bin/binary>>) -> + Bin; +decode_maybe_binary(<<0, Bin/binary>>) -> + binary_to_term(Bin); +decode_maybe_binary(<<_Other:8, Bin/binary>>) -> + Bin. + +-spec diff_index_data([{binary(), index_value()}], + [{binary(), index_value()}]) -> + [{index_op(), binary(), index_value()}]. +diff_index_data(OldIndexes, AllIndexes) -> + OldIndexSet = ordsets:from_list(OldIndexes), + AllIndexSet = ordsets:from_list(AllIndexes), + diff_specs_core(AllIndexSet, OldIndexSet). + + +diff_specs_core(AllIndexSet, OldIndexSet) -> + NewIndexSet = ordsets:subtract(AllIndexSet, OldIndexSet), + RemoveIndexSet = + ordsets:subtract(OldIndexSet, AllIndexSet), + NewIndexSpecs = + assemble_index_specs(ordsets:subtract(NewIndexSet, OldIndexSet), + add), + RemoveIndexSpecs = + assemble_index_specs(RemoveIndexSet, + remove), + NewIndexSpecs ++ RemoveIndexSpecs. + +%% @doc Assemble a list of index specs in the +%% form of triplets of the form +%% {IndexOperation, IndexField, IndexValue}. +-spec assemble_index_specs([{binary(), binary()}], index_op()) -> + [{index_op(), binary(), binary()}]. +assemble_index_specs(Indexes, IndexOp) -> + [{IndexOp, Index, Value} || {Index, Value} <- Indexes]. + + +%%%============================================================================ +%%% Test +%%%============================================================================ + +-ifdef(TEST). + + +index_extract_test() -> + SibMetaBin = <<0,0,0,1,0,0,0,0,0,0,0,221,0,0,6,48,0,4,130,247,0,1,250,134, + 1,101,0,0,0,0,4,1,77,68,75,0,0,0,44,0,131,107,0,39,77,68, + 86,101,49,55,52,55,48,50,55,45,54,50,99,49,45,52,48,57,55, + 45,97,53,102,50,45,53,54,98,51,98,97,57,57,99,55,56,50,0,0, + 0,6,1,105,110,100,101,120,0,0,0,79,0,131,108,0,0,0,2,104,2, + 107,0,8,105,100,120,49,95,98,105,110,107,0,20,50,49,53,50, + 49,49,48,55,50,51,49,55,51,48,83,111,112,104,105,97,104,2, + 107,0,8,105,100,120,49,95,98,105,110,107,0,19,50,49,56,50, + 48,53,49,48,49,51,48,49,52,54,65,118,101,114,121,106,0,0,0, + 5,1,77,68,75,50,0,0,0,44,0,131,107,0,39,77,68,86,101,49,55, + 52,55,48,50,55,45,54,50,99,49,45,52,48,57,55,45,97,53,102, + 50,45,53,54,98,51,98,97,57,57,99,55,56,50>>, + Indexes = get_indexes_from_siblingmetabin(SibMetaBin, []), + ExpIndexes = [{"idx1_bin","21521107231730Sophia"}, + {"idx1_bin","21820510130146Avery"}], + ?assertMatch(ExpIndexes, Indexes). + +diff_index_test() -> + UpdIndexes = + [{<<"idx1_bin">>,<<"20840930001702Zoe">>}, + {<<"idx1_bin">>,<<"20931011172606Emily">>}], + OldIndexes = + [{<<"idx1_bin">>,<<"20231126131808Madison">>}, + {<<"idx1_bin">>,<<"20931011172606Emily">>}], + IdxSpecs = diff_index_data(OldIndexes, UpdIndexes), + ?assertMatch([{add, <<"idx1_bin">>, <<"20840930001702Zoe">>}, + {remove, <<"idx1_bin">>,<<"20231126131808Madison">>}], IdxSpecs). + +-endif. \ No newline at end of file diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index e996c15..afc2c8c 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -145,6 +145,15 @@ % released from a compaction run of a single file to make it a run % worthwhile of compaction (released space is 100.0 - target e.g. 70.0 % means that 30.0% should be released) +-type key_size() :: + {{non_neg_integer(), + leveled_codec:journal_key_tag(), + leveled_codec:ledger_key()}, non_neg_integer()}. +-type corrupted_test_key_size() :: + {{non_neg_integer(), + leveled_codec:journal_key_tag(), + leveled_codec:ledger_key(), + null}, non_neg_integer()}. %%%============================================================================ %%% API @@ -315,7 +324,8 @@ handle_cast({score_filelist, [Entry|Tail]}, State) -> ScoringState#scoring_state.filter_server, ScoringState#scoring_state.max_sqn, ?SAMPLE_SIZE, - ?BATCH_SIZE), + ?BATCH_SIZE, + State#state.reload_strategy), Candidate = #candidate{low_sqn = LowSQN, filename = FN, @@ -493,7 +503,10 @@ schedule_compaction(CompactionHours, RunsPerDay, CurrentTS) -> %%% Internal functions %%%============================================================================ - +-spec check_single_file(pid(), fun(), any(), non_neg_integer(), + non_neg_integer(), non_neg_integer(), + leveled_codec:compaction_strategy()) -> + float(). %% @doc %% Get a score for a single CDB file in the journal. This will pull out a bunch %% of keys and sizes at random in an efficient way (by scanning the hashtable @@ -505,13 +518,19 @@ schedule_compaction(CompactionHours, RunsPerDay, CurrentTS) -> %% %% The score is based on a random sample - so will not be consistent between %% calls. -check_single_file(CDB, FilterFun, FilterServer, MaxSQN, SampleSize, BatchSize) -> +check_single_file(CDB, FilterFun, FilterServer, MaxSQN, + SampleSize, BatchSize, + ReloadStrategy) -> FN = leveled_cdb:cdb_filename(CDB), SW = os:timestamp(), PositionList = leveled_cdb:cdb_getpositions(CDB, SampleSize), KeySizeList = fetch_inbatches(PositionList, BatchSize, CDB, []), Score = - size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN), + size_comparison_score(KeySizeList, + FilterFun, + FilterServer, + MaxSQN, + ReloadStrategy), safely_log_filescore(PositionList, FN, Score, SW), Score. @@ -523,7 +542,15 @@ safely_log_filescore(PositionList, FN, Score, SW) -> div length(PositionList), leveled_log:log_timer("IC004", [Score, AvgJump, FN], SW). -size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN) -> +-spec size_comparison_score(list(key_size() | corrupted_test_key_size()), + fun(), + any(), + non_neg_integer(), + leveled_codec:compaction_strategy()) -> + float(). +size_comparison_score(KeySizeList, + FilterFun, FilterServer, MaxSQN, + RS) -> FoldFunForSizeCompare = fun(KS, {ActSize, RplSize}) -> case KS of @@ -532,7 +559,18 @@ size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN) -> leveled_codec:is_full_journalentry({SQN, Type, PK}), case IsJournalEntry of false -> - {ActSize + Size - ?CRC_SIZE, RplSize}; + TS = leveled_codec:get_tagstrategy(PK, RS), + % If the strategy is to retain key deltas, then + % scoring must reflect that. Key deltas are + % possible even if strategy does not allow as + % there is support for changing strategy from + % retain to recalc + case TS of + retain -> + {ActSize + Size - ?CRC_SIZE, RplSize}; + _ -> + {ActSize, RplSize + Size - ?CRC_SIZE} + end; true -> Check = FilterFun(FilterServer, PK, SQN), case {Check, SQN > MaxSQN} of @@ -567,12 +605,13 @@ fetch_inbatches([], _BatchSize, CDB, CheckedList) -> ok = leveled_cdb:cdb_clerkcomplete(CDB), CheckedList; fetch_inbatches(PositionList, BatchSize, CDB, CheckedList) -> - {Batch, Tail} = if - length(PositionList) >= BatchSize -> - lists:split(BatchSize, PositionList); - true -> - {PositionList, []} - end, + {Batch, Tail} = + if + length(PositionList) >= BatchSize -> + lists:split(BatchSize, PositionList); + true -> + {PositionList, []} + end, KL_List = leveled_cdb:cdb_directfetch(CDB, Batch, key_size), fetch_inbatches(Tail, BatchSize, CDB, CheckedList ++ KL_List). @@ -998,6 +1037,7 @@ fetch_testcdb(RP) -> check_single_file_test() -> RP = "test/test_area/", + RS = leveled_codec:inker_reload_strategy([]), ok = filelib:ensure_dir(leveled_inker:filepath(RP, journal_dir)), {ok, CDB} = fetch_testcdb(RP), LedgerSrv1 = [{8, {o, "Bucket", "Key1", null}}, @@ -1010,14 +1050,14 @@ check_single_file_test() -> _ -> replaced end end, - Score1 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 4), + Score1 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 4, RS), ?assertMatch(37.5, Score1), LedgerFun2 = fun(_Srv, _Key, _ObjSQN) -> current end, - Score2 = check_single_file(CDB, LedgerFun2, LedgerSrv1, 9, 8, 4), + Score2 = check_single_file(CDB, LedgerFun2, LedgerSrv1, 9, 8, 4, RS), ?assertMatch(100.0, Score2), - Score3 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 3), + Score3 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 3, RS), ?assertMatch(37.5, Score3), - Score4 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 4, 8, 4), + Score4 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 4, 8, 4, RS), ?assertMatch(75.0, Score4), ok = leveled_cdb:cdb_deletepending(CDB), ok = leveled_cdb:cdb_destroy(CDB). @@ -1132,6 +1172,7 @@ compact_empty_file_test() -> RP = "test/test_area/", ok = filelib:ensure_dir(leveled_inker:filepath(RP, journal_dir)), FN1 = leveled_inker:filepath(RP, 1, new_journal), + RS = leveled_codec:inker_reload_strategy([]), CDBopts = #cdb_options{binary_mode=true}, {ok, CDB1} = leveled_cdb:cdb_open_writer(FN1, CDBopts), {ok, FN2} = leveled_cdb:cdb_complete(CDB1), @@ -1140,7 +1181,7 @@ compact_empty_file_test() -> {2, {o, "Bucket", "Key2", null}}, {3, {o, "Bucket", "Key3", null}}], LedgerFun1 = fun(_Srv, _Key, _ObjSQN) -> replaced end, - Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4), + Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4, RS), ?assertMatch(0.0, Score1), ok = leveled_cdb:cdb_deletepending(CDB2), ok = leveled_cdb:cdb_destroy(CDB2). @@ -1207,15 +1248,22 @@ compact_singlefile_totwosmallfiles_testto() -> size_score_test() -> KeySizeList = - [{{1, ?INKT_STND, "Key1"}, 104}, - {{2, ?INKT_STND, "Key2"}, 124}, - {{3, ?INKT_STND, "Key3"}, 144}, - {{4, ?INKT_STND, "Key4"}, 154}, - {{5, ?INKT_STND, "Key5", "Subk1"}, 164}, - {{6, ?INKT_STND, "Key6"}, 174}, - {{7, ?INKT_STND, "Key7"}, 184}], + [{{1, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key1">>, null}}, 104}, + {{2, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key2">>, null}}, 124}, + {{3, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key3">>, null}}, 144}, + {{4, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key4">>, null}}, 154}, + {{5, + ?INKT_STND, + {?STD_TAG, <<"B">>, <<"Key5">>, <<"Subk1">>}, null}, + 164}, + {{6, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key6">>, null}}, 174}, + {{7, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key7">>, null}}, 184}], MaxSQN = 6, - CurrentList = ["Key1", "Key4", "Key5", "Key6"], + CurrentList = + [{?STD_TAG, <<"B">>, <<"Key1">>, null}, + {?STD_TAG, <<"B">>, <<"Key4">>, null}, + {?STD_TAG, <<"B">>, <<"Key5">>, <<"Subk1">>}, + {?STD_TAG, <<"B">>, <<"Key6">>, null}], FilterFun = fun(L, K, _SQN) -> case lists:member(K, L) of @@ -1223,7 +1271,13 @@ size_score_test() -> false -> replaced end end, - Score = size_comparison_score(KeySizeList, FilterFun, CurrentList, MaxSQN), + Score = + size_comparison_score(KeySizeList, + FilterFun, + CurrentList, + MaxSQN, + leveled_codec:inker_reload_strategy([])), + io:format("Score ~w", [Score]), ?assertMatch(true, Score > 69.0), ?assertMatch(true, Score < 70.0). diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index aa9d14d..ab26ca7 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -101,7 +101,7 @@ ink_fetch/3, ink_keycheck/3, ink_fold/4, - ink_loadpcl/4, + ink_loadpcl/5, ink_registersnapshot/2, ink_confirmdelete/2, ink_compactjournal/3, @@ -133,7 +133,6 @@ -define(WASTE_FP, "waste"). -define(JOURNAL_FILEX, "cdb"). -define(PENDING_FILEX, "pnd"). --define(LOADING_PAUSE, 1000). -define(LOADING_BATCH, 1000). -define(TEST_KC, {[], infinity}). @@ -321,7 +320,11 @@ ink_fold(Pid, MinSQN, FoldFuns, Acc) -> {fold, MinSQN, FoldFuns, Acc, by_runner}, infinity). --spec ink_loadpcl(pid(), integer(), fun(), pid()) -> ok. +-spec ink_loadpcl(pid(), + integer(), + leveled_bookie:initial_loadfun(), + fun((string(), non_neg_integer()) -> any()), + fun((any(), any()) -> ok)) -> ok. %% %% Function to prompt load of the Ledger at startup. The Penciller should %% have determined the lowest SQN not present in the Ledger, and the inker @@ -330,20 +333,11 @@ ink_fold(Pid, MinSQN, FoldFuns, Acc) -> %% %% The load fun should be a five arity function like: %% load_fun(KeyInJournal, ValueInJournal, _Position, Acc0, ExtractFun) -ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> - BatchFun = - fun(BatchAcc, _Acc) -> - push_to_penciller(Penciller, BatchAcc) - end, - InitAccFun = - fun(FN, CurrentMinSQN) -> - leveled_log:log("I0014", [FN, CurrentMinSQN]), - leveled_bookie:empty_ledgercache() - end, +ink_loadpcl(Pid, MinSQN, LoadFun, InitAccFun, BatchFun) -> gen_server:call(Pid, {fold, MinSQN, - {FilterFun, InitAccFun, BatchFun}, + {LoadFun, InitAccFun, BatchFun}, ok, as_ink}, infinity). @@ -1195,22 +1189,6 @@ foldfile_between_sequence(MinSQN, MaxSQN, FoldFuns, LastPosition, FN) end. - - -push_to_penciller(Penciller, LedgerCache) -> - % The push to penciller must start as a tree to correctly de-duplicate - % the list by order before becoming a de-duplicated list for loading - LC0 = leveled_bookie:loadqueue_ledgercache(LedgerCache), - push_to_penciller_loop(Penciller, LC0). - -push_to_penciller_loop(Penciller, LedgerCache) -> - case leveled_bookie:push_ledgercache(Penciller, LedgerCache) of - returned -> - timer:sleep(?LOADING_PAUSE), - push_to_penciller_loop(Penciller, LedgerCache); - ok -> - ok - end. sequencenumbers_fromfilenames(Filenames, Regex, IntName) -> diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index 640b688..3538cea 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -7,6 +7,8 @@ hot_backup_simple/1, hot_backup_changes/1, retain_strategy/1, + recalc_strategy/1, + recalc_transition_strategy/1, recovr_strategy/1, aae_missingjournal/1, aae_bustedjournal/1, @@ -21,6 +23,8 @@ all() -> [ hot_backup_simple, hot_backup_changes, retain_strategy, + recalc_strategy, + recalc_transition_strategy, recovr_strategy, aae_missingjournal, aae_bustedjournal, @@ -233,85 +237,97 @@ hot_backup_changes(_Config) -> testutil:reset_filestructure(). - retain_strategy(_Config) -> + rotate_wipe_compact(retain, retain). + +recalc_strategy(_Config) -> + rotate_wipe_compact(recalc, recalc). + +recalc_transition_strategy(_Config) -> + rotate_wipe_compact(retain, recalc). + + +rotate_wipe_compact(Strategy1, Strategy2) -> RootPath = testutil:reset_filestructure(), BookOpts = [{root_path, RootPath}, {cache_size, 1000}, {max_journalobjectcount, 5000}, {sync_strategy, testutil:sync_strategy()}, - {reload_strategy, [{?RIAK_TAG, retain}]}], + {reload_strategy, [{?RIAK_TAG, Strategy1}]}], BookOptsAlt = [{root_path, RootPath}, {cache_size, 1000}, {max_journalobjectcount, 2000}, {sync_strategy, testutil:sync_strategy()}, - {reload_strategy, [{?RIAK_TAG, retain}]}, + {reload_strategy, [{?RIAK_TAG, Strategy2}]}, {max_run_length, 8}], - {ok, Spcl3, LastV3} = rotating_object_check(BookOpts, "Bucket3", 800), + {ok, Spcl3, LastV3} = rotating_object_check(BookOpts, "Bucket3", 400), ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3}]), - {ok, Spcl4, LastV4} = rotating_object_check(BookOpts, "Bucket4", 1600), + {ok, Spcl4, LastV4} = rotating_object_check(BookOpts, "Bucket4", 800), ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3}, {"Bucket4", Spcl4, LastV4}]), - {ok, Spcl5, LastV5} = rotating_object_check(BookOpts, "Bucket5", 3200), - ok = restart_from_blankledger(BookOptsAlt, [{"Bucket3", Spcl3, LastV3}, - {"Bucket5", Spcl5, LastV5}]), - {ok, Spcl6, LastV6} = rotating_object_check(BookOpts, "Bucket6", 6400), + {ok, Spcl5, LastV5} = rotating_object_check(BookOpts, "Bucket5", 1600), ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3}, - {"Bucket4", Spcl4, LastV4}, - {"Bucket5", Spcl5, LastV5}, - {"Bucket6", Spcl6, LastV6}]), + {"Bucket5", Spcl5, LastV5}]), + {ok, Spcl6, LastV6} = rotating_object_check(BookOpts, "Bucket6", 3200), {ok, Book1} = leveled_bookie:book_start(BookOpts), compact_and_wait(Book1), - compact_and_wait(Book1), ok = leveled_bookie:book_close(Book1), - ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3}, + ok = restart_from_blankledger(BookOptsAlt, [{"Bucket3", Spcl3, LastV3}, {"Bucket4", Spcl4, LastV4}, {"Bucket5", Spcl5, LastV5}, {"Bucket6", Spcl6, LastV6}]), {ok, Book2} = leveled_bookie:book_start(BookOptsAlt), + compact_and_wait(Book2), + ok = leveled_bookie:book_close(Book2), - {KSpcL2, _V2} = testutil:put_indexed_objects(Book2, "AltBucket6", 3000), + ok = restart_from_blankledger(BookOptsAlt, [{"Bucket3", Spcl3, LastV3}, + {"Bucket4", Spcl4, LastV4}, + {"Bucket5", Spcl5, LastV5}, + {"Bucket6", Spcl6, LastV6}]), + + {ok, Book3} = leveled_bookie:book_start(BookOptsAlt), + + {KSpcL2, _V2} = testutil:put_indexed_objects(Book3, "AltBucket6", 3000), Q2 = fun(RT) -> {index_query, "AltBucket6", {fun testutil:foldkeysfun/3, []}, {"idx1_bin", "#", "|"}, {RT, undefined}} end, - {async, KFolder2A} = leveled_bookie:book_returnfolder(Book2, Q2(false)), + {async, KFolder2A} = leveled_bookie:book_returnfolder(Book3, Q2(false)), KeyList2A = lists:usort(KFolder2A()), true = length(KeyList2A) == 3000, DeleteFun = fun({DK, [{add, DIdx, DTerm}]}) -> - ok = testutil:book_riakdelete(Book2, + ok = testutil:book_riakdelete(Book3, "AltBucket6", DK, [{remove, DIdx, DTerm}]) end, lists:foreach(DeleteFun, KSpcL2), - {async, KFolder2AD} = leveled_bookie:book_returnfolder(Book2, Q2(false)), - KeyList2AD = lists:usort(KFolder2AD()), - true = length(KeyList2AD) == 0, - - ok = leveled_bookie:book_close(Book2), - - {ok, Book3} = leveled_bookie:book_start(BookOptsAlt), - - io:format("Compact after deletions~n"), - - compact_and_wait(Book3), - compact_and_wait(Book3), - {async, KFolder3AD} = leveled_bookie:book_returnfolder(Book3, Q2(false)), KeyList3AD = lists:usort(KFolder3AD()), true = length(KeyList3AD) == 0, ok = leveled_bookie:book_close(Book3), + {ok, Book4} = leveled_bookie:book_start(BookOptsAlt), + + io:format("Compact after deletions~n"), + + compact_and_wait(Book4), + + {async, KFolder4AD} = leveled_bookie:book_returnfolder(Book4, Q2(false)), + KeyList4AD = lists:usort(KFolder4AD()), + true = length(KeyList4AD) == 0, + + ok = leveled_bookie:book_close(Book4), + testutil:reset_filestructure(). @@ -845,6 +861,10 @@ rotating_object_check(BookOpts, B, NumberOfObjects) -> B, KSpcL2, false), + ok = testutil:check_indexed_objects(Book1, + B, + KSpcL1 ++ KSpcL2 ++ KSpcL3, + V3), ok = leveled_bookie:book_close(Book1), {ok, Book2} = leveled_bookie:book_start(BookOpts), ok = testutil:check_indexed_objects(Book2, diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index e2918dc..94ed966 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -68,6 +68,7 @@ -define(MD_VTAG, <<"X-Riak-VTag">>). -define(MD_LASTMOD, <<"X-Riak-Last-Modified">>). -define(MD_DELETED, <<"X-Riak-Deleted">>). +-define(MD_INDEX, <<"index">>). -define(EMPTY_VTAG_BIN, <<"e">>). -define(ROOT_PATH, "test"). @@ -517,23 +518,30 @@ set_object(Bucket, Key, Value, IndexGen) -> set_object(Bucket, Key, Value, IndexGen, []). set_object(Bucket, Key, Value, IndexGen, Indexes2Remove) -> - + set_object(Bucket, Key, Value, IndexGen, Indexes2Remove, []). + +set_object(Bucket, Key, Value, IndexGen, Indexes2Remove, IndexesNotToRemove) -> + IdxSpecs = IndexGen(), + Indexes = + lists:map(fun({add, IdxF, IdxV}) -> {IdxF, IdxV} end, + IdxSpecs ++ IndexesNotToRemove), Obj = {Bucket, Key, Value, - IndexGen() ++ lists:map(fun({add, IdxF, IdxV}) -> - {remove, IdxF, IdxV} end, - Indexes2Remove), - [{"MDK", "MDV" ++ Key}, - {"MDK2", "MDV" ++ Key}, - {?MD_LASTMOD, os:timestamp()}]}, + IdxSpecs ++ + lists:map(fun({add, IdxF, IdxV}) -> {remove, IdxF, IdxV} end, + Indexes2Remove), + [{<<"MDK">>, "MDV" ++ Key}, + {<<"MDK2">>, "MDV" ++ Key}, + {?MD_LASTMOD, os:timestamp()}, + {?MD_INDEX, Indexes}]}, {B1, K1, V1, Spec1, MD} = Obj, Content = #r_content{metadata=dict:from_list(MD), value=V1}, {#r_object{bucket=B1, key=K1, contents=[Content], vclock=generate_vclock()}, - Spec1}. + Spec1 ++ IndexesNotToRemove}. get_value_from_objectlistitem({_Int, Obj, _Spc}) -> [Content] = Obj#r_object.contents, @@ -762,26 +770,28 @@ put_altered_indexed_objects(Book, Bucket, KSpecL) -> put_altered_indexed_objects(Book, Bucket, KSpecL, true). put_altered_indexed_objects(Book, Bucket, KSpecL, RemoveOld2i) -> - IndexGen = testutil:get_randomindexes_generator(1), - V = testutil:get_compressiblevalue(), - RplKSpecL = lists:map(fun({K, Spc}) -> - AddSpc = if - RemoveOld2i == true -> - [lists:keyfind(add, 1, Spc)]; - RemoveOld2i == false -> - [] - end, - {O, AltSpc} = testutil:set_object(Bucket, - K, - V, - IndexGen, - AddSpc), - case book_riakput(Book, O, AltSpc) of - ok -> ok; - pause -> timer:sleep(?SLOWOFFER_DELAY) - end, - {K, AltSpc} end, - KSpecL), + IndexGen = get_randomindexes_generator(1), + V = get_compressiblevalue(), + FindAdditionFun = fun(SpcItem) -> element(1, SpcItem) == add end, + MapFun = + fun({K, Spc}) -> + {RemoveSpc, AddSpc} = + case RemoveOld2i of + true -> + {lists:filter(FindAdditionFun, Spc), []}; + false -> + {[], lists:filter(FindAdditionFun, Spc)} + end, + {O, AltSpc} = + set_object(Bucket, K, V, + IndexGen, RemoveSpc, AddSpc), + case book_riakput(Book, O, AltSpc) of + ok -> ok; + pause -> timer:sleep(?SLOWOFFER_DELAY) + end, + {K, AltSpc} + end, + RplKSpecL = lists:map(MapFun, KSpecL), {RplKSpecL, V}. rotating_object_check(RootPath, B, NumberOfObjects) -> @@ -790,16 +800,16 @@ rotating_object_check(RootPath, B, NumberOfObjects) -> {max_journalsize, 5000000}, {sync_strategy, sync_strategy()}], {ok, Book1} = leveled_bookie:book_start(BookOpts), - {KSpcL1, V1} = testutil:put_indexed_objects(Book1, B, NumberOfObjects), - ok = testutil:check_indexed_objects(Book1, B, KSpcL1, V1), - {KSpcL2, V2} = testutil:put_altered_indexed_objects(Book1, B, KSpcL1), - ok = testutil:check_indexed_objects(Book1, B, KSpcL2, V2), - {KSpcL3, V3} = testutil:put_altered_indexed_objects(Book1, B, KSpcL2), + {KSpcL1, V1} = put_indexed_objects(Book1, B, NumberOfObjects), + ok = check_indexed_objects(Book1, B, KSpcL1, V1), + {KSpcL2, V2} = put_altered_indexed_objects(Book1, B, KSpcL1), + ok = check_indexed_objects(Book1, B, KSpcL2, V2), + {KSpcL3, V3} = put_altered_indexed_objects(Book1, B, KSpcL2), ok = leveled_bookie:book_close(Book1), {ok, Book2} = leveled_bookie:book_start(BookOpts), - ok = testutil:check_indexed_objects(Book2, B, KSpcL3, V3), - {KSpcL4, V4} = testutil:put_altered_indexed_objects(Book2, B, KSpcL3), - ok = testutil:check_indexed_objects(Book2, B, KSpcL4, V4), + ok = check_indexed_objects(Book2, B, KSpcL3, V3), + {KSpcL4, V4} = put_altered_indexed_objects(Book2, B, KSpcL3), + ok = check_indexed_objects(Book2, B, KSpcL4, V4), Query = {keylist, ?RIAK_TAG, B, {fun foldkeysfun/3, []}}, {async, BList} = leveled_bookie:book_returnfolder(Book2, Query), true = NumberOfObjects == length(BList()),