Merge pull request #200 from martinsumner/mas-i198-recentaae

Mas i198 recentaae
This commit is contained in:
Martin Sumner 2018-11-04 20:51:13 +00:00 committed by GitHub
commit 37cdb22979
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 1526 additions and 1456 deletions

View file

@ -82,7 +82,8 @@
book_objectfold/5,
book_objectfold/6,
book_headfold/6,
book_headfold/7
book_headfold/7,
book_headfold/9
]).
-export([empty_ledgercache/0,
@ -104,7 +105,6 @@
-define(JOURNAL_SIZE_JITTER, 20).
-define(ABSOLUTEMAX_JOURNALSIZE, 4000000000).
-define(LONG_RUNNING, 80000).
-define(RECENT_AAE, false).
-define(COMPRESSION_METHOD, lz4).
-define(COMPRESSION_POINT, on_receipt).
-define(TIMING_SAMPLESIZE, 100).
@ -112,13 +112,13 @@
-define(DUMMY, dummy). % Dummy key used for mput operations
-define(MAX_KEYCHECK_FREQUENCY, 100).
-define(MIN_KEYCHECK_FREQUENCY, 1).
-define(OPEN_LASTMOD_RANGE, {0, infinity}).
-define(OPTION_DEFAULTS,
[{root_path, undefined},
{snapshot_bookie, undefined},
{cache_size, ?CACHE_SIZE},
{max_journalsize, 1000000000},
{sync_strategy, none},
{recent_aae, ?RECENT_AAE},
{head_only, false},
{waste_retention_period, undefined},
{max_run_length, undefined},
@ -140,7 +140,6 @@
-record(state, {inker :: pid() | undefined,
penciller :: pid() | undefined,
cache_size :: integer() | undefined,
recent_aae :: recent_aae(),
ledger_cache = #ledger_cache{},
is_snapshot :: boolean() | undefined,
slow_offer = false :: boolean(),
@ -186,10 +185,7 @@
-type fold_timings() :: no_timing|#fold_timings{}.
-type head_timings() :: no_timing|#head_timings{}.
-type timing_types() :: head|get|put|fold.
-type recent_aae() :: false|#recent_aae{}|undefined.
-type key() :: binary()|string()|{binary(), binary()}.
% Keys SHOULD be binary()
% string() support is a legacy of old tests
-type open_options() ::
%% For full description of options see ../docs/STARTUP_OPTIONS.md
[{root_path, string()|undefined} |
@ -220,12 +216,6 @@
% riak_sync is used for backwards compatability with OTP16 - and
% will manually call sync() after each write (rather than use the
% O_SYNC option on startup
{recent_aae, false|{atom(), list(), integer(), integer()}} |
% DEPRECATED
% Before working on kv_index_tictactree looked at the possibility
% of maintaining AAE just for recent changes. Given the efficiency
% of the kv_index_tictactree approach this is unecessary.
% Should be set to false
{head_only, false|with_lookup|no_lookup} |
% When set to true, there are three fundamental changes as to how
% leveled will work:
@ -310,7 +300,6 @@
% Defaults to ?COMPRESSION_POINT
].
-export_type([key/0]).
%%%============================================================================
@ -371,7 +360,7 @@ book_plainstart(Opts) ->
gen_server:start(?MODULE, [set_defaults(Opts)], []).
-spec book_tempput(pid(), key(), key(), any(),
-spec book_tempput(pid(), leveled_codec:key(), leveled_codec:key(), any(),
leveled_codec:index_specs(),
leveled_codec:tag(), integer()) -> ok|pause.
@ -438,7 +427,7 @@ book_put(Pid, Bucket, Key, Object, IndexSpecs) ->
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag) ->
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, infinity).
-spec book_put(pid(), key(), key(), any(),
-spec book_put(pid(), leveled_codec:key(), leveled_codec:key(), any(),
leveled_codec:index_specs(),
leveled_codec:tag(), infinity|integer()) -> ok|pause.
@ -448,7 +437,7 @@ book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) ->
infinity).
-spec book_mput(pid(), list(tuple())) -> ok|pause.
-spec book_mput(pid(), list(leveled_codec:object_spec())) -> ok|pause.
%% @doc
%%
%% When the store is being run in head_only mode, batches of object specs may
@ -461,7 +450,8 @@ book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) ->
book_mput(Pid, ObjectSpecs) ->
book_mput(Pid, ObjectSpecs, infinity).
-spec book_mput(pid(), list(tuple()), infinity|integer()) -> ok|pause.
-spec book_mput(pid(), list(leveled_codec:object_spec()), infinity|integer())
-> ok|pause.
%% @doc
%%
%% When the store is being run in head_only mode, batches of object specs may
@ -474,8 +464,9 @@ book_mput(Pid, ObjectSpecs) ->
book_mput(Pid, ObjectSpecs, TTL) ->
gen_server:call(Pid, {mput, ObjectSpecs, TTL}, infinity).
-spec book_delete(pid(), key(), key(), leveled_codec:index_specs())
-> ok|pause.
-spec book_delete(pid(),
leveled_codec:key(), leveled_codec:key(),
leveled_codec:index_specs()) -> ok|pause.
%% @doc
%%
@ -486,11 +477,15 @@ book_delete(Pid, Bucket, Key, IndexSpecs) ->
book_put(Pid, Bucket, Key, delete, IndexSpecs, ?STD_TAG).
-spec book_get(pid(), key(), key(), leveled_codec:tag())
-spec book_get(pid(),
leveled_codec:key(), leveled_codec:key(), leveled_codec:tag())
-> {ok, any()}|not_found.
-spec book_head(pid(), key(), key(), leveled_codec:tag())
-spec book_head(pid(),
leveled_codec:key(), leveled_codec:key(), leveled_codec:tag())
-> {ok, any()}|not_found.
-spec book_headonly(pid(),
leveled_codec:key(), leveled_codec:key(), leveled_codec:key())
-> {ok, any()}|not_found.
-spec book_headonly(pid(), key(), key(), key()) -> {ok, any()}|not_found.
%% @doc - GET and HEAD requests
%%
@ -869,8 +864,9 @@ book_objectfold(Pid, Tag, Bucket, Limiter, FoldAccT, SnapPreFold) ->
SegmentList :: false | list(integer()),
Runner :: fun(() -> Acc).
book_headfold(Pid, Tag, FoldAccT, JournalCheck, SnapPreFold, SegmentList) ->
RunnerType = {foldheads_allkeys, Tag, FoldAccT, JournalCheck, SnapPreFold, SegmentList},
book_returnfolder(Pid, RunnerType).
book_headfold(Pid, Tag, all,
FoldAccT, JournalCheck, SnapPreFold,
SegmentList, false, false).
%% @doc as book_headfold/6, but with the addition of a `Limiter' that
%% restricts the set of objects folded over. `Limiter' can either be a
@ -902,11 +898,61 @@ book_headfold(Pid, Tag, FoldAccT, JournalCheck, SnapPreFold, SegmentList) ->
SnapPreFold :: boolean(),
SegmentList :: false | list(integer()),
Runner :: fun(() -> Acc).
book_headfold(Pid, Tag, {bucket_list, BucketList}, FoldAccT, JournalCheck, SnapPreFold, SegmentList) ->
RunnerType = {foldheads_bybucket, Tag, BucketList, bucket_list, FoldAccT, JournalCheck, SnapPreFold, SegmentList},
book_headfold(Pid, Tag, Limiter, FoldAccT, JournalCheck, SnapPreFold, SegmentList) ->
book_headfold(Pid, Tag, Limiter,
FoldAccT, JournalCheck, SnapPreFold,
SegmentList, false, false).
%% @doc as book_headfold/7, but with the addition of a Last Modified Date
%% Range and Max Object Count. For version 2 objects this will filter out
%% all objects with a highest Last Modified Date that is outside of the range.
%% All version 1 objects will be included in the result set regardless of Last
%% Modified Date.
%% The Max Object Count will stop the fold once the count has been reached on
%% this store only. The Max Object Count if provided will mean that the runner
%% will return {RemainingCount, Acc} not just Acc
-spec book_headfold(pid(), Tag, Limiter, FoldAccT, JournalCheck, SnapPreFold,
SegmentList, LastModRange, MaxObjectCount) ->
{async, Runner} when
Tag :: leveled_codec:tag(),
Limiter :: BucketList | BucketKeyRange | all,
BucketList :: {bucket_list, list(Bucket)},
BucketKeyRange :: {range, Bucket, KeyRange},
KeyRange :: {StartKey, EndKey} | all,
StartKey :: Key,
EndKey :: Key,
FoldAccT :: {FoldFun, Acc},
FoldFun :: fun((Bucket, Key, Value, Acc) -> Acc),
Acc :: term(),
Bucket :: term(),
Key :: term(),
Value :: term(),
JournalCheck :: boolean(),
SnapPreFold :: boolean(),
SegmentList :: false | list(integer()),
LastModRange :: false | leveled_codec:lastmod_range(),
MaxObjectCount :: false | pos_integer(),
Runner :: fun(() -> ResultingAcc),
ResultingAcc :: Acc | {non_neg_integer(), Acc}.
book_headfold(Pid, Tag, {bucket_list, BucketList}, FoldAccT, JournalCheck, SnapPreFold,
SegmentList, LastModRange, MaxObjectCount) ->
RunnerType =
{foldheads_bybucket, Tag, BucketList, bucket_list, FoldAccT,
JournalCheck, SnapPreFold,
SegmentList, LastModRange, MaxObjectCount},
book_returnfolder(Pid, RunnerType);
book_headfold(Pid, Tag, {range, Bucket, KeyRange}, FoldAccT, JournalCheck, SnapPreFold, SegmentList) ->
RunnerType = {foldheads_bybucket, Tag, Bucket, KeyRange, FoldAccT, JournalCheck, SnapPreFold, SegmentList},
book_headfold(Pid, Tag, {range, Bucket, KeyRange}, FoldAccT, JournalCheck, SnapPreFold,
SegmentList, LastModRange, MaxObjectCount) ->
RunnerType =
{foldheads_bybucket, Tag, Bucket, KeyRange, FoldAccT,
JournalCheck, SnapPreFold,
SegmentList, LastModRange, MaxObjectCount},
book_returnfolder(Pid, RunnerType);
book_headfold(Pid, Tag, all, FoldAccT, JournalCheck, SnapPreFold,
SegmentList, LastModRange, MaxObjectCount) ->
RunnerType = {foldheads_allkeys, Tag, FoldAccT,
JournalCheck, SnapPreFold,
SegmentList, LastModRange, MaxObjectCount},
book_returnfolder(Pid, RunnerType).
-spec book_snapshot(pid(),
@ -1008,16 +1054,6 @@ init([Opts]) ->
ConfiguredCacheSize div (100 div ?CACHE_SIZE_JITTER),
CacheSize =
ConfiguredCacheSize + erlang:phash2(self()) rem CacheJitter,
RecentAAE =
case proplists:get_value(recent_aae, Opts) of
false ->
false;
{FilterType, BucketList, LimitMinutes, UnitMinutes} ->
#recent_aae{filter = FilterType,
buckets = BucketList,
limit_minutes = LimitMinutes,
unit_minutes = UnitMinutes}
end,
{HeadOnly, HeadLookup} =
case proplists:get_value(head_only, Opts) of
@ -1030,7 +1066,6 @@ init([Opts]) ->
end,
State0 = #state{cache_size=CacheSize,
recent_aae=RecentAAE,
is_snapshot=false,
head_only=HeadOnly,
head_lookup = HeadLookup},
@ -1137,7 +1172,7 @@ handle_call({get, Bucket, Key, Tag}, _From, State)
not_found;
Head ->
{Seqn, Status, _MH, _MD} =
leveled_codec:striphead_to_details(Head),
leveled_codec:striphead_to_v1details(Head),
case Status of
tomb ->
not_found;
@ -1186,7 +1221,7 @@ handle_call({head, Bucket, Key, Tag}, _From, State)
not_present ->
{not_found, State#state.ink_checking};
Head ->
case leveled_codec:striphead_to_details(Head) of
case leveled_codec:striphead_to_v1details(Head) of
{_SeqN, tomb, _MH, _MD} ->
{not_found, State#state.ink_checking};
{SeqN, {active, TS}, _MH, MD} ->
@ -1574,12 +1609,14 @@ get_runner(State, {keylist, Tag, Bucket, KeyRange, FoldAccT, TermRegex}) ->
get_runner(State,
{foldheads_allkeys,
Tag, FoldFun,
JournalCheck, SnapPreFold, SegmentList}) ->
JournalCheck, SnapPreFold, SegmentList,
LastModRange, MaxObjectCount}) ->
SnapType = snaptype_by_presence(JournalCheck),
SnapFun = return_snapfun(State, SnapType, no_lookup, true, SnapPreFold),
leveled_runner:foldheads_allkeys(SnapFun,
Tag, FoldFun,
JournalCheck, SegmentList);
JournalCheck, SegmentList,
LastModRange, MaxObjectCount);
get_runner(State,
{foldobjects_allkeys, Tag, FoldFun, SnapPreFold}) ->
get_runner(State,
@ -1597,7 +1634,8 @@ get_runner(State,
Tag,
BucketList, bucket_list,
FoldFun,
JournalCheck, SnapPreFold, SegmentList}) ->
JournalCheck, SnapPreFold,
SegmentList, LastModRange, MaxObjectCount}) ->
KeyRangeFun =
fun(Bucket) ->
{StartKey, EndKey, _} = return_ledger_keyrange(Tag, Bucket, all),
@ -1609,13 +1647,16 @@ get_runner(State,
Tag,
lists:map(KeyRangeFun, BucketList),
FoldFun,
JournalCheck, SegmentList);
JournalCheck,
SegmentList,
LastModRange, MaxObjectCount);
get_runner(State,
{foldheads_bybucket,
Tag,
Bucket, KeyRange,
FoldFun,
JournalCheck, SnapPreFold, SegmentList}) ->
JournalCheck, SnapPreFold,
SegmentList, LastModRange, MaxObjectCount}) ->
{StartKey, EndKey, SnapQ} = return_ledger_keyrange(Tag, Bucket, KeyRange),
SnapType = snaptype_by_presence(JournalCheck),
SnapFun = return_snapfun(State, SnapType, SnapQ, true, SnapPreFold),
@ -1623,7 +1664,9 @@ get_runner(State,
Tag,
[{StartKey, EndKey}],
FoldFun,
JournalCheck, SegmentList);
JournalCheck,
SegmentList,
LastModRange, MaxObjectCount);
get_runner(State,
{foldobjects_bybucket,
Tag, Bucket, KeyRange,
@ -1926,14 +1969,12 @@ preparefor_ledgercache(?INKT_KEYD,
{no_lookup, SQN, KeyChanges};
preparefor_ledgercache(_InkTag,
LedgerKey, SQN, Obj, Size, {IdxSpecs, TTL},
State) ->
{Bucket, Key, MetaValue, {KeyH, ObjH}, LastMods} =
_State) ->
{Bucket, Key, MetaValue, {KeyH, _ObjH}, _LastMods} =
leveled_codec:generate_ledgerkv(LedgerKey, SQN, Obj, Size, TTL),
KeyChanges =
[{LedgerKey, MetaValue}] ++
leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL) ++
leveled_codec:aae_indexspecs(State#state.recent_aae,
Bucket, Key, SQN, ObjH, LastMods),
leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL),
{KeyH, SQN, KeyChanges}.
@ -2449,7 +2490,7 @@ foldobjects_vs_hashtree_testto() ->
{foldheads_allkeys,
?STD_TAG,
FoldHeadsFun,
true, true, false}),
true, true, false, false, false}),
KeyHashList3 = HTFolder3(),
?assertMatch(KeyHashList1, lists:usort(KeyHashList3)),
@ -2468,7 +2509,7 @@ foldobjects_vs_hashtree_testto() ->
{foldheads_allkeys,
?STD_TAG,
FoldHeadsFun2,
false, false, false}),
false, false, false, false, false}),
KeyHashList4 = HTFolder4(),
?assertMatch(KeyHashList1, lists:usort(KeyHashList4)),
@ -2544,7 +2585,8 @@ folder_cache_test(CacheSize) ->
"BucketA",
all,
FoldHeadsFun,
true, true, false}),
true, true,
false, false, false}),
KeyHashList2A = HTFolder2A(),
{async, HTFolder2B} =
book_returnfolder(Bookie1,
@ -2553,7 +2595,8 @@ folder_cache_test(CacheSize) ->
"BucketB",
all,
FoldHeadsFun,
true, false, false}),
true, false,
false, false, false}),
KeyHashList2B = HTFolder2B(),
?assertMatch(true,
@ -2568,7 +2611,8 @@ folder_cache_test(CacheSize) ->
"BucketB",
{"Key", <<"$all">>},
FoldHeadsFun,
true, false, false}),
true, false,
false, false, false}),
KeyHashList2C = HTFolder2C(),
{async, HTFolder2D} =
book_returnfolder(Bookie1,
@ -2577,7 +2621,8 @@ folder_cache_test(CacheSize) ->
"BucketB",
{"Key", "Keyzzzzz"},
FoldHeadsFun,
true, true, false}),
true, true,
false, false, false}),
KeyHashList2D = HTFolder2D(),
?assertMatch(true,
lists:usort(KeyHashList2B) == lists:usort(KeyHashList2C)),
@ -2597,7 +2642,8 @@ folder_cache_test(CacheSize) ->
"BucketB",
{"Key", SplitIntEnd},
FoldHeadsFun,
true, false, false}),
true, false,
false, false, false}),
KeyHashList2E = HTFolder2E(),
{async, HTFolder2F} =
book_returnfolder(Bookie1,
@ -2606,7 +2652,8 @@ folder_cache_test(CacheSize) ->
"BucketB",
{SplitIntStart, "Key|"},
FoldHeadsFun,
true, false, false}),
true, false,
false, false, false}),
KeyHashList2F = HTFolder2F(),
?assertMatch(true, length(KeyHashList2E) > 0),

View file

@ -37,8 +37,8 @@
strip_to_seqonly/1,
strip_to_statusonly/1,
strip_to_keyseqonly/1,
strip_to_seqnhashonly/1,
striphead_to_details/1,
strip_to_indexdetails/1,
striphead_to_v1details/1,
is_active/3,
endkey_passed/2,
key_dominates/2,
@ -63,7 +63,6 @@
get_keyandobjhash/2,
idx_indexspecs/5,
obj_objectspecs/3,
aae_indexspecs/6,
riak_extract_metadata/2,
segment_hash/1,
to_lookup/1,
@ -74,9 +73,7 @@
-define(MAGIC, 53). % riak_kv -> riak_object
-define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w").
-define(NRT_IDX, "$aae.").
-define(ALL_BUCKETS, <<"$all">>).
-type recent_aae() :: #recent_aae{}.
-type riak_metadata() :: {binary()|delete, % Sibling Metadata
binary()|null, % Vclock Metadata
integer()|null, % Hash of vclock - non-exportable
@ -84,14 +81,35 @@
-type tag() ::
?STD_TAG|?RIAK_TAG|?IDX_TAG|?HEAD_TAG.
-type key() ::
binary()|string()|{binary(), binary()}.
% Keys SHOULD be binary()
% string() support is a legacy of old tests
-type sqn() ::
% SQN of the object in the Journal
pos_integer().
-type segment_hash() ::
% hash of the key to an aae segment - to be used in ledger filters
{integer(), integer()}|no_lookup.
-type metadata() ::
tuple()|null. % null for empty metadata
-type last_moddate() ::
% modified date as determined by the object (not this store)
% if the object has siblings in the store will be the maximum of those
% dates
integer()|undefined.
-type lastmod_range() :: {integer(), pos_integer()|infinity}.
-type ledger_status() ::
tomb|{active, non_neg_integer()|infinity}.
-type ledger_key() ::
{tag(), any(), any(), any()}|all.
-type ledger_value() ::
{integer(), ledger_status(), segment_hash(), tuple()|null}.
ledger_value_v1()|ledger_value_v2().
-type ledger_value_v1() ::
{sqn(), ledger_status(), segment_hash(), metadata()}.
-type ledger_value_v2() ::
{sqn(), ledger_status(), segment_hash(), metadata(), last_moddate()}.
-type ledger_kv() ::
{ledger_key(), ledger_value()}.
-type compaction_strategy() ::
@ -100,18 +118,29 @@
?INKT_STND|?INKT_TOMB|?INKT_MPUT|?INKT_KEYD.
-type journal_key() ::
{integer(), journal_key_tag(), ledger_key()}.
-type object_spec_v0() ::
{add|remove, key(), key(), key()|null, any()}.
-type object_spec_v1() ::
{add|remove, v1, key(), key(), key()|null,
list(erlang:timestamp())|undefined, any()}.
-type object_spec() ::
object_spec_v0()|object_spec_v1().
-type compression_method() ::
lz4|native.
-type index_specs() ::
list({add|remove, any(), any()}).
-type journal_keychanges() ::
{index_specs(), infinity|integer()}. % {KeyChanges, TTL}
-type maybe_lookup() ::
lookup|no_lookup.
-type segment_list()
:: list(integer())|false.
-export_type([tag/0,
key/0,
object_spec/0,
segment_hash/0,
ledger_status/0,
ledger_key/0,
@ -123,7 +152,10 @@
compression_method/0,
journal_keychanges/0,
index_specs/0,
segment_list/0]).
segment_list/0,
maybe_lookup/0,
last_moddate/0,
lastmod_range/0]).
%%%============================================================================
@ -152,7 +184,7 @@ segment_hash(Key) ->
segment_hash(term_to_binary(Key)).
-spec to_lookup(ledger_key()) -> lookup|no_lookup.
-spec to_lookup(ledger_key()) -> maybe_lookup().
%% @doc
%% Should it be possible to lookup a key in the merge tree. This is not true
%% For keys that should only be read through range queries. Direct lookup
@ -170,36 +202,41 @@ to_lookup(Key) ->
%% Some helper functions to get a sub_components of the key/value
-spec strip_to_statusonly(ledger_kv()) -> ledger_status().
strip_to_statusonly({_, {_, St, _, _}}) -> St.
strip_to_statusonly({_, V}) -> element(2, V).
-spec strip_to_seqonly(ledger_kv()) -> non_neg_integer().
strip_to_seqonly({_, {SeqN, _, _, _}}) -> SeqN.
strip_to_seqonly({_, V}) -> element(1, V).
-spec strip_to_keyseqonly(ledger_kv()) -> {ledger_key(), integer()}.
strip_to_keyseqonly({LK, {SeqN, _, _, _}}) -> {LK, SeqN}.
strip_to_keyseqonly({LK, V}) -> {LK, element(1, V)}.
-spec strip_to_seqnhashonly(ledger_kv()) -> {integer(), segment_hash()}.
strip_to_seqnhashonly({_, {SeqN, _, MH, _}}) -> {SeqN, MH}.
-spec strip_to_indexdetails(ledger_kv()) ->
{integer(), segment_hash(), last_moddate()}.
strip_to_indexdetails({_, V}) when tuple_size(V) == 4 ->
% A v1 value
{element(1, V), element(3, V), undefined};
strip_to_indexdetails({_, V}) when tuple_size(V) > 4 ->
% A v2 value should have a fith element - Last Modified Date
{element(1, V), element(3, V), element(5, V)}.
-spec striphead_to_details(ledger_value()) -> ledger_value().
striphead_to_details({SeqN, St, MH, MD}) -> {SeqN, St, MH, MD}.
-spec striphead_to_v1details(ledger_value()) -> ledger_value().
striphead_to_v1details(V) ->
{element(1, V), element(2, V), element(3, V), element(4, V)}.
-spec key_dominates(ledger_kv(), ledger_kv()) ->
left_hand_first|right_hand_first|left_hand_dominant|right_hand_dominant.
%% @doc
%% When comparing two keys in the ledger need to find if one key comes before
%% the other, or if the match, which key is "better" and should be the winner
key_dominates(LeftKey, RightKey) ->
case {LeftKey, RightKey} of
{{LK, _LVAL}, {RK, _RVAL}} when LK < RK ->
left_hand_first;
{{LK, _LVAL}, {RK, _RVAL}} when RK < LK ->
right_hand_first;
{{LK, {LSN, _LST, _LMH, _LMD}}, {RK, {RSN, _RST, _RMH, _RMD}}}
when LK == RK, LSN >= RSN ->
key_dominates({LK, _LVAL}, {RK, _RVAL}) when LK < RK ->
left_hand_first;
key_dominates({LK, _LVAL}, {RK, _RVAL}) when RK < LK ->
right_hand_first;
key_dominates(LObj, RObj) ->
case strip_to_seqonly(LObj) >= strip_to_seqonly(RObj) of
true ->
left_hand_dominant;
{{LK, {LSN, _LST, _LMH, _LMD}}, {RK, {RSN, _RST, _RMH, _RMD}}}
when LK == RK, LSN < RSN ->
false ->
right_hand_dominant
end.
@ -249,8 +286,6 @@ from_ledgerkey(_ExpectedTag, _OtherKey) ->
-spec from_ledgerkey(tuple()) -> tuple().
%% @doc
%% Return identifying information from the LedgerKey
from_ledgerkey({?IDX_TAG, ?ALL_BUCKETS, {_IdxFld, IdxVal}, {Bucket, Key}}) ->
{Bucket, Key, IdxVal};
from_ledgerkey({?IDX_TAG, Bucket, {_IdxFld, IdxVal}, Key}) ->
{Bucket, Key, IdxVal};
from_ledgerkey({?HEAD_TAG, Bucket, Key, SubKey}) ->
@ -528,9 +563,7 @@ hash(Obj) ->
%% @doc
%% Convert object specs to KV entries ready for the ledger
obj_objectspecs(ObjectSpecs, SQN, TTL) ->
lists:map(fun({IdxOp, Bucket, Key, SubKey, Value}) ->
gen_headspec(Bucket, Key, IdxOp, SubKey, Value, SQN, TTL)
end,
lists:map(fun(ObjectSpec) -> gen_headspec(ObjectSpec, SQN, TTL) end,
ObjectSpecs).
-spec idx_indexspecs(index_specs(),
@ -548,27 +581,25 @@ idx_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) ->
gen_indexspec(Bucket, Key, IdxOp, IdxField, IdxTerm, SQN, TTL) ->
Status = set_status(IdxOp, TTL),
case Bucket of
{all, RealBucket} ->
{to_ledgerkey(?ALL_BUCKETS,
{RealBucket, Key},
?IDX_TAG,
IdxField,
IdxTerm),
{SQN, Status, no_lookup, null}};
_ ->
{to_ledgerkey(Bucket,
Key,
?IDX_TAG,
IdxField,
IdxTerm),
{SQN, Status, no_lookup, null}}
end.
{to_ledgerkey(Bucket, Key, ?IDX_TAG, IdxField, IdxTerm),
{SQN, Status, no_lookup, null}}.
gen_headspec(Bucket, Key, IdxOp, SubKey, Value, SQN, TTL) ->
-spec gen_headspec(object_spec(), integer(), integer()|infinity) -> ledger_kv().
%% @doc
%% Take an object_spec as passed in a book_mput, and convert it into to a
%% valid ledger key and value. Supports different shaped tuples for different
%% versions of the object_spec
gen_headspec({IdxOp, v1, Bucket, Key, SubKey, LMD, Value}, SQN, TTL) ->
% v1 object spec
Status = set_status(IdxOp, TTL),
K = to_ledgerkey(Bucket, {Key, SubKey}, ?HEAD_TAG),
{K, {SQN, Status, segment_hash(K), Value}}.
{K, {SQN, Status, segment_hash(K), Value, get_last_lastmodification(LMD)}};
gen_headspec({IdxOp, Bucket, Key, SubKey, Value}, SQN, TTL) ->
% v0 object spec
Status = set_status(IdxOp, TTL),
K = to_ledgerkey(Bucket, {Key, SubKey}, ?HEAD_TAG),
{K, {SQN, Status, segment_hash(K), Value, undefined}}.
set_status(add, TTL) ->
@ -577,103 +608,6 @@ set_status(remove, _TTL) ->
%% TODO: timestamps for delayed reaping
tomb.
-spec aae_indexspecs(false|recent_aae(),
any(), any(),
integer(), integer(),
list())
-> list().
%% @doc
%% Generate an additional index term representing the change, if the last
%% modified date for the change is within the definition of recency.
%%
%% The object may have multiple last modified dates (siblings), and in this
%% case index entries for all dates within the range are added.
%%
%% The index should entry auto-expire in the future (when it is no longer
%% relevant to assessing recent changes)
aae_indexspecs(false, _Bucket, _Key, _SQN, _H, _LastMods) ->
[];
aae_indexspecs(_AAE, _Bucket, _Key, _SQN, _H, []) ->
[];
aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods) ->
InList = lists:member(Bucket, AAE#recent_aae.buckets),
Bucket0 =
case AAE#recent_aae.filter of
blacklist ->
case InList of
true ->
false;
false ->
{all, Bucket}
end;
whitelist ->
case InList of
true ->
Bucket;
false ->
false
end
end,
case Bucket0 of
false ->
[];
Bucket0 ->
GenIdxFun =
fun(LMD0, Acc) ->
Dates = parse_date(LMD0,
AAE#recent_aae.unit_minutes,
AAE#recent_aae.limit_minutes,
leveled_util:integer_now()),
case Dates of
no_index ->
Acc;
{LMD1, TTL} ->
TreeSize = AAE#recent_aae.tree_size,
SegID32 = leveled_tictac:keyto_segment32(Key),
SegID =
leveled_tictac:get_segment(SegID32, TreeSize),
IdxFldStr = ?NRT_IDX ++ LMD1 ++ "_bin",
IdxTrmStr =
string:right(integer_to_list(SegID), 8, $0) ++
"." ++
string:right(integer_to_list(H), 8, $0),
{IdxK, IdxV} =
gen_indexspec(Bucket0, Key,
add,
list_to_binary(IdxFldStr),
list_to_binary(IdxTrmStr),
SQN, TTL),
[{IdxK, IdxV}|Acc]
end
end,
lists:foldl(GenIdxFun, [], LastMods)
end.
-spec parse_date(tuple(), integer(), integer(), integer()) ->
no_index|{list(), integer()}.
%% @doc
%% Parse the last modified date and the AAE date configuration to return a
%% binary to be used as the last modified date part of the index, and an
%% integer to be used as the TTL of the index entry.
%% Return no_index if the change is not recent.
parse_date(LMD, UnitMins, LimitMins, Now) ->
LMDsecs = leveled_util:integer_time(LMD),
Recent = (LMDsecs + LimitMins * 60) > Now,
case Recent of
false ->
no_index;
true ->
{{Y, M, D}, {Hour, Minute, _Second}} =
calendar:now_to_datetime(LMD),
RoundMins =
UnitMins * (Minute div UnitMins),
StrTime =
lists:flatten(io_lib:format(?LMD_FORMAT,
[Y, M, D, Hour, RoundMins])),
TTL = min(Now, LMDsecs) + (LimitMins + UnitMins) * 60,
{StrTime, TTL}
end.
-spec generate_ledgerkv(
tuple(), integer(), any(), integer(), tuple()|infinity) ->
{any(), any(), any(),
@ -705,9 +639,23 @@ generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) ->
Value = {SQN,
Status,
Hash,
MD},
MD,
get_last_lastmodification(LastMods)},
{Bucket, Key, Value, {Hash, ObjHash}, LastMods}.
-spec get_last_lastmodification(list(erlang:timestamp())|undefined)
-> pos_integer()|undefined.
%% @doc
%% Get the highest of the last modifications measured in seconds. This will be
%% stored as 4 bytes (unsigned) so will last for another 80 + years
get_last_lastmodification(undefined) ->
undefined;
get_last_lastmodification([]) ->
undefined;
get_last_lastmodification(LastMods) ->
{Mega, Sec, _Micro} = lists:max(LastMods),
Mega * 1000000 + Sec.
extract_metadata(Obj, Size, ?RIAK_TAG) ->
riak_extract_metadata(Obj, Size);
@ -716,7 +664,7 @@ extract_metadata(Obj, Size, ?STD_TAG) ->
get_size(PK, Value) ->
{Tag, _Bucket, _Key, _} = PK,
{_, _, _, MD} = Value,
MD = element(4, Value),
case Tag of
?RIAK_TAG ->
{_RMD, _VC, _Hash, Size} = MD,
@ -733,7 +681,7 @@ get_size(PK, Value) ->
%% the sorted vclock)
get_keyandobjhash(LK, Value) ->
{Tag, Bucket, Key, _} = LK,
{_, _, _, MD} = Value,
MD = element(4, Value),
case Tag of
?IDX_TAG ->
from_ledgerkey(LK); % returns {Bucket, Key, IdxValue}
@ -927,95 +875,6 @@ hashperf_test() ->
io:format(user, "1000 object hashes in ~w microseconds~n",
[timer:now_diff(os:timestamp(), SW)]).
parsedate_test() ->
{MeS, S, MiS} = os:timestamp(),
timer:sleep(100),
Now = leveled_util:integer_now(),
UnitMins = 5,
LimitMins = 60,
PD = parse_date({MeS, S, MiS}, UnitMins, LimitMins, Now),
io:format("Parsed Date ~w~n", [PD]),
?assertMatch(true, is_tuple(PD)),
check_pd(PD, UnitMins),
CheckFun =
fun(Offset) ->
ModDate = {MeS, S + Offset * 60, MiS},
check_pd(parse_date(ModDate, UnitMins, LimitMins, Now), UnitMins)
end,
lists:foreach(CheckFun, lists:seq(1, 60)).
check_pd(PD, UnitMins) ->
{LMDstr, _TTL} = PD,
Minutes = list_to_integer(lists:nthtail(10, LMDstr)),
?assertMatch(0, Minutes rem UnitMins).
parseolddate_test() ->
LMD = os:timestamp(),
timer:sleep(100),
Now = leveled_util:integer_now() + 60 * 60,
UnitMins = 5,
LimitMins = 60,
PD = parse_date(LMD, UnitMins, LimitMins, Now),
io:format("Parsed Date ~w~n", [PD]),
?assertMatch(no_index, PD).
genaaeidx_test() ->
AAE = #recent_aae{filter=blacklist,
buckets=[],
limit_minutes=60,
unit_minutes=5},
Bucket = <<"Bucket1">>,
Key = <<"Key1">>,
SQN = 1,
H = erlang:phash2(null),
LastMods = [os:timestamp(), os:timestamp()],
AAESpecs = aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods),
?assertMatch(2, length(AAESpecs)),
LastMods1 = [os:timestamp()],
AAESpecs1 = aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods1),
?assertMatch(1, length(AAESpecs1)),
IdxB = element(2, element(1, lists:nth(1, AAESpecs1))),
io:format(user, "AAE IDXSpecs1 ~w~n", [AAESpecs1]),
?assertMatch(<<"$all">>, IdxB),
LastMods0 = [],
AAESpecs0 = aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods0),
?assertMatch(0, length(AAESpecs0)),
AAE0 = AAE#recent_aae{filter=whitelist,
buckets=[<<"Bucket0">>]},
AAESpecsB0 = aae_indexspecs(AAE0, Bucket, Key, SQN, H, LastMods1),
?assertMatch(0, length(AAESpecsB0)),
AAESpecsB1 = aae_indexspecs(AAE0, <<"Bucket0">>, Key, SQN, H, LastMods1),
?assertMatch(1, length(AAESpecsB1)),
[{{?IDX_TAG, <<"Bucket0">>, {Fld, Term}, <<"Key1">>},
{SQN, {active, TS}, no_lookup, null}}] = AAESpecsB1,
?assertMatch(true, is_integer(TS)),
?assertMatch(17, length(binary_to_list(Term))),
?assertMatch("$aae.", lists:sublist(binary_to_list(Fld), 5)),
AAE1 = AAE#recent_aae{filter=blacklist,
buckets=[<<"Bucket0">>]},
AAESpecsB2 = aae_indexspecs(AAE1, <<"Bucket0">>, Key, SQN, H, LastMods1),
?assertMatch(0, length(AAESpecsB2)).
delayedupdate_aaeidx_test() ->
AAE = #recent_aae{filter=blacklist,
buckets=[],
limit_minutes=60,
unit_minutes=5},
Bucket = <<"Bucket1">>,
Key = <<"Key1">>,
SQN = 1,
H = erlang:phash2(null),
{Mega, Sec, MSec} = os:timestamp(),
LastMods = [{Mega -1, Sec, MSec}],
AAESpecs = aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods),
?assertMatch(0, length(AAESpecs)).
head_segment_compare_test() ->
% Reminder to align native and parallel(leveled_ko) key stores for
% kv_index_tictactree
@ -1025,4 +884,13 @@ head_segment_compare_test() ->
?assertMatch(H1, H2),
?assertMatch(H1, H3).
headspec_v0v1_test() ->
% A v0 object spec generates the same outcome as a v1 object spec with the
% last modified date undefined
V1 = {add, v1, <<"B">>, <<"K">>, <<"SK">>, undefined, <<"V">>},
V0 = {add, <<"B">>, <<"K">>, <<"SK">>, <<"V">>},
TTL = infinity,
?assertMatch(true, gen_headspec(V0, 1, TTL) == gen_headspec(V1, 1, TTL)).
-endif.

View file

@ -177,7 +177,7 @@
pcl_fetch/4,
pcl_fetchkeys/5,
pcl_fetchkeys/6,
pcl_fetchkeysbysegment/6,
pcl_fetchkeysbysegment/8,
pcl_fetchnextkey/5,
pcl_checksequencenumber/3,
pcl_workforclerk/1,
@ -237,6 +237,7 @@
-define(SNAPSHOT_TIMEOUT_SHORT, 600).
-define(TIMING_SAMPLECOUNTDOWN, 10000).
-define(TIMING_SAMPLESIZE, 100).
-define(OPEN_LASTMOD_RANGE, {0, infinity}).
-record(state, {manifest, % a manifest record from the leveled_manifest module
persisted_sqn = 0 :: integer(), % The highest SQN persisted
@ -248,7 +249,7 @@
levelzero_pending = false :: boolean(),
levelzero_constructor :: pid() | undefined,
levelzero_cache = [] :: list(), % a list of trees
levelzero_cache = [] :: levelzero_cache(),
levelzero_size = 0 :: integer(),
levelzero_maxcachesize :: integer() | undefined,
levelzero_cointoss = false :: boolean(),
@ -293,6 +294,12 @@
integer()}.
-type pcl_state() :: #state{}.
-type pcl_timings() :: no_timing|#pcl_timings{}.
-type levelzero_cacheentry() :: {pos_integer(), levled_tree:leveled_tree()}.
-type levelzero_cache() :: list(levelzero_cacheentry()).
-type iterator_entry()
:: {pos_integer(),
list(leveled_codec:ledger_kv()|leveled_sst:expandable_pointer())}.
-type iterator() :: list(iterator_entry()).
%%%============================================================================
%%% API
@ -405,7 +412,7 @@ pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc, By) ->
{fetch_keys,
StartKey, EndKey,
AccFun, InitAcc,
false, -1,
false, false, -1,
By},
infinity).
@ -414,7 +421,9 @@ pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc, By) ->
leveled_codec:ledger_key(),
leveled_codec:ledger_key(),
fun(), any(),
leveled_codec:segment_list()) -> any().
leveled_codec:segment_list(),
false | leveled_codec:lastmod_range(),
boolean()) -> any().
%% @doc
%% Run a range query between StartKey and EndKey (inclusive). This will cover
%% all keys in the range - so must only be run against snapshots of the
@ -427,12 +436,21 @@ pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc, By) ->
%% Note that segment must be false unless the object Tag supports additional
%% indexing by segment. This cannot be used on ?IDX_TAG and other tags that
%% use the no_lookup hash
pcl_fetchkeysbysegment(Pid, StartKey, EndKey, AccFun, InitAcc, SegmentList) ->
pcl_fetchkeysbysegment(Pid, StartKey, EndKey, AccFun, InitAcc,
SegmentList, LastModRange, LimitByCount) ->
{MaxKeys, InitAcc0} =
case LimitByCount of
true ->
% The passed in accumulator should have the Max Key Count
% as the first element of a tuple with the actual accumulator
InitAcc;
false ->
{-1, InitAcc}
end,
gen_server:call(Pid,
{fetch_keys,
StartKey, EndKey,
AccFun, InitAcc,
SegmentList, -1,
StartKey, EndKey, AccFun, InitAcc0,
SegmentList, LastModRange, MaxKeys,
as_pcl},
infinity).
@ -449,7 +467,7 @@ pcl_fetchnextkey(Pid, StartKey, EndKey, AccFun, InitAcc) ->
{fetch_keys,
StartKey, EndKey,
AccFun, InitAcc,
false, 1,
false, false, 1,
as_pcl},
infinity).
@ -684,10 +702,17 @@ handle_call({check_sqn, Key, Hash, SQN}, _From, State) ->
handle_call({fetch_keys,
StartKey, EndKey,
AccFun, InitAcc,
SegmentList, MaxKeys, By},
SegmentList, LastModRange, MaxKeys, By},
_From,
State=#state{snapshot_fully_loaded=Ready})
when Ready == true ->
LastModRange0 =
case LastModRange of
false ->
?OPEN_LASTMOD_RANGE;
R ->
R
end,
SW = os:timestamp(),
L0AsList =
case State#state.levelzero_astree of
@ -720,7 +745,7 @@ handle_call({fetch_keys,
keyfolder({L0AsList, SSTiter},
{StartKey, EndKey},
{AccFun, InitAcc},
{SegmentList, MaxKeys})
{SegmentList, LastModRange0, MaxKeys})
end,
case By of
as_pcl ->
@ -1375,27 +1400,40 @@ keyfolder(IMMiter, SSTiter, StartKey, EndKey, {AccFun, Acc}) ->
keyfolder({IMMiter, SSTiter},
{StartKey, EndKey},
{AccFun, Acc},
{false, -1}).
{false, {0, infinity}, -1}).
keyfolder(_Iterators, _KeyRange, {_AccFun, Acc}, {_SegmentList, MaxKeys})
when MaxKeys == 0 ->
Acc;
keyfolder({[], SSTiter}, KeyRange, {AccFun, Acc}, {SegmentList, MaxKeys}) ->
keyfolder(_Iterators, _KeyRange, {_AccFun, Acc},
{_SegmentList, _LastModRange, MaxKeys}) when MaxKeys == 0 ->
{0, Acc};
keyfolder({[], SSTiter}, KeyRange, {AccFun, Acc},
{SegmentList, LastModRange, MaxKeys}) ->
{StartKey, EndKey} = KeyRange,
case find_nextkey(SSTiter, StartKey, EndKey, SegmentList) of
case find_nextkey(SSTiter, StartKey, EndKey,
SegmentList, element(1, LastModRange)) of
no_more_keys ->
Acc;
case MaxKeys > 0 of
true ->
% This query had a max count, so we must respond with the
% remainder on the count
{MaxKeys, Acc};
false ->
% This query started with a MaxKeys set to -1. Query is
% not interested in having MaxKeys in Response
Acc
end;
{NxSSTiter, {SSTKey, SSTVal}} ->
Acc1 = AccFun(SSTKey, SSTVal, Acc),
{Acc1, MK1} =
maybe_accumulate(SSTKey, SSTVal, Acc, AccFun,
MaxKeys, LastModRange),
keyfolder({[], NxSSTiter},
KeyRange,
{AccFun, Acc1},
{SegmentList, MaxKeys - 1})
{SegmentList, LastModRange, MK1})
end;
keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator},
KeyRange,
{AccFun, Acc},
{SegmentList, MaxKeys}) ->
{SegmentList, LastModRange, MaxKeys}) ->
{StartKey, EndKey} = KeyRange,
case {IMMKey < StartKey, leveled_codec:endkey_passed(EndKey, IMMKey)} of
{false, true} ->
@ -1405,18 +1443,21 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator},
keyfolder({[], SSTiterator},
KeyRange,
{AccFun, Acc},
{SegmentList, MaxKeys});
{SegmentList, LastModRange, MaxKeys});
{false, false} ->
case find_nextkey(SSTiterator, StartKey, EndKey, SegmentList) of
case find_nextkey(SSTiterator, StartKey, EndKey,
SegmentList, element(1, LastModRange)) of
no_more_keys ->
% No more keys in range in the persisted store, so use the
% in-memory KV as the next
Acc1 = AccFun(IMMKey, IMMVal, Acc),
{Acc1, MK1} =
maybe_accumulate(IMMKey, IMMVal, Acc, AccFun,
MaxKeys, LastModRange),
keyfolder({NxIMMiterator,
[]},
KeyRange,
{AccFun, Acc1},
{SegmentList, MaxKeys - 1});
{SegmentList, LastModRange, MK1});
{NxSSTiterator, {SSTKey, SSTVal}} ->
% There is a next key, so need to know which is the
% next key between the two (and handle two keys
@ -1426,7 +1467,9 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator},
{SSTKey,
SSTVal}) of
left_hand_first ->
Acc1 = AccFun(IMMKey, IMMVal, Acc),
{Acc1, MK1} =
maybe_accumulate(IMMKey, IMMVal, Acc, AccFun,
MaxKeys, LastModRange),
% Stow the previous best result away at Level -1
% so that there is no need to iterate to it again
NewEntry = {-1, [{SSTKey, SSTVal}]},
@ -1437,16 +1480,20 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator},
NewEntry)},
KeyRange,
{AccFun, Acc1},
{SegmentList, MaxKeys - 1});
{SegmentList, LastModRange, MK1});
right_hand_first ->
Acc1 = AccFun(SSTKey, SSTVal, Acc),
{Acc1, MK1} =
maybe_accumulate(SSTKey, SSTVal, Acc, AccFun,
MaxKeys, LastModRange),
keyfolder({[{IMMKey, IMMVal}|NxIMMiterator],
NxSSTiterator},
KeyRange,
{AccFun, Acc1},
{SegmentList, MaxKeys - 1});
{SegmentList, LastModRange, MK1});
left_hand_dominant ->
Acc1 = AccFun(IMMKey, IMMVal, Acc),
{Acc1, MK1} =
maybe_accumulate(IMMKey, IMMVal, Acc, AccFun,
MaxKeys, LastModRange),
% We can add to the accumulator here. As the SST
% key was the most dominant across all SST levels,
% so there is no need to hold off until the IMMKey
@ -1455,30 +1502,55 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator},
NxSSTiterator},
KeyRange,
{AccFun, Acc1},
{SegmentList, MaxKeys - 1})
{SegmentList, LastModRange, MK1})
end
end
end.
-spec maybe_accumulate(leveled_codec:ledger_key(),
leveled_codec:ledger_value(),
any(), fun(), integer(),
{non_neg_integer(), non_neg_integer()|infinity}) ->
any().
%% @doc
%% Make an accumulation decision based one the date range
maybe_accumulate(LK, LV, Acc, AccFun, MaxKeys, {LowLastMod, HighLastMod}) ->
{_SQN, _SH, LMD} = leveled_codec:strip_to_indexdetails({LK, LV}),
RunAcc =
(LMD == undefined) or ((LMD >= LowLastMod) and (LMD =< HighLastMod)),
case RunAcc of
true ->
{AccFun(LK, LV, Acc), MaxKeys - 1};
false ->
{Acc, MaxKeys}
end.
-spec find_nextkey(iterator(),
leveled_codec:ledger_key(), leveled_codec:ledger_key()) ->
no_more_keys|{iterator(), leveled_codec:ledger_kv()}.
%% @doc
%% Looks to find the best choice for the next key across the levels (other
%% than in-memory table)
%% In finding the best choice, the next key in a given level may be a next
%% block or next file pointer which will need to be expanded
find_nextkey(QueryArray, StartKey, EndKey) ->
find_nextkey(QueryArray, StartKey, EndKey, false).
find_nextkey(QueryArray, StartKey, EndKey, false, 0).
find_nextkey(QueryArray, StartKey, EndKey, SegmentList) ->
find_nextkey(QueryArray, StartKey, EndKey, SegmentList, LowLastMod) ->
find_nextkey(QueryArray,
-1,
{null, null},
StartKey, EndKey,
SegmentList, ?ITERATOR_SCANWIDTH).
SegmentList,
LowLastMod,
?ITERATOR_SCANWIDTH).
find_nextkey(_QueryArray, LCnt,
{null, null},
_StartKey, _EndKey,
_SegList, _Width) when LCnt > ?MAX_LEVELS ->
_SegList, _LowLastMod, _Width) when LCnt > ?MAX_LEVELS ->
% The array has been scanned wihtout finding a best key - must be
% exhausted - respond to indicate no more keys to be found by the
% iterator
@ -1486,7 +1558,7 @@ find_nextkey(_QueryArray, LCnt,
find_nextkey(QueryArray, LCnt,
{BKL, BestKV},
_StartKey, _EndKey,
_SegList, _Width) when LCnt > ?MAX_LEVELS ->
_SegList, _LowLastMod, _Width) when LCnt > ?MAX_LEVELS ->
% All levels have been scanned, so need to remove the best result from
% the array, and return that array along with the best key/sqn/status
% combination
@ -1495,7 +1567,7 @@ find_nextkey(QueryArray, LCnt,
find_nextkey(QueryArray, LCnt,
{BestKeyLevel, BestKV},
StartKey, EndKey,
SegList, Width) ->
SegList, LowLastMod, Width) ->
% Get the next key at this level
{NextKey, RestOfKeys} =
case lists:keyfind(LCnt, 1, QueryArray) of
@ -1514,15 +1586,16 @@ find_nextkey(QueryArray, LCnt,
LCnt + 1,
{BKL, BKV},
StartKey, EndKey,
SegList, Width);
SegList, LowLastMod, Width);
{{next, Owner, _SK}, BKL, BKV} ->
% The first key at this level is pointer to a file - need to query
% the file to expand this level out before proceeding
Pointer = {next, Owner, StartKey, EndKey},
UpdList = leveled_sst:expand_list_by_pointer(Pointer,
RestOfKeys,
Width,
SegList),
UpdList = leveled_sst:sst_expandpointer(Pointer,
RestOfKeys,
Width,
SegList,
LowLastMod),
NewEntry = {LCnt, UpdList},
% Need to loop around at this level (LCnt) as we have not yet
% examined a real key at this level
@ -1530,15 +1603,16 @@ find_nextkey(QueryArray, LCnt,
LCnt,
{BKL, BKV},
StartKey, EndKey,
SegList, Width);
SegList, LowLastMod, Width);
{{pointer, SSTPid, Slot, PSK, PEK}, BKL, BKV} ->
% The first key at this level is pointer within a file - need to
% query the file to expand this level out before proceeding
Pointer = {pointer, SSTPid, Slot, PSK, PEK},
UpdList = leveled_sst:expand_list_by_pointer(Pointer,
RestOfKeys,
Width,
SegList),
UpdList = leveled_sst:sst_expandpointer(Pointer,
RestOfKeys,
Width,
SegList,
LowLastMod),
NewEntry = {LCnt, UpdList},
% Need to loop around at this level (LCnt) as we have not yet
% examined a real key at this level
@ -1546,7 +1620,7 @@ find_nextkey(QueryArray, LCnt,
LCnt,
{BKL, BKV},
StartKey, EndKey,
SegList, Width);
SegList, LowLastMod, Width);
{{Key, Val}, null, null} ->
% No best key set - so can assume that this key is the best key,
% and check the lower levels
@ -1554,7 +1628,7 @@ find_nextkey(QueryArray, LCnt,
LCnt + 1,
{LCnt, {Key, Val}},
StartKey, EndKey,
SegList, Width);
SegList, LowLastMod, Width);
{{Key, Val}, _BKL, {BestKey, _BestVal}} when Key < BestKey ->
% There is a real key and a best key to compare, and the real key
% at this level is before the best key, and so is now the new best
@ -1564,7 +1638,7 @@ find_nextkey(QueryArray, LCnt,
LCnt + 1,
{LCnt, {Key, Val}},
StartKey, EndKey,
SegList, Width);
SegList, LowLastMod, Width);
{{Key, Val}, BKL, {BestKey, BestVal}} when Key == BestKey ->
SQN = leveled_codec:strip_to_seqonly({Key, Val}),
BestSQN = leveled_codec:strip_to_seqonly({BestKey, BestVal}),
@ -1579,7 +1653,7 @@ find_nextkey(QueryArray, LCnt,
LCnt + 1,
{BKL, {BestKey, BestVal}},
StartKey, EndKey,
SegList, Width);
SegList, LowLastMod, Width);
SQN > BestSQN ->
% There is a real key at the front of this level and it has
% a higher SQN than the best key, so we should use this as
@ -1595,7 +1669,7 @@ find_nextkey(QueryArray, LCnt,
LCnt + 1,
{LCnt, {Key, Val}},
StartKey, EndKey,
SegList, Width)
SegList, LowLastMod, Width)
end;
{_, BKL, BKV} ->
% This is not the best key
@ -1603,7 +1677,7 @@ find_nextkey(QueryArray, LCnt,
LCnt + 1,
{BKL, BKV},
StartKey, EndKey,
SegList, Width)
SegList, LowLastMod, Width)
end.
@ -1964,84 +2038,98 @@ simple_server_test() ->
simple_findnextkey_test() ->
QueryArray = [
{2, [{{o, "Bucket1", "Key1"}, {5, {active, infinity}, null}},
{{o, "Bucket1", "Key5"}, {4, {active, infinity}, null}}]},
{3, [{{o, "Bucket1", "Key3"}, {3, {active, infinity}, null}}]},
{5, [{{o, "Bucket1", "Key2"}, {2, {active, infinity}, null}}]}
{2, [{{o, "Bucket1", "Key1", null}, {5, {active, infinity}, {0, 0}, null}},
{{o, "Bucket1", "Key5", null}, {4, {active, infinity}, {0, 0}, null}}]},
{3, [{{o, "Bucket1", "Key3", null}, {3, {active, infinity}, {0, 0}, null}}]},
{5, [{{o, "Bucket1", "Key2", null}, {2, {active, infinity}, {0, 0}, null}}]}
],
{Array2, KV1} = find_nextkey(QueryArray,
{o, "Bucket1", "Key0"},
{o, "Bucket1", "Key5"}),
?assertMatch({{o, "Bucket1", "Key1"}, {5, {active, infinity}, null}}, KV1),
{o, "Bucket1", "Key0", null},
{o, "Bucket1", "Key5", null}),
?assertMatch({{o, "Bucket1", "Key1", null},
{5, {active, infinity}, {0, 0}, null}},
KV1),
{Array3, KV2} = find_nextkey(Array2,
{o, "Bucket1", "Key0"},
{o, "Bucket1", "Key5"}),
?assertMatch({{o, "Bucket1", "Key2"}, {2, {active, infinity}, null}}, KV2),
{o, "Bucket1", "Key0", null},
{o, "Bucket1", "Key5", null}),
?assertMatch({{o, "Bucket1", "Key2", null},
{2, {active, infinity}, {0, 0}, null}},
KV2),
{Array4, KV3} = find_nextkey(Array3,
{o, "Bucket1", "Key0"},
{o, "Bucket1", "Key5"}),
?assertMatch({{o, "Bucket1", "Key3"}, {3, {active, infinity}, null}}, KV3),
{o, "Bucket1", "Key0", null},
{o, "Bucket1", "Key5", null}),
?assertMatch({{o, "Bucket1", "Key3", null},
{3, {active, infinity}, {0, 0}, null}},
KV3),
{Array5, KV4} = find_nextkey(Array4,
{o, "Bucket1", "Key0"},
{o, "Bucket1", "Key5"}),
?assertMatch({{o, "Bucket1", "Key5"}, {4, {active, infinity}, null}}, KV4),
{o, "Bucket1", "Key0", null},
{o, "Bucket1", "Key5", null}),
?assertMatch({{o, "Bucket1", "Key5", null},
{4, {active, infinity}, {0, 0}, null}},
KV4),
ER = find_nextkey(Array5,
{o, "Bucket1", "Key0"},
{o, "Bucket1", "Key5"}),
{o, "Bucket1", "Key0", null},
{o, "Bucket1", "Key5", null}),
?assertMatch(no_more_keys, ER).
sqnoverlap_findnextkey_test() ->
QueryArray = [
{2, [{{o, "Bucket1", "Key1"}, {5, {active, infinity}, 0, null}},
{{o, "Bucket1", "Key5"}, {4, {active, infinity}, 0, null}}]},
{3, [{{o, "Bucket1", "Key3"}, {3, {active, infinity}, 0, null}}]},
{5, [{{o, "Bucket1", "Key5"}, {2, {active, infinity}, 0, null}}]}
{2, [{{o, "Bucket1", "Key1", null}, {5, {active, infinity}, {0, 0}, null}},
{{o, "Bucket1", "Key5", null}, {4, {active, infinity}, {0, 0}, null}}]},
{3, [{{o, "Bucket1", "Key3", null}, {3, {active, infinity}, {0, 0}, null}}]},
{5, [{{o, "Bucket1", "Key5", null}, {2, {active, infinity}, {0, 0}, null}}]}
],
{Array2, KV1} = find_nextkey(QueryArray,
{o, "Bucket1", "Key0"},
{o, "Bucket1", "Key5"}),
?assertMatch({{o, "Bucket1", "Key1"}, {5, {active, infinity}, 0, null}},
{o, "Bucket1", "Key0", null},
{o, "Bucket1", "Key5", null}),
?assertMatch({{o, "Bucket1", "Key1", null},
{5, {active, infinity}, {0, 0}, null}},
KV1),
{Array3, KV2} = find_nextkey(Array2,
{o, "Bucket1", "Key0"},
{o, "Bucket1", "Key5"}),
?assertMatch({{o, "Bucket1", "Key3"}, {3, {active, infinity}, 0, null}},
{o, "Bucket1", "Key0", null},
{o, "Bucket1", "Key5", null}),
?assertMatch({{o, "Bucket1", "Key3", null},
{3, {active, infinity}, {0, 0}, null}},
KV2),
{Array4, KV3} = find_nextkey(Array3,
{o, "Bucket1", "Key0"},
{o, "Bucket1", "Key5"}),
?assertMatch({{o, "Bucket1", "Key5"}, {4, {active, infinity}, 0, null}},
{o, "Bucket1", "Key0", null},
{o, "Bucket1", "Key5", null}),
?assertMatch({{o, "Bucket1", "Key5", null},
{4, {active, infinity}, {0, 0}, null}},
KV3),
ER = find_nextkey(Array4,
{o, "Bucket1", "Key0"},
{o, "Bucket1", "Key5"}),
{o, "Bucket1", "Key0", null},
{o, "Bucket1", "Key5", null}),
?assertMatch(no_more_keys, ER).
sqnoverlap_otherway_findnextkey_test() ->
QueryArray = [
{2, [{{o, "Bucket1", "Key1"}, {5, {active, infinity}, 0, null}},
{{o, "Bucket1", "Key5"}, {1, {active, infinity}, 0, null}}]},
{3, [{{o, "Bucket1", "Key3"}, {3, {active, infinity}, 0, null}}]},
{5, [{{o, "Bucket1", "Key5"}, {2, {active, infinity}, 0, null}}]}
{2, [{{o, "Bucket1", "Key1", null}, {5, {active, infinity}, {0, 0}, null}},
{{o, "Bucket1", "Key5", null}, {1, {active, infinity}, {0, 0}, null}}]},
{3, [{{o, "Bucket1", "Key3", null}, {3, {active, infinity}, {0, 0}, null}}]},
{5, [{{o, "Bucket1", "Key5", null}, {2, {active, infinity}, {0, 0}, null}}]}
],
{Array2, KV1} = find_nextkey(QueryArray,
{o, "Bucket1", "Key0"},
{o, "Bucket1", "Key5"}),
?assertMatch({{o, "Bucket1", "Key1"}, {5, {active, infinity}, 0, null}},
{o, "Bucket1", "Key0", null},
{o, "Bucket1", "Key5", null}),
?assertMatch({{o, "Bucket1", "Key1", null},
{5, {active, infinity}, {0, 0}, null}},
KV1),
{Array3, KV2} = find_nextkey(Array2,
{o, "Bucket1", "Key0"},
{o, "Bucket1", "Key5"}),
?assertMatch({{o, "Bucket1", "Key3"}, {3, {active, infinity}, 0, null}},
{o, "Bucket1", "Key0", null},
{o, "Bucket1", "Key5", null}),
?assertMatch({{o, "Bucket1", "Key3", null},
{3, {active, infinity}, {0, 0}, null}},
KV2),
{Array4, KV3} = find_nextkey(Array3,
{o, "Bucket1", "Key0"},
{o, "Bucket1", "Key5"}),
?assertMatch({{o, "Bucket1", "Key5"}, {2, {active, infinity}, 0, null}},
{o, "Bucket1", "Key0", null},
{o, "Bucket1", "Key5", null}),
?assertMatch({{o, "Bucket1", "Key5", null},
{2, {active, infinity}, {0, 0}, null}},
KV3),
ER = find_nextkey(Array4,
{o, "Bucket1", "Key0"},
{o, "Bucket1", "Key5"}),
{o, "Bucket1", "Key0", null},
{o, "Bucket1", "Key5", null}),
?assertMatch(no_more_keys, ER).
foldwithimm_simple_test() ->

View file

@ -30,9 +30,9 @@
bucketkey_query/6,
hashlist_query/3,
tictactree/5,
foldheads_allkeys/5,
foldheads_allkeys/7,
foldobjects_allkeys/4,
foldheads_bybucket/6,
foldheads_bybucket/8,
foldobjects_bybucket/4,
foldobjects_byindex/3
]).
@ -49,6 +49,7 @@
:: {fun(), any()}.
-type term_regex() :: re:mp()|undefined.
%%%============================================================================
%%% External functions
%%%============================================================================
@ -269,12 +270,14 @@ tictactree(SnapFun, {Tag, Bucket, Query}, JournalCheck, TreeSize, Filter) ->
{async, Runner}.
-spec foldheads_allkeys(fun(), leveled_codec:tag(),
fun(), boolean(), false|list(integer()))
-> {async, fun()}.
fun(), boolean(), false|list(integer()),
false|leveled_codec:lastmod_range(),
false|pos_integer()) -> {async, fun()}.
%% @doc
%% Fold over all heads in the store for a given tag - applying the passed
%% function to each proxy object
foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck, SegmentList) ->
foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck,
SegmentList, LastModRange, MaxObjectCount) ->
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
foldobjects(SnapFun,
@ -282,7 +285,9 @@ foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck, SegmentList) ->
[{StartKey, EndKey}],
FoldFun,
{true, JournalCheck},
SegmentList).
SegmentList,
LastModRange,
MaxObjectCount).
-spec foldobjects_allkeys(fun(), leveled_codec:tag(), fun(),
key_order|sqn_order) -> {async, fun()}.
@ -399,7 +404,10 @@ foldobjects_bybucket(SnapFun, Tag, KeyRanges, FoldFun) ->
atom(),
list({any(), any()}),
fun(),
boolean(), false|list(integer()))
boolean(),
false|list(integer()),
false|leveled_codec:lastmod_range(),
false|pos_integer())
-> {async, fun()}.
%% @doc
%% Fold over all object metadata within a given key range in a bucket
@ -407,13 +415,16 @@ foldheads_bybucket(SnapFun,
Tag,
KeyRanges,
FoldFun,
JournalCheck, SegmentList) ->
JournalCheck,
SegmentList, LastModRange, MaxObjectCount) ->
foldobjects(SnapFun,
Tag,
KeyRanges,
FoldFun,
{true, JournalCheck},
SegmentList).
SegmentList,
LastModRange,
MaxObjectCount).
-spec foldobjects_byindex(fun(), tuple(), fun()) -> {async, fun()}.
%% @doc
@ -454,10 +465,10 @@ get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList, {C, L}) ->
ExtractFun,
null),
case R of
null ->
{1, null} ->
leveled_log:log("B0008",[]),
BKList;
{{B, K}, V} ->
{0, {{B, K}, V}} ->
case leveled_codec:is_active({Tag, B, K, null}, V, Now) of
true ->
leveled_log:log("B0009",[B]),
@ -484,6 +495,16 @@ get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList, {C, L}) ->
-spec foldobjects(fun(), atom(), list(), fun(),
false|{true, boolean()}, false|list(integer())) ->
{async, fun()}.
foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, SegmentList) ->
foldobjects(SnapFun, Tag, KeyRanges,
FoldObjFun, DeferredFetch, SegmentList, false, false).
-spec foldobjects(fun(), atom(), list(), fun(),
false|{true, boolean()},
false|list(integer()),
false|leveled_codec:lastmod_range(),
false|pos_integer()) ->
{async, fun()}.
%% @doc
%% The object folder should be passed DeferredFetch.
%% DeferredFetch can either be false (which will return to the fold function
@ -491,7 +512,8 @@ get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList, {C, L}) ->
%% will be created that if understood by the fold function will allow the fold
%% function to work on the head of the object, and defer fetching the body in
%% case such a fetch is unecessary.
foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, SegmentList) ->
foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch,
SegmentList, LastModRange, MaxObjectCount) ->
{FoldFun, InitAcc} =
case is_tuple(FoldObjFun) of
true ->
@ -502,15 +524,23 @@ foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, SegmentList) ->
% no initial accumulator passed, and so should be just a list
{FoldObjFun, []}
end,
{LimitByCount, InitAcc0} =
case MaxObjectCount of
false ->
{false, InitAcc};
MOC when is_integer(MOC) ->
{true, {MOC, InitAcc}}
end,
Folder =
fun() ->
{ok, LedgerSnapshot, JournalSnapshot} = SnapFun(),
AccFun = accumulate_objects(FoldFun,
JournalSnapshot,
Tag,
DeferredFetch),
AccFun =
accumulate_objects(FoldFun,
JournalSnapshot,
Tag,
DeferredFetch),
ListFoldFun =
fun({StartKey, EndKey}, FoldAcc) ->
@ -519,9 +549,11 @@ foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, SegmentList) ->
EndKey,
AccFun,
FoldAcc,
SegmentList)
SegmentList,
LastModRange,
LimitByCount)
end,
Acc = lists:foldl(ListFoldFun, InitAcc, KeyRanges),
Acc = lists:foldl(ListFoldFun, InitAcc0, KeyRanges),
ok = leveled_penciller:pcl_close(LedgerSnapshot),
case DeferredFetch of
{true, false} ->
@ -612,7 +644,7 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
case leveled_codec:is_active(LK, V, Now) of
true ->
{SQN, _St, _MH, MD} =
leveled_codec:striphead_to_details(V),
leveled_codec:striphead_to_v1details(V),
{B, K} =
case leveled_codec:from_ledgerkey(LK) of
{B0, K0} ->

File diff suppressed because it is too large Load diff

View file

@ -189,7 +189,7 @@ import_tree(ExportedTree) ->
level2 = Lv2}.
-spec add_kv(tictactree(), tuple(), tuple(), fun()) -> tictactree().
-spec add_kv(tictactree(), term(), term(), fun()) -> tictactree().
%% @doc
%% Add a Key and value to a tictactree using the BinExtractFun to extract a
%% binary from the Key and value from which to generate the hash. The
@ -198,7 +198,7 @@ import_tree(ExportedTree) ->
add_kv(TicTacTree, Key, Value, BinExtractFun) ->
add_kv(TicTacTree, Key, Value, BinExtractFun, false).
-spec add_kv(tictactree(), tuple(), tuple(), fun(), boolean())
-spec add_kv(tictactree(), term(), term(), fun(), boolean())
-> tictactree()|{tictactree(), integer()}.
%% @doc
%% add_kv with ability to return segment ID of Key added
@ -523,8 +523,15 @@ get_size(Size) ->
?XLARGE
end.
segmentcompare(SrcBin, SinkBin) when byte_size(SrcBin)==byte_size(SinkBin) ->
segmentcompare(SrcBin, SinkBin, [], 0).
segmentcompare(SrcBin, SinkBin) when byte_size(SrcBin) == byte_size(SinkBin) ->
segmentcompare(SrcBin, SinkBin, [], 0);
segmentcompare(<<>>, SinkBin) ->
Size = bit_size(SinkBin),
segmentcompare(<<0:Size/integer>>, SinkBin);
segmentcompare(SrcBin, <<>>) ->
Size = bit_size(SrcBin),
segmentcompare(SrcBin, <<0:Size/integer>>).
segmentcompare(<<>>, <<>>, Acc, _Counter) ->
Acc;
@ -836,6 +843,19 @@ matchbysegment_check(SegList, MatchList, SmallSize, LargeSize) ->
OL = lists:filter(PredFun, MatchList),
{timer:now_diff(os:timestamp(), SW)/1000, OL}.
find_dirtysegments_withanemptytree_test() ->
T1 = new_tree(t1),
T2 = new_tree(t2),
?assertMatch([], find_dirtysegments(fetch_root(T1), fetch_root(T2))),
{T3, DS1} =
add_kv(T2, <<"TestKey">>, <<"V1">>, fun(B, K) -> {B, K} end, true),
ExpectedAnswer = [DS1 div 256],
?assertMatch(ExpectedAnswer, find_dirtysegments(<<>>, fetch_root(T3))),
?assertMatch(ExpectedAnswer, find_dirtysegments(fetch_root(T3), <<>>)).
-endif.

View file

@ -35,6 +35,9 @@
integer(), % length
any()}.
-export_type([leveled_tree/0]).
%%%============================================================================
%%% API
%%%============================================================================

View file

@ -529,30 +529,31 @@ multibucket_fold(_Config) ->
end,
FoldAccT = {FF, []},
{async, R1} = leveled_bookie:book_headfold(Bookie1,
?RIAK_TAG,
{bucket_list,
[{<<"Type1">>, <<"Bucket1">>},
{<<"Type2">>, <<"Bucket4">>}]},
FoldAccT,
false,
true,
false),
{async, R1} =
leveled_bookie:book_headfold(Bookie1,
?RIAK_TAG,
{bucket_list,
[{<<"Type1">>, <<"Bucket1">>},
{<<"Type2">>, <<"Bucket4">>}]},
FoldAccT,
false,
true,
false),
O1 = length(R1()),
io:format("Result R1 of length ~w~n", [O1]),
Q2 = {foldheads_bybucket,
?RIAK_TAG,
[<<"Bucket2">>, <<"Bucket3">>], bucket_list,
{fun(_B, _K, _PO, Acc) ->
Acc +1
end,
0},
false,
true,
false},
{async, R2} = leveled_bookie:book_returnfolder(Bookie1, Q2),
{async, R2} =
leveled_bookie:book_headfold(Bookie1,
?RIAK_TAG,
{bucket_list,
[<<"Bucket2">>,
<<"Bucket3">>]},
{fun(_B, _K, _PO, Acc) ->
Acc +1
end,
0},
false, true, false),
O2 = R2(),
io:format("Result R2 of ~w~n", [O2]),

View file

@ -227,7 +227,8 @@ aae_missingjournal(_Config) ->
{foldheads_allkeys,
?RIAK_TAG,
FoldHeadsFun,
true, true, false}),
true, true, false,
false, false}),
HeadL2 = length(AllHeadF2()),
io:format("Fold head returned ~w objects~n", [HeadL2]),
true = HeadL2 < HeadL1,

View file

@ -3,6 +3,7 @@
-include("include/leveled.hrl").
-export([all/0]).
-export([
fetchclocks_modifiedbetween/1,
crossbucket_aae/1,
handoff/1,
dollar_bucket_index/1,
@ -10,6 +11,7 @@
]).
all() -> [
fetchclocks_modifiedbetween,
crossbucket_aae,
handoff,
dollar_bucket_index,
@ -18,6 +20,315 @@ all() -> [
-define(MAGIC, 53). % riak_kv -> riak_object
fetchclocks_modifiedbetween(_Config) ->
RootPathA = testutil:reset_filestructure("fetchClockA"),
RootPathB = testutil:reset_filestructure("fetchClockB"),
StartOpts1A = [{root_path, RootPathA},
{max_journalsize, 500000000},
{max_pencillercachesize, 8000},
{sync_strategy, testutil:sync_strategy()}],
StartOpts1B = [{root_path, RootPathB},
{max_journalsize, 500000000},
{max_pencillercachesize, 12000},
{sync_strategy, testutil:sync_strategy()}],
{ok, Bookie1A} = leveled_bookie:book_start(StartOpts1A),
{ok, Bookie1B} = leveled_bookie:book_start(StartOpts1B),
ObjL1StartTS = testutil:convert_to_seconds(os:timestamp()),
ObjList1 =
testutil:generate_objects(20000,
{fixed_binary, 1}, [],
leveled_rand:rand_bytes(512),
fun() -> [] end,
<<"B0">>),
timer:sleep(1000),
ObjL1EndTS = testutil:convert_to_seconds(os:timestamp()),
timer:sleep(1000),
_ObjL2StartTS = testutil:convert_to_seconds(os:timestamp()),
ObjList2 =
testutil:generate_objects(15000,
{fixed_binary, 20001}, [],
leveled_rand:rand_bytes(512),
fun() -> [] end,
<<"B0">>),
timer:sleep(1000),
_ObjList2EndTS = testutil:convert_to_seconds(os:timestamp()),
timer:sleep(1000),
ObjL3StartTS = testutil:convert_to_seconds(os:timestamp()),
ObjList3 =
testutil:generate_objects(35000,
{fixed_binary, 35001}, [],
leveled_rand:rand_bytes(512),
fun() -> [] end,
<<"B0">>),
timer:sleep(1000),
ObjL3EndTS = testutil:convert_to_seconds(os:timestamp()),
timer:sleep(1000),
ObjL4StartTS = testutil:convert_to_seconds(os:timestamp()),
ObjList4 =
testutil:generate_objects(30000,
{fixed_binary, 70001}, [],
leveled_rand:rand_bytes(512),
fun() -> [] end,
<<"B0">>),
timer:sleep(1000),
_ObjL4EndTS = testutil:convert_to_seconds(os:timestamp()),
timer:sleep(1000),
ObjL5StartTS = testutil:convert_to_seconds(os:timestamp()),
ObjList5 =
testutil:generate_objects(8000,
{fixed_binary, 1}, [],
leveled_rand:rand_bytes(512),
fun() -> [] end,
<<"B1">>),
timer:sleep(1000),
_ObjL5EndTS = testutil:convert_to_seconds(os:timestamp()),
timer:sleep(1000),
_ObjL6StartTS = testutil:convert_to_seconds(os:timestamp()),
ObjList6 =
testutil:generate_objects(7000,
{fixed_binary, 1}, [],
leveled_rand:rand_bytes(512),
fun() -> [] end,
<<"B2">>),
timer:sleep(1000),
ObjL6EndTS = testutil:convert_to_seconds(os:timestamp()),
timer:sleep(1000),
testutil:riakload(Bookie1A, ObjList5),
testutil:riakload(Bookie1A, ObjList1),
testutil:riakload(Bookie1A, ObjList2),
testutil:riakload(Bookie1A, ObjList3),
testutil:riakload(Bookie1A, ObjList4),
testutil:riakload(Bookie1A, ObjList6),
testutil:riakload(Bookie1B, ObjList4),
testutil:riakload(Bookie1B, ObjList5),
testutil:riakload(Bookie1B, ObjList1),
testutil:riakload(Bookie1B, ObjList6),
testutil:riakload(Bookie1B, ObjList3),
RevertFixedBinKey =
fun(FBK) ->
<<$K, $e, $y, KeyNumber:64/integer>> = FBK,
KeyNumber
end,
StoreFoldFun =
fun(_B, K, _V, {_LK, AccC}) ->
{RevertFixedBinKey(K), AccC + 1}
end,
KeyRangeFun =
fun(StartNumber, EndNumber) ->
{range,
<<"B0">>,
{testutil:fixed_bin_key(StartNumber),
testutil:fixed_bin_key(EndNumber)}}
end,
% Count with max object count
FoldRangesFun =
fun(FoldTarget, ModRange, EndNumber, MaxCount) ->
fun(_I, {LKN, KC}) ->
{async, Runner} =
leveled_bookie:book_headfold(FoldTarget,
?RIAK_TAG,
KeyRangeFun(LKN + 1,
EndNumber),
{StoreFoldFun, {LKN, KC}},
false,
true,
false,
ModRange,
MaxCount),
{_, {LKN0, KC0}} = Runner(),
{LKN0, KC0}
end
end,
R1A = lists:foldl(FoldRangesFun(Bookie1A, false, 50000, 13000),
{0, 0}, lists:seq(1, 4)),
io:format("R1A ~w~n", [R1A]),
true = {50000, 50000} == R1A,
R1B = lists:foldl(FoldRangesFun(Bookie1B, false, 50000, 13000),
{0, 0}, lists:seq(1, 3)),
io:format("R1B ~w~n", [R1B]),
true = {50000, 35000} == R1B,
R2A = lists:foldl(FoldRangesFun(Bookie1A,
{ObjL3StartTS, ObjL3EndTS},
60000,
13000),
{10000, 0}, lists:seq(1, 2)),
io:format("R2A ~w~n", [R2A]),
true = {60000, 25000} == R2A,
R2A_SR = lists:foldl(FoldRangesFun(Bookie1A,
{ObjL3StartTS, ObjL3EndTS},
60000,
13000),
{10000, 0}, lists:seq(1, 1)), % Only single rotation
io:format("R2A_SingleRotation ~w~n", [R2A_SR]),
true = {48000, 13000} == R2A_SR, % Hit at max results
R2B = lists:foldl(FoldRangesFun(Bookie1B,
{ObjL3StartTS, ObjL3EndTS},
60000,
13000),
{10000, 0}, lists:seq(1, 2)),
io:format("R2B ~w~n", [R1B]),
true = {60000, 25000} == R2B,
CrudeStoreFoldFun =
fun(LowLMD, HighLMD) ->
fun(_B, K, V, {LK, AccC}) ->
% Value is proxy_object? Can we get the metadata and
% read the last modified date? The do a non-accelerated
% fold to chekc that it is slower
{proxy_object, MDBin, _Size, _Fetcher} = binary_to_term(V),
LMDTS = testutil:get_lastmodified(MDBin),
LMD = testutil:convert_to_seconds(LMDTS),
case (LMD >= LowLMD) and (LMD =< HighLMD) of
true ->
{RevertFixedBinKey(K), AccC + 1};
false ->
{LK, AccC}
end
end
end,
io:format("Comparing queries for Obj1 TS range ~w ~w~n",
[ObjL1StartTS, ObjL1EndTS]),
PlusFilterStart = os:timestamp(),
R3A_PlusFilter = lists:foldl(FoldRangesFun(Bookie1A,
{ObjL1StartTS, ObjL1EndTS},
100000,
100000),
{0, 0}, lists:seq(1, 1)),
PlusFilterTime = timer:now_diff(os:timestamp(), PlusFilterStart)/1000,
io:format("R3A_PlusFilter ~w~n", [R3A_PlusFilter]),
true = {20000, 20000} == R3A_PlusFilter,
NoFilterStart = os:timestamp(),
{async, R3A_NoFilterRunner} =
leveled_bookie:book_headfold(Bookie1A,
?RIAK_TAG,
KeyRangeFun(1, 100000),
{CrudeStoreFoldFun(ObjL1StartTS,
ObjL1EndTS),
{0, 0}},
false,
true,
false),
R3A_NoFilter = R3A_NoFilterRunner(),
NoFilterTime = timer:now_diff(os:timestamp(), NoFilterStart)/1000,
io:format("R3A_NoFilter ~w~n", [R3A_NoFilter]),
true = {20000, 20000} == R3A_NoFilter,
io:format("Filtered query ~w ms and unfiltered query ~w ms~n",
[PlusFilterTime, NoFilterTime]),
true = NoFilterTime > PlusFilterTime,
SimpleCountFun =
fun(_B, _K, _V, AccC) -> AccC + 1 end,
{async, R4A_MultiBucketRunner} =
leveled_bookie:book_headfold(Bookie1A,
?RIAK_TAG,
{bucket_list, [<<"B0">>, <<"B2">>]},
{SimpleCountFun, 0},
false,
true,
false,
{ObjL4StartTS, ObjL6EndTS},
% Range includes ObjjL5 LMDs,
% but these ar enot in bucket list
false),
R4A_MultiBucket = R4A_MultiBucketRunner(),
io:format("R4A_MultiBucket ~w ~n", [R4A_MultiBucket]),
true = R4A_MultiBucket == 37000,
{async, R5A_MultiBucketRunner} =
leveled_bookie:book_headfold(Bookie1A,
?RIAK_TAG,
{bucket_list, [<<"B2">>, <<"B0">>]},
% Reverse the buckets in the bucket
% list
{SimpleCountFun, 0},
false,
true,
false,
{ObjL4StartTS, ObjL6EndTS},
false),
R5A_MultiBucket = R5A_MultiBucketRunner(),
io:format("R5A_MultiBucket ~w ~n", [R5A_MultiBucket]),
true = R5A_MultiBucket == 37000,
{async, R5B_MultiBucketRunner} =
leveled_bookie:book_headfold(Bookie1B,
% Same query - other bookie
?RIAK_TAG,
{bucket_list, [<<"B2">>, <<"B0">>]},
{SimpleCountFun, 0},
false,
true,
false,
{ObjL4StartTS, ObjL6EndTS},
false),
R5B_MultiBucket = R5B_MultiBucketRunner(),
io:format("R5B_MultiBucket ~w ~n", [R5B_MultiBucket]),
true = R5A_MultiBucket == 37000,
testutil:update_some_objects(Bookie1A, ObjList1, 1000),
R6A_PlusFilter = lists:foldl(FoldRangesFun(Bookie1A,
{ObjL1StartTS, ObjL1EndTS},
100000,
100000),
{0, 0}, lists:seq(1, 1)),
io:format("R6A_PlusFilter ~w~n", [R6A_PlusFilter]),
true = 19000 == element(2, R6A_PlusFilter),
% Hit limit of max count before trying next bucket, with and without a
% timestamp filter
{async, R7A_MultiBucketRunner} =
leveled_bookie:book_headfold(Bookie1A,
?RIAK_TAG,
{bucket_list, [<<"B1">>, <<"B2">>]},
{SimpleCountFun, 0},
false,
true,
false,
{ObjL5StartTS, ObjL6EndTS},
5000),
R7A_MultiBucket = R7A_MultiBucketRunner(),
io:format("R7A_MultiBucket ~w ~n", [R7A_MultiBucket]),
true = R7A_MultiBucket == {0, 5000},
{async, R8A_MultiBucketRunner} =
leveled_bookie:book_headfold(Bookie1A,
?RIAK_TAG,
{bucket_list, [<<"B1">>, <<"B2">>]},
{SimpleCountFun, 0},
false,
true,
false,
false,
5000),
R8A_MultiBucket = R8A_MultiBucketRunner(),
io:format("R8A_MultiBucket ~w ~n", [R8A_MultiBucket]),
true = R8A_MultiBucket == {0, 5000},
ok = leveled_bookie:book_destroy(Bookie1A),
ok = leveled_bookie:book_destroy(Bookie1B).
crossbucket_aae(_Config) ->
% Test requires multiple different databases, so want to mount them all
% on individual file paths
@ -141,7 +452,7 @@ test_segfilter_query(Bookie, CLs) ->
Acc
end
end, 0},
false, true, SegL}
false, true, SegL, false, false}
end,
{async, SL1Folder} =
@ -174,7 +485,7 @@ test_singledelta_stores(BookA, BookB, TreeSize, DeltaKey) ->
?RIAK_TAG,
{fun head_tictac_foldfun/4,
{0, leveled_tictac:new_tree(test, TreeSize)}},
false, true, false},
false, true, false, false, false},
% tictac query by bucket (should be same result as all stores)
TicTacByBucketFolder =
{foldheads_bybucket,
@ -182,7 +493,7 @@ test_singledelta_stores(BookA, BookB, TreeSize, DeltaKey) ->
all,
{fun head_tictac_foldfun/4,
{0, leveled_tictac:new_tree(test, TreeSize)}},
false, false, false},
false, false, false, false, false},
DLs = check_tictacfold(BookA, BookB,
TicTacFolder,
@ -197,7 +508,7 @@ test_singledelta_stores(BookA, BookB, TreeSize, DeltaKey) ->
{foldheads_allkeys,
?RIAK_TAG,
{get_segment_folder(DLs, TreeSize), []},
false, true, false},
false, true, false, false, false},
SW_SL0 = os:timestamp(),
{async, BookASegFolder} =
@ -221,7 +532,7 @@ test_singledelta_stores(BookA, BookB, TreeSize, DeltaKey) ->
{foldheads_allkeys,
?RIAK_TAG,
{get_segment_folder(DLs, TreeSize), []},
false, true, SegFilterList},
false, true, SegFilterList, false, false},
SW_SL1 = os:timestamp(),
{async, BookASegFolder1} =
@ -240,7 +551,7 @@ test_singledelta_stores(BookA, BookB, TreeSize, DeltaKey) ->
{foldheads_allkeys,
?RIAK_TAG,
{get_segment_folder(DLs, TreeSize), []},
true, true, SegFilterList},
true, true, SegFilterList, false, false},
SW_SL1CP = os:timestamp(),
{async, BookASegFolder1CP} =
@ -264,7 +575,7 @@ test_singledelta_stores(BookA, BookB, TreeSize, DeltaKey) ->
{foldheads_allkeys,
?RIAK_TAG,
{get_segment_folder(DLs, TreeSize), []},
false, true, SegFilterListF},
false, true, SegFilterListF, false, false},
SW_SL1F = os:timestamp(),
{async, BookASegFolder1F} =
@ -468,7 +779,7 @@ handoff(_Config) ->
?RIAK_TAG,
{fun head_tictac_foldfun/4,
{0, leveled_tictac:new_tree(test, TreeSize)}},
false, true, false},
false, true, false, false, false},
check_tictacfold(Bookie1, Bookie2, TicTacFolder, none, TreeSize),
check_tictacfold(Bookie2, Bookie3, TicTacFolder, none, TreeSize),
check_tictacfold(Bookie3, Bookie4, TicTacFolder, none, TreeSize),

View file

@ -28,6 +28,7 @@
get_key/1,
get_value/1,
get_vclock/1,
get_lastmodified/1,
get_compressiblevalue/0,
get_compressiblevalue_andinteger/0,
get_randomindexes_generator/1,
@ -51,8 +52,9 @@
sync_strategy/0,
riak_object/4,
get_value_from_objectlistitem/1,
numbered_key/1,
fixed_bin_key/1]).
numbered_key/1,
fixed_bin_key/1,
convert_to_seconds/1]).
-define(RETURN_TERMS, {true, undefined}).
-define(SLOWOFFER_DELAY, 5).
@ -491,7 +493,10 @@ update_some_objects(Bookie, ObjList, SampleSize) ->
VC = Obj#r_object.vclock,
VC0 = update_vclock(VC),
[C] = Obj#r_object.contents,
C0 = C#r_content{value = leveled_rand:rand_bytes(512)},
MD = C#r_content.metadata,
MD0 = dict:store(?MD_LASTMOD, os:timestamp(), MD),
C0 = C#r_content{value = leveled_rand:rand_bytes(512),
metadata = MD0},
UpdObj = Obj#r_object{vclock = VC0, contents = [C0]},
{R, UpdObj, Spec}
end,
@ -551,6 +556,24 @@ get_value(ObjectBin) ->
error
end.
get_lastmodified(ObjectBin) ->
<<_Magic:8/integer, _Vers:8/integer, VclockLen:32/integer,
Rest1/binary>> = ObjectBin,
<<_VclockBin:VclockLen/binary, SibCount:32/integer, SibsBin/binary>> = Rest1,
case SibCount of
1 ->
<<SibLength:32/integer, Rest2/binary>> = SibsBin,
<<_ContentBin:SibLength/binary,
MetaLength:32/integer,
MetaBin:MetaLength/binary,
_Rest3/binary>> = Rest2,
<<MegaSec:32/integer,
Sec:32/integer,
MicroSec:32/integer,
_RestMetaBin/binary>> = MetaBin,
{MegaSec, Sec, MicroSec}
end.
get_vclock(ObjectBin) ->
<<_Magic:8/integer, _Vers:8/integer, VclockLen:32/integer,
Rest1/binary>> = ObjectBin,
@ -771,3 +794,5 @@ find_journals(RootPath) ->
FNsA_J),
CDBFiles.
convert_to_seconds({MegaSec, Seconds, _MicroSec}) ->
MegaSec * 1000000 + Seconds.

View file

@ -5,20 +5,12 @@
-export([
many_put_compare/1,
index_compare/1,
recent_aae_noaae/1,
recent_aae_allaae/1,
recent_aae_bucketaae/1,
recent_aae_expiry/1,
basic_headonly/1
]).
all() -> [
many_put_compare,
index_compare,
recent_aae_noaae,
recent_aae_allaae,
recent_aae_bucketaae,
recent_aae_expiry,
basic_headonly
].
@ -164,14 +156,15 @@ many_put_compare(_Config) ->
[timer:now_diff(os:timestamp(), SWB0Obj)]),
true = length(leveled_tictac:find_dirtyleaves(TreeA, TreeAObj0)) == 0,
FoldQ1 = {foldheads_bybucket,
o_rkv,
"Bucket",
all,
{FoldObjectsFun, leveled_tictac:new_tree(0, TreeSize)},
true, true, false},
InitAccTree = leveled_tictac:new_tree(0, TreeSize),
{async, TreeAObjFolder1} =
leveled_bookie:book_returnfolder(Bookie2, FoldQ1),
leveled_bookie:book_headfold(Bookie2,
?RIAK_TAG,
{range, "Bucket", all},
{FoldObjectsFun,
InitAccTree},
true, true, false),
SWB1Obj = os:timestamp(),
TreeAObj1 = TreeAObjFolder1(),
io:format("Build tictac tree via object fold with "++
@ -192,21 +185,26 @@ many_put_compare(_Config) ->
fun(_Bucket, Key, Value, Acc) ->
leveled_tictac:add_kv(Acc, Key, Value, AltExtractFun)
end,
AltFoldQ0 = {foldheads_bybucket,
o_rkv,
"Bucket",
all,
{AltFoldObjectsFun, leveled_tictac:new_tree(0, TreeSize)},
false, true, false},
{async, TreeAAltObjFolder0} =
leveled_bookie:book_returnfolder(Bookie2, AltFoldQ0),
leveled_bookie:book_headfold(Bookie2,
?RIAK_TAG,
{range, "Bucket", all},
{AltFoldObjectsFun,
InitAccTree},
false, true, false),
SWB2Obj = os:timestamp(),
TreeAAltObj = TreeAAltObjFolder0(),
io:format("Build tictac tree via object fold with no "++
"presence check and 200K objects and alt hash in ~w~n",
[timer:now_diff(os:timestamp(), SWB2Obj)]),
{async, TreeBAltObjFolder0} =
leveled_bookie:book_returnfolder(Bookie3, AltFoldQ0),
leveled_bookie:book_headfold(Bookie3,
?RIAK_TAG,
{range, "Bucket", all},
{AltFoldObjectsFun,
InitAccTree},
false, true, false),
SWB3Obj = os:timestamp(),
TreeBAltObj = TreeBAltObjFolder0(),
io:format("Build tictac tree via object fold with no "++
@ -542,478 +540,6 @@ index_compare(_Config) ->
ok = leveled_bookie:book_close(Book2D).
recent_aae_noaae(_Config) ->
% Starts databases with recent_aae tables, and attempt to query to fetch
% recent aae trees returns empty trees as no index entries are found.
TreeSize = small,
% SegmentCount = 256 * 256,
UnitMins = 2,
% Test requires multiple different databases, so want to mount them all
% on individual file paths
RootPathA = testutil:reset_filestructure("testA"),
RootPathB = testutil:reset_filestructure("testB"),
RootPathC = testutil:reset_filestructure("testC"),
RootPathD = testutil:reset_filestructure("testD"),
StartOptsA = aae_startopts(RootPathA, false),
StartOptsB = aae_startopts(RootPathB, false),
StartOptsC = aae_startopts(RootPathC, false),
StartOptsD = aae_startopts(RootPathD, false),
% Book1A to get all objects
{ok, Book1A} = leveled_bookie:book_start(StartOptsA),
% Book1B/C/D will have objects partitioned across it
{ok, Book1B} = leveled_bookie:book_start(StartOptsB),
{ok, Book1C} = leveled_bookie:book_start(StartOptsC),
{ok, Book1D} = leveled_bookie:book_start(StartOptsD),
{B1, K1, V1, S1, MD} = {"Bucket",
"Key1.1.4567.4321",
"Value1",
[],
[{"MDK1", "MDV1"}]},
{TestObject, TestSpec} = testutil:generate_testobject(B1, K1, V1, S1, MD),
SW_StartLoad = os:timestamp(),
ok = testutil:book_riakput(Book1A, TestObject, TestSpec),
ok = testutil:book_riakput(Book1B, TestObject, TestSpec),
testutil:check_forobject(Book1A, TestObject),
testutil:check_forobject(Book1B, TestObject),
{TicTacTreeJoined, TicTacTreeFull, EmptyTree, _LMDIndexes} =
load_and_check_recentaae(Book1A, Book1B, Book1C, Book1D,
SW_StartLoad, TreeSize, UnitMins,
false),
% Go compare! Also confirm we're not comparing empty trees
DL1_0 = leveled_tictac:find_dirtyleaves(TicTacTreeFull,
TicTacTreeJoined),
DL1_1 = leveled_tictac:find_dirtyleaves(TicTacTreeFull, EmptyTree),
true = DL1_0 == [],
true = length(DL1_1) == 0,
ok = leveled_bookie:book_close(Book1A),
ok = leveled_bookie:book_close(Book1B),
ok = leveled_bookie:book_close(Book1C),
ok = leveled_bookie:book_close(Book1D).
recent_aae_allaae(_Config) ->
% Leveled is started in blacklisted mode with no buckets blacklisted.
%
% A number of changes are then loaded into a store, and also partitioned
% across a separate set of three stores. A merge tree is returned from
% both the single store and the partitioned store, and proven to compare
% the same.
%
% A single change is then made, but into one half of the system only. The
% aae index is then re-queried and it is verified that a signle segment
% difference is found.
%
% The segment Id found is then used in a query to find the Keys that make
% up that segment, and the delta discovered should be just that one key
% which was known to have been changed
TreeSize = small,
% SegmentCount = 256 * 256,
UnitMins = 2,
AAE = {blacklist, [], 60, UnitMins},
% Test requires multiple different databases, so want to mount them all
% on individual file paths
RootPathA = testutil:reset_filestructure("testA"),
RootPathB = testutil:reset_filestructure("testB"),
RootPathC = testutil:reset_filestructure("testC"),
RootPathD = testutil:reset_filestructure("testD"),
StartOptsA = aae_startopts(RootPathA, AAE),
StartOptsB = aae_startopts(RootPathB, AAE),
StartOptsC = aae_startopts(RootPathC, AAE),
StartOptsD = aae_startopts(RootPathD, AAE),
% Book1A to get all objects
{ok, Book1A} = leveled_bookie:book_start(StartOptsA),
% Book1B/C/D will have objects partitioned across it
{ok, Book1B} = leveled_bookie:book_start(StartOptsB),
{ok, Book1C} = leveled_bookie:book_start(StartOptsC),
{ok, Book1D} = leveled_bookie:book_start(StartOptsD),
{B1, K1, V1, S1, MD} = {"Bucket",
"Key1.1.4567.4321",
"Value1",
[],
[{"MDK1", "MDV1"}]},
{TestObject, TestSpec} = testutil:generate_testobject(B1, K1, V1, S1, MD),
SW_StartLoad = os:timestamp(),
ok = testutil:book_riakput(Book1A, TestObject, TestSpec),
ok = testutil:book_riakput(Book1B, TestObject, TestSpec),
testutil:check_forobject(Book1A, TestObject),
testutil:check_forobject(Book1B, TestObject),
{TicTacTreeJoined, TicTacTreeFull, EmptyTree, LMDIndexes} =
load_and_check_recentaae(Book1A, Book1B, Book1C, Book1D,
SW_StartLoad, TreeSize, UnitMins,
false),
% Go compare! Also confirm we're not comparing empty trees
DL1_0 = leveled_tictac:find_dirtyleaves(TicTacTreeFull,
TicTacTreeJoined),
DL1_1 = leveled_tictac:find_dirtyleaves(TicTacTreeFull, EmptyTree),
true = DL1_0 == [],
true = length(DL1_1) > 100,
ok = leveled_bookie:book_close(Book1A),
ok = leveled_bookie:book_close(Book1B),
ok = leveled_bookie:book_close(Book1C),
ok = leveled_bookie:book_close(Book1D),
% Book2A to get all objects
{ok, Book2A} = leveled_bookie:book_start(StartOptsA),
% Book2B/C/D will have objects partitioned across it
{ok, Book2B} = leveled_bookie:book_start(StartOptsB),
{ok, Book2C} = leveled_bookie:book_start(StartOptsC),
{ok, Book2D} = leveled_bookie:book_start(StartOptsD),
{TicTacTreeJoined, TicTacTreeFull, EmptyTree, LMDIndexes} =
load_and_check_recentaae(Book2A, Book2B, Book2C, Book2D,
SW_StartLoad, TreeSize, UnitMins,
LMDIndexes),
% Go compare! Also confirm we're not comparing empty trees
DL1_0 = leveled_tictac:find_dirtyleaves(TicTacTreeFull,
TicTacTreeJoined),
DL1_1 = leveled_tictac:find_dirtyleaves(TicTacTreeFull, EmptyTree),
true = DL1_0 == [],
true = length(DL1_1) > 100,
V2 = "Value2",
{TestObject2, TestSpec2} =
testutil:generate_testobject(B1, K1, V2, S1, MD),
New_startTS = os:timestamp(),
ok = testutil:book_riakput(Book2B, TestObject2, TestSpec2),
testutil:check_forobject(Book2B, TestObject2),
testutil:check_forobject(Book2A, TestObject),
New_endTS = os:timestamp(),
NewLMDIndexes = determine_lmd_indexes(New_startTS, New_endTS, UnitMins),
{TicTacTreeJoined2, TicTacTreeFull2, _EmptyTree, NewLMDIndexes} =
load_and_check_recentaae(Book2A, Book2B, Book2C, Book2D,
New_startTS, TreeSize, UnitMins,
NewLMDIndexes),
DL2_0 = leveled_tictac:find_dirtyleaves(TicTacTreeFull2,
TicTacTreeJoined2),
% DL2_1 = leveled_tictac:find_dirtyleaves(TicTacTreeFull, EmptyTree),
true = length(DL2_0) == 1,
[DirtySeg] = DL2_0,
TermPrefix = string:right(integer_to_list(DirtySeg), 8, $0),
LMDSegFolder =
fun(LMD, {Acc, Bookie}) ->
IdxLMD = list_to_binary("$aae." ++ LMD ++ "_bin"),
IdxQ1 =
{index_query,
<<"$all">>,
{fun testutil:foldkeysfun_returnbucket/3, []},
{IdxLMD,
list_to_binary(TermPrefix ++ "."),
list_to_binary(TermPrefix ++ "|")},
{true, undefined}},
{async, IdxFolder} =
leveled_bookie:book_returnfolder(Bookie, IdxQ1),
{Acc ++ IdxFolder(), Bookie}
end,
{KeysTerms2A, _} = lists:foldl(LMDSegFolder,
{[], Book2A},
lists:usort(LMDIndexes ++ NewLMDIndexes)),
true = length(KeysTerms2A) >= 1,
{KeysTerms2B, _} = lists:foldl(LMDSegFolder,
{[], Book2B},
lists:usort(LMDIndexes ++ NewLMDIndexes)),
{KeysTerms2C, _} = lists:foldl(LMDSegFolder,
{[], Book2C},
lists:usort(LMDIndexes ++ NewLMDIndexes)),
{KeysTerms2D, _} = lists:foldl(LMDSegFolder,
{[], Book2D},
lists:usort(LMDIndexes ++ NewLMDIndexes)),
KeysTerms2Joined = KeysTerms2B ++ KeysTerms2C ++ KeysTerms2D,
DeltaX = lists:subtract(KeysTerms2A, KeysTerms2Joined),
DeltaY = lists:subtract(KeysTerms2Joined, KeysTerms2A),
io:format("DeltaX ~w~n", [DeltaX]),
io:format("DeltaY ~w~n", [DeltaY]),
true = length(DeltaX) == 0, % This hasn't seen any extra changes
true = length(DeltaY) == 1, % This has seen an extra change
[{_, {B1, K1}}] = DeltaY,
ok = leveled_bookie:book_close(Book2A),
ok = leveled_bookie:book_close(Book2B),
ok = leveled_bookie:book_close(Book2C),
ok = leveled_bookie:book_close(Book2D).
recent_aae_bucketaae(_Config) ->
% Configure AAE to work only on a single whitelisted bucket
% Confirm that we can spot a delta in this bucket, but not
% in another bucket
TreeSize = small,
% SegmentCount = 256 * 256,
UnitMins = 2,
AAE = {whitelist, [<<"Bucket">>], 60, UnitMins},
% Test requires multiple different databases, so want to mount them all
% on individual file paths
RootPathA = testutil:reset_filestructure("testA"),
RootPathB = testutil:reset_filestructure("testB"),
RootPathC = testutil:reset_filestructure("testC"),
RootPathD = testutil:reset_filestructure("testD"),
StartOptsA = aae_startopts(RootPathA, AAE),
StartOptsB = aae_startopts(RootPathB, AAE),
StartOptsC = aae_startopts(RootPathC, AAE),
StartOptsD = aae_startopts(RootPathD, AAE),
% Book1A to get all objects
{ok, Book1A} = leveled_bookie:book_start(StartOptsA),
% Book1B/C/D will have objects partitioned across it
{ok, Book1B} = leveled_bookie:book_start(StartOptsB),
{ok, Book1C} = leveled_bookie:book_start(StartOptsC),
{ok, Book1D} = leveled_bookie:book_start(StartOptsD),
{B1, K1, V1, S1, MD} = {<<"Bucket">>,
"Key1.1.4567.4321",
"Value1",
[],
[{"MDK1", "MDV1"}]},
{TestObject, TestSpec} = testutil:generate_testobject(B1, K1, V1, S1, MD),
SW_StartLoad = os:timestamp(),
ok = testutil:book_riakput(Book1A, TestObject, TestSpec),
ok = testutil:book_riakput(Book1B, TestObject, TestSpec),
testutil:check_forobject(Book1A, TestObject),
testutil:check_forobject(Book1B, TestObject),
{TicTacTreeJoined, TicTacTreeFull, EmptyTree, LMDIndexes} =
load_and_check_recentaae(Book1A, Book1B, Book1C, Book1D,
SW_StartLoad, TreeSize, UnitMins,
false, <<"Bucket">>),
% Go compare! Also confirm we're not comparing empty trees
DL1_0 = leveled_tictac:find_dirtyleaves(TicTacTreeFull,
TicTacTreeJoined),
DL1_1 = leveled_tictac:find_dirtyleaves(TicTacTreeFull, EmptyTree),
true = DL1_0 == [],
true = length(DL1_1) > 100,
ok = leveled_bookie:book_close(Book1A),
ok = leveled_bookie:book_close(Book1B),
ok = leveled_bookie:book_close(Book1C),
ok = leveled_bookie:book_close(Book1D),
% Book2A to get all objects
{ok, Book2A} = leveled_bookie:book_start(StartOptsA),
% Book2B/C/D will have objects partitioned across it
{ok, Book2B} = leveled_bookie:book_start(StartOptsB),
{ok, Book2C} = leveled_bookie:book_start(StartOptsC),
{ok, Book2D} = leveled_bookie:book_start(StartOptsD),
% Change the value for a key in another bucket
% If we get trees for this period, no difference should be found
V2 = "Value2",
{TestObject2, TestSpec2} =
testutil:generate_testobject(<<"NotBucket">>, K1, V2, S1, MD),
New_startTS2 = os:timestamp(),
ok = testutil:book_riakput(Book2B, TestObject2, TestSpec2),
testutil:check_forobject(Book2B, TestObject2),
testutil:check_forobject(Book2A, TestObject),
New_endTS2 = os:timestamp(),
NewLMDIndexes2 = determine_lmd_indexes(New_startTS2, New_endTS2, UnitMins),
{TicTacTreeJoined2, TicTacTreeFull2, _EmptyTree, NewLMDIndexes2} =
load_and_check_recentaae(Book2A, Book2B, Book2C, Book2D,
New_startTS2, TreeSize, UnitMins,
NewLMDIndexes2, <<"Bucket">>),
DL2_0 = leveled_tictac:find_dirtyleaves(TicTacTreeFull2,
TicTacTreeJoined2),
true = length(DL2_0) == 0,
% Now create an object that is a change to an existing key in the
% monitored bucket. A differrence should be found
{TestObject3, TestSpec3} =
testutil:generate_testobject(B1, K1, V2, S1, MD),
New_startTS3 = os:timestamp(),
ok = testutil:book_riakput(Book2B, TestObject3, TestSpec3),
testutil:check_forobject(Book2B, TestObject3),
testutil:check_forobject(Book2A, TestObject),
New_endTS3 = os:timestamp(),
NewLMDIndexes3 = determine_lmd_indexes(New_startTS3, New_endTS3, UnitMins),
{TicTacTreeJoined3, TicTacTreeFull3, _EmptyTree, NewLMDIndexes3} =
load_and_check_recentaae(Book2A, Book2B, Book2C, Book2D,
New_startTS3, TreeSize, UnitMins,
NewLMDIndexes3, <<"Bucket">>),
DL3_0 = leveled_tictac:find_dirtyleaves(TicTacTreeFull3,
TicTacTreeJoined3),
% DL2_1 = leveled_tictac:find_dirtyleaves(TicTacTreeFull, EmptyTree),
true = length(DL3_0) == 1,
% Find the dirty segment, and use that to find the dirty key
%
% Note that unlike when monitoring $all, fold_keys can be used as there
% is no need to return the Bucket (as hte bucket is known)
[DirtySeg] = DL3_0,
TermPrefix = string:right(integer_to_list(DirtySeg), 8, $0),
LMDSegFolder =
fun(LMD, {Acc, Bookie}) ->
IdxLMD = list_to_binary("$aae." ++ LMD ++ "_bin"),
IdxQ1 =
{index_query,
<<"Bucket">>,
{fun testutil:foldkeysfun/3, []},
{IdxLMD,
list_to_binary(TermPrefix ++ "."),
list_to_binary(TermPrefix ++ "|")},
{true, undefined}},
{async, IdxFolder} =
leveled_bookie:book_returnfolder(Bookie, IdxQ1),
{Acc ++ IdxFolder(), Bookie}
end,
{KeysTerms2A, _} = lists:foldl(LMDSegFolder,
{[], Book2A},
lists:usort(LMDIndexes ++ NewLMDIndexes3)),
true = length(KeysTerms2A) >= 1,
{KeysTerms2B, _} = lists:foldl(LMDSegFolder,
{[], Book2B},
lists:usort(LMDIndexes ++ NewLMDIndexes3)),
{KeysTerms2C, _} = lists:foldl(LMDSegFolder,
{[], Book2C},
lists:usort(LMDIndexes ++ NewLMDIndexes3)),
{KeysTerms2D, _} = lists:foldl(LMDSegFolder,
{[], Book2D},
lists:usort(LMDIndexes ++ NewLMDIndexes3)),
KeysTerms2Joined = KeysTerms2B ++ KeysTerms2C ++ KeysTerms2D,
DeltaX = lists:subtract(KeysTerms2A, KeysTerms2Joined),
DeltaY = lists:subtract(KeysTerms2Joined, KeysTerms2A),
io:format("DeltaX ~w~n", [DeltaX]),
io:format("DeltaY ~w~n", [DeltaY]),
true = length(DeltaX) == 0, % This hasn't seen any extra changes
true = length(DeltaY) == 1, % This has seen an extra change
[{_, K1}] = DeltaY,
ok = leveled_bookie:book_close(Book2A),
ok = leveled_bookie:book_close(Book2B),
ok = leveled_bookie:book_close(Book2C),
ok = leveled_bookie:book_close(Book2D).
recent_aae_expiry(_Config) ->
% Proof that the index entries are indeed expired
TreeSize = small,
% SegmentCount = 256 * 256,
UnitMins = 1,
TotalMins = 2,
AAE = {blacklist, [], TotalMins, UnitMins},
% Test requires multiple different databases, so want to mount them all
% on individual file paths
RootPathA = testutil:reset_filestructure("testA"),
StartOptsA = aae_startopts(RootPathA, AAE),
% Book1A to get all objects
{ok, Book1A} = leveled_bookie:book_start(StartOptsA),
GenMapFun =
fun(_X) ->
V = testutil:get_compressiblevalue(),
Indexes = testutil:get_randomindexes_generator(8),
testutil:generate_objects(5000,
binary_uuid,
[],
V,
Indexes)
end,
ObjLists = lists:map(GenMapFun, lists:seq(1, 3)),
SW0 = os:timestamp(),
% Load all nine lists into Book1A
lists:foreach(fun(ObjL) -> testutil:riakload(Book1A, ObjL) end,
ObjLists),
SW1 = os:timestamp(),
% sleep for two minutes, so all index entries will have expired
GetTicTacTreeFun =
fun(Bookie) ->
get_tictactree_fun(Bookie, <<"$all">>, TreeSize)
end,
EmptyTree = leveled_tictac:new_tree(empty, TreeSize),
LMDIndexes = determine_lmd_indexes(SW0, SW1, UnitMins),
% Should get a non-empty answer to the query
TicTacTree1_Full =
lists:foldl(GetTicTacTreeFun(Book1A), EmptyTree, LMDIndexes),
DL3_0 = leveled_tictac:find_dirtyleaves(TicTacTree1_Full, EmptyTree),
io:format("Dirty leaves found before expiry ~w~n", [length(DL3_0)]),
true = length(DL3_0) > 0,
SecondsSinceLMD = timer:now_diff(os:timestamp(), SW0) div 1000000,
SecondsToExpiry = (TotalMins + UnitMins) * 60,
io:format("SecondsToExpiry ~w SecondsSinceLMD ~w~n",
[SecondsToExpiry, SecondsSinceLMD]),
io:format("LMDIndexes ~w~n", [LMDIndexes]),
case SecondsToExpiry > SecondsSinceLMD of
true ->
timer:sleep((1 + SecondsToExpiry - SecondsSinceLMD) * 1000);
false ->
timer:sleep(1000)
end,
% Should now get an empty answer - all entries have expired
TicTacTree2_Full =
lists:foldl(GetTicTacTreeFun(Book1A), EmptyTree, LMDIndexes),
DL4_0 = leveled_tictac:find_dirtyleaves(TicTacTree2_Full, EmptyTree),
io:format("Dirty leaves found after expiry ~w~n", [length(DL4_0)]),
timer:sleep(10000),
TicTacTree3_Full =
lists:foldl(GetTicTacTreeFun(Book1A), EmptyTree, LMDIndexes),
DL5_0 = leveled_tictac:find_dirtyleaves(TicTacTree3_Full, EmptyTree),
io:format("Dirty leaves found after expiry plus 10s ~w~n", [length(DL5_0)]),
ok = leveled_bookie:book_close(Book1A),
true = length(DL4_0) == 0.
basic_headonly(_Config) ->
ObjectCount = 200000,
RemoveCount = 100,
@ -1069,7 +595,8 @@ basic_headonly_test(ObjectCount, RemoveCount, HeadOnly) ->
InitAcc = {0, 0},
RunnerDefinition =
{foldheads_allkeys, h, {FoldFun, InitAcc}, false, false, false},
{foldheads_allkeys, h, {FoldFun, InitAcc},
false, false, false, false, false},
{async, Runner1} =
leveled_bookie:book_returnfolder(Bookie1, RunnerDefinition),
@ -1196,145 +723,6 @@ load_objectspecs(ObjectSpecL, SliceSize, Bookie) ->
end.
load_and_check_recentaae(Book1A, Book1B, Book1C, Book1D,
SW_StartLoad, TreeSize, UnitMins,
LMDIndexes_Loaded) ->
load_and_check_recentaae(Book1A, Book1B, Book1C, Book1D,
SW_StartLoad, TreeSize, UnitMins,
LMDIndexes_Loaded, <<"$all">>).
load_and_check_recentaae(Book1A, Book1B, Book1C, Book1D,
SW_StartLoad, TreeSize, UnitMins,
LMDIndexes_Loaded, Bucket) ->
LMDIndexes =
case LMDIndexes_Loaded of
false ->
% Generate nine lists of objects
% BucketBin = list_to_binary("Bucket"),
GenMapFun =
fun(_X) ->
V = testutil:get_compressiblevalue(),
Indexes = testutil:get_randomindexes_generator(8),
testutil:generate_objects(5000,
binary_uuid,
[],
V,
Indexes)
end,
ObjLists = lists:map(GenMapFun, lists:seq(1, 9)),
% Load all nine lists into Book1A
lists:foreach(fun(ObjL) -> testutil:riakload(Book1A, ObjL) end,
ObjLists),
% Split nine lists across Book1B to Book1D, three object lists
% in each
lists:foreach(fun(ObjL) -> testutil:riakload(Book1B, ObjL) end,
lists:sublist(ObjLists, 1, 3)),
lists:foreach(fun(ObjL) -> testutil:riakload(Book1C, ObjL) end,
lists:sublist(ObjLists, 4, 3)),
lists:foreach(fun(ObjL) -> testutil:riakload(Book1D, ObjL) end,
lists:sublist(ObjLists, 7, 3)),
SW_EndLoad = os:timestamp(),
determine_lmd_indexes(SW_StartLoad, SW_EndLoad, UnitMins);
_ ->
LMDIndexes_Loaded
end,
EmptyTree = leveled_tictac:new_tree(empty, TreeSize),
GetTicTacTreeFun =
fun(Bookie) ->
get_tictactree_fun(Bookie, Bucket, TreeSize)
end,
% Get a TicTac tree representing one of the indexes in Bucket A
TicTacTree1_Full =
lists:foldl(GetTicTacTreeFun(Book1A), EmptyTree, LMDIndexes),
TicTacTree1_P1 =
lists:foldl(GetTicTacTreeFun(Book1B), EmptyTree, LMDIndexes),
TicTacTree1_P2 =
lists:foldl(GetTicTacTreeFun(Book1C), EmptyTree, LMDIndexes),
TicTacTree1_P3 =
lists:foldl(GetTicTacTreeFun(Book1D), EmptyTree, LMDIndexes),
% Merge the tree across the partitions
TicTacTree1_Joined = lists:foldl(fun leveled_tictac:merge_trees/2,
TicTacTree1_P1,
[TicTacTree1_P2, TicTacTree1_P3]),
{TicTacTree1_Full, TicTacTree1_Joined, EmptyTree, LMDIndexes}.
aae_startopts(RootPath, AAE) ->
LS = 2000,
JS = 50000000,
SS = testutil:sync_strategy(),
[{root_path, RootPath},
{sync_strategy, SS},
{cache_size, LS},
{max_journalsize, JS},
{recent_aae, AAE}].
determine_lmd_indexes(StartTS, EndTS, UnitMins) ->
StartDT = calendar:now_to_datetime(StartTS),
EndDT = calendar:now_to_datetime(EndTS),
StartTimeStr = get_strtime(StartDT, UnitMins),
EndTimeStr = get_strtime(EndDT, UnitMins),
AddTimeFun =
fun(X, Acc) ->
case lists:member(EndTimeStr, Acc) of
true ->
Acc;
false ->
NextTime =
UnitMins * 60 * X +
calendar:datetime_to_gregorian_seconds(StartDT),
NextDT =
calendar:gregorian_seconds_to_datetime(NextTime),
Acc ++ [get_strtime(NextDT, UnitMins)]
end
end,
lists:foldl(AddTimeFun, [StartTimeStr], lists:seq(1, 10)).
get_strtime(DateTime, UnitMins) ->
{{Y, M, D}, {Hour, Minute, _Second}} = DateTime,
RoundMins =
UnitMins * (Minute div UnitMins),
StrTime =
lists:flatten(io_lib:format(?LMD_FORMAT,
[Y, M, D, Hour, RoundMins])),
StrTime.
get_tictactree_fun(Bookie, Bucket, TreeSize) ->
fun(LMD, Acc) ->
SW = os:timestamp(),
ST = <<"0">>,
ET = <<"A">>,
Q = {tictactree_idx,
{Bucket,
list_to_binary("$aae." ++ LMD ++ "_bin"),
ST,
ET},
TreeSize,
fun(_B, _K) -> accumulate end},
{async, Folder} = leveled_bookie:book_returnfolder(Bookie, Q),
R = Folder(),
io:format("TicTac Tree for index ~s took " ++
"~w microseconds~n",
[LMD, timer:now_diff(os:timestamp(), SW)]),
leveled_tictac:merge_trees(R, Acc)
end.
get_segment(K, SegmentCount) ->
BinKey =
case is_binary(K) of