Mas d31 nhskv16sst (#428)

* Add performance/profiling test

Add test to perf_SUITE to do performance tests and also profile different activities in leveled.

This can then be used to highlight functions with unexpectedly high execution times, and prove the impact of changes.

Switch between riak_ctperf and riak_fullperf to change from standard test (with profile option) to full-scale performance test

* Change shape of default perfTest

* Refactor SST

Compare and contrast profile for guess, before and after refactor:

pre

```
lists:map_1/2                                         313370     2.33    32379  [      0.10]

lists:foldl_1/3                                       956590     4.81    66992  [      0.07]

leveled_sst:'-expand_list_by_pointer/5-fun-0-'/4      925020     6.13    85318  [      0.09]

erlang:binary_to_term/1                                 3881     8.55   119012  [     30.67]

erlang:'++'/2                                         974322    11.55   160724  [      0.16]

lists:member/2                                       4000180    15.00   208697  [      0.05]

leveled_sst:find_pos/4                               4029220    21.01   292347  [      0.07]

leveled_sst:member_check/2                           4000000    21.17   294601  [      0.07]

--------------------------------------------------  --------  -------  -------  [----------]

Total:                                              16894665  100.00%  1391759  [      0.08]
```

post

```
lists:map_1/2                                         63800     0.79    6795  [      0.11]

erlang:term_to_binary/1                               15726     0.81    6950  [      0.44]

lists:keyfind/3                                      180967     0.92    7884  [      0.04]

erlang:spawn_link/3                                   15717     1.08    9327  [      0.59]

leveled_sst:'-read_slots/5-fun-1-'/8                  31270     1.15    9895  [      0.32]

gen:do_call/4                                          7881     1.31   11243  [      1.43]

leveled_penciller:find_nextkey/8                     180936     2.01   17293  [      0.10]

prim_file:pread_nif/3                                 15717     3.89   33437  [      2.13]

leveled_sst:find_pos/4                              4028940    17.85  153554  [      0.04]

erlang:binary_to_term/1                               15717    51.97  447048  [     28.44]

--------------------------------------------------  -------  -------  ------  [----------]

Total:                                              6704100  100.00%  860233  [      0.13]

```

* Update leveled_penciller.erl

* Mas d31 nhskv16sstpcl (#426)

Performance updates to leveled:

- Refactoring of pointer expansion when fetching from leveled_sst files to avoid expensive list concatenation.
- Refactoring of leveled_ebloom to make more flexible, reduce code, and improve check time.
- Refactoring of querying within leveled_sst to reduce the number of blocks that need to be de-serialised per query.
- Refactoring of the leveled_penciller's query key comparator, to make use of maps and simplify the filtering.
- General speed-up of frequently called functions.
This commit is contained in:
Martin Sumner 2024-01-22 21:22:54 +00:00 committed by GitHub
parent 49490c38ef
commit c294570bce
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 1817 additions and 2113 deletions

View file

@ -84,7 +84,7 @@
end_key :: tuple() | undefined,
owner :: pid()|list(),
filename :: string() | undefined,
bloom :: binary() | none | undefined}).
bloom = none :: leveled_ebloom:bloom() | none}).
-record(cdb_options,
{max_size :: pos_integer() | undefined,

View file

@ -18,7 +18,6 @@
strip_to_keyseqonly/1,
strip_to_indexdetails/1,
striphead_to_v1details/1,
is_active/3,
endkey_passed/2,
key_dominates/2,
maybe_reap_expiredkey/2,
@ -48,7 +47,10 @@
to_lookup/1,
next_key/1,
return_proxy/4,
get_metadata/1]).
get_metadata/1,
maybe_accumulate/5,
accumulate_index/2,
count_tombs/2]).
-define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w").
-define(NRT_IDX, "$aae.").
@ -251,22 +253,79 @@ striphead_to_v1details(V) ->
get_metadata(LV) ->
element(4, LV).
-spec key_dominates(ledger_kv(), ledger_kv()) ->
left_hand_first|right_hand_first|left_hand_dominant|right_hand_dominant.
-spec maybe_accumulate(
list(leveled_codec:ledger_kv()),
term(),
non_neg_integer(),
{pos_integer(), {non_neg_integer(), non_neg_integer()|infinity}},
leveled_penciller:pclacc_fun())
-> {term(), non_neg_integer()}.
%% @doc
%% Make an accumulation decision based on the date range and also the expiry
%% status of the ledger key and value Needs to handle v1 and v2 values. When
%% folding over heads -> v2 values, index-keys -> v1 values.
maybe_accumulate([], Acc, Count, _Filter, _Fun) ->
{Acc, Count};
maybe_accumulate(
[{K, {_SQN, {active, TS}, _SH, _MD, undefined}=V}|T],
Acc, Count, {Now, _ModRange}=Filter, AccFun)
when TS >= Now ->
maybe_accumulate(T, AccFun(K, V, Acc), Count + 1, Filter, AccFun);
maybe_accumulate(
[{K, {_SQN, {active, TS}, _SH, _MD}=V}|T],
Acc, Count, {Now, _ModRange}=Filter, AccFun)
when TS >= Now ->
maybe_accumulate(T, AccFun(K, V, Acc), Count + 1, Filter, AccFun);
maybe_accumulate(
[{_K, {_SQN, tomb, _SH, _MD, _LMD}}|T],
Acc, Count, Filter, AccFun) ->
maybe_accumulate(T, Acc, Count, Filter, AccFun);
maybe_accumulate(
[{_K, {_SQN, tomb, _SH, _MD}}|T],
Acc, Count, Filter, AccFun) ->
maybe_accumulate(T, Acc, Count, Filter, AccFun);
maybe_accumulate(
[{K, {_SQN, {active, TS}, _SH, _MD, LMD}=V}|T],
Acc, Count, {Now, {LowDate, HighDate}}=Filter, AccFun)
when TS >= Now, LMD >= LowDate, LMD =< HighDate ->
maybe_accumulate(T, AccFun(K, V, Acc), Count + 1, Filter, AccFun);
maybe_accumulate(
[_LV|T],
Acc, Count, Filter, AccFun) ->
maybe_accumulate(T, Acc, Count, Filter, AccFun).
-spec accumulate_index(
{boolean(), undefined|leveled_runner:mp()}, leveled_runner:acc_fun())
-> any().
accumulate_index({false, undefined}, FoldKeysFun) ->
fun({?IDX_TAG, Bucket, _IndexInfo, ObjKey}, _Value, Acc) ->
FoldKeysFun(Bucket, ObjKey, Acc)
end;
accumulate_index({true, undefined}, FoldKeysFun) ->
fun({?IDX_TAG, Bucket, {_IdxFld, IdxValue}, ObjKey}, _Value, Acc) ->
FoldKeysFun(Bucket, {IdxValue, ObjKey}, Acc)
end;
accumulate_index({AddTerm, TermRegex}, FoldKeysFun) ->
fun({?IDX_TAG, Bucket, {_IdxFld, IdxValue}, ObjKey}, _Value, Acc) ->
case re:run(IdxValue, TermRegex) of
nomatch ->
Acc;
_ ->
case AddTerm of
true ->
FoldKeysFun(Bucket, {IdxValue, ObjKey}, Acc);
false ->
FoldKeysFun(Bucket, ObjKey, Acc)
end
end
end.
-spec key_dominates(ledger_kv(), ledger_kv()) -> boolean().
%% @doc
%% When comparing two keys in the ledger need to find if one key comes before
%% the other, or if the match, which key is "better" and should be the winner
key_dominates({LK, _LVAL}, {RK, _RVAL}) when LK < RK ->
left_hand_first;
key_dominates({LK, _LVAL}, {RK, _RVAL}) when RK < LK ->
right_hand_first;
key_dominates(LObj, RObj) ->
case strip_to_seqonly(LObj) >= strip_to_seqonly(RObj) of
true ->
left_hand_dominant;
false ->
right_hand_dominant
end.
strip_to_seqonly(LObj) >= strip_to_seqonly(RObj).
-spec maybe_reap_expiredkey(ledger_kv(), {boolean(), integer()}) -> boolean().
%% @doc
@ -286,20 +345,18 @@ maybe_reap(tomb, {true, _CurrTS}) ->
maybe_reap(_, _) ->
false.
-spec is_active(ledger_key(), ledger_value(), non_neg_integer()) -> boolean().
%% @doc
%% Is this an active KV pair or has the timestamp expired
is_active(Key, Value, Now) ->
case strip_to_statusonly({Key, Value}) of
{active, infinity} ->
true;
tomb ->
false;
{active, TS} when TS >= Now ->
true;
{active, _TS} ->
false
end.
-spec count_tombs(
list(ledger_kv()), non_neg_integer()|not_counted) ->
non_neg_integer()|not_counted.
count_tombs(_List, not_counted) ->
not_counted;
count_tombs([], Count) ->
Count;
count_tombs([{_K, V}|T], Count) when element(2, V) == tomb ->
count_tombs(T, Count + 1);
count_tombs([_KV|T], Count) ->
count_tombs(T, Count).
-spec from_ledgerkey(atom(), tuple()) -> false|tuple().
%% @doc

View file

@ -1,23 +1,36 @@
%% -------- TinyBloom ---------
%%
%% A fixed size bloom that supports 32K keys only, made to try and minimise
%% the cost of producing the bloom
%% A 1-byte per key bloom filter with a 5% fpr. Pre-prepared segment hashes
%% (a leveled codec type) are, used for building and checking - the filter
%% splits a single hash into a 1 byte slot identifier, and 2 x 12 bit hashes
%% (so k=2, although only a single hash is used).
%%
%% The filter is designed to support a maximum of 64K keys, larger numbers of
%% keys will see higher fprs - with a 40% fpr at 250K keys.
%%
%% The filter uses the second "Extra Hash" part of the segment-hash to ensure
%% no overlap of fpr with the leveled_sst find_pos function.
%%
%% The completed bloom is a binary - to minimise the cost of copying between
%% processes and holding in memory.
-module(leveled_ebloom).
-include("include/leveled.hrl").
-export([
create_bloom/1,
check_hash/2
]).
-define(BLOOM_SIZE_BYTES, 512).
-define(INTEGER_SIZE, 4096).
-define(BAND_MASK, ?INTEGER_SIZE - 1).
-define(BLOOM_SLOTSIZE_BYTES, 512).
-define(INTEGER_SLICE_SIZE, 64).
-define(INTEGER_SLICES, 64).
% i.e. ?INTEGER_SLICES * ?INTEGER_SLICE_SIZE = ?BLOOM_SLOTSIZE_BYTES div 8
-define(MASK_BSR, 6).
% i.e. 2 ^ (12 - 6) = ?INTEGER_SLICES
-define(MASK_BAND, 63).
% i.e. integer slize size - 1
-define(SPLIT_BAND, 4095).
% i.e. (?BLOOM_SLOTSIZE_BYTES * 8) - 1
-type bloom() :: binary().
@ -29,64 +42,39 @@
-spec create_bloom(list(leveled_codec:segment_hash())) -> bloom().
%% @doc
%% Create a binary bloom filter from a list of hashes
%% Create a binary bloom filter from a list of hashes. In the leveled
%% implementation the hashes are leveled_codec:segment_hash/0 type, but only
%% a single 32-bit hash (the second element of the tuple is actually used in
%% the building of the bloom filter
create_bloom(HashList) ->
SlotCount =
case length(HashList) of
0 ->
<<>>;
L when L > 32768 ->
{HL0, HL1} =
lists:partition(fun({_, Hash}) -> Hash band 32 == 0 end,
HashList),
Bin1 =
add_hashlist(HL0,
32,
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0),
Bin2 =
add_hashlist(HL1,
32,
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0),
<<Bin1/binary, Bin2/binary>>;
L when L > 16384 ->
add_hashlist(HashList,
32,
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0);
L when L > 4096 ->
add_hashlist(HashList,
16,
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0);
L when L > 2048 ->
add_hashlist(HashList, 4, 0, 0, 0, 0);
_ ->
add_hashlist(HashList, 2, 0, 0)
end.
0;
L ->
min(128, max(2, (L - 1) div 512))
end,
SlotHashes =
map_hashes(
HashList,
list_to_tuple(lists:duplicate(SlotCount, [])),
SlotCount
),
build_bloom(SlotHashes, SlotCount).
-spec check_hash(leveled_codec:segment_hash(), bloom()) -> boolean().
%% @doc
%% Check for the presence of a given hash within a bloom
%% Check for the presence of a given hash within a bloom. Only the second
%% element of the leveled_codec:segment_hash/0 type is used - a 32-bit hash.
check_hash(_Hash, <<>>) ->
false;
check_hash({_SegHash, Hash}, BloomBin) ->
SlotSplit = byte_size(BloomBin) div ?BLOOM_SIZE_BYTES,
{Slot, Hashes} = split_hash(Hash, SlotSplit),
Mask = get_mask(Hashes),
Pos = Slot * ?BLOOM_SIZE_BYTES,
IntSize = ?INTEGER_SIZE,
<<_H:Pos/binary, CheckInt:IntSize/integer, _T/binary>> = BloomBin,
case CheckInt band Mask of
Mask ->
true;
check_hash({_SegHash, Hash}, BloomBin) when is_binary(BloomBin)->
SlotSplit = byte_size(BloomBin) div ?BLOOM_SLOTSIZE_BYTES,
{Slot, [H0, H1]} = split_hash(Hash, SlotSplit),
Pos = ((Slot + 1) * ?BLOOM_SLOTSIZE_BYTES) - 1,
case match_hash(BloomBin, Pos - (H0 div 8), H0 rem 8) of
true ->
match_hash(BloomBin, Pos - (H1 div 8), H1 rem 8);
_ ->
false
end.
@ -95,408 +83,78 @@ check_hash({_SegHash, Hash}, BloomBin) ->
%%% Internal Functions
%%%============================================================================
-type slot_count() :: 0|2..128.
-type bloom_hash() :: 0..16#FFF.
-type external_hash() :: 0..16#FFFFFFFF.
-spec map_hashes(
list(leveled_codec:segment_hash()), tuple(), slot_count()) -> tuple().
map_hashes([], HashListTuple, _SlotCount) ->
HashListTuple;
map_hashes([Hash|Rest], HashListTuple, SlotCount) ->
{Slot, Hashes} = split_hash(element(2, Hash), SlotCount),
SlotHL = element(Slot + 1, HashListTuple),
map_hashes(
Rest,
setelement(Slot + 1, HashListTuple, Hashes ++ SlotHL),
SlotCount).
-spec split_hash(external_hash(), slot_count())
-> {non_neg_integer(), [bloom_hash()]}.
split_hash(Hash, SlotSplit) ->
Slot = (Hash band 255) rem SlotSplit,
H0 = (Hash bsr 8) band (?BAND_MASK),
H1 = (Hash bsr 20) band (?BAND_MASK),
H0 = (Hash bsr 8) band ?SPLIT_BAND,
H1 = (Hash bsr 20) band ?SPLIT_BAND,
{Slot, [H0, H1]}.
get_mask([H0, H1]) ->
(1 bsl H0) bor (1 bsl H1).
-spec match_hash(bloom(), non_neg_integer(), 0..16#FF) -> boolean().
match_hash(BloomBin, Pos, Hash) ->
<<_Pre:Pos/binary, CheckInt:8/integer, _Rest/binary>> = BloomBin,
(CheckInt bsr Hash) band 1 == 1.
-spec build_bloom(tuple(), slot_count()) -> bloom().
build_bloom(_SlotHashes, 0) ->
<<>>;
build_bloom(SlotHashes, SlotCount) when SlotCount > 0 ->
lists:foldr(
fun(I, AccBin) ->
HashList = element(I, SlotHashes),
SlotBin =
add_hashlist(
lists:usort(HashList), 0, 1, ?INTEGER_SLICES, <<>>),
<<SlotBin/binary, AccBin/binary>>
end,
<<>>,
lists:seq(1, SlotCount)
).
%% This looks ugly and clunky, but in tests it was quicker than modifying an
%% Erlang term like an array as it is passed around the loop
add_hashlist([], _S, S0, S1) ->
IntSize = ?INTEGER_SIZE,
<<S0:IntSize/integer, S1:IntSize/integer>>;
add_hashlist([{_SegHash, TopHash}|T], SlotSplit, S0, S1) ->
{Slot, Hashes} = split_hash(TopHash, SlotSplit),
Mask = get_mask(Hashes),
case Slot of
0 ->
add_hashlist(T, SlotSplit, S0 bor Mask, S1);
1 ->
add_hashlist(T, SlotSplit, S0, S1 bor Mask)
end.
add_hashlist([], _S, S0, S1, S2, S3) ->
IntSize = ?INTEGER_SIZE,
<<S0:IntSize/integer, S1:IntSize/integer,
S2:IntSize/integer, S3:IntSize/integer>>;
add_hashlist([{_SegHash, TopHash}|T], SlotSplit, S0, S1, S2, S3) ->
{Slot, Hashes} = split_hash(TopHash, SlotSplit),
Mask = get_mask(Hashes),
case Slot of
0 ->
add_hashlist(T, SlotSplit, S0 bor Mask, S1, S2, S3);
1 ->
add_hashlist(T, SlotSplit, S0, S1 bor Mask, S2, S3);
2 ->
add_hashlist(T, SlotSplit, S0, S1, S2 bor Mask, S3);
3 ->
add_hashlist(T, SlotSplit, S0, S1, S2, S3 bor Mask)
end.
add_hashlist([], _S, S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15) ->
IntSize = ?INTEGER_SIZE,
<<S0:IntSize/integer, S1:IntSize/integer,
S2:IntSize/integer, S3:IntSize/integer,
S4:IntSize/integer, S5:IntSize/integer,
S6:IntSize/integer, S7:IntSize/integer,
S8:IntSize/integer, S9:IntSize/integer,
S10:IntSize/integer, S11:IntSize/integer,
S12:IntSize/integer, S13:IntSize/integer,
S14:IntSize/integer, S15:IntSize/integer>>;
add_hashlist([{_SegHash, TopHash}|T],
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15) ->
{Slot, Hashes} = split_hash(TopHash, SlotSplit),
Mask = get_mask(Hashes),
case Slot of
0 ->
add_hashlist(T,
SlotSplit,
S0 bor Mask, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15);
1 ->
add_hashlist(T,
SlotSplit,
S0, S1 bor Mask, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15);
2 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2 bor Mask, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15);
3 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3 bor Mask, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15);
4 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4 bor Mask, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15);
5 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5 bor Mask, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15);
6 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6 bor Mask, S7,
S8, S9, S10, S11, S12, S13, S14, S15);
7 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7 bor Mask,
S8, S9, S10, S11, S12, S13, S14, S15);
8 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8 bor Mask, S9, S10, S11, S12, S13, S14, S15);
9 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9 bor Mask, S10, S11, S12, S13, S14, S15);
10 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10 bor Mask, S11, S12, S13, S14, S15);
11 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11 bor Mask, S12, S13, S14, S15);
12 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12 bor Mask, S13, S14, S15);
13 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13 bor Mask, S14, S15);
14 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14 bor Mask, S15);
15 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15 bor Mask)
end.
add_hashlist([], _S, S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15,
S16, S17, S18, S19, S20, S21, S22, S23,
S24, S25, S26, S27, S28, S29, S30, S31) ->
IntSize = ?INTEGER_SIZE,
<<S0:IntSize/integer, S1:IntSize/integer,
S2:IntSize/integer, S3:IntSize/integer,
S4:IntSize/integer, S5:IntSize/integer,
S6:IntSize/integer, S7:IntSize/integer,
S8:IntSize/integer, S9:IntSize/integer,
S10:IntSize/integer, S11:IntSize/integer,
S12:IntSize/integer, S13:IntSize/integer,
S14:IntSize/integer, S15:IntSize/integer,
S16:IntSize/integer, S17:IntSize/integer,
S18:IntSize/integer, S19:IntSize/integer,
S20:IntSize/integer, S21:IntSize/integer,
S22:IntSize/integer, S23:IntSize/integer,
S24:IntSize/integer, S25:IntSize/integer,
S26:IntSize/integer, S27:IntSize/integer,
S28:IntSize/integer, S29:IntSize/integer,
S30:IntSize/integer, S31:IntSize/integer>>;
add_hashlist([{_SegHash, TopHash}|T],
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15,
S16, S17, S18, S19, S20, S21, S22, S23,
S24, S25, S26, S27, S28, S29, S30, S31) ->
{Slot, Hashes} = split_hash(TopHash, SlotSplit),
Mask = get_mask(Hashes),
case Slot of
0 ->
add_hashlist(T,
SlotSplit,
S0 bor Mask, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15,
S16, S17, S18, S19, S20, S21, S22, S23,
S24, S25, S26, S27, S28, S29, S30, S31);
1 ->
add_hashlist(T,
SlotSplit,
S0, S1 bor Mask, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15,
S16, S17, S18, S19, S20, S21, S22, S23,
S24, S25, S26, S27, S28, S29, S30, S31);
2 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2 bor Mask, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15,
S16, S17, S18, S19, S20, S21, S22, S23,
S24, S25, S26, S27, S28, S29, S30, S31);
3 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3 bor Mask, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15,
S16, S17, S18, S19, S20, S21, S22, S23,
S24, S25, S26, S27, S28, S29, S30, S31);
4 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4 bor Mask, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15,
S16, S17, S18, S19, S20, S21, S22, S23,
S24, S25, S26, S27, S28, S29, S30, S31);
5 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5 bor Mask, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15,
S16, S17, S18, S19, S20, S21, S22, S23,
S24, S25, S26, S27, S28, S29, S30, S31);
6 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6 bor Mask, S7,
S8, S9, S10, S11, S12, S13, S14, S15,
S16, S17, S18, S19, S20, S21, S22, S23,
S24, S25, S26, S27, S28, S29, S30, S31);
7 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7 bor Mask,
S8, S9, S10, S11, S12, S13, S14, S15,
S16, S17, S18, S19, S20, S21, S22, S23,
S24, S25, S26, S27, S28, S29, S30, S31);
8 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8 bor Mask, S9, S10, S11, S12, S13, S14, S15,
S16, S17, S18, S19, S20, S21, S22, S23,
S24, S25, S26, S27, S28, S29, S30, S31);
9 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9 bor Mask, S10, S11, S12, S13, S14, S15,
S16, S17, S18, S19, S20, S21, S22, S23,
S24, S25, S26, S27, S28, S29, S30, S31);
10 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10 bor Mask, S11, S12, S13, S14, S15,
S16, S17, S18, S19, S20, S21, S22, S23,
S24, S25, S26, S27, S28, S29, S30, S31);
11 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11 bor Mask, S12, S13, S14, S15,
S16, S17, S18, S19, S20, S21, S22, S23,
S24, S25, S26, S27, S28, S29, S30, S31);
12 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12 bor Mask, S13, S14, S15,
S16, S17, S18, S19, S20, S21, S22, S23,
S24, S25, S26, S27, S28, S29, S30, S31);
13 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13 bor Mask, S14, S15,
S16, S17, S18, S19, S20, S21, S22, S23,
S24, S25, S26, S27, S28, S29, S30, S31);
14 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14 bor Mask, S15,
S16, S17, S18, S19, S20, S21, S22, S23,
S24, S25, S26, S27, S28, S29, S30, S31);
15 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15 bor Mask,
S16, S17, S18, S19, S20, S21, S22, S23,
S24, S25, S26, S27, S28, S29, S30, S31);
16 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15,
S16 bor Mask, S17, S18, S19, S20, S21, S22, S23,
S24, S25, S26, S27, S28, S29, S30, S31);
17 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15,
S16, S17 bor Mask, S18, S19, S20, S21, S22, S23,
S24, S25, S26, S27, S28, S29, S30, S31);
18 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15,
S16, S17, S18 bor Mask, S19, S20, S21, S22, S23,
S24, S25, S26, S27, S28, S29, S30, S31);
19 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15,
S16, S17, S18, S19 bor Mask, S20, S21, S22, S23,
S24, S25, S26, S27, S28, S29, S30, S31);
20 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15,
S16, S17, S18, S19, S20 bor Mask, S21, S22, S23,
S24, S25, S26, S27, S28, S29, S30, S31);
21 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15,
S16, S17, S18, S19, S20, S21 bor Mask, S22, S23,
S24, S25, S26, S27, S28, S29, S30, S31);
22 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15,
S16, S17, S18, S19, S20, S21, S22 bor Mask, S23,
S24, S25, S26, S27, S28, S29, S30, S31);
23 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15,
S16, S17, S18, S19, S20, S21, S22, S23 bor Mask,
S24, S25, S26, S27, S28, S29, S30, S31);
24 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15,
S16, S17, S18, S19, S20, S21, S22, S23,
S24 bor Mask, S25, S26, S27, S28, S29, S30, S31);
25 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15,
S16, S17, S18, S19, S20, S21, S22, S23,
S24, S25 bor Mask, S26, S27, S28, S29, S30, S31);
26 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15,
S16, S17, S18, S19, S20, S21, S22, S23,
S24, S25, S26 bor Mask, S27, S28, S29, S30, S31);
27 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15,
S16, S17, S18, S19, S20, S21, S22, S23,
S24, S25, S26, S27 bor Mask, S28, S29, S30, S31);
28 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15,
S16, S17, S18, S19, S20, S21, S22, S23,
S24, S25, S26, S27, S28 bor Mask, S29, S30, S31);
29 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15,
S16, S17, S18, S19, S20, S21, S22, S23,
S24, S25, S26, S27, S28, S29 bor Mask, S30, S31);
30 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15,
S16, S17, S18, S19, S20, S21, S22, S23,
S24, S25, S26, S27, S28, S29, S30 bor Mask, S31);
31 ->
add_hashlist(T,
SlotSplit,
S0, S1, S2, S3, S4, S5, S6, S7,
S8, S9, S10, S11, S12, S13, S14, S15,
S16, S17, S18, S19, S20, S21, S22, S23,
S24, S25, S26, S27, S28, S29, S30, S31 bor Mask)
end.
-spec add_hashlist(
list(bloom_hash()),
non_neg_integer(),
non_neg_integer(),
0..?INTEGER_SLICES,
binary()) -> bloom().
add_hashlist([], ThisSlice, SliceCount, SliceCount, AccBin) ->
<<ThisSlice:?INTEGER_SLICE_SIZE/integer, AccBin/binary>>;
add_hashlist([], ThisSlice, SliceNumber, SliceCount, AccBin) ->
add_hashlist(
[],
0,
SliceNumber + 1,
SliceCount,
<<ThisSlice:?INTEGER_SLICE_SIZE/integer, AccBin/binary>>);
add_hashlist([H0|Rest], ThisSlice, SliceNumber, SliceCount, AccBin)
when ((H0 bsr ?MASK_BSR) + 1) == SliceNumber ->
Mask0 = 1 bsl (H0 band (?MASK_BAND)),
add_hashlist(
Rest, ThisSlice bor Mask0, SliceNumber, SliceCount, AccBin);
add_hashlist(Rest, ThisSlice, SliceNumber, SliceCount, AccBin) ->
add_hashlist(
Rest,
0,
SliceNumber + 1,
SliceCount,
<<ThisSlice:?INTEGER_SLICE_SIZE/integer, AccBin/binary>>).
%%%============================================================================
%%% Test
@ -507,11 +165,7 @@ add_hashlist([{_SegHash, TopHash}|T],
-include_lib("eunit/include/eunit.hrl").
generate_orderedkeys(Seqn, Count, BucketRangeLow, BucketRangeHigh) ->
generate_orderedkeys(Seqn,
Count,
[],
BucketRangeLow,
BucketRangeHigh).
generate_orderedkeys(Seqn, Count, [], BucketRangeLow, BucketRangeHigh).
generate_orderedkeys(_Seqn, 0, Acc, _BucketLow, _BucketHigh) ->
Acc;
@ -521,17 +175,12 @@ generate_orderedkeys(Seqn, Count, Acc, BucketLow, BucketHigh) ->
io_lib:format("K~4..0B", [BucketLow + BNumber]),
KeyExt =
io_lib:format("K~8..0B", [Seqn * 100 + leveled_rand:uniform(100)]),
LK = leveled_codec:to_ledgerkey("Bucket" ++ BucketExt, "Key" ++ KeyExt, o),
Chunk = leveled_rand:rand_bytes(16),
{_B, _K, MV, _H, _LMs} =
leveled_codec:generate_ledgerkv(LK, Seqn, Chunk, 64, infinity),
generate_orderedkeys(Seqn + 1,
Count - 1,
[{LK, MV}|Acc],
BucketLow,
BucketHigh).
generate_orderedkeys(
Seqn + 1, Count - 1, [{LK, MV}|Acc], BucketLow, BucketHigh).
get_hashlist(N) ->
KVL = generate_orderedkeys(1, N, 1, 20),
@ -560,16 +209,16 @@ check_neg_hashes(BloomBin, HashList, Counters) ->
end,
lists:foldl(CheckFun, Counters, HashList).
empty_bloom_test() ->
BloomBin0 = create_bloom([]),
?assertMatch({0, 4},
check_neg_hashes(BloomBin0, [0, 10, 100, 100000], {0, 0})).
?assertMatch(
{0, 4}, check_neg_hashes(BloomBin0, [0, 10, 100, 100000], {0, 0})).
bloom_test_() ->
{timeout, 120, fun bloom_test_ranges/0}.
bloom_test_ranges() ->
test_bloom(250000, 2),
test_bloom(80000, 4),
test_bloom(60000, 4),
test_bloom(40000, 4),
@ -577,7 +226,8 @@ bloom_test_ranges() ->
test_bloom(20000, 4),
test_bloom(10000, 4),
test_bloom(5000, 4),
test_bloom(2000, 4).
test_bloom(2000, 4),
test_bloom(1000, 4).
test_bloom(N, Runs) ->
ListOfHashLists =
@ -599,22 +249,27 @@ test_bloom(N, Runs) ->
SWa = os:timestamp(),
ListOfBlooms =
lists:map(fun({HL, _ML}) -> create_bloom(HL) end,
SplitListOfHashLists),
lists:map(
fun({HL, _ML}) -> create_bloom(HL) end, SplitListOfHashLists),
TSa = timer:now_diff(os:timestamp(), SWa)/Runs,
SWb = os:timestamp(),
lists:foreach(fun(Nth) ->
PosChecks =
lists:foldl(
fun(Nth, ChecksMade) ->
{HL, _ML} = lists:nth(Nth, SplitListOfHashLists),
BB = lists:nth(Nth, ListOfBlooms),
check_all_hashes(BB, HL)
check_all_hashes(BB, HL),
ChecksMade + length(HL)
end,
0,
lists:seq(1, Runs)),
TSb = timer:now_diff(os:timestamp(), SWb)/Runs,
TSb = timer:now_diff(os:timestamp(), SWb),
SWc = os:timestamp(),
{Pos, Neg} =
lists:foldl(fun(Nth, Acc) ->
lists:foldl(
fun(Nth, Acc) ->
{_HL, ML} = lists:nth(Nth, SplitListOfHashLists),
BB = lists:nth(Nth, ListOfBlooms),
check_neg_hashes(BB, ML, Acc)
@ -622,12 +277,16 @@ test_bloom(N, Runs) ->
{0, 0},
lists:seq(1, Runs)),
FPR = Pos / (Pos + Neg),
TSc = timer:now_diff(os:timestamp(), SWc)/Runs,
TSc = timer:now_diff(os:timestamp(), SWc),
io:format(user,
"Test with size ~w has microsecond timings: -"
++ " build ~w check ~w neg_check ~w and fpr ~w~n",
[N, TSa, TSb, TSc, FPR]).
BytesPerKey =
(lists:sum(lists:map(fun byte_size/1, ListOfBlooms)) div 4) / N,
io:format(
user,
"Test with size ~w has microsecond timings: - "
"build in ~w then ~.3f per pos-check, ~.3f per neg-check, "
"fpr ~.3f with bytes-per-key ~.3f~n",
[N, round(TSa), TSb / PosChecks, TSc / (Pos + Neg), FPR, BytesPerKey]).
-endif.

View file

@ -107,7 +107,7 @@ clerk_removelogs(Pid, ForcedLogs) ->
-spec clerk_close(pid()) -> ok.
clerk_close(Pid) ->
gen_server:call(Pid, close, 20000).
gen_server:call(Pid, close, 60000).
%%%============================================================================
%%% gen_server callbacks

File diff suppressed because it is too large Load diff

View file

@ -451,23 +451,28 @@ key_lookup(Manifest, LevelIdx, Key) ->
-spec query_manifest(
manifest(),
leveled_codec:ledger_key(),
leveled_codec:ledger_key()) -> list().
leveled_codec:ledger_key())
-> list(
{lsm_level(),
list({next, manifest_entry(), leveled_codec:ledger_key()})}).
query_manifest(Manifest, StartKey, EndKey) ->
SetupFoldFun =
fun(Level, Acc) ->
Pointers =
range_lookup(Manifest, Level, StartKey, EndKey),
case Pointers of
[] -> Acc;
PL -> Acc ++ [{Level, PL}]
case range_lookup(Manifest, Level, StartKey, EndKey) of
[] ->
Acc;
Pointers ->
[{Level, Pointers}|Acc]
end
end,
lists:foldl(SetupFoldFun, [], lists:seq(0, ?MAX_LEVELS - 1)).
-spec range_lookup(manifest(),
-spec range_lookup(
manifest(),
integer(),
leveled_codec:ledger_key(),
leveled_codec:ledger_key()) -> list().
leveled_codec:ledger_key())
-> list({next, manifest_entry(), leveled_codec:ledger_key()}).
%% @doc
%% Return a list of manifest_entry pointers at this level which cover the
%% key query range.
@ -478,10 +483,11 @@ range_lookup(Manifest, LevelIdx, StartKey, EndKey) ->
end,
range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun).
-spec merge_lookup(manifest(),
-spec merge_lookup(
manifest(),
integer(),
leveled_codec:ledger_key(),
leveled_codec:ledger_key()) -> list().
leveled_codec:ledger_key()) -> list({next, manifest_entry(), all}).
%% @doc
%% Return a list of manifest_entry pointers at this level which cover the
%% key query range, only all keys in the files should be included in the
@ -494,8 +500,8 @@ merge_lookup(Manifest, LevelIdx, StartKey, EndKey) ->
range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun).
-spec mergefile_selector(manifest(), integer(), selector_strategy())
-> manifest_entry().
-spec mergefile_selector(
manifest(), integer(), selector_strategy()) -> manifest_entry().
%% @doc
%% An algorithm for discovering which files to merge ....
%% We can find the most optimal file:
@ -511,13 +517,15 @@ mergefile_selector(Manifest, LevelIdx, _Strategy) when LevelIdx =< 1 ->
Level = array:get(LevelIdx, Manifest#manifest.levels),
lists:nth(leveled_rand:uniform(length(Level)), Level);
mergefile_selector(Manifest, LevelIdx, random) ->
Level = leveled_tree:to_list(array:get(LevelIdx,
Manifest#manifest.levels)),
Level =
leveled_tree:to_list(
array:get(LevelIdx, Manifest#manifest.levels)),
{_SK, ME} = lists:nth(leveled_rand:uniform(length(Level)), Level),
ME;
mergefile_selector(Manifest, LevelIdx, {grooming, ScoringFun}) ->
Level = leveled_tree:to_list(array:get(LevelIdx,
Manifest#manifest.levels)),
Level =
leveled_tree:to_list(
array:get(LevelIdx, Manifest#manifest.levels)),
SelectorFun =
fun(_I, Acc) ->
{_SK, ME} = lists:nth(leveled_rand:uniform(length(Level)), Level),
@ -555,12 +563,12 @@ add_snapshot(Manifest, Pid, Timeout) ->
ManSQN = Manifest#manifest.manifest_sqn,
case Manifest#manifest.min_snapshot_sqn of
0 ->
Manifest#manifest{snapshots = SnapList0,
min_snapshot_sqn = ManSQN};
Manifest#manifest{
snapshots = SnapList0, min_snapshot_sqn = ManSQN};
N ->
N0 = min(N, ManSQN),
Manifest#manifest{snapshots = SnapList0,
min_snapshot_sqn = N0}
Manifest#manifest{
snapshots = SnapList0, min_snapshot_sqn = N0}
end.
-spec release_snapshot(manifest(), pid()|atom()) -> manifest().

View file

@ -71,7 +71,7 @@ prepare_for_index(IndexArray, no_lookup) ->
prepare_for_index(IndexArray, Hash) ->
{Slot, H0} = split_hash(Hash),
Bin = array:get(Slot, IndexArray),
array:set(Slot, <<Bin/binary, 1:1/integer, H0:23/integer>>, IndexArray).
array:set(Slot, <<Bin/binary, H0:24/integer>>, IndexArray).
-spec add_to_index(array:array(), index_array(), integer()) -> index_array().
%% @doc
@ -201,16 +201,16 @@ merge_trees(StartKey, EndKey, TreeList, LevelMinus1) ->
find_pos(<<>>, _Hash) ->
false;
find_pos(<<1:1/integer, Hash:23/integer, _T/binary>>, Hash) ->
find_pos(<<Hash:24/integer, _T/binary>>, Hash) ->
true;
find_pos(<<1:1/integer, _Miss:23/integer, T/binary>>, Hash) ->
find_pos(<<_Miss:24/integer, T/binary>>, Hash) ->
find_pos(T, Hash).
split_hash({SegmentID, ExtraHash}) ->
Slot = SegmentID band 255,
H0 = (SegmentID bsr 8) bor (ExtraHash bsl 8),
{Slot, H0 band 8388607}.
{Slot, H0 band 16#FFFFFF}.
check_slotlist(Key, _Hash, CheckList, TreeList) ->
SlotCheckFun =

View file

@ -65,6 +65,7 @@
-type mp()
:: {re_pattern, term(), term(), term(), term()}.
-export_type([acc_fun/0, mp/0]).
%%%============================================================================
%%% External functions
@ -146,14 +147,6 @@ bucket_list(SnapFun, Tag, FoldBucketsFun, InitAcc, MaxBuckets) ->
%% for a timeout
index_query(SnapFun, {StartKey, EndKey, TermHandling}, FoldAccT) ->
{FoldKeysFun, InitAcc} = FoldAccT,
{ReturnTerms, TermRegex} = TermHandling,
AddFun =
case ReturnTerms of
true ->
fun add_terms/2;
_ ->
fun add_keys/2
end,
Runner =
fun() ->
@ -163,7 +156,7 @@ index_query(SnapFun, {StartKey, EndKey, TermHandling}, FoldAccT) ->
LedgerSnapshot,
StartKey,
EndKey,
accumulate_index(TermRegex, AddFun, FoldKeysFun),
leveled_codec:accumulate_index(TermHandling, FoldKeysFun),
InitAcc,
by_runner),
wrap_runner(Folder, AfterFun)
@ -680,47 +673,20 @@ check_presence(Key, Value, InkerClone) ->
false
end.
accumulate_keys(FoldKeysFun, TermRegex) ->
AccFun =
accumulate_keys(FoldKeysFun, undefined) ->
fun(Key, _Value, Acc) ->
{B, K} = leveled_codec:from_ledgerkey(Key),
case TermRegex of
undefined ->
FoldKeysFun(B, K, Acc);
Re ->
case re:run(K, Re) of
FoldKeysFun(B, K, Acc)
end;
accumulate_keys(FoldKeysFun, TermRegex) ->
fun(Key, _Value, Acc) ->
{B, K} = leveled_codec:from_ledgerkey(Key),
case re:run(K, TermRegex) of
nomatch ->
Acc;
_ ->
FoldKeysFun(B, K, Acc)
end
end
end,
AccFun.
add_keys(ObjKey, _IdxValue) ->
ObjKey.
add_terms(ObjKey, IdxValue) ->
{IdxValue, ObjKey}.
accumulate_index(TermRe, AddFun, FoldKeysFun) ->
case TermRe of
undefined ->
fun(Key, _Value, Acc) ->
{Bucket, ObjKey, IdxValue} = leveled_codec:from_ledgerkey(Key),
FoldKeysFun(Bucket, AddFun(ObjKey, IdxValue), Acc)
end;
TermRe ->
fun(Key, _Value, Acc) ->
{Bucket, ObjKey, IdxValue} = leveled_codec:from_ledgerkey(Key),
case re:run(IdxValue, TermRe) of
nomatch ->
Acc;
_ ->
FoldKeysFun(Bucket, AddFun(ObjKey, IdxValue), Acc)
end
end
end.
-spec wrap_runner(fun(), fun()) -> any().

File diff suppressed because it is too large Load diff

View file

@ -5,8 +5,6 @@
-module(leveled_util).
-include("include/leveled.hrl").
-export([generate_uuid/0,
integer_now/0,
integer_time/1,
@ -42,7 +40,7 @@ integer_time(TS) ->
calendar:datetime_to_gregorian_seconds(DT).
-spec magic_hash(any()) -> integer().
-spec magic_hash(any()) -> 0..16#FFFFFFFF.
%% @doc
%% Use DJ Bernstein magic hash function. Note, this is more expensive than
%% phash2 but provides a much more balanced result.
@ -52,7 +50,7 @@ integer_time(TS) ->
%% http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function
magic_hash({binary, BinaryKey}) ->
H = 5381,
hash1(H, BinaryKey) band 16#FFFFFFFF;
hash1(H, BinaryKey);
magic_hash(AnyKey) ->
BK = t2b(AnyKey),
magic_hash({binary, BK}).
@ -60,7 +58,7 @@ magic_hash(AnyKey) ->
hash1(H, <<>>) ->
H;
hash1(H, <<B:8/integer, Rest/bytes>>) ->
H1 = H * 33,
H1 = (H * 33) band 16#FFFFFFFF,
H2 = H1 bxor B,
hash1(H2, Rest).

View file

@ -34,8 +34,8 @@ expiring_indexes(_Config) ->
% before). Confirm that replacing an object has the expected outcome, if
% the IndexSpecs are updated as part of the request.
KeyCount = 50000,
Future = 60,
% 1 minute - if running tests on a slow machine, may need to increase
Future = 120,
% 2 minutes - if running tests on a slow machine, may need to increase
% this value
RootPath = testutil:reset_filestructure(),
StartOpts1 =
@ -46,11 +46,28 @@ expiring_indexes(_Config) ->
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
SW1 = os:timestamp(),
timer:sleep(1000),
V9 = testutil:get_compressiblevalue(),
Indexes9 = testutil:get_randomindexes_generator(2),
TempRiakObjects =
testutil:generate_objects(
KeyCount, binary_uuid, [], V9, Indexes9, "riakBucket"),
IBKL1 = testutil:stdload_expiring(Bookie1, KeyCount, Future),
lists:foreach(
fun({_RN, Obj, Spc}) ->
testutil:book_tempriakput(
Bookie1, Obj, Spc, leveled_util:integer_now() + Future)
end,
TempRiakObjects
),
timer:sleep(1000),
% Wait a second after last key so that none loaded in the last second
LoadTime = timer:now_diff(os:timestamp(), SW1)/1000000,
io:format("Load of ~w std objects in ~w seconds~n", [KeyCount, LoadTime]),
timer:sleep(1000),
SW2 = os:timestamp(),
FilterFun = fun({I, _B, _K}) -> lists:member(I, [5, 6, 7, 8]) end,
@ -76,6 +93,25 @@ expiring_indexes(_Config) ->
{async, I0Counter1} = CountI0Fold(),
I0Count1 = I0Counter1(),
HeadFold =
fun(LowTS, HighTS) ->
leveled_bookie:book_headfold(
Bookie1,
?RIAK_TAG,
{range, <<"riakBucket">>, all},
{fun(_B, _K, _V, Acc) -> Acc + 1 end, 0},
false, true, false,
{testutil:convert_to_seconds(LowTS),
testutil:convert_to_seconds(HighTS)},
false
)
end,
{async, HeadCount0Fun} = HeadFold(SW1, SW2),
{async, HeadCount1Fun} = HeadFold(SW2, os:timestamp()),
HeadCounts = {HeadCount0Fun(), HeadCount1Fun()},
io:format("HeadCounts ~w before expiry~n", [HeadCounts]),
{KeyCount, 0} = HeadCounts,
FoldFun = fun(BF, {IdxV, KeyF}, Acc) -> [{IdxV, BF, KeyF}|Acc] end,
InitAcc = [],
IndexFold =
@ -145,6 +181,12 @@ expiring_indexes(_Config) ->
true = QR4 == [],
true = QR5 == [],
{async, HeadCount0ExpFun} = HeadFold(SW1, SW2),
{async, HeadCount1ExpFun} = HeadFold(SW2, os:timestamp()),
HeadCountsExp = {HeadCount0ExpFun(), HeadCount1ExpFun()},
io:format("HeadCounts ~w after expiry~n", [HeadCountsExp]),
{0, 0} = HeadCountsExp,
ok = leveled_bookie:book_close(Bookie1),
testutil:reset_filestructure().
@ -379,7 +421,9 @@ single_object_with2i(_Config) ->
%% @TODO replace all index queries with new Top-Level API if tests
%% pass
{async, IdxFolder1} = leveled_bookie:book_indexfold(Bookie1,
{async, IdxFolder1} =
leveled_bookie:book_indexfold(
Bookie1,
"Bucket1",
{fun testutil:foldkeysfun/3, []},
{list_to_binary("binary_bin"),

View file

@ -3,6 +3,7 @@
-include("../include/leveled.hrl").
-export([book_riakput/3,
book_tempriakput/4,
book_riakdelete/4,
book_riakget/3,
book_riakhead/3,
@ -182,6 +183,16 @@ book_riakput(Pid, RiakObject, IndexSpecs) ->
IndexSpecs,
?RIAK_TAG).
book_tempriakput(Pid, RiakObject, IndexSpecs, TTL) ->
leveled_bookie:book_tempput(
Pid,
RiakObject#r_object.bucket,
RiakObject#r_object.key,
to_binary(v1, RiakObject),
IndexSpecs,
?RIAK_TAG,
TTL).
book_riakdelete(Pid, Bucket, Key, IndexSpecs) ->
leveled_bookie:book_put(Pid, Bucket, Key, delete, IndexSpecs, ?RIAK_TAG).