Extract deprecated recent_aae

Ready to add other forms of last modified filtering
This commit is contained in:
Martin Sumner 2018-10-29 15:49:50 +00:00
parent 5cbda7c2f7
commit 2e2c35fe1b
3 changed files with 3 additions and 833 deletions

View file

@ -104,7 +104,6 @@
-define(JOURNAL_SIZE_JITTER, 20).
-define(ABSOLUTEMAX_JOURNALSIZE, 4000000000).
-define(LONG_RUNNING, 80000).
-define(RECENT_AAE, false).
-define(COMPRESSION_METHOD, lz4).
-define(COMPRESSION_POINT, on_receipt).
-define(TIMING_SAMPLESIZE, 100).
@ -118,7 +117,6 @@
{cache_size, ?CACHE_SIZE},
{max_journalsize, 1000000000},
{sync_strategy, none},
{recent_aae, ?RECENT_AAE},
{head_only, false},
{waste_retention_period, undefined},
{max_run_length, undefined},
@ -140,7 +138,6 @@
-record(state, {inker :: pid() | undefined,
penciller :: pid() | undefined,
cache_size :: integer() | undefined,
recent_aae :: recent_aae(),
ledger_cache = #ledger_cache{},
is_snapshot :: boolean() | undefined,
slow_offer = false :: boolean(),
@ -186,7 +183,6 @@
-type fold_timings() :: no_timing|#fold_timings{}.
-type head_timings() :: no_timing|#head_timings{}.
-type timing_types() :: head|get|put|fold.
-type recent_aae() :: false|#recent_aae{}|undefined.
-type key() :: binary()|string()|{binary(), binary()}.
% Keys SHOULD be binary()
% string() support is a legacy of old tests
@ -220,12 +216,6 @@
% riak_sync is used for backwards compatability with OTP16 - and
% will manually call sync() after each write (rather than use the
% O_SYNC option on startup
{recent_aae, false|{atom(), list(), integer(), integer()}} |
% DEPRECATED
% Before working on kv_index_tictactree looked at the possibility
% of maintaining AAE just for recent changes. Given the efficiency
% of the kv_index_tictactree approach this is unecessary.
% Should be set to false
{head_only, false|with_lookup|no_lookup} |
% When set to true, there are three fundamental changes as to how
% leveled will work:
@ -1008,16 +998,6 @@ init([Opts]) ->
ConfiguredCacheSize div (100 div ?CACHE_SIZE_JITTER),
CacheSize =
ConfiguredCacheSize + erlang:phash2(self()) rem CacheJitter,
RecentAAE =
case proplists:get_value(recent_aae, Opts) of
false ->
false;
{FilterType, BucketList, LimitMinutes, UnitMinutes} ->
#recent_aae{filter = FilterType,
buckets = BucketList,
limit_minutes = LimitMinutes,
unit_minutes = UnitMinutes}
end,
{HeadOnly, HeadLookup} =
case proplists:get_value(head_only, Opts) of
@ -1030,7 +1010,6 @@ init([Opts]) ->
end,
State0 = #state{cache_size=CacheSize,
recent_aae=RecentAAE,
is_snapshot=false,
head_only=HeadOnly,
head_lookup = HeadLookup},
@ -1926,14 +1905,12 @@ preparefor_ledgercache(?INKT_KEYD,
{no_lookup, SQN, KeyChanges};
preparefor_ledgercache(_InkTag,
LedgerKey, SQN, Obj, Size, {IdxSpecs, TTL},
State) ->
{Bucket, Key, MetaValue, {KeyH, ObjH}, LastMods} =
_State) ->
{Bucket, Key, MetaValue, {KeyH, _ObjH}, _LastMods} =
leveled_codec:generate_ledgerkv(LedgerKey, SQN, Obj, Size, TTL),
KeyChanges =
[{LedgerKey, MetaValue}] ++
leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL) ++
leveled_codec:aae_indexspecs(State#state.recent_aae,
Bucket, Key, SQN, ObjH, LastMods),
leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL),
{KeyH, SQN, KeyChanges}.

View file

@ -63,7 +63,6 @@
get_keyandobjhash/2,
idx_indexspecs/5,
obj_objectspecs/3,
aae_indexspecs/6,
riak_extract_metadata/2,
segment_hash/1,
to_lookup/1,
@ -76,7 +75,6 @@
-define(NRT_IDX, "$aae.").
-define(ALL_BUCKETS, <<"$all">>).
-type recent_aae() :: #recent_aae{}.
-type riak_metadata() :: {binary()|delete, % Sibling Metadata
binary()|null, % Vclock Metadata
integer()|null, % Hash of vclock - non-exportable
@ -577,103 +575,6 @@ set_status(remove, _TTL) ->
%% TODO: timestamps for delayed reaping
tomb.
-spec aae_indexspecs(false|recent_aae(),
any(), any(),
integer(), integer(),
list())
-> list().
%% @doc
%% Generate an additional index term representing the change, if the last
%% modified date for the change is within the definition of recency.
%%
%% The object may have multiple last modified dates (siblings), and in this
%% case index entries for all dates within the range are added.
%%
%% The index should entry auto-expire in the future (when it is no longer
%% relevant to assessing recent changes)
aae_indexspecs(false, _Bucket, _Key, _SQN, _H, _LastMods) ->
[];
aae_indexspecs(_AAE, _Bucket, _Key, _SQN, _H, []) ->
[];
aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods) ->
InList = lists:member(Bucket, AAE#recent_aae.buckets),
Bucket0 =
case AAE#recent_aae.filter of
blacklist ->
case InList of
true ->
false;
false ->
{all, Bucket}
end;
whitelist ->
case InList of
true ->
Bucket;
false ->
false
end
end,
case Bucket0 of
false ->
[];
Bucket0 ->
GenIdxFun =
fun(LMD0, Acc) ->
Dates = parse_date(LMD0,
AAE#recent_aae.unit_minutes,
AAE#recent_aae.limit_minutes,
leveled_util:integer_now()),
case Dates of
no_index ->
Acc;
{LMD1, TTL} ->
TreeSize = AAE#recent_aae.tree_size,
SegID32 = leveled_tictac:keyto_segment32(Key),
SegID =
leveled_tictac:get_segment(SegID32, TreeSize),
IdxFldStr = ?NRT_IDX ++ LMD1 ++ "_bin",
IdxTrmStr =
string:right(integer_to_list(SegID), 8, $0) ++
"." ++
string:right(integer_to_list(H), 8, $0),
{IdxK, IdxV} =
gen_indexspec(Bucket0, Key,
add,
list_to_binary(IdxFldStr),
list_to_binary(IdxTrmStr),
SQN, TTL),
[{IdxK, IdxV}|Acc]
end
end,
lists:foldl(GenIdxFun, [], LastMods)
end.
-spec parse_date(tuple(), integer(), integer(), integer()) ->
no_index|{list(), integer()}.
%% @doc
%% Parse the last modified date and the AAE date configuration to return a
%% binary to be used as the last modified date part of the index, and an
%% integer to be used as the TTL of the index entry.
%% Return no_index if the change is not recent.
parse_date(LMD, UnitMins, LimitMins, Now) ->
LMDsecs = leveled_util:integer_time(LMD),
Recent = (LMDsecs + LimitMins * 60) > Now,
case Recent of
false ->
no_index;
true ->
{{Y, M, D}, {Hour, Minute, _Second}} =
calendar:now_to_datetime(LMD),
RoundMins =
UnitMins * (Minute div UnitMins),
StrTime =
lists:flatten(io_lib:format(?LMD_FORMAT,
[Y, M, D, Hour, RoundMins])),
TTL = min(Now, LMDsecs) + (LimitMins + UnitMins) * 60,
{StrTime, TTL}
end.
-spec generate_ledgerkv(
tuple(), integer(), any(), integer(), tuple()|infinity) ->
{any(), any(), any(),
@ -927,95 +828,6 @@ hashperf_test() ->
io:format(user, "1000 object hashes in ~w microseconds~n",
[timer:now_diff(os:timestamp(), SW)]).
parsedate_test() ->
{MeS, S, MiS} = os:timestamp(),
timer:sleep(100),
Now = leveled_util:integer_now(),
UnitMins = 5,
LimitMins = 60,
PD = parse_date({MeS, S, MiS}, UnitMins, LimitMins, Now),
io:format("Parsed Date ~w~n", [PD]),
?assertMatch(true, is_tuple(PD)),
check_pd(PD, UnitMins),
CheckFun =
fun(Offset) ->
ModDate = {MeS, S + Offset * 60, MiS},
check_pd(parse_date(ModDate, UnitMins, LimitMins, Now), UnitMins)
end,
lists:foreach(CheckFun, lists:seq(1, 60)).
check_pd(PD, UnitMins) ->
{LMDstr, _TTL} = PD,
Minutes = list_to_integer(lists:nthtail(10, LMDstr)),
?assertMatch(0, Minutes rem UnitMins).
parseolddate_test() ->
LMD = os:timestamp(),
timer:sleep(100),
Now = leveled_util:integer_now() + 60 * 60,
UnitMins = 5,
LimitMins = 60,
PD = parse_date(LMD, UnitMins, LimitMins, Now),
io:format("Parsed Date ~w~n", [PD]),
?assertMatch(no_index, PD).
genaaeidx_test() ->
AAE = #recent_aae{filter=blacklist,
buckets=[],
limit_minutes=60,
unit_minutes=5},
Bucket = <<"Bucket1">>,
Key = <<"Key1">>,
SQN = 1,
H = erlang:phash2(null),
LastMods = [os:timestamp(), os:timestamp()],
AAESpecs = aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods),
?assertMatch(2, length(AAESpecs)),
LastMods1 = [os:timestamp()],
AAESpecs1 = aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods1),
?assertMatch(1, length(AAESpecs1)),
IdxB = element(2, element(1, lists:nth(1, AAESpecs1))),
io:format(user, "AAE IDXSpecs1 ~w~n", [AAESpecs1]),
?assertMatch(<<"$all">>, IdxB),
LastMods0 = [],
AAESpecs0 = aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods0),
?assertMatch(0, length(AAESpecs0)),
AAE0 = AAE#recent_aae{filter=whitelist,
buckets=[<<"Bucket0">>]},
AAESpecsB0 = aae_indexspecs(AAE0, Bucket, Key, SQN, H, LastMods1),
?assertMatch(0, length(AAESpecsB0)),
AAESpecsB1 = aae_indexspecs(AAE0, <<"Bucket0">>, Key, SQN, H, LastMods1),
?assertMatch(1, length(AAESpecsB1)),
[{{?IDX_TAG, <<"Bucket0">>, {Fld, Term}, <<"Key1">>},
{SQN, {active, TS}, no_lookup, null}}] = AAESpecsB1,
?assertMatch(true, is_integer(TS)),
?assertMatch(17, length(binary_to_list(Term))),
?assertMatch("$aae.", lists:sublist(binary_to_list(Fld), 5)),
AAE1 = AAE#recent_aae{filter=blacklist,
buckets=[<<"Bucket0">>]},
AAESpecsB2 = aae_indexspecs(AAE1, <<"Bucket0">>, Key, SQN, H, LastMods1),
?assertMatch(0, length(AAESpecsB2)).
delayedupdate_aaeidx_test() ->
AAE = #recent_aae{filter=blacklist,
buckets=[],
limit_minutes=60,
unit_minutes=5},
Bucket = <<"Bucket1">>,
Key = <<"Key1">>,
SQN = 1,
H = erlang:phash2(null),
{Mega, Sec, MSec} = os:timestamp(),
LastMods = [{Mega -1, Sec, MSec}],
AAESpecs = aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods),
?assertMatch(0, length(AAESpecs)).
head_segment_compare_test() ->
% Reminder to align native and parallel(leveled_ko) key stores for
% kv_index_tictactree