TupleBuckets in Riak objects

Adds support with test for tuplebuckets in Riak keys.

This exposed that there was no filter using the seglist on the in-mmemory keys.  This means that if there is no filter applied in the fold_function, many false positives may emerge.

This is probably not a big performance benefit (and indeed for performance it may be better to apply during the leveled_pmem:merge_trees).

Some thought still required as to what is more likely to contribute to future bugs: an extra location using the hash matching found in leveled_sst, or the extra results in the query.
This commit is contained in:
Martin Sumner 2018-11-05 01:21:08 +00:00
parent 37cdb22979
commit e72a946f43
5 changed files with 256 additions and 9 deletions

View file

@ -36,6 +36,7 @@
inker_reload_strategy/1, inker_reload_strategy/1,
strip_to_seqonly/1, strip_to_seqonly/1,
strip_to_statusonly/1, strip_to_statusonly/1,
strip_to_segmentonly/1,
strip_to_keyseqonly/1, strip_to_keyseqonly/1,
strip_to_indexdetails/1, strip_to_indexdetails/1,
striphead_to_v1details/1, striphead_to_v1details/1,
@ -174,12 +175,24 @@ segment_hash(Key) when is_binary(Key) ->
segment_hash({?RIAK_TAG, Bucket, Key, null}) segment_hash({?RIAK_TAG, Bucket, Key, null})
when is_binary(Bucket), is_binary(Key) -> when is_binary(Bucket), is_binary(Key) ->
segment_hash(<<Bucket/binary, Key/binary>>); segment_hash(<<Bucket/binary, Key/binary>>);
segment_hash({?RIAK_TAG, {BucketType, Bucket}, Key, SubKey})
when is_binary(BucketType), is_binary(Bucket) ->
segment_hash({?RIAK_TAG,
<<BucketType/binary, Bucket/binary>>,
Key,
SubKey});
segment_hash({?HEAD_TAG, Bucket, Key, SubK}) segment_hash({?HEAD_TAG, Bucket, Key, SubK})
when is_binary(Bucket), is_binary(Key), is_binary(SubK) -> when is_binary(Bucket), is_binary(Key), is_binary(SubK) ->
segment_hash(<<Bucket/binary, Key/binary, SubK/binary>>); segment_hash(<<Bucket/binary, Key/binary, SubK/binary>>);
segment_hash({?HEAD_TAG, Bucket, Key, _SubK}) segment_hash({?HEAD_TAG, Bucket, Key, _SubK})
when is_binary(Bucket), is_binary(Key) -> when is_binary(Bucket), is_binary(Key) ->
segment_hash(<<Bucket/binary, Key/binary>>); segment_hash(<<Bucket/binary, Key/binary>>);
% segment_hash({?HEAD_TAG, {BucketType, Bucket}, Key, SubKey})
% when is_binary(BucketType), is_binary(Bucket) ->
% segment_hash({?HEAD_TAG,
% <<BucketType/binary, Bucket/binary>>,
% Key,
% SubKey});
segment_hash(Key) -> segment_hash(Key) ->
segment_hash(term_to_binary(Key)). segment_hash(term_to_binary(Key)).
@ -207,6 +220,9 @@ strip_to_statusonly({_, V}) -> element(2, V).
-spec strip_to_seqonly(ledger_kv()) -> non_neg_integer(). -spec strip_to_seqonly(ledger_kv()) -> non_neg_integer().
strip_to_seqonly({_, V}) -> element(1, V). strip_to_seqonly({_, V}) -> element(1, V).
-spec strip_to_segmentonly(ledger_kv()) -> segment_hash().
strip_to_segmentonly({_LK, LV}) -> element(3, LV).
-spec strip_to_keyseqonly(ledger_kv()) -> {ledger_key(), integer()}. -spec strip_to_keyseqonly(ledger_kv()) -> {ledger_key(), integer()}.
strip_to_keyseqonly({LK, V}) -> {LK, element(1, V)}. strip_to_keyseqonly({LK, V}) -> {LK, element(1, V)}.

View file

@ -724,6 +724,22 @@ handle_call({fetch_keys,
List -> List ->
List List
end, end,
FilteredL0 =
case SegmentList of
false ->
L0AsList;
_ ->
TunedList = leveled_sst:tune_seglist(SegmentList),
FilterFun =
fun(LKV) ->
CheckSeg =
leveled_sst:extract_hash(
leveled_codec:strip_to_segmentonly(LKV)),
lists:member(CheckSeg, TunedList)
end,
lists:filter(FilterFun, L0AsList)
end,
leveled_log:log_randomtimer("P0037", leveled_log:log_randomtimer("P0037",
[State#state.levelzero_size], [State#state.levelzero_size],
SW, SW,
@ -742,7 +758,7 @@ handle_call({fetch_keys,
SSTiter = lists:foldl(SetupFoldFun, [], lists:seq(0, ?MAX_LEVELS - 1)), SSTiter = lists:foldl(SetupFoldFun, [], lists:seq(0, ?MAX_LEVELS - 1)),
Folder = Folder =
fun() -> fun() ->
keyfolder({L0AsList, SSTiter}, keyfolder({FilteredL0, SSTiter},
{StartKey, EndKey}, {StartKey, EndKey},
{AccFun, InitAcc}, {AccFun, InitAcc},
{SegmentList, LastModRange0, MaxKeys}) {SegmentList, LastModRange0, MaxKeys})

View file

@ -121,7 +121,7 @@
sst_deleteconfirmed/1, sst_deleteconfirmed/1,
sst_close/1]). sst_close/1]).
-export([tune_seglist/1, extract_hash/1]).
-record(slot_index_value, {slot_id :: integer(), -record(slot_index_value, {slot_id :: integer(),
start_position :: integer(), start_position :: integer(),
@ -879,7 +879,7 @@ fetch(LedgerKey, Hash, State, Timings0) ->
State#state{blockindex_cache = BlockIndexCache}, State#state{blockindex_cache = BlockIndexCache},
Timings3}; Timings3};
{BlockLengths, _LMD, PosBin} -> {BlockLengths, _LMD, PosBin} ->
PosList = find_pos(PosBin, extra_hash(Hash), [], 0), PosList = find_pos(PosBin, extract_hash(Hash), [], 0),
case PosList of case PosList of
[] -> [] ->
{_SW3, Timings3} = {_SW3, Timings3} =
@ -1290,7 +1290,7 @@ lookup_slots(StartKey, EndKey, Tree) ->
accumulate_positions({K, V}, {PosBinAcc, NoHashCount, HashAcc, LMDAcc}) -> accumulate_positions({K, V}, {PosBinAcc, NoHashCount, HashAcc, LMDAcc}) ->
{_SQN, H1, LMD} = leveled_codec:strip_to_indexdetails({K, V}), {_SQN, H1, LMD} = leveled_codec:strip_to_indexdetails({K, V}),
LMDAcc0 = take_max_lastmoddate(LMD, LMDAcc), LMDAcc0 = take_max_lastmoddate(LMD, LMDAcc),
PosH1 = extra_hash(H1), PosH1 = extract_hash(H1),
case is_integer(PosH1) of case is_integer(PosH1) of
true -> true ->
case NoHashCount of case NoHashCount of
@ -1725,7 +1725,7 @@ binaryslot_get(FullBin, Key, Hash, PressMethod, IdxModDate) ->
{BlockLengths, _LMD, PosBinIndex} = {BlockLengths, _LMD, PosBinIndex} =
extract_header(Header, IdxModDate), extract_header(Header, IdxModDate),
PosList = find_pos(PosBinIndex, PosList = find_pos(PosBinIndex,
extra_hash(Hash), extract_hash(Hash),
[], [],
0), 0),
{fetch_value(PosList, BlockLengths, Blocks, Key, PressMethod), {fetch_value(PosList, BlockLengths, Blocks, Key, PressMethod),
@ -1926,9 +1926,9 @@ block_offsetandlength(BlockLengths, BlockID) ->
{B1L + B2L + B3L + B4L, B5L} {B1L + B2L + B3L + B4L, B5L}
end. end.
extra_hash({SegHash, _ExtraHash}) when is_integer(SegHash) -> extract_hash({SegHash, _ExtraHash}) when is_integer(SegHash) ->
tune_hash(SegHash); tune_hash(SegHash);
extra_hash(NotHash) -> extract_hash(NotHash) ->
NotHash. NotHash.
cache_hash({_SegHash, ExtraHash}) when is_integer(ExtraHash) -> cache_hash({_SegHash, ExtraHash}) when is_integer(ExtraHash) ->
@ -2658,8 +2658,8 @@ indexed_list_mixedkeys_bitflip_test() ->
ToList = binaryslot_tolist(SlotBin, native, ?INDEX_MODDATE), ToList = binaryslot_tolist(SlotBin, native, ?INDEX_MODDATE),
?assertMatch(Keys, ToList), ?assertMatch(Keys, ToList),
[Pos1] = find_pos(PosBin, extra_hash(MH1), [], 0), [Pos1] = find_pos(PosBin, extract_hash(MH1), [], 0),
[Pos2] = find_pos(PosBin, extra_hash(MH2), [], 0), [Pos2] = find_pos(PosBin, extract_hash(MH2), [], 0),
{BN1, _BP1} = revert_position(Pos1), {BN1, _BP1} = revert_position(Pos1),
{BN2, _BP2} = revert_position(Pos2), {BN2, _BP2} = revert_position(Pos2),
{Offset1, Length1} = block_offsetandlength(Header, BN1), {Offset1, Length1} = block_offsetandlength(Header, BN1),

View file

@ -3,6 +3,7 @@
-include("include/leveled.hrl"). -include("include/leveled.hrl").
-export([all/0]). -export([all/0]).
-export([ -export([
basic_riak/1,
fetchclocks_modifiedbetween/1, fetchclocks_modifiedbetween/1,
crossbucket_aae/1, crossbucket_aae/1,
handoff/1, handoff/1,
@ -11,6 +12,7 @@
]). ]).
all() -> [ all() -> [
basic_riak,
fetchclocks_modifiedbetween, fetchclocks_modifiedbetween,
crossbucket_aae, crossbucket_aae,
handoff, handoff,
@ -21,6 +23,215 @@ all() -> [
-define(MAGIC, 53). % riak_kv -> riak_object -define(MAGIC, 53). % riak_kv -> riak_object
basic_riak(_Config) ->
basic_riak_tester(<<"B0">>, 120000),
basic_riak_tester({<<"Type0">>, <<"B0">>}, 80000).
basic_riak_tester(Bucket, KeyCount) ->
% Key Count should be > 10K and divisible by 5
io:format("Basic riak test with Bucket ~w KeyCount ~w~n",
[Bucket, KeyCount]),
IndexCount = 20,
RootPath = testutil:reset_filestructure("basicRiak"),
StartOpts1 = [{root_path, RootPath},
{max_journalsize, 500000000},
{max_pencillercachesize, 24000},
{sync_strategy, testutil:sync_strategy()}],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
IndexGenFun =
fun(ListID) ->
fun() ->
RandInt = leveled_rand:uniform(IndexCount),
ID = integer_to_list(ListID),
[{add,
list_to_binary("integer" ++ ID ++ "_int"),
RandInt},
{add,
list_to_binary("binary" ++ ID ++ "_bin"),
<<RandInt:32/integer>>}]
end
end,
CountPerList = KeyCount div 5,
ObjList1 =
testutil:generate_objects(CountPerList,
{fixed_binary, 1}, [],
leveled_rand:rand_bytes(512),
IndexGenFun(1),
Bucket),
ObjList2 =
testutil:generate_objects(CountPerList,
{fixed_binary, CountPerList + 1}, [],
leveled_rand:rand_bytes(512),
IndexGenFun(2),
Bucket),
ObjList3 =
testutil:generate_objects(CountPerList,
{fixed_binary, 2 * CountPerList + 1}, [],
leveled_rand:rand_bytes(512),
IndexGenFun(3),
Bucket),
ObjList4 =
testutil:generate_objects(CountPerList,
{fixed_binary, 3 * CountPerList + 1}, [],
leveled_rand:rand_bytes(512),
IndexGenFun(4),
Bucket),
ObjList5 =
testutil:generate_objects(CountPerList,
{fixed_binary, 4 * CountPerList + 1}, [],
leveled_rand:rand_bytes(512),
IndexGenFun(5),
Bucket),
% Mix with the ordering on the load, just in case ordering hides issues
testutil:riakload(Bookie1, ObjList4),
testutil:riakload(Bookie1, ObjList1),
testutil:riakload(Bookie1, ObjList3),
testutil:riakload(Bookie1, ObjList5),
testutil:riakload(Bookie1, ObjList2),
% This needs to stay last,
% as the last key of this needs to be the last key added
% so that headfold check, checks something in memory
% Take a subset, and do some HEAD/GET requests
SubList1 = lists:sublist(lists:ukeysort(1, ObjList1), 1000),
SubList5 = lists:sublist(lists:ukeysort(1, ObjList5), 1000),
ok = testutil:check_forlist(Bookie1, SubList1),
ok = testutil:check_forlist(Bookie1, SubList5),
ok = testutil:checkhead_forlist(Bookie1, SubList1),
ok = testutil:checkhead_forlist(Bookie1, SubList5),
FoldKeysFun = fun(_B, K, Acc) -> [K|Acc] end,
IntIndexFold =
fun(Idx, Book) ->
fun(IC, CountAcc) ->
ID = integer_to_list(Idx),
Index = list_to_binary("integer" ++ ID ++ "_int"),
{async, R} =
leveled_bookie:book_indexfold(Book,
{Bucket, <<>>},
{FoldKeysFun, []},
{Index,
IC,
IC},
{true, undefined}),
KTL = R(),
CountAcc + length(KTL)
end
end,
BinIndexFold =
fun(Idx, Book) ->
fun(IC, CountAcc) ->
ID = integer_to_list(Idx),
Index = list_to_binary("binary" ++ ID ++ "_bin"),
{async, R} =
leveled_bookie:book_indexfold(Book,
{Bucket, <<>>},
{FoldKeysFun, []},
{Index,
<<IC:32/integer>>,
<<IC:32/integer>>},
{true, undefined}),
KTL = R(),
CountAcc + length(KTL)
end
end,
SWA = os:timestamp(),
TotalIndexEntries2 =
lists:foldl(IntIndexFold(2, Bookie1), 0, lists:seq(1, IndexCount)),
io:format("~w queries returned count=~w in ~w ms~n",
[IndexCount,
TotalIndexEntries2,
timer:now_diff(os:timestamp(), SWA)/1000]),
true = TotalIndexEntries2 == length(ObjList2),
SWB = os:timestamp(),
TotalIndexEntries4 =
lists:foldl(IntIndexFold(4, Bookie1), 0, lists:seq(1, IndexCount)),
io:format("~w queries returned count=~w in ~w ms~n",
[IndexCount,
TotalIndexEntries4,
timer:now_diff(os:timestamp(), SWB)/1000]),
true = TotalIndexEntries4 == length(ObjList4),
SWC = os:timestamp(),
TotalIndexEntries3 =
lists:foldl(BinIndexFold(3, Bookie1), 0, lists:seq(1, IndexCount)),
io:format("~w queries returned count=~w in ~w ms~n",
[IndexCount,
TotalIndexEntries3,
timer:now_diff(os:timestamp(), SWC)/1000]),
true = TotalIndexEntries3 == length(ObjList3),
ok = leveled_bookie:book_close(Bookie1),
StartOpts2 = [{root_path, RootPath},
{max_journalsize, 200000000},
{max_pencillercachesize, 12000},
{sync_strategy, testutil:sync_strategy()}],
{ok, Bookie2} = leveled_bookie:book_start(StartOpts2),
ok = testutil:check_forlist(Bookie2, SubList5),
ok = testutil:checkhead_forlist(Bookie2, SubList1),
TotalIndexEntries4B =
lists:foldl(IntIndexFold(4, Bookie2), 0, lists:seq(1, IndexCount)),
true = TotalIndexEntries4B == length(ObjList4),
TotalIndexEntries3B =
lists:foldl(BinIndexFold(3, Bookie2), 0, lists:seq(1, IndexCount)),
true = TotalIndexEntries3B == length(ObjList3),
HeadFoldFun = fun(B, K, _Hd, Acc) -> [{B, K}|Acc] end,
[{_I1, Obj1, _Spc1}|_Rest1] = ObjList1,
[{_I2, Obj2, _Spc2}|_Rest2] = ObjList2,
[{_I3, Obj3, _Spc3}|_Rest3] = ObjList3,
[{_I4, Obj4, _Spc4}|_Rest4] = ObjList4,
[{_I5, Obj5, _Spc5}|_Rest5] = ObjList5,
{_I2L, Obj2L, _Spc2L} = lists:last(ObjList2),
SegList =
lists:map(fun(Obj) -> get_aae_segment(Obj) end,
[Obj1, Obj2, Obj3, Obj4, Obj5, Obj2L]),
BKList =
lists:map(fun(Obj) ->
{testutil:get_bucket(Obj), testutil:get_key(Obj)}
end,
[Obj1, Obj2, Obj3, Obj4, Obj5, Obj2L]),
{async, HeadR} =
leveled_bookie:book_headfold(Bookie2,
?RIAK_TAG,
{HeadFoldFun, []},
true, false,
SegList),
KLBySeg = HeadR(),
io:format("SegList Headfold returned ~w heads~n", [length(KLBySeg)]),
true = length(KLBySeg) < KeyCount div 1000, % not too many false answers
KLBySegRem = lists:subtract(KLBySeg, BKList),
true = length(KLBySeg) - length(KLBySegRem) == length(BKList),
ok = leveled_bookie:book_destroy(Bookie2).
get_aae_segment(Obj) ->
get_aae_segment(testutil:get_bucket(Obj), testutil:get_key(Obj)).
get_aae_segment({Type, Bucket}, Key) ->
leveled_tictac:keyto_segment32(<<Type/binary, Bucket/binary, Key/binary>>);
get_aae_segment(Bucket, Key) ->
leveled_tictac:keyto_segment32(<<Bucket/binary, Key/binary>>).
fetchclocks_modifiedbetween(_Config) -> fetchclocks_modifiedbetween(_Config) ->
RootPathA = testutil:reset_filestructure("fetchClockA"), RootPathA = testutil:reset_filestructure("fetchClockA"),
RootPathB = testutil:reset_filestructure("fetchClockB"), RootPathB = testutil:reset_filestructure("fetchClockB"),

View file

@ -25,6 +25,7 @@
generate_objects/5, generate_objects/5,
generate_objects/6, generate_objects/6,
set_object/5, set_object/5,
get_bucket/1,
get_key/1, get_key/1,
get_value/1, get_value/1,
get_vclock/1, get_vclock/1,
@ -536,6 +537,9 @@ actor_list() ->
[{1, albert}, {2, bertie}, {3, clara}, {4, dave}, {5, elton}, [{1, albert}, {2, bertie}, {3, clara}, {4, dave}, {5, elton},
{6, fred}, {7, george}, {8, harry}, {9, isaac}, {10, leila}]. {6, fred}, {7, george}, {8, harry}, {9, isaac}, {10, leila}].
get_bucket(Object) ->
Object#r_object.bucket.
get_key(Object) -> get_key(Object) ->
Object#r_object.key. Object#r_object.key.