Add temporary aae index
Pending ct tests. The aae index should expire after limit_minutes and be on an index which is rounded to unit_minutes.
This commit is contained in:
parent
2dd303237b
commit
8da8722b9e
7 changed files with 281 additions and 72 deletions
|
@ -5,6 +5,8 @@
|
||||||
-define(STD_TAG, o).
|
-define(STD_TAG, o).
|
||||||
%% Tag used for secondary index keys
|
%% Tag used for secondary index keys
|
||||||
-define(IDX_TAG, i).
|
-define(IDX_TAG, i).
|
||||||
|
%% Tag used for near real-time anti-entropy index keys
|
||||||
|
-define(AAE_TAG, i_aae).
|
||||||
|
|
||||||
%% Inker key type used for 'normal' objects
|
%% Inker key type used for 'normal' objects
|
||||||
-define(INKT_STND, stnd).
|
-define(INKT_STND, stnd).
|
||||||
|
@ -67,6 +69,25 @@
|
||||||
waste_retention_period :: integer(),
|
waste_retention_period :: integer(),
|
||||||
reload_strategy = [] :: list()}).
|
reload_strategy = [] :: list()}).
|
||||||
|
|
||||||
|
-record(recent_aae, {buckets :: list()|all,
|
||||||
|
% whitelist of buckets to support recent recent AAE
|
||||||
|
% or all to support all buckets
|
||||||
|
|
||||||
|
limit_minutes :: integer(),
|
||||||
|
% how long to retain entries the temporary index for
|
||||||
|
% It will actually be retained for limit + unit minutes
|
||||||
|
% 60 minutes seems sensible
|
||||||
|
|
||||||
|
unit_minutes :: integer(),
|
||||||
|
% What the minimum unit size will be for a query
|
||||||
|
% e.g. the minimum time duration to be used in range
|
||||||
|
% queries of the aae index
|
||||||
|
% 5 minutes seems sensible
|
||||||
|
|
||||||
|
tree_size = small :: atom()
|
||||||
|
% Just defaulted to small for now
|
||||||
|
}).
|
||||||
|
|
||||||
-record(r_content, {
|
-record(r_content, {
|
||||||
metadata,
|
metadata,
|
||||||
value :: term()
|
value :: term()
|
||||||
|
|
|
@ -82,6 +82,7 @@
|
||||||
-define(CACHE_SIZE_JITTER, 25).
|
-define(CACHE_SIZE_JITTER, 25).
|
||||||
-define(JOURNAL_SIZE_JITTER, 20).
|
-define(JOURNAL_SIZE_JITTER, 20).
|
||||||
-define(LONG_RUNNING, 80000).
|
-define(LONG_RUNNING, 80000).
|
||||||
|
-define(RECENT_AAE, false).
|
||||||
|
|
||||||
-record(ledger_cache, {mem :: ets:tab(),
|
-record(ledger_cache, {mem :: ets:tab(),
|
||||||
loader = leveled_tree:empty(?CACHE_TYPE)
|
loader = leveled_tree:empty(?CACHE_TYPE)
|
||||||
|
@ -94,6 +95,7 @@
|
||||||
-record(state, {inker :: pid(),
|
-record(state, {inker :: pid(),
|
||||||
penciller :: pid(),
|
penciller :: pid(),
|
||||||
cache_size :: integer(),
|
cache_size :: integer(),
|
||||||
|
recent_aae :: false|#recent_aae{},
|
||||||
ledger_cache = #ledger_cache{},
|
ledger_cache = #ledger_cache{},
|
||||||
is_snapshot :: boolean(),
|
is_snapshot :: boolean(),
|
||||||
slow_offer = false :: boolean(),
|
slow_offer = false :: boolean(),
|
||||||
|
@ -157,7 +159,7 @@ book_start(RootPath, LedgerCacheSize, JournalSize, SyncStrategy) ->
|
||||||
%%
|
%%
|
||||||
%% TODO:
|
%% TODO:
|
||||||
%% The reload_strategy is exposed as currently no firm decision has been made
|
%% The reload_strategy is exposed as currently no firm decision has been made
|
||||||
%% about how recovery should work. For instance if we were to trust evrything
|
%% about how recovery should work. For instance if we were to trust everything
|
||||||
%% as permanent in the Ledger once it is persisted, then there would be no
|
%% as permanent in the Ledger once it is persisted, then there would be no
|
||||||
%% need to retain a skinny history of key changes in the Journal after
|
%% need to retain a skinny history of key changes in the Journal after
|
||||||
%% compaction. If, as an alternative we assume the Ledger is never permanent,
|
%% compaction. If, as an alternative we assume the Ledger is never permanent,
|
||||||
|
@ -383,15 +385,28 @@ init([Opts]) ->
|
||||||
undefined ->
|
undefined ->
|
||||||
% Start from file not snapshot
|
% Start from file not snapshot
|
||||||
{InkerOpts, PencillerOpts} = set_options(Opts),
|
{InkerOpts, PencillerOpts} = set_options(Opts),
|
||||||
{Inker, Penciller} = startup(InkerOpts, PencillerOpts),
|
|
||||||
CacheJitter = ?CACHE_SIZE div (100 div ?CACHE_SIZE_JITTER),
|
CacheJitter = ?CACHE_SIZE div (100 div ?CACHE_SIZE_JITTER),
|
||||||
CacheSize = get_opt(cache_size, Opts, ?CACHE_SIZE)
|
CacheSize = get_opt(cache_size, Opts, ?CACHE_SIZE)
|
||||||
+ erlang:phash2(self()) rem CacheJitter,
|
+ erlang:phash2(self()) rem CacheJitter,
|
||||||
|
RecentAAE =
|
||||||
|
case get_opt(recent_aae, Opts, ?RECENT_AAE) of
|
||||||
|
false ->
|
||||||
|
false;
|
||||||
|
{BucketList, LimitMinutes, UnitMinutes} ->
|
||||||
|
#recent_aae{buckets = BucketList,
|
||||||
|
limit_minutes = LimitMinutes,
|
||||||
|
unit_minutes = UnitMinutes}
|
||||||
|
end,
|
||||||
|
|
||||||
|
{Inker, Penciller} = startup(InkerOpts, PencillerOpts, RecentAAE),
|
||||||
|
|
||||||
NewETS = ets:new(mem, [ordered_set]),
|
NewETS = ets:new(mem, [ordered_set]),
|
||||||
leveled_log:log("B0001", [Inker, Penciller]),
|
leveled_log:log("B0001", [Inker, Penciller]),
|
||||||
{ok, #state{inker=Inker,
|
{ok, #state{inker=Inker,
|
||||||
penciller=Penciller,
|
penciller=Penciller,
|
||||||
cache_size=CacheSize,
|
cache_size=CacheSize,
|
||||||
|
recent_aae=RecentAAE,
|
||||||
ledger_cache=#ledger_cache{mem = NewETS},
|
ledger_cache=#ledger_cache{mem = NewETS},
|
||||||
is_snapshot=false}};
|
is_snapshot=false}};
|
||||||
Bookie ->
|
Bookie ->
|
||||||
|
@ -418,7 +433,8 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) ->
|
||||||
SQN,
|
SQN,
|
||||||
Object,
|
Object,
|
||||||
ObjSize,
|
ObjSize,
|
||||||
{IndexSpecs, TTL}),
|
{IndexSpecs, TTL},
|
||||||
|
State#state.recent_aae),
|
||||||
Cache0 = addto_ledgercache(Changes, State#state.ledger_cache),
|
Cache0 = addto_ledgercache(Changes, State#state.ledger_cache),
|
||||||
T1 = timer:now_diff(os:timestamp(), SW) - T0,
|
T1 = timer:now_diff(os:timestamp(), SW) - T0,
|
||||||
PutTimes = leveled_log:put_timing(bookie, State#state.put_timing, T0, T1),
|
PutTimes = leveled_log:put_timing(bookie, State#state.put_timing, T0, T1),
|
||||||
|
@ -1151,14 +1167,14 @@ set_options(Opts) ->
|
||||||
max_inmemory_tablesize = PCLL0CacheSize,
|
max_inmemory_tablesize = PCLL0CacheSize,
|
||||||
levelzero_cointoss = true}}.
|
levelzero_cointoss = true}}.
|
||||||
|
|
||||||
startup(InkerOpts, PencillerOpts) ->
|
startup(InkerOpts, PencillerOpts, RecentAAE) ->
|
||||||
{ok, Inker} = leveled_inker:ink_start(InkerOpts),
|
{ok, Inker} = leveled_inker:ink_start(InkerOpts),
|
||||||
{ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts),
|
{ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts),
|
||||||
LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller),
|
LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller),
|
||||||
leveled_log:log("B0005", [LedgerSQN]),
|
leveled_log:log("B0005", [LedgerSQN]),
|
||||||
ok = leveled_inker:ink_loadpcl(Inker,
|
ok = leveled_inker:ink_loadpcl(Inker,
|
||||||
LedgerSQN + 1,
|
LedgerSQN + 1,
|
||||||
fun load_fun/5,
|
get_loadfun(RecentAAE),
|
||||||
Penciller),
|
Penciller),
|
||||||
{Inker, Penciller}.
|
{Inker, Penciller}.
|
||||||
|
|
||||||
|
@ -1383,17 +1399,21 @@ accumulate_index(TermRe, AddFun, FoldKeysFun) ->
|
||||||
|
|
||||||
|
|
||||||
preparefor_ledgercache(?INKT_KEYD,
|
preparefor_ledgercache(?INKT_KEYD,
|
||||||
LedgerKey, SQN, _Obj, _Size, {IdxSpecs, TTL}) ->
|
LedgerKey, SQN, _Obj, _Size, {IdxSpecs, TTL},
|
||||||
|
_AAE) ->
|
||||||
{Bucket, Key} = leveled_codec:from_ledgerkey(LedgerKey),
|
{Bucket, Key} = leveled_codec:from_ledgerkey(LedgerKey),
|
||||||
KeyChanges =
|
KeyChanges =
|
||||||
leveled_codec:convert_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL),
|
leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL),
|
||||||
{no_lookup, SQN, KeyChanges};
|
{no_lookup, SQN, KeyChanges};
|
||||||
preparefor_ledgercache(_Type, LedgerKey, SQN, Obj, Size, {IdxSpecs, TTL}) ->
|
preparefor_ledgercache(_InkTag,
|
||||||
{Bucket, Key, MetaValue, H, _LastMods} =
|
LedgerKey, SQN, Obj, Size, {IdxSpecs, TTL},
|
||||||
|
AAE) ->
|
||||||
|
{Bucket, Key, MetaValue, H, LastMods} =
|
||||||
leveled_codec:generate_ledgerkv(LedgerKey, SQN, Obj, Size, TTL),
|
leveled_codec:generate_ledgerkv(LedgerKey, SQN, Obj, Size, TTL),
|
||||||
KeyChanges =
|
KeyChanges =
|
||||||
[{LedgerKey, MetaValue}] ++
|
[{LedgerKey, MetaValue}] ++
|
||||||
leveled_codec:convert_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL),
|
leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL) ++
|
||||||
|
leveled_codec:aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods),
|
||||||
{H, SQN, KeyChanges}.
|
{H, SQN, KeyChanges}.
|
||||||
|
|
||||||
|
|
||||||
|
@ -1452,35 +1472,40 @@ maybe_withjitter(CacheSize, MaxCacheSize) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
get_loadfun(RecentAAE) ->
|
||||||
load_fun(KeyInJournal, ValueInJournal, _Position, Acc0, ExtractFun) ->
|
PrepareFun =
|
||||||
{MinSQN, MaxSQN, OutputTree} = Acc0,
|
fun(Tag, PK, SQN, Obj, VS, IdxSpecs) ->
|
||||||
{SQN, Type, PK} = KeyInJournal,
|
preparefor_ledgercache(Tag, PK, SQN, Obj, VS, IdxSpecs, RecentAAE)
|
||||||
% VBin may already be a term
|
end,
|
||||||
{VBin, VSize} = ExtractFun(ValueInJournal),
|
LoadFun =
|
||||||
{Obj, IndexSpecs} = leveled_codec:split_inkvalue(VBin),
|
fun(KeyInJournal, ValueInJournal, _Pos, Acc0, ExtractFun) ->
|
||||||
case SQN of
|
{MinSQN, MaxSQN, OutputTree} = Acc0,
|
||||||
SQN when SQN < MinSQN ->
|
{SQN, InkTag, PK} = KeyInJournal,
|
||||||
{loop, Acc0};
|
% VBin may already be a term
|
||||||
SQN when SQN < MaxSQN ->
|
{VBin, VSize} = ExtractFun(ValueInJournal),
|
||||||
Changes = preparefor_ledgercache(Type, PK, SQN,
|
{Obj, IdxSpecs} = leveled_codec:split_inkvalue(VBin),
|
||||||
Obj, VSize, IndexSpecs),
|
case SQN of
|
||||||
{loop,
|
SQN when SQN < MinSQN ->
|
||||||
{MinSQN,
|
{loop, Acc0};
|
||||||
MaxSQN,
|
SQN when SQN < MaxSQN ->
|
||||||
addto_ledgercache(Changes, OutputTree, loader)}};
|
Chngs = PrepareFun(InkTag, PK, SQN, Obj, VSize, IdxSpecs),
|
||||||
MaxSQN ->
|
{loop,
|
||||||
leveled_log:log("B0006", [SQN]),
|
{MinSQN,
|
||||||
Changes = preparefor_ledgercache(Type, PK, SQN,
|
MaxSQN,
|
||||||
Obj, VSize, IndexSpecs),
|
addto_ledgercache(Chngs, OutputTree, loader)}};
|
||||||
{stop,
|
MaxSQN ->
|
||||||
{MinSQN,
|
leveled_log:log("B0006", [SQN]),
|
||||||
MaxSQN,
|
Chngs = PrepareFun(InkTag, PK, SQN, Obj, VSize, IdxSpecs),
|
||||||
addto_ledgercache(Changes, OutputTree, loader)}};
|
{stop,
|
||||||
SQN when SQN > MaxSQN ->
|
{MinSQN,
|
||||||
leveled_log:log("B0007", [MaxSQN, SQN]),
|
MaxSQN,
|
||||||
{stop, Acc0}
|
addto_ledgercache(Chngs, OutputTree, loader)}};
|
||||||
end.
|
SQN when SQN > MaxSQN ->
|
||||||
|
leveled_log:log("B0007", [MaxSQN, SQN]),
|
||||||
|
{stop, Acc0}
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
LoadFun.
|
||||||
|
|
||||||
|
|
||||||
get_opt(Key, Opts) ->
|
get_opt(Key, Opts) ->
|
||||||
|
|
|
@ -59,7 +59,8 @@
|
||||||
generate_ledgerkv/5,
|
generate_ledgerkv/5,
|
||||||
get_size/2,
|
get_size/2,
|
||||||
get_keyandhash/2,
|
get_keyandhash/2,
|
||||||
convert_indexspecs/5,
|
idx_indexspecs/5,
|
||||||
|
aae_indexspecs/6,
|
||||||
generate_uuid/0,
|
generate_uuid/0,
|
||||||
integer_now/0,
|
integer_now/0,
|
||||||
riak_extract_metadata/2,
|
riak_extract_metadata/2,
|
||||||
|
@ -68,22 +69,19 @@
|
||||||
|
|
||||||
-define(V1_VERS, 1).
|
-define(V1_VERS, 1).
|
||||||
-define(MAGIC, 53). % riak_kv -> riak_object
|
-define(MAGIC, 53). % riak_kv -> riak_object
|
||||||
|
-define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w").
|
||||||
|
|
||||||
|
|
||||||
|
-type recent_aae() :: #recent_aae{}.
|
||||||
|
|
||||||
|
-spec magic_hash(any()) -> integer().
|
||||||
|
%% @doc
|
||||||
%% Use DJ Bernstein magic hash function. Note, this is more expensive than
|
%% Use DJ Bernstein magic hash function. Note, this is more expensive than
|
||||||
%% phash2 but provides a much more balanced result.
|
%% phash2 but provides a much more balanced result.
|
||||||
%%
|
%%
|
||||||
%% Hash function contains mysterious constants, some explanation here as to
|
%% Hash function contains mysterious constants, some explanation here as to
|
||||||
%% what they are -
|
%% what they are -
|
||||||
%% http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function
|
%% http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function
|
||||||
|
|
||||||
to_lookup(Key) ->
|
|
||||||
case element(1, Key) of
|
|
||||||
?IDX_TAG ->
|
|
||||||
no_lookup;
|
|
||||||
_ ->
|
|
||||||
lookup
|
|
||||||
end.
|
|
||||||
|
|
||||||
magic_hash({?RIAK_TAG, Bucket, Key, _SubKey}) ->
|
magic_hash({?RIAK_TAG, Bucket, Key, _SubKey}) ->
|
||||||
magic_hash({Bucket, Key});
|
magic_hash({Bucket, Key});
|
||||||
magic_hash({?STD_TAG, Bucket, Key, _SubKey}) ->
|
magic_hash({?STD_TAG, Bucket, Key, _SubKey}) ->
|
||||||
|
@ -100,7 +98,23 @@ hash1(H, <<B:8/integer, Rest/bytes>>) ->
|
||||||
H2 = H1 bxor B,
|
H2 = H1 bxor B,
|
||||||
hash1(H2, Rest).
|
hash1(H2, Rest).
|
||||||
|
|
||||||
|
%% @doc
|
||||||
|
%% Should it be possible to lookup a key in the merge tree. This is not true
|
||||||
|
%% For keys that should only be read through range queries. Direct lookup
|
||||||
|
%% keys will have presence in bloom filters and other lookup accelerators.
|
||||||
|
to_lookup(Key) ->
|
||||||
|
case element(1, Key) of
|
||||||
|
?IDX_TAG ->
|
||||||
|
no_lookup;
|
||||||
|
?AAE_TAG ->
|
||||||
|
no_lookup;
|
||||||
|
_ ->
|
||||||
|
lookup
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec generate_uuid() -> list().
|
||||||
|
%% @doc
|
||||||
|
%% Generate a new globally unique ID as a string.
|
||||||
%% Credit to
|
%% Credit to
|
||||||
%% https://github.com/afiskon/erlang-uuid-v4/blob/master/src/uuid.erl
|
%% https://github.com/afiskon/erlang-uuid-v4/blob/master/src/uuid.erl
|
||||||
generate_uuid() ->
|
generate_uuid() ->
|
||||||
|
@ -363,7 +377,7 @@ endkey_passed({EK1, EK2, EK3, null}, {CK1, CK2, CK3, _}) ->
|
||||||
endkey_passed(EndKey, CheckingKey) ->
|
endkey_passed(EndKey, CheckingKey) ->
|
||||||
EndKey < CheckingKey.
|
EndKey < CheckingKey.
|
||||||
|
|
||||||
convert_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) ->
|
idx_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) ->
|
||||||
lists:map(fun({IndexOp, IdxField, IdxValue}) ->
|
lists:map(fun({IndexOp, IdxField, IdxValue}) ->
|
||||||
Status = case IndexOp of
|
Status = case IndexOp of
|
||||||
add ->
|
add ->
|
||||||
|
@ -378,6 +392,85 @@ convert_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) ->
|
||||||
end,
|
end,
|
||||||
IndexSpecs).
|
IndexSpecs).
|
||||||
|
|
||||||
|
-spec aae_indexspecs(false|recent_aae(),
|
||||||
|
any(), any(),
|
||||||
|
integer(), integer(),
|
||||||
|
list())
|
||||||
|
-> list().
|
||||||
|
%% @doc
|
||||||
|
%% Generate an additional index term representing the change, if the last
|
||||||
|
%% modified date for the change is within the definition of recency.
|
||||||
|
%%
|
||||||
|
%% The objetc may have multiple last modified dates (siblings), and in this
|
||||||
|
%% case index entries for all dates within the range are added.
|
||||||
|
%%
|
||||||
|
%% The index should entry auto-expire in the future (when it is no longer
|
||||||
|
%% relevant to assessing recent changes)
|
||||||
|
aae_indexspecs(false, _Bucket, _Key, _SQN, _H, _LastMods) ->
|
||||||
|
[];
|
||||||
|
aae_indexspecs(_AAE, _Bucket, _Key, _SQN, _H, []) ->
|
||||||
|
[];
|
||||||
|
aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods) ->
|
||||||
|
InBucket =
|
||||||
|
case AAE#recent_aae.buckets of
|
||||||
|
all ->
|
||||||
|
true;
|
||||||
|
ListB ->
|
||||||
|
lists:member(Bucket, ListB)
|
||||||
|
end,
|
||||||
|
case InBucket of
|
||||||
|
true ->
|
||||||
|
GenTagFun =
|
||||||
|
fun(LMD0, Acc) ->
|
||||||
|
Dates = parse_date(LMD0,
|
||||||
|
AAE#recent_aae.unit_minutes,
|
||||||
|
AAE#recent_aae.limit_minutes,
|
||||||
|
integer_now()),
|
||||||
|
case Dates of
|
||||||
|
no_index ->
|
||||||
|
Acc;
|
||||||
|
{LMD1, TTL} ->
|
||||||
|
TreeSize = AAE#recent_aae.tree_size,
|
||||||
|
SegmentID =
|
||||||
|
leveled_tictac:get_segment(Key, TreeSize),
|
||||||
|
IdxK = {?AAE_TAG,
|
||||||
|
LMD1,
|
||||||
|
{{SegmentID, H}, Bucket},
|
||||||
|
Key},
|
||||||
|
IdxV = {SQN, {active, TTL}, no_lookup, null},
|
||||||
|
[{IdxK, IdxV}|Acc]
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
lists:foldl(GenTagFun, [], LastMods);
|
||||||
|
false ->
|
||||||
|
[]
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec parse_date(tuple(), integer(), integer(), integer()) ->
|
||||||
|
no_index|{binary(), integer()}.
|
||||||
|
%% @doc
|
||||||
|
%% Parse the lat modified date and the AAE date configuration to return a
|
||||||
|
%% binary to be used as the last modified date part of the index, and an
|
||||||
|
%% integer to be used as the TTL of the index entry.
|
||||||
|
%% Return no_index if the change is not recent.
|
||||||
|
parse_date(LMD, UnitMins, LimitMins, Now) ->
|
||||||
|
LMDsecs = integer_time(LMD),
|
||||||
|
Recent = (LMDsecs + LimitMins * 60) > Now,
|
||||||
|
case Recent of
|
||||||
|
false ->
|
||||||
|
no_index;
|
||||||
|
true ->
|
||||||
|
{{Y, M, D}, {Hour, Minute, _Second}} =
|
||||||
|
calendar:now_to_datetime(LMD),
|
||||||
|
RoundMins =
|
||||||
|
UnitMins * (Minute div UnitMins),
|
||||||
|
StrTime =
|
||||||
|
lists:flatten(io_lib:format(?LMD_FORMAT,
|
||||||
|
[Y, M, D, Hour, RoundMins])),
|
||||||
|
TTL = min(Now, LMDsecs) + (LimitMins + UnitMins) * 60,
|
||||||
|
{list_to_binary(StrTime), TTL}
|
||||||
|
end.
|
||||||
|
|
||||||
-spec generate_ledgerkv(tuple(), integer(), any(),
|
-spec generate_ledgerkv(tuple(), integer(), any(),
|
||||||
integer(), tuple()|infinity) ->
|
integer(), tuple()|infinity) ->
|
||||||
{any(), any(), any(), integer()|no_lookup, list()}.
|
{any(), any(), any(), integer()|no_lookup, list()}.
|
||||||
|
@ -532,7 +625,7 @@ indexspecs_test() ->
|
||||||
IndexSpecs = [{add, "t1_int", 456},
|
IndexSpecs = [{add, "t1_int", 456},
|
||||||
{add, "t1_bin", "adbc123"},
|
{add, "t1_bin", "adbc123"},
|
||||||
{remove, "t1_bin", "abdc456"}],
|
{remove, "t1_bin", "abdc456"}],
|
||||||
Changes = convert_indexspecs(IndexSpecs, "Bucket", "Key2", 1, infinity),
|
Changes = idx_indexspecs(IndexSpecs, "Bucket", "Key2", 1, infinity),
|
||||||
?assertMatch({{i, "Bucket", {"t1_int", 456}, "Key2"},
|
?assertMatch({{i, "Bucket", {"t1_int", 456}, "Key2"},
|
||||||
{1, {active, infinity}, no_lookup, null}},
|
{1, {active, infinity}, no_lookup, null}},
|
||||||
lists:nth(1, Changes)),
|
lists:nth(1, Changes)),
|
||||||
|
@ -642,5 +735,67 @@ magichashperf_test() ->
|
||||||
{TimeMH2, _HL1} = timer:tc(lists, map, [fun(K) -> magic_hash(K) end, KL]),
|
{TimeMH2, _HL1} = timer:tc(lists, map, [fun(K) -> magic_hash(K) end, KL]),
|
||||||
io:format(user, "1000 keys magic hashed in ~w microseconds~n", [TimeMH2]).
|
io:format(user, "1000 keys magic hashed in ~w microseconds~n", [TimeMH2]).
|
||||||
|
|
||||||
|
parsedate_test() ->
|
||||||
|
{MeS, S, MiS} = os:timestamp(),
|
||||||
|
timer:sleep(100),
|
||||||
|
Now = integer_now(),
|
||||||
|
UnitMins = 5,
|
||||||
|
LimitMins = 60,
|
||||||
|
PD = parse_date({MeS, S, MiS}, UnitMins, LimitMins, Now),
|
||||||
|
io:format("Parsed Date ~w~n", [PD]),
|
||||||
|
?assertMatch(true, is_tuple(PD)),
|
||||||
|
check_pd(PD, UnitMins),
|
||||||
|
CheckFun =
|
||||||
|
fun(Offset) ->
|
||||||
|
ModDate = {MeS, S + Offset * 60, MiS},
|
||||||
|
check_pd(parse_date(ModDate, UnitMins, LimitMins, Now), UnitMins)
|
||||||
|
end,
|
||||||
|
lists:foreach(CheckFun, lists:seq(1, 60)).
|
||||||
|
|
||||||
|
check_pd(PD, UnitMins) ->
|
||||||
|
{LMDbin, _TTL} = PD,
|
||||||
|
LMDstr = binary_to_list(LMDbin),
|
||||||
|
Minutes = list_to_integer(lists:nthtail(10, LMDstr)),
|
||||||
|
?assertMatch(0, Minutes rem UnitMins).
|
||||||
|
|
||||||
|
parseolddate_test() ->
|
||||||
|
LMD = os:timestamp(),
|
||||||
|
timer:sleep(100),
|
||||||
|
Now = integer_now() + 60 * 60,
|
||||||
|
UnitMins = 5,
|
||||||
|
LimitMins = 60,
|
||||||
|
PD = parse_date(LMD, UnitMins, LimitMins, Now),
|
||||||
|
io:format("Parsed Date ~w~n", [PD]),
|
||||||
|
?assertMatch(no_index, PD).
|
||||||
|
|
||||||
|
genaaeidx_test() ->
|
||||||
|
AAE = #recent_aae{buckets=all, limit_minutes=60, unit_minutes=5},
|
||||||
|
Bucket = <<"Bucket1">>,
|
||||||
|
Key = <<"Key1">>,
|
||||||
|
SQN = 1,
|
||||||
|
H = erlang:phash2(null),
|
||||||
|
LastMods = [os:timestamp(), os:timestamp()],
|
||||||
|
|
||||||
|
AAESpecs = aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods),
|
||||||
|
?assertMatch(2, length(AAESpecs)),
|
||||||
|
|
||||||
|
LastMods1 = [os:timestamp()],
|
||||||
|
AAESpecs1 = aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods1),
|
||||||
|
?assertMatch(1, length(AAESpecs1)),
|
||||||
|
|
||||||
|
LastMods0 = [],
|
||||||
|
AAESpecs0 = aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods0),
|
||||||
|
?assertMatch(0, length(AAESpecs0)),
|
||||||
|
|
||||||
|
AAE0 = AAE#recent_aae{buckets=[<<"Bucket0">>]},
|
||||||
|
AAESpecsB0 = aae_indexspecs(AAE0, Bucket, Key, SQN, H, LastMods1),
|
||||||
|
?assertMatch(0, length(AAESpecsB0)),
|
||||||
|
AAESpecsB1 = aae_indexspecs(AAE0, <<"Bucket0">>, Key, SQN, H, LastMods1),
|
||||||
|
|
||||||
|
?assertMatch(1, length(AAESpecsB1)),
|
||||||
|
[{{?AAE_TAG, _LMD, {{SegID, H}, <<"Bucket0">>}, <<"Key1">>},
|
||||||
|
{SQN, {active, TS}, no_lookup, null}}] = AAESpecsB1,
|
||||||
|
?assertMatch(true, is_integer(SegID)),
|
||||||
|
?assertMatch(true, is_integer(TS)).
|
||||||
|
|
||||||
-endif.
|
-endif.
|
|
@ -1002,7 +1002,7 @@ plain_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
|
||||||
element(1, R).
|
element(1, R).
|
||||||
|
|
||||||
fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
|
fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
|
||||||
PosList = leveled_pmem:check_index(Hash, L0Index),
|
PosList = leveled_pmem:check_index(Hash, L0Index),
|
||||||
L0Check = leveled_pmem:check_levelzero(Key, Hash, PosList, L0Cache),
|
L0Check = leveled_pmem:check_levelzero(Key, Hash, PosList, L0Cache),
|
||||||
case L0Check of
|
case L0Check of
|
||||||
{false, not_found} ->
|
{false, not_found} ->
|
||||||
|
|
|
@ -1503,11 +1503,11 @@ generate_indexkeys(Count, IndexList) ->
|
||||||
|
|
||||||
generate_indexkey(Term, Count) ->
|
generate_indexkey(Term, Count) ->
|
||||||
IndexSpecs = [{add, "t1_int", Term}],
|
IndexSpecs = [{add, "t1_int", Term}],
|
||||||
leveled_codec:convert_indexspecs(IndexSpecs,
|
leveled_codec:idx_indexspecs(IndexSpecs,
|
||||||
"Bucket",
|
"Bucket",
|
||||||
"Key" ++ integer_to_list(Count),
|
"Key" ++ integer_to_list(Count),
|
||||||
Count,
|
Count,
|
||||||
infinity).
|
infinity).
|
||||||
|
|
||||||
form_slot_test() ->
|
form_slot_test() ->
|
||||||
% If a skip key happens, mustn't switch to loookup by accident as could be
|
% If a skip key happens, mustn't switch to loookup by accident as could be
|
||||||
|
|
|
@ -97,17 +97,7 @@ new_tree(TreeID) ->
|
||||||
new_tree(TreeID, small).
|
new_tree(TreeID, small).
|
||||||
|
|
||||||
new_tree(TreeID, Size) ->
|
new_tree(TreeID, Size) ->
|
||||||
{BitWidth, Width, SegmentCount} =
|
{BitWidth, Width, SegmentCount} = get_size(Size),
|
||||||
case Size of
|
|
||||||
small ->
|
|
||||||
?SMALL;
|
|
||||||
medium ->
|
|
||||||
?MEDIUM;
|
|
||||||
large ->
|
|
||||||
?LARGE;
|
|
||||||
xlarge ->
|
|
||||||
?XLARGE
|
|
||||||
end,
|
|
||||||
Lv1Width = Width * ?HASH_SIZE * 8,
|
Lv1Width = Width * ?HASH_SIZE * 8,
|
||||||
Lv1Init = <<0:Lv1Width/integer>>,
|
Lv1Init = <<0:Lv1Width/integer>>,
|
||||||
Lv2SegBinSize = Width * ?HASH_SIZE * 8,
|
Lv2SegBinSize = Width * ?HASH_SIZE * 8,
|
||||||
|
@ -236,14 +226,31 @@ merge_trees(TreeA, TreeB) ->
|
||||||
|
|
||||||
MergedTree#tictactree{level1 = NewLevel1, level2 = NewLevel2}.
|
MergedTree#tictactree{level1 = NewLevel1, level2 = NewLevel2}.
|
||||||
|
|
||||||
get_segment(Key, SegmentCount) ->
|
-spec get_segment(any(), integer()|small|medium|large|xlarge) -> integer().
|
||||||
erlang:phash2(Key) band (SegmentCount - 1).
|
%% @doc
|
||||||
|
%% Return the segment ID for a Key. Can pass the tree size or the actual
|
||||||
|
%% segment count derived from the size
|
||||||
|
get_segment(Key, SegmentCount) when is_integer(SegmentCount) ->
|
||||||
|
erlang:phash2(Key) band (SegmentCount - 1);
|
||||||
|
get_segment(Key, TreeSize) ->
|
||||||
|
get_segment(Key, element(3, get_size(TreeSize))).
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
|
get_size(Size) ->
|
||||||
|
case Size of
|
||||||
|
small ->
|
||||||
|
?SMALL;
|
||||||
|
medium ->
|
||||||
|
?MEDIUM;
|
||||||
|
large ->
|
||||||
|
?LARGE;
|
||||||
|
xlarge ->
|
||||||
|
?XLARGE
|
||||||
|
end.
|
||||||
|
|
||||||
segmentcompare(SrcBin, SinkBin) when byte_size(SrcBin)==byte_size(SinkBin) ->
|
segmentcompare(SrcBin, SinkBin) when byte_size(SrcBin)==byte_size(SinkBin) ->
|
||||||
segmentcompare(SrcBin, SinkBin, [], 0).
|
segmentcompare(SrcBin, SinkBin, [], 0).
|
||||||
|
|
||||||
|
|
|
@ -374,7 +374,8 @@ set_object(Bucket, Key, Value, IndexGen, Indexes2Remove) ->
|
||||||
{remove, IdxF, IdxV} end,
|
{remove, IdxF, IdxV} end,
|
||||||
Indexes2Remove),
|
Indexes2Remove),
|
||||||
[{"MDK", "MDV" ++ Key},
|
[{"MDK", "MDV" ++ Key},
|
||||||
{"MDK2", "MDV" ++ Key}]},
|
{"MDK2", "MDV" ++ Key},
|
||||||
|
{?MD_LASTMOD, os:timestamp()}]},
|
||||||
{B1, K1, V1, Spec1, MD} = Obj,
|
{B1, K1, V1, Spec1, MD} = Obj,
|
||||||
Content = #r_content{metadata=dict:from_list(MD), value=V1},
|
Content = #r_content{metadata=dict:from_list(MD), value=V1},
|
||||||
{#r_object{bucket=B1,
|
{#r_object{bucket=B1,
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue