diff --git a/include/leveled.hrl b/include/leveled.hrl index f6b0294..6e1b603 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -5,6 +5,8 @@ -define(STD_TAG, o). %% Tag used for secondary index keys -define(IDX_TAG, i). +%% Tag used for near real-time anti-entropy index keys +-define(AAE_TAG, i_aae). %% Inker key type used for 'normal' objects -define(INKT_STND, stnd). @@ -67,6 +69,25 @@ waste_retention_period :: integer(), reload_strategy = [] :: list()}). +-record(recent_aae, {buckets :: list()|all, + % whitelist of buckets to support recent recent AAE + % or all to support all buckets + + limit_minutes :: integer(), + % how long to retain entries the temporary index for + % It will actually be retained for limit + unit minutes + % 60 minutes seems sensible + + unit_minutes :: integer(), + % What the minimum unit size will be for a query + % e.g. the minimum time duration to be used in range + % queries of the aae index + % 5 minutes seems sensible + + tree_size = small :: atom() + % Just defaulted to small for now + }). + -record(r_content, { metadata, value :: term() diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index f5c6342..8b46d81 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -82,6 +82,7 @@ -define(CACHE_SIZE_JITTER, 25). -define(JOURNAL_SIZE_JITTER, 20). -define(LONG_RUNNING, 80000). +-define(RECENT_AAE, false). -record(ledger_cache, {mem :: ets:tab(), loader = leveled_tree:empty(?CACHE_TYPE) @@ -94,6 +95,7 @@ -record(state, {inker :: pid(), penciller :: pid(), cache_size :: integer(), + recent_aae :: false|#recent_aae{}, ledger_cache = #ledger_cache{}, is_snapshot :: boolean(), slow_offer = false :: boolean(), @@ -157,7 +159,7 @@ book_start(RootPath, LedgerCacheSize, JournalSize, SyncStrategy) -> %% %% TODO: %% The reload_strategy is exposed as currently no firm decision has been made -%% about how recovery should work. For instance if we were to trust evrything +%% about how recovery should work. For instance if we were to trust everything %% as permanent in the Ledger once it is persisted, then there would be no %% need to retain a skinny history of key changes in the Journal after %% compaction. If, as an alternative we assume the Ledger is never permanent, @@ -383,15 +385,28 @@ init([Opts]) -> undefined -> % Start from file not snapshot {InkerOpts, PencillerOpts} = set_options(Opts), - {Inker, Penciller} = startup(InkerOpts, PencillerOpts), + CacheJitter = ?CACHE_SIZE div (100 div ?CACHE_SIZE_JITTER), CacheSize = get_opt(cache_size, Opts, ?CACHE_SIZE) + erlang:phash2(self()) rem CacheJitter, + RecentAAE = + case get_opt(recent_aae, Opts, ?RECENT_AAE) of + false -> + false; + {BucketList, LimitMinutes, UnitMinutes} -> + #recent_aae{buckets = BucketList, + limit_minutes = LimitMinutes, + unit_minutes = UnitMinutes} + end, + + {Inker, Penciller} = startup(InkerOpts, PencillerOpts, RecentAAE), + NewETS = ets:new(mem, [ordered_set]), leveled_log:log("B0001", [Inker, Penciller]), {ok, #state{inker=Inker, penciller=Penciller, cache_size=CacheSize, + recent_aae=RecentAAE, ledger_cache=#ledger_cache{mem = NewETS}, is_snapshot=false}}; Bookie -> @@ -418,7 +433,8 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) -> SQN, Object, ObjSize, - {IndexSpecs, TTL}), + {IndexSpecs, TTL}, + State#state.recent_aae), Cache0 = addto_ledgercache(Changes, State#state.ledger_cache), T1 = timer:now_diff(os:timestamp(), SW) - T0, PutTimes = leveled_log:put_timing(bookie, State#state.put_timing, T0, T1), @@ -1151,14 +1167,14 @@ set_options(Opts) -> max_inmemory_tablesize = PCLL0CacheSize, levelzero_cointoss = true}}. -startup(InkerOpts, PencillerOpts) -> +startup(InkerOpts, PencillerOpts, RecentAAE) -> {ok, Inker} = leveled_inker:ink_start(InkerOpts), {ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts), LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller), leveled_log:log("B0005", [LedgerSQN]), ok = leveled_inker:ink_loadpcl(Inker, LedgerSQN + 1, - fun load_fun/5, + get_loadfun(RecentAAE), Penciller), {Inker, Penciller}. @@ -1383,17 +1399,21 @@ accumulate_index(TermRe, AddFun, FoldKeysFun) -> preparefor_ledgercache(?INKT_KEYD, - LedgerKey, SQN, _Obj, _Size, {IdxSpecs, TTL}) -> + LedgerKey, SQN, _Obj, _Size, {IdxSpecs, TTL}, + _AAE) -> {Bucket, Key} = leveled_codec:from_ledgerkey(LedgerKey), KeyChanges = - leveled_codec:convert_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL), + leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL), {no_lookup, SQN, KeyChanges}; -preparefor_ledgercache(_Type, LedgerKey, SQN, Obj, Size, {IdxSpecs, TTL}) -> - {Bucket, Key, MetaValue, H, _LastMods} = +preparefor_ledgercache(_InkTag, + LedgerKey, SQN, Obj, Size, {IdxSpecs, TTL}, + AAE) -> + {Bucket, Key, MetaValue, H, LastMods} = leveled_codec:generate_ledgerkv(LedgerKey, SQN, Obj, Size, TTL), KeyChanges = [{LedgerKey, MetaValue}] ++ - leveled_codec:convert_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL), + leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL) ++ + leveled_codec:aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods), {H, SQN, KeyChanges}. @@ -1452,35 +1472,40 @@ maybe_withjitter(CacheSize, MaxCacheSize) -> end. - -load_fun(KeyInJournal, ValueInJournal, _Position, Acc0, ExtractFun) -> - {MinSQN, MaxSQN, OutputTree} = Acc0, - {SQN, Type, PK} = KeyInJournal, - % VBin may already be a term - {VBin, VSize} = ExtractFun(ValueInJournal), - {Obj, IndexSpecs} = leveled_codec:split_inkvalue(VBin), - case SQN of - SQN when SQN < MinSQN -> - {loop, Acc0}; - SQN when SQN < MaxSQN -> - Changes = preparefor_ledgercache(Type, PK, SQN, - Obj, VSize, IndexSpecs), - {loop, - {MinSQN, - MaxSQN, - addto_ledgercache(Changes, OutputTree, loader)}}; - MaxSQN -> - leveled_log:log("B0006", [SQN]), - Changes = preparefor_ledgercache(Type, PK, SQN, - Obj, VSize, IndexSpecs), - {stop, - {MinSQN, - MaxSQN, - addto_ledgercache(Changes, OutputTree, loader)}}; - SQN when SQN > MaxSQN -> - leveled_log:log("B0007", [MaxSQN, SQN]), - {stop, Acc0} - end. +get_loadfun(RecentAAE) -> + PrepareFun = + fun(Tag, PK, SQN, Obj, VS, IdxSpecs) -> + preparefor_ledgercache(Tag, PK, SQN, Obj, VS, IdxSpecs, RecentAAE) + end, + LoadFun = + fun(KeyInJournal, ValueInJournal, _Pos, Acc0, ExtractFun) -> + {MinSQN, MaxSQN, OutputTree} = Acc0, + {SQN, InkTag, PK} = KeyInJournal, + % VBin may already be a term + {VBin, VSize} = ExtractFun(ValueInJournal), + {Obj, IdxSpecs} = leveled_codec:split_inkvalue(VBin), + case SQN of + SQN when SQN < MinSQN -> + {loop, Acc0}; + SQN when SQN < MaxSQN -> + Chngs = PrepareFun(InkTag, PK, SQN, Obj, VSize, IdxSpecs), + {loop, + {MinSQN, + MaxSQN, + addto_ledgercache(Chngs, OutputTree, loader)}}; + MaxSQN -> + leveled_log:log("B0006", [SQN]), + Chngs = PrepareFun(InkTag, PK, SQN, Obj, VSize, IdxSpecs), + {stop, + {MinSQN, + MaxSQN, + addto_ledgercache(Chngs, OutputTree, loader)}}; + SQN when SQN > MaxSQN -> + leveled_log:log("B0007", [MaxSQN, SQN]), + {stop, Acc0} + end + end, + LoadFun. get_opt(Key, Opts) -> diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index a8f57fc..621f997 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -59,7 +59,8 @@ generate_ledgerkv/5, get_size/2, get_keyandhash/2, - convert_indexspecs/5, + idx_indexspecs/5, + aae_indexspecs/6, generate_uuid/0, integer_now/0, riak_extract_metadata/2, @@ -68,22 +69,19 @@ -define(V1_VERS, 1). -define(MAGIC, 53). % riak_kv -> riak_object +-define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w"). + +-type recent_aae() :: #recent_aae{}. + +-spec magic_hash(any()) -> integer(). +%% @doc %% Use DJ Bernstein magic hash function. Note, this is more expensive than %% phash2 but provides a much more balanced result. %% %% Hash function contains mysterious constants, some explanation here as to %% what they are - %% http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function - -to_lookup(Key) -> - case element(1, Key) of - ?IDX_TAG -> - no_lookup; - _ -> - lookup - end. - magic_hash({?RIAK_TAG, Bucket, Key, _SubKey}) -> magic_hash({Bucket, Key}); magic_hash({?STD_TAG, Bucket, Key, _SubKey}) -> @@ -100,7 +98,23 @@ hash1(H, <>) -> H2 = H1 bxor B, hash1(H2, Rest). +%% @doc +%% Should it be possible to lookup a key in the merge tree. This is not true +%% For keys that should only be read through range queries. Direct lookup +%% keys will have presence in bloom filters and other lookup accelerators. +to_lookup(Key) -> + case element(1, Key) of + ?IDX_TAG -> + no_lookup; + ?AAE_TAG -> + no_lookup; + _ -> + lookup + end. +-spec generate_uuid() -> list(). +%% @doc +%% Generate a new globally unique ID as a string. %% Credit to %% https://github.com/afiskon/erlang-uuid-v4/blob/master/src/uuid.erl generate_uuid() -> @@ -363,7 +377,7 @@ endkey_passed({EK1, EK2, EK3, null}, {CK1, CK2, CK3, _}) -> endkey_passed(EndKey, CheckingKey) -> EndKey < CheckingKey. -convert_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) -> +idx_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) -> lists:map(fun({IndexOp, IdxField, IdxValue}) -> Status = case IndexOp of add -> @@ -378,6 +392,85 @@ convert_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) -> end, IndexSpecs). +-spec aae_indexspecs(false|recent_aae(), + any(), any(), + integer(), integer(), + list()) + -> list(). +%% @doc +%% Generate an additional index term representing the change, if the last +%% modified date for the change is within the definition of recency. +%% +%% The objetc may have multiple last modified dates (siblings), and in this +%% case index entries for all dates within the range are added. +%% +%% The index should entry auto-expire in the future (when it is no longer +%% relevant to assessing recent changes) +aae_indexspecs(false, _Bucket, _Key, _SQN, _H, _LastMods) -> + []; +aae_indexspecs(_AAE, _Bucket, _Key, _SQN, _H, []) -> + []; +aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods) -> + InBucket = + case AAE#recent_aae.buckets of + all -> + true; + ListB -> + lists:member(Bucket, ListB) + end, + case InBucket of + true -> + GenTagFun = + fun(LMD0, Acc) -> + Dates = parse_date(LMD0, + AAE#recent_aae.unit_minutes, + AAE#recent_aae.limit_minutes, + integer_now()), + case Dates of + no_index -> + Acc; + {LMD1, TTL} -> + TreeSize = AAE#recent_aae.tree_size, + SegmentID = + leveled_tictac:get_segment(Key, TreeSize), + IdxK = {?AAE_TAG, + LMD1, + {{SegmentID, H}, Bucket}, + Key}, + IdxV = {SQN, {active, TTL}, no_lookup, null}, + [{IdxK, IdxV}|Acc] + end + end, + lists:foldl(GenTagFun, [], LastMods); + false -> + [] + end. + +-spec parse_date(tuple(), integer(), integer(), integer()) -> + no_index|{binary(), integer()}. +%% @doc +%% Parse the lat modified date and the AAE date configuration to return a +%% binary to be used as the last modified date part of the index, and an +%% integer to be used as the TTL of the index entry. +%% Return no_index if the change is not recent. +parse_date(LMD, UnitMins, LimitMins, Now) -> + LMDsecs = integer_time(LMD), + Recent = (LMDsecs + LimitMins * 60) > Now, + case Recent of + false -> + no_index; + true -> + {{Y, M, D}, {Hour, Minute, _Second}} = + calendar:now_to_datetime(LMD), + RoundMins = + UnitMins * (Minute div UnitMins), + StrTime = + lists:flatten(io_lib:format(?LMD_FORMAT, + [Y, M, D, Hour, RoundMins])), + TTL = min(Now, LMDsecs) + (LimitMins + UnitMins) * 60, + {list_to_binary(StrTime), TTL} + end. + -spec generate_ledgerkv(tuple(), integer(), any(), integer(), tuple()|infinity) -> {any(), any(), any(), integer()|no_lookup, list()}. @@ -532,7 +625,7 @@ indexspecs_test() -> IndexSpecs = [{add, "t1_int", 456}, {add, "t1_bin", "adbc123"}, {remove, "t1_bin", "abdc456"}], - Changes = convert_indexspecs(IndexSpecs, "Bucket", "Key2", 1, infinity), + Changes = idx_indexspecs(IndexSpecs, "Bucket", "Key2", 1, infinity), ?assertMatch({{i, "Bucket", {"t1_int", 456}, "Key2"}, {1, {active, infinity}, no_lookup, null}}, lists:nth(1, Changes)), @@ -642,5 +735,67 @@ magichashperf_test() -> {TimeMH2, _HL1} = timer:tc(lists, map, [fun(K) -> magic_hash(K) end, KL]), io:format(user, "1000 keys magic hashed in ~w microseconds~n", [TimeMH2]). +parsedate_test() -> + {MeS, S, MiS} = os:timestamp(), + timer:sleep(100), + Now = integer_now(), + UnitMins = 5, + LimitMins = 60, + PD = parse_date({MeS, S, MiS}, UnitMins, LimitMins, Now), + io:format("Parsed Date ~w~n", [PD]), + ?assertMatch(true, is_tuple(PD)), + check_pd(PD, UnitMins), + CheckFun = + fun(Offset) -> + ModDate = {MeS, S + Offset * 60, MiS}, + check_pd(parse_date(ModDate, UnitMins, LimitMins, Now), UnitMins) + end, + lists:foreach(CheckFun, lists:seq(1, 60)). + +check_pd(PD, UnitMins) -> + {LMDbin, _TTL} = PD, + LMDstr = binary_to_list(LMDbin), + Minutes = list_to_integer(lists:nthtail(10, LMDstr)), + ?assertMatch(0, Minutes rem UnitMins). + +parseolddate_test() -> + LMD = os:timestamp(), + timer:sleep(100), + Now = integer_now() + 60 * 60, + UnitMins = 5, + LimitMins = 60, + PD = parse_date(LMD, UnitMins, LimitMins, Now), + io:format("Parsed Date ~w~n", [PD]), + ?assertMatch(no_index, PD). + +genaaeidx_test() -> + AAE = #recent_aae{buckets=all, limit_minutes=60, unit_minutes=5}, + Bucket = <<"Bucket1">>, + Key = <<"Key1">>, + SQN = 1, + H = erlang:phash2(null), + LastMods = [os:timestamp(), os:timestamp()], + + AAESpecs = aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods), + ?assertMatch(2, length(AAESpecs)), + + LastMods1 = [os:timestamp()], + AAESpecs1 = aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods1), + ?assertMatch(1, length(AAESpecs1)), + + LastMods0 = [], + AAESpecs0 = aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods0), + ?assertMatch(0, length(AAESpecs0)), + + AAE0 = AAE#recent_aae{buckets=[<<"Bucket0">>]}, + AAESpecsB0 = aae_indexspecs(AAE0, Bucket, Key, SQN, H, LastMods1), + ?assertMatch(0, length(AAESpecsB0)), + AAESpecsB1 = aae_indexspecs(AAE0, <<"Bucket0">>, Key, SQN, H, LastMods1), + + ?assertMatch(1, length(AAESpecsB1)), + [{{?AAE_TAG, _LMD, {{SegID, H}, <<"Bucket0">>}, <<"Key1">>}, + {SQN, {active, TS}, no_lookup, null}}] = AAESpecsB1, + ?assertMatch(true, is_integer(SegID)), + ?assertMatch(true, is_integer(TS)). -endif. \ No newline at end of file diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index bfb2bff..41e3732 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -1002,7 +1002,7 @@ plain_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> element(1, R). fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> - PosList = leveled_pmem:check_index(Hash, L0Index), + PosList = leveled_pmem:check_index(Hash, L0Index), L0Check = leveled_pmem:check_levelzero(Key, Hash, PosList, L0Cache), case L0Check of {false, not_found} -> diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index eb5af40..6e2073a 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -1503,11 +1503,11 @@ generate_indexkeys(Count, IndexList) -> generate_indexkey(Term, Count) -> IndexSpecs = [{add, "t1_int", Term}], - leveled_codec:convert_indexspecs(IndexSpecs, - "Bucket", - "Key" ++ integer_to_list(Count), - Count, - infinity). + leveled_codec:idx_indexspecs(IndexSpecs, + "Bucket", + "Key" ++ integer_to_list(Count), + Count, + infinity). form_slot_test() -> % If a skip key happens, mustn't switch to loookup by accident as could be diff --git a/src/leveled_tictac.erl b/src/leveled_tictac.erl index 1ba4b08..88d1744 100644 --- a/src/leveled_tictac.erl +++ b/src/leveled_tictac.erl @@ -97,17 +97,7 @@ new_tree(TreeID) -> new_tree(TreeID, small). new_tree(TreeID, Size) -> - {BitWidth, Width, SegmentCount} = - case Size of - small -> - ?SMALL; - medium -> - ?MEDIUM; - large -> - ?LARGE; - xlarge -> - ?XLARGE - end, + {BitWidth, Width, SegmentCount} = get_size(Size), Lv1Width = Width * ?HASH_SIZE * 8, Lv1Init = <<0:Lv1Width/integer>>, Lv2SegBinSize = Width * ?HASH_SIZE * 8, @@ -236,14 +226,31 @@ merge_trees(TreeA, TreeB) -> MergedTree#tictactree{level1 = NewLevel1, level2 = NewLevel2}. -get_segment(Key, SegmentCount) -> - erlang:phash2(Key) band (SegmentCount - 1). - +-spec get_segment(any(), integer()|small|medium|large|xlarge) -> integer(). +%% @doc +%% Return the segment ID for a Key. Can pass the tree size or the actual +%% segment count derived from the size +get_segment(Key, SegmentCount) when is_integer(SegmentCount) -> + erlang:phash2(Key) band (SegmentCount - 1); +get_segment(Key, TreeSize) -> + get_segment(Key, element(3, get_size(TreeSize))). %%%============================================================================ %%% Internal functions %%%============================================================================ +get_size(Size) -> + case Size of + small -> + ?SMALL; + medium -> + ?MEDIUM; + large -> + ?LARGE; + xlarge -> + ?XLARGE + end. + segmentcompare(SrcBin, SinkBin) when byte_size(SrcBin)==byte_size(SinkBin) -> segmentcompare(SrcBin, SinkBin, [], 0). diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index afc9aec..cd143cf 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -374,7 +374,8 @@ set_object(Bucket, Key, Value, IndexGen, Indexes2Remove) -> {remove, IdxF, IdxV} end, Indexes2Remove), [{"MDK", "MDV" ++ Key}, - {"MDK2", "MDV" ++ Key}]}, + {"MDK2", "MDV" ++ Key}, + {?MD_LASTMOD, os:timestamp()}]}, {B1, K1, V1, Spec1, MD} = Obj, Content = #r_content{metadata=dict:from_list(MD), value=V1}, {#r_object{bucket=B1,