Merge branch 'master' into mas-i134-compileflag

This commit is contained in:
Martin Sumner 2018-05-04 17:14:21 +01:00
commit eb5950a179
14 changed files with 556 additions and 308 deletions

View file

@ -248,14 +248,15 @@ book_start(Opts) ->
gen_server:start(?MODULE, [Opts], []). gen_server:start(?MODULE, [Opts], []).
-spec book_tempput(pid(), any(), any(), any(), list(), atom(), integer()) -> -spec book_tempput(pid(), any(), any(), any(),
ok|pause. leveled_codec:index_specs(),
leveled_codec:tag(), integer()) -> ok|pause.
%% @doc Put an object with an expiry time %% @doc Put an object with an expiry time
%% %%
%% Put an item in the store but with a Time To Live - the time when the object %% Put an item in the store but with a Time To Live - the time when the object
%% should expire, in gregorian_seconds (add the required number of seconds to %% should expire, in gregorian_seconds (add the required number of seconds to
%% leveled_codec:integer_time/1). %% leveled_util:integer_time/1).
%% %%
%% There exists the possibility of per object expiry times, not just whole %% There exists the possibility of per object expiry times, not just whole
%% store expiry times as has traditionally been the feature in Riak. Care %% store expiry times as has traditionally been the feature in Riak. Care
@ -314,8 +315,9 @@ book_put(Pid, Bucket, Key, Object, IndexSpecs) ->
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag) -> book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag) ->
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, infinity). book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, infinity).
-spec book_put(pid(), any(), any(), any(), list(), atom(), infinity|integer()) -spec book_put(pid(), any(), any(), any(),
-> ok|pause. leveled_codec:index_specs(),
leveled_codec:tag(), infinity|integer()) -> ok|pause.
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) -> book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) ->
gen_server:call(Pid, gen_server:call(Pid,
@ -349,7 +351,8 @@ book_mput(Pid, ObjectSpecs) ->
book_mput(Pid, ObjectSpecs, TTL) -> book_mput(Pid, ObjectSpecs, TTL) ->
gen_server:call(Pid, {mput, ObjectSpecs, TTL}, infinity). gen_server:call(Pid, {mput, ObjectSpecs, TTL}, infinity).
-spec book_delete(pid(), any(), any(), list()) -> ok|pause. -spec book_delete(pid(), any(), any(), leveled_codec:index_specs())
-> ok|pause.
%% @doc %% @doc
%% %%
@ -360,8 +363,10 @@ book_delete(Pid, Bucket, Key, IndexSpecs) ->
book_put(Pid, Bucket, Key, delete, IndexSpecs, ?STD_TAG). book_put(Pid, Bucket, Key, delete, IndexSpecs, ?STD_TAG).
-spec book_get(pid(), any(), any(), atom()) -> {ok, any()}|not_found. -spec book_get(pid(), any(), any(), leveled_codec:tag())
-spec book_head(pid(), any(), any(), atom()) -> {ok, any()}|not_found. -> {ok, any()}|not_found.
-spec book_head(pid(), any(), any(), leveled_codec:tag())
-> {ok, any()}|not_found.
%% @doc - GET and HEAD requests %% @doc - GET and HEAD requests
%% %%
@ -496,7 +501,7 @@ book_destroy(Pid) ->
gen_server:call(Pid, destroy, infinity). gen_server:call(Pid, destroy, infinity).
-spec book_isempty(pid(), atom()) -> boolean(). -spec book_isempty(pid(), leveled_codec:tag()) -> boolean().
%% @doc %% @doc
%% Confirm if the store is empty, or if it contains a Key and Value for a %% Confirm if the store is empty, or if it contains a Key and Value for a
%% given tag %% given tag
@ -575,7 +580,7 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State)
{IndexSpecs, TTL}), {IndexSpecs, TTL}),
{SW1, Timings1} = {SW1, Timings1} =
update_timings(SW0, {put, {inker, ObjSize}}, State#state.put_timings), update_timings(SW0, {put, {inker, ObjSize}}, State#state.put_timings),
Changes = preparefor_ledgercache(no_type_assigned, Changes = preparefor_ledgercache(null,
LedgerKey, LedgerKey,
SQN, SQN,
Object, Object,
@ -654,7 +659,7 @@ handle_call({get, Bucket, Key, Tag}, _From, State)
tomb -> tomb ->
not_found; not_found;
{active, TS} -> {active, TS} ->
case TS >= leveled_codec:integer_now() of case TS >= leveled_util:integer_now() of
false -> false ->
not_found; not_found;
true -> true ->
@ -695,7 +700,7 @@ handle_call({head, Bucket, Key, Tag}, _From, State)
{_SeqN, tomb, _MH, _MD} -> {_SeqN, tomb, _MH, _MD} ->
{reply, not_found, State}; {reply, not_found, State};
{_SeqN, {active, TS}, _MH, MD} -> {_SeqN, {active, TS}, _MH, MD} ->
case TS >= leveled_codec:integer_now() of case TS >= leveled_util:integer_now() of
true -> true ->
{SWr, UpdTimingsP} = {SWr, UpdTimingsP} =
update_timings(SWp, update_timings(SWp,
@ -1242,8 +1247,11 @@ readycache_forsnapshot(LedgerCache, Query) ->
max_sqn=LedgerCache#ledger_cache.max_sqn} max_sqn=LedgerCache#ledger_cache.max_sqn}
end. end.
-spec scan_table(ets:tab(), tuple(), tuple()) -> -spec scan_table(ets:tab(),
{list(), non_neg_integer()|infinity, non_neg_integer()}. leveled_codec:ledger_key(), leveled_codec:ledger_key())
-> {list(leveled_codec:ledger_kv()),
non_neg_integer()|infinity,
non_neg_integer()}.
%% @doc %% @doc
%% Query the ETS table to find a range of keys (start inclusive). Should also %% Query the ETS table to find a range of keys (start inclusive). Should also
%% return the miniumum and maximum sequence number found in the query. This %% return the miniumum and maximum sequence number found in the query. This
@ -1280,7 +1288,8 @@ scan_table(Table, StartKey, EndKey, Acc, MinSQN, MaxSQN) ->
end. end.
-spec fetch_head(tuple(), pid(), ledger_cache()) -> not_present|tuple(). -spec fetch_head(leveled_codec:ledger_key(), pid(), ledger_cache())
-> not_present|leveled_codec:ledger_value().
%% @doc %% @doc
%% Fetch only the head of the object from the Ledger (or the bookie's recent %% Fetch only the head of the object from the Ledger (or the bookie's recent
%% ledger cache if it has just been updated). not_present is returned if the %% ledger cache if it has just been updated). not_present is returned if the
@ -1310,9 +1319,14 @@ fetch_head(Key, Penciller, LedgerCache) ->
end. end.
-spec preparefor_ledgercache(atom(), any(), integer(), any(), -spec preparefor_ledgercache(leveled_codec:journal_key_tag()|null,
integer(), tuple(), book_state()) -> leveled_codec:ledger_key()|?DUMMY,
{integer()|no_lookup, integer(), list()}. integer(), any(), integer(),
leveled_codec:key_changes(),
book_state())
-> {integer()|no_lookup,
integer(),
list(leveled_codec:ledger_kv())}.
%% @doc %% @doc
%% Prepare an object and its related key changes for addition to the Ledger %% Prepare an object and its related key changes for addition to the Ledger
%% via the Ledger Cache. %% via the Ledger Cache.
@ -1342,8 +1356,10 @@ preparefor_ledgercache(_InkTag,
-spec addto_ledgercache({integer()|no_lookup, -spec addto_ledgercache({integer()|no_lookup,
integer(), list()}, ledger_cache()) integer(),
-> ledger_cache(). list(leveled_codec:ledger_kv())},
ledger_cache())
-> ledger_cache().
%% @doc %% @doc
%% Add a set of changes associated with a single sequence number (journal %% Add a set of changes associated with a single sequence number (journal
%% update) and key to the ledger cache. If the changes are not to be looked %% update) and key to the ledger cache. If the changes are not to be looked
@ -1356,8 +1372,11 @@ addto_ledgercache({H, SQN, KeyChanges}, Cache) ->
max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}. max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}.
-spec addto_ledgercache({integer()|no_lookup, -spec addto_ledgercache({integer()|no_lookup,
integer(), list()}, ledger_cache(), loader) integer(),
-> ledger_cache(). list(leveled_codec:ledger_kv())},
ledger_cache(),
loader)
-> ledger_cache().
%% @doc %% @doc
%% Add a set of changes associated witha single sequence number (journal %% Add a set of changes associated witha single sequence number (journal
%% update) to the ledger cache. This is used explicitly when laoding the %% update) to the ledger cache. This is used explicitly when laoding the
@ -1649,7 +1668,7 @@ ttl_test() ->
{ok, Bookie1} = book_start([{root_path, RootPath}]), {ok, Bookie1} = book_start([{root_path, RootPath}]),
ObjL1 = generate_multiple_objects(100, 1), ObjL1 = generate_multiple_objects(100, 1),
% Put in all the objects with a TTL in the future % Put in all the objects with a TTL in the future
Future = leveled_codec:integer_now() + 300, Future = leveled_util:integer_now() + 300,
lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1, lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1,
"Bucket", K, V, S, "Bucket", K, V, S,
?STD_TAG, ?STD_TAG,
@ -1665,7 +1684,7 @@ ttl_test() ->
ObjL1), ObjL1),
ObjL2 = generate_multiple_objects(100, 101), ObjL2 = generate_multiple_objects(100, 101),
Past = leveled_codec:integer_now() - 300, Past = leveled_util:integer_now() - 300,
lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1, lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1,
"Bucket", K, V, S, "Bucket", K, V, S,
?STD_TAG, ?STD_TAG,
@ -1741,7 +1760,7 @@ hashlist_query_testto() ->
{cache_size, 500}]), {cache_size, 500}]),
ObjL1 = generate_multiple_objects(1200, 1), ObjL1 = generate_multiple_objects(1200, 1),
% Put in all the objects with a TTL in the future % Put in all the objects with a TTL in the future
Future = leveled_codec:integer_now() + 300, Future = leveled_util:integer_now() + 300,
lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1, lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1,
"Bucket", K, V, S, "Bucket", K, V, S,
?STD_TAG, ?STD_TAG,
@ -1749,7 +1768,7 @@ hashlist_query_testto() ->
ObjL1), ObjL1),
ObjL2 = generate_multiple_objects(20, 1201), ObjL2 = generate_multiple_objects(20, 1201),
% Put in a few objects with a TTL in the past % Put in a few objects with a TTL in the past
Past = leveled_codec:integer_now() - 300, Past = leveled_util:integer_now() - 300,
lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1, lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1,
"Bucket", K, V, S, "Bucket", K, V, S,
?STD_TAG, ?STD_TAG,
@ -1793,7 +1812,7 @@ hashlist_query_withjournalcheck_testto() ->
{cache_size, 500}]), {cache_size, 500}]),
ObjL1 = generate_multiple_objects(800, 1), ObjL1 = generate_multiple_objects(800, 1),
% Put in all the objects with a TTL in the future % Put in all the objects with a TTL in the future
Future = leveled_codec:integer_now() + 300, Future = leveled_util:integer_now() + 300,
lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1, lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1,
"Bucket", K, V, S, "Bucket", K, V, S,
?STD_TAG, ?STD_TAG,
@ -1822,7 +1841,7 @@ foldobjects_vs_hashtree_testto() ->
{cache_size, 500}]), {cache_size, 500}]),
ObjL1 = generate_multiple_objects(800, 1), ObjL1 = generate_multiple_objects(800, 1),
% Put in all the objects with a TTL in the future % Put in all the objects with a TTL in the future
Future = leveled_codec:integer_now() + 300, Future = leveled_util:integer_now() + 300,
lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1, lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1,
"Bucket", K, V, S, "Bucket", K, V, S,
?STD_TAG, ?STD_TAG,
@ -1896,7 +1915,7 @@ foldobjects_vs_foldheads_bybucket_testto() ->
ObjL1 = generate_multiple_objects(400, 1), ObjL1 = generate_multiple_objects(400, 1),
ObjL2 = generate_multiple_objects(400, 1), ObjL2 = generate_multiple_objects(400, 1),
% Put in all the objects with a TTL in the future % Put in all the objects with a TTL in the future
Future = leveled_codec:integer_now() + 300, Future = leveled_util:integer_now() + 300,
lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1, lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1,
"BucketA", K, V, S, "BucketA", K, V, S,
?STD_TAG, ?STD_TAG,
@ -2034,7 +2053,7 @@ is_empty_test() ->
{max_journalsize, 1000000}, {max_journalsize, 1000000},
{cache_size, 500}]), {cache_size, 500}]),
% Put in an object with a TTL in the future % Put in an object with a TTL in the future
Future = leveled_codec:integer_now() + 300, Future = leveled_util:integer_now() + 300,
?assertMatch(true, leveled_bookie:book_isempty(Bookie1, ?STD_TAG)), ?assertMatch(true, leveled_bookie:book_isempty(Bookie1, ?STD_TAG)),
ok = book_tempput(Bookie1, ok = book_tempput(Bookie1,
<<"B">>, <<"K">>, {value, <<"V">>}, [], <<"B">>, <<"K">>, {value, <<"V">>}, [],

View file

@ -1554,7 +1554,7 @@ endian_flip(Int) ->
X. X.
hash(Key) -> hash(Key) ->
leveled_codec:magic_hash(Key). leveled_util:magic_hash(Key).
% Get the least significant 8 bits from the hash. % Get the least significant 8 bits from the hash.
hash_to_index(Hash) -> hash_to_index(Hash) ->

View file

@ -47,7 +47,7 @@
to_ledgerkey/5, to_ledgerkey/5,
from_ledgerkey/1, from_ledgerkey/1,
from_ledgerkey/2, from_ledgerkey/2,
to_inkerkv/3, to_inkerkey/2,
to_inkerkv/6, to_inkerkv/6,
from_inkerkv/1, from_inkerkv/1,
from_inkerkv/2, from_inkerkv/2,
@ -64,10 +64,7 @@
idx_indexspecs/5, idx_indexspecs/5,
obj_objectspecs/3, obj_objectspecs/3,
aae_indexspecs/6, aae_indexspecs/6,
generate_uuid/0,
integer_now/0,
riak_extract_metadata/2, riak_extract_metadata/2,
magic_hash/1,
segment_hash/1, segment_hash/1,
to_lookup/1, to_lookup/1,
riak_metadata_to_binary/2]). riak_metadata_to_binary/2]).
@ -84,8 +81,54 @@
integer()|null, % Hash of vclock - non-exportable integer()|null, % Hash of vclock - non-exportable
integer()}. % Size in bytes of real object integer()}. % Size in bytes of real object
-type tag() ::
?STD_TAG|?RIAK_TAG|?IDX_TAG|?HEAD_TAG.
-type segment_hash() ::
{integer(), integer()}|no_lookup.
-type ledger_status() ::
tomb|{active, non_neg_integer()|infinity}.
-type ledger_key() ::
{tag(), any(), any(), any()}.
-type ledger_value() ::
{integer(), ledger_status(), segment_hash(), tuple()|null}.
-type ledger_kv() ::
{ledger_key(), ledger_value()}.
-type compaction_strategy() ::
list({tag(), retain|skip|recalc}).
-type journal_key_tag() ::
?INKT_STND|?INKT_TOMB|?INKT_MPUT|?INKT_KEYD.
-type journal_key() ::
{integer(), journal_key_tag(), ledger_key()}.
-type compression_method() ::
lz4|native.
-type journal_keychanges() ::
{list(), infinity|integer()}. % {KeyChanges, TTL}
-type index_specs() ::
list({add|remove, any(), any()}).
-spec segment_hash(any()) -> {integer(), integer()}. -type segment_list()
:: list(integer())|false.
-export_type([tag/0,
segment_hash/0,
ledger_status/0,
ledger_key/0,
ledger_value/0,
ledger_kv/0,
compaction_strategy/0,
journal_key_tag/0,
journal_key/0,
compression_method/0,
journal_keychanges/0,
index_specs/0,
segment_list/0]).
%%%============================================================================
%%% Ledger Key Manipulation
%%%============================================================================
-spec segment_hash(ledger_key()|binary()) -> {integer(), integer()}.
%% @doc %% @doc
%% Return two 16 bit integers - the segment ID and a second integer for spare %% Return two 16 bit integers - the segment ID and a second integer for spare
%% entropy. The hashed should be used in blooms or indexes such that some %% entropy. The hashed should be used in blooms or indexes such that some
@ -107,28 +150,7 @@ segment_hash(Key) ->
segment_hash(term_to_binary(Key)). segment_hash(term_to_binary(Key)).
-spec magic_hash(any()) -> integer(). -spec to_lookup(ledger_key()) -> lookup|no_lookup.
%% @doc
%% Use DJ Bernstein magic hash function. Note, this is more expensive than
%% phash2 but provides a much more balanced result.
%%
%% Hash function contains mysterious constants, some explanation here as to
%% what they are -
%% http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function
magic_hash({binary, BinaryKey}) ->
H = 5381,
hash1(H, BinaryKey) band 16#FFFFFFFF;
magic_hash(AnyKey) ->
BK = term_to_binary(AnyKey),
magic_hash({binary, BK}).
hash1(H, <<>>) ->
H;
hash1(H, <<B:8/integer, Rest/bytes>>) ->
H1 = H * 33,
H2 = H1 bxor B,
hash1(H2, Rest).
%% @doc %% @doc
%% Should it be possible to lookup a key in the merge tree. This is not true %% Should it be possible to lookup a key in the merge tree. This is not true
%% For keys that should only be read through range queries. Direct lookup %% For keys that should only be read through range queries. Direct lookup
@ -141,36 +163,30 @@ to_lookup(Key) ->
lookup lookup
end. end.
-spec generate_uuid() -> list().
%% @doc %% @doc
%% Generate a new globally unique ID as a string. %% Some helper functions to get a sub_components of the key/value
%% Credit to
%% https://github.com/afiskon/erlang-uuid-v4/blob/master/src/uuid.erl
generate_uuid() ->
<<A:32, B:16, C:16, D:16, E:48>> = leveled_rand:rand_bytes(16),
L = io_lib:format("~8.16.0b-~4.16.0b-4~3.16.0b-~4.16.0b-~12.16.0b",
[A, B, C band 16#0fff, D band 16#3fff bor 16#8000, E]),
binary_to_list(list_to_binary(L)).
inker_reload_strategy(AltList) ->
ReloadStrategy0 = [{?RIAK_TAG, retain}, {?STD_TAG, retain}],
lists:foldl(fun({X, Y}, SList) ->
lists:keyreplace(X, 1, SList, {X, Y})
end,
ReloadStrategy0,
AltList).
-spec strip_to_statusonly(ledger_kv()) -> ledger_status().
strip_to_statusonly({_, {_, St, _, _}}) -> St. strip_to_statusonly({_, {_, St, _, _}}) -> St.
-spec strip_to_seqonly(ledger_kv()) -> non_neg_integer().
strip_to_seqonly({_, {SeqN, _, _, _}}) -> SeqN. strip_to_seqonly({_, {SeqN, _, _, _}}) -> SeqN.
-spec strip_to_keyseqonly(ledger_kv()) -> {ledger_key(), integer()}.
strip_to_keyseqonly({LK, {SeqN, _, _, _}}) -> {LK, SeqN}. strip_to_keyseqonly({LK, {SeqN, _, _, _}}) -> {LK, SeqN}.
-spec strip_to_seqnhashonly(ledger_kv()) -> {integer(), segment_hash()}.
strip_to_seqnhashonly({_, {SeqN, _, MH, _}}) -> {SeqN, MH}. strip_to_seqnhashonly({_, {SeqN, _, MH, _}}) -> {SeqN, MH}.
-spec striphead_to_details(ledger_value()) -> ledger_value().
striphead_to_details({SeqN, St, MH, MD}) -> {SeqN, St, MH, MD}. striphead_to_details({SeqN, St, MH, MD}) -> {SeqN, St, MH, MD}.
-spec key_dominates(ledger_kv(), ledger_kv()) ->
left_hand_first|right_hand_first|left_hand_dominant|right_hand_dominant.
%% @doc
%% When comparing two keys in the ledger need to find if one key comes before
%% the other, or if the match, which key is "better" and should be the winner
key_dominates(LeftKey, RightKey) -> key_dominates(LeftKey, RightKey) ->
case {LeftKey, RightKey} of case {LeftKey, RightKey} of
{{LK, _LVAL}, {RK, _RVAL}} when LK < RK -> {{LK, _LVAL}, {RK, _RVAL}} when LK < RK ->
@ -185,7 +201,11 @@ key_dominates(LeftKey, RightKey) ->
right_hand_dominant right_hand_dominant
end. end.
-spec maybe_reap_expiredkey(ledger_kv(), {boolean(), integer()}) -> boolean().
%% @doc
%% Make a reap decision based on the level in the ledger (needs to be expired
%% and in the basement). the level is a tuple of the is_basement boolean, and
%% a timestamp passed into the calling function
maybe_reap_expiredkey(KV, LevelD) -> maybe_reap_expiredkey(KV, LevelD) ->
Status = strip_to_statusonly(KV), Status = strip_to_statusonly(KV),
maybe_reap(Status, LevelD). maybe_reap(Status, LevelD).
@ -199,6 +219,9 @@ maybe_reap(tomb, {true, _CurrTS}) ->
maybe_reap(_, _) -> maybe_reap(_, _) ->
false. false.
-spec is_active(ledger_key(), ledger_value(), non_neg_integer()) -> boolean().
%% @doc
%% Is this an active KV pair or has the timestamp expired
is_active(Key, Value, Now) -> is_active(Key, Value, Now) ->
case strip_to_statusonly({Key, Value}) of case strip_to_statusonly({Key, Value}) of
{active, infinity} -> {active, infinity} ->
@ -231,21 +254,111 @@ from_ledgerkey({?IDX_TAG, Bucket, {_IdxFld, IdxVal}, Key}) ->
from_ledgerkey({_Tag, Bucket, Key, _SubKey}) -> from_ledgerkey({_Tag, Bucket, Key, _SubKey}) ->
{Bucket, Key}. {Bucket, Key}.
-spec to_ledgerkey(any(), any(), tag(), any(), any()) -> ledger_key().
%% @doc
%% Convert something into a ledger key
to_ledgerkey(Bucket, Key, Tag, Field, Value) when Tag == ?IDX_TAG -> to_ledgerkey(Bucket, Key, Tag, Field, Value) when Tag == ?IDX_TAG ->
{?IDX_TAG, Bucket, {Field, Value}, Key}. {?IDX_TAG, Bucket, {Field, Value}, Key}.
-spec to_ledgerkey(any(), any(), tag()) -> ledger_key().
%% @doc
%% Convert something into a ledger key
to_ledgerkey(Bucket, {Key, SubKey}, ?HEAD_TAG) -> to_ledgerkey(Bucket, {Key, SubKey}, ?HEAD_TAG) ->
{?HEAD_TAG, Bucket, Key, SubKey}; {?HEAD_TAG, Bucket, Key, SubKey};
to_ledgerkey(Bucket, Key, Tag) -> to_ledgerkey(Bucket, Key, Tag) ->
{Tag, Bucket, Key, null}. {Tag, Bucket, Key, null}.
-spec endkey_passed(ledger_key(), ledger_key()) -> boolean().
%% @oc
%% Compare a key against a query key, only comparing elements that are non-null
%% in the Query key. This is used for comparing against end keys in queries.
endkey_passed(all, _) ->
false;
endkey_passed({EK1, null, null, null}, {CK1, _, _, _}) ->
EK1 < CK1;
endkey_passed({EK1, EK2, null, null}, {CK1, CK2, _, _}) ->
{EK1, EK2} < {CK1, CK2};
endkey_passed({EK1, EK2, EK3, null}, {CK1, CK2, CK3, _}) ->
{EK1, EK2, EK3} < {CK1, CK2, CK3};
endkey_passed(EndKey, CheckingKey) ->
EndKey < CheckingKey.
%% Return the Key, Value and Hash Option for this object. The hash option
%% indicates whether the key would ever be looked up directly, and so if it
%% requires an entry in the hash table
to_inkerkv(LedgerKey, SQN, to_fetch) ->
{{SQN, ?INKT_STND, LedgerKey}, null, true}.
%%%============================================================================
%%% Journal Compaction functions
%%%============================================================================
-spec inker_reload_strategy(compaction_strategy()) -> compaction_strategy().
%% @doc
%% Take the default startegy for compaction, and override the approach for any
%% tags passed in
inker_reload_strategy(AltList) ->
ReloadStrategy0 = [{?RIAK_TAG, retain}, {?STD_TAG, retain}],
lists:foldl(fun({X, Y}, SList) ->
lists:keyreplace(X, 1, SList, {X, Y})
end,
ReloadStrategy0,
AltList).
-spec compact_inkerkvc({journal_key(), any(), boolean()},
compaction_strategy()) ->
skip|{retain, any()}|{recalc, null}.
%% @doc
%% Decide whether a superceded object should be replicated in the compacted
%% file and in what format.
compact_inkerkvc({_InkerKey, crc_wonky, false}, _Strategy) ->
skip;
compact_inkerkvc({{_SQN, ?INKT_TOMB, _LK}, _V, _CrcCheck}, _Strategy) ->
skip;
compact_inkerkvc({{SQN, ?INKT_KEYD, LK}, V, CrcCheck}, Strategy) ->
case get_tagstrategy(LK, Strategy) of
skip ->
skip;
retain ->
{retain, {{SQN, ?INKT_KEYD, LK}, V, CrcCheck}};
TagStrat ->
{TagStrat, null}
end;
compact_inkerkvc({{SQN, ?INKT_STND, LK}, V, CrcCheck}, Strategy) ->
case get_tagstrategy(LK, Strategy) of
skip ->
skip;
retain ->
{_V, KeyDeltas} = revert_value_from_journal(V),
{retain, {{SQN, ?INKT_KEYD, LK}, {null, KeyDeltas}, CrcCheck}};
TagStrat ->
{TagStrat, null}
end.
-spec get_tagstrategy(ledger_key(), compaction_strategy())
-> skip|retain|recalc.
%% @doc
%% Work out the compaction startegy for the key
get_tagstrategy({Tag, _, _, _}, Strategy) ->
case lists:keyfind(Tag, 1, Strategy) of
{Tag, TagStrat} ->
TagStrat;
false ->
leveled_log:log("IC012", [Tag, Strategy]),
skip
end.
%%%============================================================================
%%% Manipulate Journal Key and Value
%%%============================================================================
-spec to_inkerkey(ledger_key(), non_neg_integer()) -> journal_key().
%% @doc
%% convertion from ledger_key to journal_key to allow for the key to be fetched
to_inkerkey(LedgerKey, SQN) ->
{SQN, ?INKT_STND, LedgerKey}.
-spec to_inkerkv(ledger_key(), non_neg_integer(), any(), journal_keychanges(),
compression_method(), boolean()) -> {journal_key(), any()}.
%% @doc
%% Convert to the correct format of a Journal key and value
to_inkerkv(LedgerKey, SQN, Object, KeyChanges, PressMethod, Compress) -> to_inkerkv(LedgerKey, SQN, Object, KeyChanges, PressMethod, Compress) ->
InkerType = check_forinkertype(LedgerKey, Object), InkerType = check_forinkertype(LedgerKey, Object),
Value = Value =
@ -264,6 +377,11 @@ from_inkerkv(Object, ToIgnoreKeyChanges) ->
Object Object
end. end.
-spec create_value_for_journal({any(), journal_keychanges()|binary()},
boolean(), compression_method()) -> binary().
%% @doc
%% Serialise the value to be stored in the Journal
create_value_for_journal({Object, KeyChanges}, Compress, Method) create_value_for_journal({Object, KeyChanges}, Compress, Method)
when not is_binary(KeyChanges) -> when not is_binary(KeyChanges) ->
KeyChangeBin = term_to_binary(KeyChanges, [compressed]), KeyChangeBin = term_to_binary(KeyChanges, [compressed]),
@ -311,6 +429,10 @@ serialise_object(Object, false, _Method) ->
serialise_object(Object, true, _Method) -> serialise_object(Object, true, _Method) ->
term_to_binary(Object, [compressed]). term_to_binary(Object, [compressed]).
-spec revert_value_from_journal(binary()) -> {any(), journal_keychanges()}.
%% @doc
%% Revert the object back to its deserialised state, along with the list of
%% key changes associated with the change
revert_value_from_journal(JournalBin) -> revert_value_from_journal(JournalBin) ->
revert_value_from_journal(JournalBin, false). revert_value_from_journal(JournalBin, false).
@ -324,7 +446,8 @@ revert_value_from_journal(JournalBin, ToIgnoreKeyChanges) ->
case ToIgnoreKeyChanges of case ToIgnoreKeyChanges of
true -> true ->
<<OBin2:Length1/binary, _KCBin2:KeyChangeLength/binary>> = JBin0, <<OBin2:Length1/binary, _KCBin2:KeyChangeLength/binary>> = JBin0,
{deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4), []}; {deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4),
{[], infinity}};
false -> false ->
<<OBin2:Length1/binary, KCBin2:KeyChangeLength/binary>> = JBin0, <<OBin2:Length1/binary, KCBin2:KeyChangeLength/binary>> = JBin0,
{deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4), {deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4),
@ -359,54 +482,22 @@ encode_valuetype(IsBinary, IsCompressed, Method) ->
end, end,
Bit1 + Bit2 + Bit3. Bit1 + Bit2 + Bit3.
-spec decode_valuetype(integer()) -> {boolean(), boolean(), boolean()}.
%% @doc
%% Check bit flags to confirm how the object has been serialised
decode_valuetype(TypeInt) -> decode_valuetype(TypeInt) ->
IsCompressed = TypeInt band 1 == 1, IsCompressed = TypeInt band 1 == 1,
IsBinary = TypeInt band 2 == 2, IsBinary = TypeInt band 2 == 2,
IsLz4 = TypeInt band 4 == 4, IsLz4 = TypeInt band 4 == 4,
{IsBinary, IsCompressed, IsLz4}. {IsBinary, IsCompressed, IsLz4}.
-spec from_journalkey(journal_key()) -> {integer(), ledger_key()}.
%% @doc
%% Return just SQN and Ledger Key
from_journalkey({SQN, _Type, LedgerKey}) -> from_journalkey({SQN, _Type, LedgerKey}) ->
{SQN, LedgerKey}. {SQN, LedgerKey}.
compact_inkerkvc({_InkerKey, crc_wonky, false}, _Strategy) ->
skip;
compact_inkerkvc({{_SQN, ?INKT_TOMB, _LK}, _V, _CrcCheck}, _Strategy) ->
skip;
compact_inkerkvc({{SQN, ?INKT_KEYD, LK}, V, CrcCheck}, Strategy) ->
case get_tagstrategy(LK, Strategy) of
skip ->
skip;
retain ->
{retain, {{SQN, ?INKT_KEYD, LK}, V, CrcCheck}};
TagStrat ->
{TagStrat, null}
end;
compact_inkerkvc({{SQN, ?INKT_STND, LK}, V, CrcCheck}, Strategy) ->
case get_tagstrategy(LK, Strategy) of
skip ->
skip;
retain ->
{_V, KeyDeltas} = revert_value_from_journal(V),
{retain, {{SQN, ?INKT_KEYD, LK}, {null, KeyDeltas}, CrcCheck}};
TagStrat ->
{TagStrat, null}
end;
compact_inkerkvc(_KVC, _Strategy) ->
skip.
get_tagstrategy(LK, Strategy) ->
case LK of
{Tag, _, _, _} ->
case lists:keyfind(Tag, 1, Strategy) of
{Tag, TagStrat} ->
TagStrat;
false ->
leveled_log:log("IC012", [Tag, Strategy]),
skip
end;
_ ->
skip
end.
split_inkvalue(VBin) when is_binary(VBin) -> split_inkvalue(VBin) when is_binary(VBin) ->
revert_value_from_journal(VBin). revert_value_from_journal(VBin).
@ -422,27 +513,27 @@ hash(Obj) ->
erlang:phash2(term_to_binary(Obj)). erlang:phash2(term_to_binary(Obj)).
% Compare a key against a query key, only comparing elements that are non-null
% in the Query key. This is used for comparing against end keys in queries. %%%============================================================================
endkey_passed(all, _) -> %%% Other Ledger Functions
false; %%%============================================================================
endkey_passed({EK1, null, null, null}, {CK1, _, _, _}) ->
EK1 < CK1;
endkey_passed({EK1, EK2, null, null}, {CK1, CK2, _, _}) ->
{EK1, EK2} < {CK1, CK2};
endkey_passed({EK1, EK2, EK3, null}, {CK1, CK2, CK3, _}) ->
{EK1, EK2, EK3} < {CK1, CK2, CK3};
endkey_passed(EndKey, CheckingKey) ->
EndKey < CheckingKey.
-spec obj_objectspecs(list(tuple()), integer(), integer()|infinity)
-> list(ledger_kv()).
%% @doc
%% Convert object specs to KV entries ready for the ledger
obj_objectspecs(ObjectSpecs, SQN, TTL) -> obj_objectspecs(ObjectSpecs, SQN, TTL) ->
lists:map(fun({IdxOp, Bucket, Key, SubKey, Value}) -> lists:map(fun({IdxOp, Bucket, Key, SubKey, Value}) ->
gen_headspec(Bucket, Key, IdxOp, SubKey, Value, SQN, TTL) gen_headspec(Bucket, Key, IdxOp, SubKey, Value, SQN, TTL)
end, end,
ObjectSpecs). ObjectSpecs).
-spec idx_indexspecs(index_specs(),
any(), any(), integer(), integer()|infinity)
-> list(ledger_kv()).
%% @doc
%% Convert index specs to KV entries ready for the ledger
idx_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) -> idx_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) ->
lists:map( lists:map(
fun({IdxOp, IdxFld, IdxTrm}) -> fun({IdxOp, IdxFld, IdxTrm}) ->
@ -528,7 +619,7 @@ aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods) ->
Dates = parse_date(LMD0, Dates = parse_date(LMD0,
AAE#recent_aae.unit_minutes, AAE#recent_aae.unit_minutes,
AAE#recent_aae.limit_minutes, AAE#recent_aae.limit_minutes,
integer_now()), leveled_util:integer_now()),
case Dates of case Dates of
no_index -> no_index ->
Acc; Acc;
@ -557,12 +648,12 @@ aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods) ->
-spec parse_date(tuple(), integer(), integer(), integer()) -> -spec parse_date(tuple(), integer(), integer(), integer()) ->
no_index|{binary(), integer()}. no_index|{binary(), integer()}.
%% @doc %% @doc
%% Parse the lat modified date and the AAE date configuration to return a %% Parse the last modified date and the AAE date configuration to return a
%% binary to be used as the last modified date part of the index, and an %% binary to be used as the last modified date part of the index, and an
%% integer to be used as the TTL of the index entry. %% integer to be used as the TTL of the index entry.
%% Return no_index if the change is not recent. %% Return no_index if the change is not recent.
parse_date(LMD, UnitMins, LimitMins, Now) -> parse_date(LMD, UnitMins, LimitMins, Now) ->
LMDsecs = integer_time(LMD), LMDsecs = leveled_util:integer_time(LMD),
Recent = (LMDsecs + LimitMins * 60) > Now, Recent = (LMDsecs + LimitMins * 60) > Now,
case Recent of case Recent of
false -> false ->
@ -614,13 +705,6 @@ generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) ->
{Bucket, Key, Value, {Hash, ObjHash}, LastMods}. {Bucket, Key, Value, {Hash, ObjHash}, LastMods}.
integer_now() ->
integer_time(os:timestamp()).
integer_time(TS) ->
DT = calendar:now_to_universal_time(TS),
calendar:datetime_to_gregorian_seconds(DT).
extract_metadata(Obj, Size, ?RIAK_TAG) -> extract_metadata(Obj, Size, ?RIAK_TAG) ->
riak_extract_metadata(Obj, Size); riak_extract_metadata(Obj, Size);
extract_metadata(Obj, Size, ?STD_TAG) -> extract_metadata(Obj, Size, ?STD_TAG) ->
@ -782,37 +866,18 @@ endkey_passed_test() ->
?assertMatch(true, endkey_passed(TestKey, K2)). ?assertMatch(true, endkey_passed(TestKey, K2)).
corrupted_ledgerkey_test() ->
% When testing for compacted journal which has been corrupted, there may
% be a corruptes ledger key. Always skip these keys
% Key has become a 3-tuple not a 4-tuple
TagStrat1 = compact_inkerkvc({{1,
?INKT_STND,
{?STD_TAG, "B1", "K1andSK"}},
{},
true},
[{?STD_TAG, retain}]),
?assertMatch(skip, TagStrat1),
TagStrat2 = compact_inkerkvc({{1,
?INKT_KEYD,
{?STD_TAG, "B1", "K1andSK"}},
{},
true},
[{?STD_TAG, retain}]),
?assertMatch(skip, TagStrat2).
general_skip_strategy_test() -> general_skip_strategy_test() ->
% Confirm that we will skip if the strategy says so % Confirm that we will skip if the strategy says so
TagStrat1 = compact_inkerkvc({{1, TagStrat1 = compact_inkerkvc({{1,
?INKT_STND, ?INKT_STND,
{?STD_TAG, "B1", "K1andSK"}}, {?STD_TAG, "B1", "K1andSK", null}},
{}, {},
true}, true},
[{?STD_TAG, skip}]), [{?STD_TAG, skip}]),
?assertMatch(skip, TagStrat1), ?assertMatch(skip, TagStrat1),
TagStrat2 = compact_inkerkvc({{1, TagStrat2 = compact_inkerkvc({{1,
?INKT_KEYD, ?INKT_KEYD,
{?STD_TAG, "B1", "K1andSK"}}, {?STD_TAG, "B1", "K1andSK", null}},
{}, {},
true}, true},
[{?STD_TAG, skip}]), [{?STD_TAG, skip}]),
@ -839,15 +904,6 @@ general_skip_strategy_test() ->
[{?STD_TAG, skip}, {?IDX_TAG, recalc}]), [{?STD_TAG, skip}, {?IDX_TAG, recalc}]),
?assertMatch(skip, TagStrat5). ?assertMatch(skip, TagStrat5).
corrupted_inker_tag_test() ->
% Confirm that we will skip on unknown inker tag
TagStrat1 = compact_inkerkvc({{1,
foo,
{?STD_TAG, "B1", "K1andSK"}},
{},
true},
[{?STD_TAG, retain}]),
?assertMatch(skip, TagStrat1).
%% Test below proved that the overhead of performing hashes was trivial %% Test below proved that the overhead of performing hashes was trivial
%% Maybe 5 microseconds per hash %% Maybe 5 microseconds per hash
@ -859,24 +915,10 @@ hashperf_test() ->
io:format(user, "1000 object hashes in ~w microseconds~n", io:format(user, "1000 object hashes in ~w microseconds~n",
[timer:now_diff(os:timestamp(), SW)]). [timer:now_diff(os:timestamp(), SW)]).
magichashperf_test() ->
KeyFun =
fun(X) ->
K = {o, "Bucket", "Key" ++ integer_to_list(X), null},
{K, X}
end,
KL = lists:map(KeyFun, lists:seq(1, 1000)),
{TimeMH, _HL1} = timer:tc(lists, map, [fun(K) -> magic_hash(K) end, KL]),
io:format(user, "1000 keys magic hashed in ~w microseconds~n", [TimeMH]),
{TimePH, _Hl2} = timer:tc(lists, map, [fun(K) -> erlang:phash2(K) end, KL]),
io:format(user, "1000 keys phash2 hashed in ~w microseconds~n", [TimePH]),
{TimeMH2, _HL1} = timer:tc(lists, map, [fun(K) -> magic_hash(K) end, KL]),
io:format(user, "1000 keys magic hashed in ~w microseconds~n", [TimeMH2]).
parsedate_test() -> parsedate_test() ->
{MeS, S, MiS} = os:timestamp(), {MeS, S, MiS} = os:timestamp(),
timer:sleep(100), timer:sleep(100),
Now = integer_now(), Now = leveled_util:integer_now(),
UnitMins = 5, UnitMins = 5,
LimitMins = 60, LimitMins = 60,
PD = parse_date({MeS, S, MiS}, UnitMins, LimitMins, Now), PD = parse_date({MeS, S, MiS}, UnitMins, LimitMins, Now),
@ -898,7 +940,7 @@ check_pd(PD, UnitMins) ->
parseolddate_test() -> parseolddate_test() ->
LMD = os:timestamp(), LMD = os:timestamp(),
timer:sleep(100), timer:sleep(100),
Now = integer_now() + 60 * 60, Now = leveled_util:integer_now() + 60 * 60,
UnitMins = 5, UnitMins = 5,
LimitMins = 60, LimitMins = 60,
PD = parse_date(LMD, UnitMins, LimitMins, Now), PD = parse_date(LMD, UnitMins, LimitMins, Now),

View file

@ -821,14 +821,14 @@ fetch_testcdb(RP) ->
{ok, {ok,
CDB1} = leveled_cdb:cdb_open_writer(FN1, CDB1} = leveled_cdb:cdb_open_writer(FN1,
#cdb_options{binary_mode=true}), #cdb_options{binary_mode=true}),
{K1, V1} = test_inkerkv(1, "Key1", "Value1", []), {K1, V1} = test_inkerkv(1, "Key1", "Value1", {[], infinity}),
{K2, V2} = test_inkerkv(2, "Key2", "Value2", []), {K2, V2} = test_inkerkv(2, "Key2", "Value2", {[], infinity}),
{K3, V3} = test_inkerkv(3, "Key3", "Value3", []), {K3, V3} = test_inkerkv(3, "Key3", "Value3", {[], infinity}),
{K4, V4} = test_inkerkv(4, "Key1", "Value4", []), {K4, V4} = test_inkerkv(4, "Key1", "Value4", {[], infinity}),
{K5, V5} = test_inkerkv(5, "Key1", "Value5", []), {K5, V5} = test_inkerkv(5, "Key1", "Value5", {[], infinity}),
{K6, V6} = test_inkerkv(6, "Key1", "Value6", []), {K6, V6} = test_inkerkv(6, "Key1", "Value6", {[], infinity}),
{K7, V7} = test_inkerkv(7, "Key1", "Value7", []), {K7, V7} = test_inkerkv(7, "Key1", "Value7", {[], infinity}),
{K8, V8} = test_inkerkv(8, "Key1", "Value8", []), {K8, V8} = test_inkerkv(8, "Key1", "Value8", {[], infinity}),
ok = leveled_cdb:cdb_put(CDB1, K1, V1), ok = leveled_cdb:cdb_put(CDB1, K1, V1),
ok = leveled_cdb:cdb_put(CDB1, K2, V2), ok = leveled_cdb:cdb_put(CDB1, K2, V2),
ok = leveled_cdb:cdb_put(CDB1, K3, V3), ok = leveled_cdb:cdb_put(CDB1, K3, V3),
@ -920,7 +920,8 @@ compact_single_file_recovr_test() ->
{2, {2,
stnd, stnd,
test_ledgerkey("Key2")}), test_ledgerkey("Key2")}),
?assertMatch({{_, _}, {"Value2", []}}, leveled_codec:from_inkerkv(RKV1)), ?assertMatch({{_, _}, {"Value2", {[], infinity}}},
leveled_codec:from_inkerkv(RKV1)),
ok = leveled_cdb:cdb_deletepending(CDB), ok = leveled_cdb:cdb_deletepending(CDB),
ok = leveled_cdb:cdb_destroy(CDB). ok = leveled_cdb:cdb_destroy(CDB).
@ -958,7 +959,8 @@ compact_single_file_retain_test() ->
{2, {2,
stnd, stnd,
test_ledgerkey("Key2")}), test_ledgerkey("Key2")}),
?assertMatch({{_, _}, {"Value2", []}}, leveled_codec:from_inkerkv(RKV1)), ?assertMatch({{_, _}, {"Value2", {[], infinity}}},
leveled_codec:from_inkerkv(RKV1)),
ok = leveled_cdb:cdb_deletepending(CDB), ok = leveled_cdb:cdb_deletepending(CDB),
ok = leveled_cdb:cdb_destroy(CDB). ok = leveled_cdb:cdb_destroy(CDB).
@ -1001,7 +1003,8 @@ compact_singlefile_totwosmallfiles_testto() ->
LK = test_ledgerkey("Key" ++ integer_to_list(X)), LK = test_ledgerkey("Key" ++ integer_to_list(X)),
Value = leveled_rand:rand_bytes(1024), Value = leveled_rand:rand_bytes(1024),
{IK, IV} = {IK, IV} =
leveled_codec:to_inkerkv(LK, X, Value, [], leveled_codec:to_inkerkv(LK, X, Value,
{[], infinity},
native, true), native, true),
ok = leveled_cdb:cdb_put(CDB1, IK, IV) ok = leveled_cdb:cdb_put(CDB1, IK, IV)
end, end,

View file

@ -35,6 +35,7 @@
%% The Entry should have a pid() as the third element, but a string() may be %% The Entry should have a pid() as the third element, but a string() may be
%% used in unit tests %% used in unit tests
-export_type([manifest/0, manifest_entry/0]).
%%%============================================================================ %%%============================================================================
%%% API %%% API
@ -73,7 +74,8 @@ add_entry(Manifest, Entry, ToEnd) ->
from_list(Man1) from_list(Man1)
end. end.
-spec append_lastkey(manifest(), pid(), any()) -> manifest(). -spec append_lastkey(manifest(), pid(), leveled_codec:journal_key())
-> manifest().
%% @doc %% @doc
%% On discovery of the last key in the last journal entry, the manifest can %% On discovery of the last key in the last journal entry, the manifest can
%% be updated through this function to have the last key %% be updated through this function to have the last key

View file

@ -127,6 +127,7 @@
-define(PENDING_FILEX, "pnd"). -define(PENDING_FILEX, "pnd").
-define(LOADING_PAUSE, 1000). -define(LOADING_PAUSE, 1000).
-define(LOADING_BATCH, 1000). -define(LOADING_BATCH, 1000).
-define(TEST_KC, {[], infinity}).
-record(state, {manifest = [] :: list(), -record(state, {manifest = [] :: list(),
manifest_sqn = 0 :: integer(), manifest_sqn = 0 :: integer(),
@ -171,18 +172,14 @@ ink_start(InkerOpts) ->
gen_server:start(?MODULE, [InkerOpts], []). gen_server:start(?MODULE, [InkerOpts], []).
-spec ink_put(pid(), -spec ink_put(pid(),
{atom(), any(), any(), any()}|string(), leveled_codec:ledger_key(),
any(), any(),
{list(), integer()|infinity}) -> leveled_codec:key_changes()) ->
{ok, integer(), integer()}. {ok, integer(), integer()}.
%% @doc %% @doc
%% PUT an object into the journal, returning the sequence number for the PUT %% PUT an object into the journal, returning the sequence number for the PUT
%% as well as the size of the object (information required by the ledger). %% as well as the size of the object (information required by the ledger).
%% %%
%% The primary key is expected to be a tuple of the form
%% {Tag, Bucket, Key, null}, but unit tests support pure string Keys and so
%% these types are also supported.
%%
%% KeyChanges is a tuple of {KeyChanges, TTL} where the TTL is an %% KeyChanges is a tuple of {KeyChanges, TTL} where the TTL is an
%% expiry time (or infinity). %% expiry time (or infinity).
ink_put(Pid, PrimaryKey, Object, KeyChanges) -> ink_put(Pid, PrimaryKey, Object, KeyChanges) ->
@ -199,7 +196,7 @@ ink_mput(Pid, PrimaryKey, ObjectChanges) ->
gen_server:call(Pid, {mput, PrimaryKey, ObjectChanges}, infinity). gen_server:call(Pid, {mput, PrimaryKey, ObjectChanges}, infinity).
-spec ink_get(pid(), -spec ink_get(pid(),
{atom(), any(), any(), any()}|string(), leveled_codec:ledger_key(),
integer()) -> integer()) ->
{{integer(), any()}, {any(), any()}}. {{integer(), any()}, {any(), any()}}.
%% @doc %% @doc
@ -221,7 +218,7 @@ ink_fetch(Pid, PrimaryKey, SQN) ->
gen_server:call(Pid, {fetch, PrimaryKey, SQN}, infinity). gen_server:call(Pid, {fetch, PrimaryKey, SQN}, infinity).
-spec ink_keycheck(pid(), -spec ink_keycheck(pid(),
{atom(), any(), any(), any()}|string(), leveled_codec:ledger_key(),
integer()) -> integer()) ->
probably|missing. probably|missing.
%% @doc %% @doc
@ -671,8 +668,11 @@ get_cdbopts(InkOpts)->
CDBopts#cdb_options{waste_path = WasteFP}. CDBopts#cdb_options{waste_path = WasteFP}.
-spec put_object(tuple(), any(), list(), ink_state()) -spec put_object(leveled_codec:ledger_key(),
-> {ok|rolling, ink_state(), integer()}. any(),
leveled_codec:journal_keychanges(),
ink_state())
-> {ok|rolling, ink_state(), integer()}.
%% @doc %% @doc
%% Add the object to the current journal if it fits. If it doesn't fit, a new %% Add the object to the current journal if it fits. If it doesn't fit, a new
%% journal must be started, and the old journal is set to "roll" into a read %% journal must be started, and the old journal is set to "roll" into a read
@ -725,7 +725,9 @@ put_object(LedgerKey, Object, KeyChanges, State) ->
end. end.
-spec get_object(tuple(), integer(), leveled_imanifest:manifest()) -> any(). -spec get_object(leveled_codec:ledger_key(),
integer(),
leveled_imanifest:manifest()) -> any().
%% @doc %% @doc
%% Find the SQN in the manifest and then fetch the object from the Journal, %% Find the SQN in the manifest and then fetch the object from the Journal,
%% in the manifest. If the fetch is in response to a user GET request then %% in the manifest. If the fetch is in response to a user GET request then
@ -736,14 +738,14 @@ get_object(LedgerKey, SQN, Manifest) ->
get_object(LedgerKey, SQN, Manifest, ToIgnoreKeyChanges) -> get_object(LedgerKey, SQN, Manifest, ToIgnoreKeyChanges) ->
JournalP = leveled_imanifest:find_entry(SQN, Manifest), JournalP = leveled_imanifest:find_entry(SQN, Manifest),
{InkerKey, _V, true} = InkerKey = leveled_codec:to_inkerkey(LedgerKey, SQN),
leveled_codec:to_inkerkv(LedgerKey, SQN, to_fetch),
Obj = leveled_cdb:cdb_get(JournalP, InkerKey), Obj = leveled_cdb:cdb_get(JournalP, InkerKey),
leveled_codec:from_inkerkv(Obj, ToIgnoreKeyChanges). leveled_codec:from_inkerkv(Obj, ToIgnoreKeyChanges).
-spec key_check(tuple(), integer(), leveled_imanifest:manifest()) -spec key_check(leveled_codec:ledger_key(),
-> missing|probably. integer(),
leveled_imanifest:manifest()) -> missing|probably.
%% @doc %% @doc
%% Checks for the presence of the key at that SQN withing the journal, %% Checks for the presence of the key at that SQN withing the journal,
%% avoiding the cost of actually reading the object from disk. %% avoiding the cost of actually reading the object from disk.
@ -752,8 +754,7 @@ get_object(LedgerKey, SQN, Manifest, ToIgnoreKeyChanges) ->
%% the positive answer is 'probably' not 'true' %% the positive answer is 'probably' not 'true'
key_check(LedgerKey, SQN, Manifest) -> key_check(LedgerKey, SQN, Manifest) ->
JournalP = leveled_imanifest:find_entry(SQN, Manifest), JournalP = leveled_imanifest:find_entry(SQN, Manifest),
{InkerKey, _V, true} = InkerKey = leveled_codec:to_inkerkey(LedgerKey, SQN),
leveled_codec:to_inkerkv(LedgerKey, SQN, to_fetch),
leveled_cdb:cdb_keycheck(JournalP, InkerKey). leveled_cdb:cdb_keycheck(JournalP, InkerKey).
@ -1013,12 +1014,12 @@ filepath(RootPath, journal_waste_dir) ->
filepath(RootPath, NewSQN, new_journal) -> filepath(RootPath, NewSQN, new_journal) ->
filename:join(filepath(RootPath, journal_dir), filename:join(filepath(RootPath, journal_dir),
integer_to_list(NewSQN) ++ "_" integer_to_list(NewSQN) ++ "_"
++ leveled_codec:generate_uuid() ++ leveled_util:generate_uuid()
++ "." ++ ?PENDING_FILEX); ++ "." ++ ?PENDING_FILEX);
filepath(CompactFilePath, NewSQN, compact_journal) -> filepath(CompactFilePath, NewSQN, compact_journal) ->
filename:join(CompactFilePath, filename:join(CompactFilePath,
integer_to_list(NewSQN) ++ "_" integer_to_list(NewSQN) ++ "_"
++ leveled_codec:generate_uuid() ++ leveled_util:generate_uuid()
++ "." ++ ?PENDING_FILEX). ++ "." ++ ?PENDING_FILEX).
@ -1037,9 +1038,11 @@ initiate_penciller_snapshot(Bookie) ->
create_value_for_journal(Obj, Comp) -> create_value_for_journal(Obj, Comp) ->
leveled_codec:create_value_for_journal(Obj, Comp, native). leveled_codec:create_value_for_journal(Obj, Comp, native).
key_converter(K) ->
{o, <<"B">>, K, null}.
build_dummy_journal() -> build_dummy_journal() ->
F = fun(X) -> X end, build_dummy_journal(fun key_converter/1).
build_dummy_journal(F).
build_dummy_journal(KeyConvertF) -> build_dummy_journal(KeyConvertF) ->
RootPath = "../test/journal", RootPath = "../test/journal",
@ -1053,12 +1056,14 @@ build_dummy_journal(KeyConvertF) ->
{ok, J1} = leveled_cdb:cdb_open_writer(F1), {ok, J1} = leveled_cdb:cdb_open_writer(F1),
{K1, V1} = {KeyConvertF("Key1"), "TestValue1"}, {K1, V1} = {KeyConvertF("Key1"), "TestValue1"},
{K2, V2} = {KeyConvertF("Key2"), "TestValue2"}, {K2, V2} = {KeyConvertF("Key2"), "TestValue2"},
ok = leveled_cdb:cdb_put(J1, ok =
{1, stnd, K1}, leveled_cdb:cdb_put(J1,
create_value_for_journal({V1, []}, false)), {1, stnd, K1},
ok = leveled_cdb:cdb_put(J1, create_value_for_journal({V1, ?TEST_KC}, false)),
{2, stnd, K2}, ok =
create_value_for_journal({V2, []}, false)), leveled_cdb:cdb_put(J1,
{2, stnd, K2},
create_value_for_journal({V2, ?TEST_KC}, false)),
ok = leveled_cdb:cdb_roll(J1), ok = leveled_cdb:cdb_roll(J1),
LK1 = leveled_cdb:cdb_lastkey(J1), LK1 = leveled_cdb:cdb_lastkey(J1),
lists:foldl(fun(X, Closed) -> lists:foldl(fun(X, Closed) ->
@ -1077,12 +1082,14 @@ build_dummy_journal(KeyConvertF) ->
{ok, J2} = leveled_cdb:cdb_open_writer(F2), {ok, J2} = leveled_cdb:cdb_open_writer(F2),
{K1, V3} = {KeyConvertF("Key1"), "TestValue3"}, {K1, V3} = {KeyConvertF("Key1"), "TestValue3"},
{K4, V4} = {KeyConvertF("Key4"), "TestValue4"}, {K4, V4} = {KeyConvertF("Key4"), "TestValue4"},
ok = leveled_cdb:cdb_put(J2, ok =
{3, stnd, K1}, leveled_cdb:cdb_put(J2,
create_value_for_journal({V3, []}, false)), {3, stnd, K1},
ok = leveled_cdb:cdb_put(J2, create_value_for_journal({V3, ?TEST_KC}, false)),
{4, stnd, K4}, ok =
create_value_for_journal({V4, []}, false)), leveled_cdb:cdb_put(J2,
{4, stnd, K4},
create_value_for_journal({V4, ?TEST_KC}, false)),
LK2 = leveled_cdb:cdb_lastkey(J2), LK2 = leveled_cdb:cdb_lastkey(J2),
ok = leveled_cdb:cdb_close(J2), ok = leveled_cdb:cdb_close(J2),
Manifest = [{1, "../test/journal/journal_files/nursery_1", "pid1", LK1}, Manifest = [{1, "../test/journal/journal_files/nursery_1", "pid1", LK1},
@ -1120,12 +1127,12 @@ simple_inker_test() ->
cdb_options=CDBopts, cdb_options=CDBopts,
compression_method=native, compression_method=native,
compress_on_receipt=true}), compress_on_receipt=true}),
Obj1 = ink_get(Ink1, "Key1", 1), Obj1 = ink_get(Ink1, key_converter("Key1"), 1),
?assertMatch({{1, "Key1"}, {"TestValue1", []}}, Obj1), ?assertMatch(Obj1, {{1, key_converter("Key1")}, {"TestValue1", ?TEST_KC}}),
Obj3 = ink_get(Ink1, "Key1", 3), Obj3 = ink_get(Ink1, key_converter("Key1"), 3),
?assertMatch({{3, "Key1"}, {"TestValue3", []}}, Obj3), ?assertMatch(Obj3, {{3, key_converter("Key1")}, {"TestValue3", ?TEST_KC}}),
Obj4 = ink_get(Ink1, "Key4", 4), Obj4 = ink_get(Ink1, key_converter("Key4"), 4),
?assertMatch({{4, "Key4"}, {"TestValue4", []}}, Obj4), ?assertMatch(Obj4, {{4, key_converter("Key4")}, {"TestValue4", ?TEST_KC}}),
ink_close(Ink1), ink_close(Ink1),
clean_testdir(RootPath). clean_testdir(RootPath).
@ -1144,10 +1151,10 @@ simple_inker_completeactivejournal_test() ->
cdb_options=CDBopts, cdb_options=CDBopts,
compression_method=native, compression_method=native,
compress_on_receipt=true}), compress_on_receipt=true}),
Obj1 = ink_get(Ink1, "Key1", 1), Obj1 = ink_get(Ink1, key_converter("Key1"), 1),
?assertMatch({{1, "Key1"}, {"TestValue1", []}}, Obj1), ?assertMatch(Obj1, {{1, key_converter("Key1")}, {"TestValue1", ?TEST_KC}}),
Obj2 = ink_get(Ink1, "Key4", 4), Obj2 = ink_get(Ink1, key_converter("Key4"), 4),
?assertMatch({{4, "Key4"}, {"TestValue4", []}}, Obj2), ?assertMatch(Obj2, {{4, key_converter("Key4")}, {"TestValue4", ?TEST_KC}}),
ink_close(Ink1), ink_close(Ink1),
clean_testdir(RootPath). clean_testdir(RootPath).
@ -1241,9 +1248,9 @@ empty_manifest_test() ->
cdb_options=CDBopts, cdb_options=CDBopts,
compression_method=native, compression_method=native,
compress_on_receipt=true}), compress_on_receipt=true}),
?assertMatch(not_present, ink_fetch(Ink1, "Key1", 1)), ?assertMatch(not_present, ink_fetch(Ink1, key_converter("Key1"), 1)),
CheckFun = fun(L, K, SQN) -> lists:member({SQN, K}, L) end, CheckFun = fun(L, K, SQN) -> lists:member({SQN, key_converter(K)}, L) end,
?assertMatch(false, CheckFun([], "key", 1)), ?assertMatch(false, CheckFun([], "key", 1)),
ok = ink_compactjournal(Ink1, ok = ink_compactjournal(Ink1,
[], [],
@ -1263,11 +1270,12 @@ empty_manifest_test() ->
cdb_options=CDBopts, cdb_options=CDBopts,
compression_method=native, compression_method=native,
compress_on_receipt=false}), compress_on_receipt=false}),
?assertMatch(not_present, ink_fetch(Ink2, "Key1", 1)), ?assertMatch(not_present, ink_fetch(Ink2, key_converter("Key1"), 1)),
{ok, SQN, Size} = ink_put(Ink2, "Key1", "Value1", {[], infinity}), {ok, SQN, Size} =
ink_put(Ink2, key_converter("Key1"), "Value1", {[], infinity}),
?assertMatch(2, SQN), ?assertMatch(2, SQN),
?assertMatch(true, Size > 0), ?assertMatch(true, Size > 0),
{ok, V} = ink_fetch(Ink2, "Key1", 2), {ok, V} = ink_fetch(Ink2, key_converter("Key1"), 2),
?assertMatch("Value1", V), ?assertMatch("Value1", V),
ink_close(Ink2), ink_close(Ink2),
clean_testdir(RootPath). clean_testdir(RootPath).

View file

@ -266,7 +266,7 @@ generate_randomkeys(Count, Acc, BucketLow, BRange) ->
BNumber = string:right(integer_to_list(BucketLow + leveled_rand:uniform(BRange)), BNumber = string:right(integer_to_list(BucketLow + leveled_rand:uniform(BRange)),
4, $0), 4, $0),
KNumber = string:right(integer_to_list(leveled_rand:uniform(1000)), 4, $0), KNumber = string:right(integer_to_list(leveled_rand:uniform(1000)), 4, $0),
K = {o, "Bucket" ++ BNumber, "Key" ++ KNumber}, K = {o, "Bucket" ++ BNumber, "Key" ++ KNumber, null},
RandKey = {K, {Count + 1, RandKey = {K, {Count + 1,
{active, infinity}, {active, infinity},
leveled_codec:segment_hash(K), leveled_codec:segment_hash(K),

View file

@ -347,7 +347,8 @@ pcl_fetchlevelzero(Pid, Slot) ->
% be stuck in L0 pending % be stuck in L0 pending
gen_server:call(Pid, {fetch_levelzero, Slot}, 60000). gen_server:call(Pid, {fetch_levelzero, Slot}, 60000).
-spec pcl_fetch(pid(), tuple()) -> {tuple(), tuple()}|not_present. -spec pcl_fetch(pid(), leveled_codec:ledger_key())
-> leveled_codec:ledger_kv()|not_present.
%% @doc %% @doc
%% Fetch a key, return the first (highest SQN) occurrence of that Key along %% Fetch a key, return the first (highest SQN) occurrence of that Key along
%% with the value. %% with the value.
@ -364,8 +365,10 @@ pcl_fetch(Pid, Key) ->
gen_server:call(Pid, {fetch, Key, Hash}, infinity) gen_server:call(Pid, {fetch, Key, Hash}, infinity)
end. end.
-spec pcl_fetch(pid(), tuple(), {integer(), integer()}) -> -spec pcl_fetch(pid(),
{tuple(), tuple()}|not_present. leveled_codec:ledger_key(),
leveled_codec:segment_hash())
-> leveled_codec:ledger_kv()|not_present.
%% @doc %% @doc
%% Fetch a key, return the first (highest SQN) occurrence of that Key along %% Fetch a key, return the first (highest SQN) occurrence of that Key along
%% with the value. %% with the value.
@ -374,7 +377,10 @@ pcl_fetch(Pid, Key) ->
pcl_fetch(Pid, Key, Hash) -> pcl_fetch(Pid, Key, Hash) ->
gen_server:call(Pid, {fetch, Key, Hash}, infinity). gen_server:call(Pid, {fetch, Key, Hash}, infinity).
-spec pcl_fetchkeys(pid(), tuple(), tuple(), fun(), any()) -> any(). -spec pcl_fetchkeys(pid(),
leveled_codec:ledger_key(),
leveled_codec:ledger_key(),
fun(), any()) -> any().
%% @doc %% @doc
%% Run a range query between StartKey and EndKey (inclusive). This will cover %% Run a range query between StartKey and EndKey (inclusive). This will cover
%% all keys in the range - so must only be run against snapshots of the %% all keys in the range - so must only be run against snapshots of the
@ -392,8 +398,11 @@ pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc) ->
false, -1}, false, -1},
infinity). infinity).
-spec pcl_fetchkeysbysegment(pid(), tuple(), tuple(), fun(), any(), -spec pcl_fetchkeysbysegment(pid(),
false|list(integer())) -> any(). leveled_codec:ledger_key(),
leveled_codec:ledger_key(),
fun(), any(),
leveled_codec:segment_list()) -> any().
%% @doc %% @doc
%% Run a range query between StartKey and EndKey (inclusive). This will cover %% Run a range query between StartKey and EndKey (inclusive). This will cover
%% all keys in the range - so must only be run against snapshots of the %% all keys in the range - so must only be run against snapshots of the
@ -414,7 +423,10 @@ pcl_fetchkeysbysegment(Pid, StartKey, EndKey, AccFun, InitAcc, SegmentList) ->
SegmentList, -1}, SegmentList, -1},
infinity). infinity).
-spec pcl_fetchnextkey(pid(), tuple(), tuple(), fun(), any()) -> any(). -spec pcl_fetchnextkey(pid(),
leveled_codec:ledger_key(),
leveled_codec:ledger_key(),
fun(), any()) -> any().
%% @doc %% @doc
%% Run a range query between StartKey and EndKey (inclusive). This has the %% Run a range query between StartKey and EndKey (inclusive). This has the
%% same constraints as pcl_fetchkeys/5, but will only return the first key %% same constraints as pcl_fetchkeys/5, but will only return the first key
@ -427,7 +439,9 @@ pcl_fetchnextkey(Pid, StartKey, EndKey, AccFun, InitAcc) ->
false, 1}, false, 1},
infinity). infinity).
-spec pcl_checksequencenumber(pid(), tuple(), integer()) -> boolean(). -spec pcl_checksequencenumber(pid(),
leveled_codec:ledger_key(),
integer()) -> boolean().
%% @doc %% @doc
%% Check if the sequence number of the passed key is not replaced by a change %% Check if the sequence number of the passed key is not replaced by a change
%% after the passed sequence number. Will return true if the Key is present %% after the passed sequence number. Will return true if the Key is present
@ -450,14 +464,18 @@ pcl_checksequencenumber(Pid, Key, SQN) ->
pcl_workforclerk(Pid) -> pcl_workforclerk(Pid) ->
gen_server:cast(Pid, work_for_clerk). gen_server:cast(Pid, work_for_clerk).
-spec pcl_manifestchange(pid(), tuple()) -> ok. -spec pcl_manifestchange(pid(), leveled_pmanifest:manifest()) -> ok.
%% @doc %% @doc
%% Provide a manifest record (i.e. the output of the leveled_pmanifest module) %% Provide a manifest record (i.e. the output of the leveled_pmanifest module)
%% that is required to beocme the new manifest. %% that is required to beocme the new manifest.
pcl_manifestchange(Pid, Manifest) -> pcl_manifestchange(Pid, Manifest) ->
gen_server:cast(Pid, {manifest_change, Manifest}). gen_server:cast(Pid, {manifest_change, Manifest}).
-spec pcl_confirml0complete(pid(), string(), tuple(), tuple(), binary()) -> ok. -spec pcl_confirml0complete(pid(),
string(),
leveled_codec:ledger_key(),
leveled_codec:ledger_key(),
binary()) -> ok.
%% @doc %% @doc
%% Allows a SST writer that has written a L0 file to confirm that the file %% Allows a SST writer that has written a L0 file to confirm that the file
%% is now complete, so the filename and key ranges can be added to the %% is now complete, so the filename and key ranges can be added to the

View file

@ -79,6 +79,8 @@
-type manifest() :: #manifest{}. -type manifest() :: #manifest{}.
-type manifest_entry() :: #manifest_entry{}. -type manifest_entry() :: #manifest_entry{}.
-export_type([manifest/0, manifest_entry/0]).
%%%============================================================================ %%%============================================================================
%%% API %%% API
%%%============================================================================ %%%============================================================================
@ -306,7 +308,8 @@ switch_manifest_entry(Manifest, ManSQN, SrcLevel, Entry) ->
get_manifest_sqn(Manifest) -> get_manifest_sqn(Manifest) ->
Manifest#manifest.manifest_sqn. Manifest#manifest.manifest_sqn.
-spec key_lookup(manifest(), integer(), tuple()) -> false|manifest_entry(). -spec key_lookup(manifest(), integer(), leveled_codec:ledger_key())
-> false|manifest_entry().
%% @doc %% @doc
%% For a given key find which manifest entry covers that key at that level, %% For a given key find which manifest entry covers that key at that level,
%% returning false if there is no covering manifest entry at that level. %% returning false if there is no covering manifest entry at that level.
@ -320,7 +323,10 @@ key_lookup(Manifest, LevelIdx, Key) ->
Key) Key)
end. end.
-spec range_lookup(manifest(), integer(), tuple(), tuple()) -> list(). -spec range_lookup(manifest(),
integer(),
leveled_codec:ledger_key(),
leveled_codec:ledger_key()) -> list().
%% @doc %% @doc
%% Return a list of manifest_entry pointers at this level which cover the %% Return a list of manifest_entry pointers at this level which cover the
%% key query range. %% key query range.
@ -331,7 +337,10 @@ range_lookup(Manifest, LevelIdx, StartKey, EndKey) ->
end, end,
range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun). range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun).
-spec merge_lookup(manifest(), integer(), tuple(), tuple()) -> list(). -spec merge_lookup(manifest(),
integer(),
leveled_codec:ledger_key(),
leveled_codec:ledger_key()) -> list().
%% @doc %% @doc
%% Return a list of manifest_entry pointers at this level which cover the %% Return a list of manifest_entry pointers at this level which cover the
%% key query range, only all keys in the files should be included in the %% key query range, only all keys in the files should be included in the

View file

@ -50,7 +50,7 @@
%%% API %%% API
%%%============================================================================ %%%============================================================================
-spec prepare_for_index(index_array(), {integer(), integer()}|no_lookup) -spec prepare_for_index(index_array(), leveled_codec:segment_hash())
-> index_array(). -> index_array().
%% @doc %% @doc
%% Add the hash of a key to the index. This is 'prepared' in the sense that %% Add the hash of a key to the index. This is 'prepared' in the sense that

View file

@ -42,14 +42,17 @@
-define(CHECKJOURNAL_PROB, 0.2). -define(CHECKJOURNAL_PROB, 0.2).
-type key_range() :: {StartKey:: any(), EndKey :: any()}. -type key_range()
:: {leveled_codec:leveled_key(), leveled_codec:leveled_key()}.
-type fun_and_acc()
:: {fun(), any()}.
%%%============================================================================ %%%============================================================================
%%% External functions %%% External functions
%%%============================================================================ %%%============================================================================
-spec bucket_sizestats(fun(), any(), atom()) -> {async, fun()}. -spec bucket_sizestats(fun(), any(), leveled_codec:tag()) -> {async, fun()}.
%% @doc %% @doc
%% Fold over a bucket accumulating the count of objects and their total sizes %% Fold over a bucket accumulating the count of objects and their total sizes
bucket_sizestats(SnapFun, Bucket, Tag) -> bucket_sizestats(SnapFun, Bucket, Tag) ->
@ -69,13 +72,14 @@ bucket_sizestats(SnapFun, Bucket, Tag) ->
end, end,
{async, Runner}. {async, Runner}.
-spec binary_bucketlist(fun(), atom(), fun(), any()) -> {async, fun()}. -spec binary_bucketlist(fun(), leveled_codec:tag(), fun(), any())
-> {async, fun()}.
%% @doc %% @doc
%% List buckets for tag, assuming bucket names are all binary type %% List buckets for tag, assuming bucket names are all binary type
binary_bucketlist(SnapFun, Tag, FoldBucketsFun, InitAcc) -> binary_bucketlist(SnapFun, Tag, FoldBucketsFun, InitAcc) ->
binary_bucketlist(SnapFun, Tag, FoldBucketsFun, InitAcc, -1). binary_bucketlist(SnapFun, Tag, FoldBucketsFun, InitAcc, -1).
-spec binary_bucketlist(fun(), atom(), fun(), any(), integer()) -spec binary_bucketlist(fun(), leveled_codec:tag(), fun(), any(), integer())
-> {async, fun()}. -> {async, fun()}.
%% @doc %% @doc
%% set Max Buckets to -1 to list all buckets, otherwise will only return %% set Max Buckets to -1 to list all buckets, otherwise will only return
@ -94,7 +98,11 @@ binary_bucketlist(SnapFun, Tag, FoldBucketsFun, InitAcc, MaxBuckets) ->
end, end,
{async, Runner}. {async, Runner}.
-spec index_query(fun(), tuple(), tuple()) -> {async, fun()}. -spec index_query(fun(),
{leveled_codec:ledger_key(),
leveled_codec:ledger_key(),
{boolean(), undefined|re:mp()|iodata()}},
fun_and_acc()) -> {async, fun()}.
%% @doc %% @doc
%% Secondary index query %% Secondary index query
index_query(SnapFun, {StartKey, EndKey, TermHandling}, FoldAccT) -> index_query(SnapFun, {StartKey, EndKey, TermHandling}, FoldAccT) ->
@ -121,10 +129,13 @@ index_query(SnapFun, {StartKey, EndKey, TermHandling}, FoldAccT) ->
end, end,
{async, Runner}. {async, Runner}.
-spec bucketkey_query(fun(), atom(), any(), key_range(), tuple()) -> {async, fun()}. -spec bucketkey_query(fun(), leveled_codec:tag(), any(),
key_range(), fun_and_acc()) -> {async, fun()}.
%% @doc %% @doc
%% Fold over all keys in `KeyRange' under tag (restricted to a given bucket) %% Fold over all keys in `KeyRange' under tag (restricted to a given bucket)
bucketkey_query(SnapFun, Tag, Bucket, {StartKey, EndKey}, {FoldKeysFun, InitAcc}) -> bucketkey_query(SnapFun, Tag, Bucket,
{StartKey, EndKey},
{FoldKeysFun, InitAcc}) ->
SK = leveled_codec:to_ledgerkey(Bucket, StartKey, Tag), SK = leveled_codec:to_ledgerkey(Bucket, StartKey, Tag),
EK = leveled_codec:to_ledgerkey(Bucket, EndKey, Tag), EK = leveled_codec:to_ledgerkey(Bucket, EndKey, Tag),
AccFun = accumulate_keys(FoldKeysFun), AccFun = accumulate_keys(FoldKeysFun),
@ -141,13 +152,14 @@ bucketkey_query(SnapFun, Tag, Bucket, {StartKey, EndKey}, {FoldKeysFun, InitAcc}
end, end,
{async, Runner}. {async, Runner}.
-spec bucketkey_query(fun(), atom(), any(), tuple()) -> {async, fun()}. -spec bucketkey_query(fun(), leveled_codec:tag(), any(), fun_and_acc())
-> {async, fun()}.
%% @doc %% @doc
%% Fold over all keys under tag (potentially restricted to a given bucket) %% Fold over all keys under tag (potentially restricted to a given bucket)
bucketkey_query(SnapFun, Tag, Bucket, FunAcc) -> bucketkey_query(SnapFun, Tag, Bucket, FunAcc) ->
bucketkey_query(SnapFun, Tag, Bucket, {null, null}, FunAcc). bucketkey_query(SnapFun, Tag, Bucket, {null, null}, FunAcc).
-spec hashlist_query(fun(), atom(), boolean()) -> {async, fun()}. -spec hashlist_query(fun(), leveled_codec:tag(), boolean()) -> {async, fun()}.
%% @doc %% @doc
%% Fold pver the key accumulating the hashes %% Fold pver the key accumulating the hashes
hashlist_query(SnapFun, Tag, JournalCheck) -> hashlist_query(SnapFun, Tag, JournalCheck) ->
@ -173,7 +185,9 @@ hashlist_query(SnapFun, Tag, JournalCheck) ->
end, end,
{async, Runner}. {async, Runner}.
-spec tictactree(fun(), {atom(), any(), tuple()}, boolean(), atom(), fun()) -spec tictactree(fun(),
{leveled_codec:tag(), any(), tuple()},
boolean(), atom(), fun())
-> {async, fun()}. -> {async, fun()}.
%% @doc %% @doc
%% Return a merkle tree from the fold, directly accessing hashes cached in the %% Return a merkle tree from the fold, directly accessing hashes cached in the
@ -233,7 +247,8 @@ tictactree(SnapFun, {Tag, Bucket, Query}, JournalCheck, TreeSize, Filter) ->
end, end,
{async, Runner}. {async, Runner}.
-spec foldheads_allkeys(fun(), atom(), fun(), boolean(), false|list(integer())) -spec foldheads_allkeys(fun(), leveled_codec:tag(),
fun(), boolean(), false|list(integer()))
-> {async, fun()}. -> {async, fun()}.
%% @doc %% @doc
%% Fold over all heads in the store for a given tag - applying the passed %% Fold over all heads in the store for a given tag - applying the passed
@ -248,8 +263,8 @@ foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck, SegmentList) ->
{true, JournalCheck}, {true, JournalCheck},
SegmentList). SegmentList).
-spec foldobjects_allkeys(fun(), atom(), fun(), key_order|sqn_order) -spec foldobjects_allkeys(fun(), leveled_codec:tag(), fun(),
-> {async, fun()}. key_order|sqn_order) -> {async, fun()}.
%% @doc %% @doc
%% Fold over all objects for a given tag %% Fold over all objects for a given tag
foldobjects_allkeys(SnapFun, Tag, FoldFun, key_order) -> foldobjects_allkeys(SnapFun, Tag, FoldFun, key_order) ->
@ -345,8 +360,10 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) ->
{async, Folder}. {async, Folder}.
-spec foldobjects_bybucket(fun(), atom(), list({any(), any()}), fun()) -> -spec foldobjects_bybucket(fun(),
{async, fun()}. leveled_codec:tag(),
list(key_range()),
fun()) -> {async, fun()}.
%% @doc %% @doc
%% Fold over all objects within a given key range in a bucket %% Fold over all objects within a given key range in a bucket
foldobjects_bybucket(SnapFun, Tag, KeyRanges, FoldFun) -> foldobjects_bybucket(SnapFun, Tag, KeyRanges, FoldFun) ->
@ -403,7 +420,7 @@ foldobjects_byindex(SnapFun, {Tag, Bucket, Field, FromTerm, ToTerm}, FoldFun) ->
get_nextbucket(_NextB, _NextK, _Tag, _LS, BKList, {Limit, Limit}) -> get_nextbucket(_NextB, _NextK, _Tag, _LS, BKList, {Limit, Limit}) ->
BKList; BKList;
get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList, {C, L}) -> get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList, {C, L}) ->
Now = leveled_codec:integer_now(), Now = leveled_util:integer_now(),
StartKey = leveled_codec:to_ledgerkey(NextBucket, NextKey, Tag), StartKey = leveled_codec:to_ledgerkey(NextBucket, NextKey, Tag),
EndKey = leveled_codec:to_ledgerkey(null, null, Tag), EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
ExtractFun = ExtractFun =
@ -420,7 +437,7 @@ get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList, {C, L}) ->
leveled_log:log("B0008",[]), leveled_log:log("B0008",[]),
BKList; BKList;
{{B, K}, V} when is_binary(B), is_binary(K) -> {{B, K}, V} when is_binary(B), is_binary(K) ->
case leveled_codec:is_active({B, K}, V, Now) of case leveled_codec:is_active({Tag, B, K, null}, V, Now) of
true -> true ->
leveled_log:log("B0009",[B]), leveled_log:log("B0009",[B]),
get_nextbucket(<<B/binary, 0>>, get_nextbucket(<<B/binary, 0>>,
@ -497,7 +514,7 @@ foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, SegmentList) ->
accumulate_size() -> accumulate_size() ->
Now = leveled_codec:integer_now(), Now = leveled_util:integer_now(),
AccFun = fun(Key, Value, {Size, Count}) -> AccFun = fun(Key, Value, {Size, Count}) ->
case leveled_codec:is_active(Key, Value, Now) of case leveled_codec:is_active(Key, Value, Now) of
true -> true ->
@ -533,7 +550,7 @@ accumulate_tree(FilterFun, JournalCheck, InkerClone, HashFun) ->
AddKeyFun). AddKeyFun).
get_hashaccumulator(JournalCheck, InkerClone, AddKeyFun) -> get_hashaccumulator(JournalCheck, InkerClone, AddKeyFun) ->
Now = leveled_codec:integer_now(), Now = leveled_util:integer_now(),
AccFun = AccFun =
fun(LK, V, Acc) -> fun(LK, V, Acc) ->
case leveled_codec:is_active(LK, V, Now) of case leveled_codec:is_active(LK, V, Now) of
@ -559,7 +576,7 @@ get_hashaccumulator(JournalCheck, InkerClone, AddKeyFun) ->
accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) -> accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
Now = leveled_codec:integer_now(), Now = leveled_util:integer_now(),
AccFun = AccFun =
fun(LK, V, Acc) -> fun(LK, V, Acc) ->
% The function takes the Ledger Key and the value from the % The function takes the Ledger Key and the value from the
@ -642,7 +659,7 @@ check_presence(Key, Value, InkerClone) ->
end. end.
accumulate_keys(FoldKeysFun) -> accumulate_keys(FoldKeysFun) ->
Now = leveled_codec:integer_now(), Now = leveled_util:integer_now(),
AccFun = fun(Key, Value, Acc) -> AccFun = fun(Key, Value, Acc) ->
case leveled_codec:is_active(Key, Value, Now) of case leveled_codec:is_active(Key, Value, Now) of
true -> true ->
@ -661,7 +678,7 @@ add_terms(ObjKey, IdxValue) ->
{IdxValue, ObjKey}. {IdxValue, ObjKey}.
accumulate_index(TermRe, AddFun, FoldKeysFun) -> accumulate_index(TermRe, AddFun, FoldKeysFun) ->
Now = leveled_codec:integer_now(), Now = leveled_util:integer_now(),
case TermRe of case TermRe of
undefined -> undefined ->
fun(Key, Value, Acc) -> fun(Key, Value, Acc) ->

View file

@ -135,7 +135,17 @@
size :: integer(), size :: integer(),
max_sqn :: integer()}). max_sqn :: integer()}).
-type press_methods() :: lz4|native|none. -type press_methods()
:: lz4|native|none.
-type range_endpoint()
:: all|leveled_codec:leveled_key().
-type slot_pointer()
:: {pointer, pid(), integer(), range_endpoint(), range_endpoint()}.
-type sst_pointer()
:: {next,
leveled_pmanifest:manifest_entry(),
leveled_codec:ledger_key()|all}.
%% yield_blockquery is used to detemrine if the work necessary to process a %% yield_blockquery is used to detemrine if the work necessary to process a
%% range query beyond the fetching the slot should be managed from within %% range query beyond the fetching the slot should be managed from within
@ -185,8 +195,10 @@
%%% API %%% API
%%%============================================================================ %%%============================================================================
-spec sst_open(string(), string()) -> -spec sst_open(string(), string())
{ok, pid(), {tuple(), tuple()}, binary()}. -> {ok, pid(),
{leveled_codec:ledger_key(), leveled_codec:ledger_key()},
binary()}.
%% @doc %% @doc
%% Open an SST file at a given path and filename. The first and last keys %% Open an SST file at a given path and filename. The first and last keys
%% are returned in response to the request - so that those keys can be used %% are returned in response to the request - so that those keys can be used
@ -205,8 +217,11 @@ sst_open(RootPath, Filename) ->
end. end.
-spec sst_new(string(), string(), integer(), -spec sst_new(string(), string(), integer(),
list(), integer(), press_methods()) -> list(leveled_codec:ledger_kv()),
{ok, pid(), {tuple(), tuple()}, binary()}. integer(), press_methods())
-> {ok, pid(),
{leveled_codec:ledger_key(), leveled_codec:ledger_key()},
binary()}.
%% @doc %% @doc
%% Start a new SST file at the assigned level passing in a list of Key, Value %% Start a new SST file at the assigned level passing in a list of Key, Value
%% pairs. This should not be used for basement levels or unexpanded Key/Value %% pairs. This should not be used for basement levels or unexpanded Key/Value
@ -228,9 +243,17 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod) ->
{ok, Pid, {SK, EK}, Bloom} {ok, Pid, {SK, EK}, Bloom}
end. end.
-spec sst_new(string(), string(), list(), list(), -spec sst_new(string(), string(),
boolean(), integer(), integer(), press_methods()) -> list(leveled_codec:ledger_kv()|sst_pointer()),
empty|{ok, pid(), {{list(), list()}, tuple(), tuple()}, binary()}. list(leveled_codec:ledger_kv()|sst_pointer()),
boolean(), integer(),
integer(), press_methods())
-> empty|{ok, pid(),
{{list(leveled_codec:ledger_kv()),
list(leveled_codec:ledger_kv())},
leveled_codec:ledger_key(),
leveled_codec:ledger_key()},
binary()}.
%% @doc %% @doc
%% Start a new SST file at the assigned level passing in a two lists of %% Start a new SST file at the assigned level passing in a two lists of
%% {Key, Value} pairs to be merged. The merge_lists function will use the %% {Key, Value} pairs to be merged. The merge_lists function will use the
@ -239,7 +262,7 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod) ->
%% %%
%% The remainder of the lists is returned along with the StartKey and EndKey %% The remainder of the lists is returned along with the StartKey and EndKey
%% so that the remainder cna be used in the next file in the merge. It might %% so that the remainder cna be used in the next file in the merge. It might
%% be that the merge_lists returns nothin (for example when a basement file is %% be that the merge_lists returns nothing (for example when a basement file is
%% all tombstones) - and the atome empty is returned in this case so that the %% all tombstones) - and the atome empty is returned in this case so that the
%% file is not added to the manifest. %% file is not added to the manifest.
sst_new(RootPath, Filename, sst_new(RootPath, Filename,
@ -291,7 +314,8 @@ sst_newlevelzero(RootPath, Filename,
PressMethod0}), PressMethod0}),
{ok, Pid, noreply}. {ok, Pid, noreply}.
-spec sst_get(pid(), tuple()) -> tuple()|not_present. -spec sst_get(pid(), leveled_codec:ledger_key())
-> leveled_codec:ledger_kv()|not_present.
%% @doc %% @doc
%% Return a Key, Value pair matching a Key or not_present if the Key is not in %% Return a Key, Value pair matching a Key or not_present if the Key is not in
%% the store. The segment_hash function is used to accelerate the seeking of %% the store. The segment_hash function is used to accelerate the seeking of
@ -299,7 +323,8 @@ sst_newlevelzero(RootPath, Filename,
sst_get(Pid, LedgerKey) -> sst_get(Pid, LedgerKey) ->
sst_get(Pid, LedgerKey, leveled_codec:segment_hash(LedgerKey)). sst_get(Pid, LedgerKey, leveled_codec:segment_hash(LedgerKey)).
-spec sst_get(pid(), tuple(), {integer(), integer()}) -> tuple()|not_present. -spec sst_get(pid(), leveled_codec:ledger_key(), leveled_codec:segment_hash())
-> leveled_codec:ledger_kv()|not_present.
%% @doc %% @doc
%% Return a Key, Value pair matching a Key or not_present if the Key is not in %% Return a Key, Value pair matching a Key or not_present if the Key is not in
%% the store (with the magic hash precalculated). %% the store (with the magic hash precalculated).
@ -307,7 +332,11 @@ sst_get(Pid, LedgerKey, Hash) ->
gen_fsm:sync_send_event(Pid, {get_kv, LedgerKey, Hash}, infinity). gen_fsm:sync_send_event(Pid, {get_kv, LedgerKey, Hash}, infinity).
-spec sst_getkvrange(pid(), tuple()|all, tuple()|all, integer()) -> list(). -spec sst_getkvrange(pid(),
range_endpoint(),
range_endpoint(),
integer())
-> list(leveled_codec:ledger_kv()|slot_pointer()).
%% @doc %% @doc
%% Get a range of {Key, Value} pairs as a list between StartKey and EndKey %% Get a range of {Key, Value} pairs as a list between StartKey and EndKey
%% (inclusive). The ScanWidth is the maximum size of the range, a pointer %% (inclusive). The ScanWidth is the maximum size of the range, a pointer
@ -317,8 +346,12 @@ sst_getkvrange(Pid, StartKey, EndKey, ScanWidth) ->
sst_getfilteredrange(Pid, StartKey, EndKey, ScanWidth, false). sst_getfilteredrange(Pid, StartKey, EndKey, ScanWidth, false).
-spec sst_getfilteredrange(pid(), tuple()|all, tuple()|all, integer(), -spec sst_getfilteredrange(pid(),
list()|false) -> list(). range_endpoint(),
range_endpoint(),
integer(),
leveled_codec:segment_list())
-> list(leveled_codec:ledger_kv()|slot_pointer()).
%% @doc %% @doc
%% Get a range of {Key, Value} pairs as a list between StartKey and EndKey %% Get a range of {Key, Value} pairs as a list between StartKey and EndKey
%% (inclusive). The ScanWidth is the maximum size of the range, a pointer %% (inclusive). The ScanWidth is the maximum size of the range, a pointer
@ -348,7 +381,8 @@ sst_getfilteredrange(Pid, StartKey, EndKey, ScanWidth, SegList) ->
Reply Reply
end. end.
-spec sst_getslots(pid(), list()) -> list(). -spec sst_getslots(pid(), list(slot_pointer()))
-> list(leveled_codec:ledger_kv()).
%% @doc %% @doc
%% Get a list of slots by their ID. The slot will be converted from the binary %% Get a list of slots by their ID. The slot will be converted from the binary
%% to term form outside of the FSM loop, this is to stop the copying of the %% to term form outside of the FSM loop, this is to stop the copying of the
@ -356,7 +390,10 @@ sst_getfilteredrange(Pid, StartKey, EndKey, ScanWidth, SegList) ->
sst_getslots(Pid, SlotList) -> sst_getslots(Pid, SlotList) ->
sst_getfilteredslots(Pid, SlotList, false). sst_getfilteredslots(Pid, SlotList, false).
-spec sst_getfilteredslots(pid(), list(), false|list()) -> list(). -spec sst_getfilteredslots(pid(),
list(slot_pointer()),
leveled_codec:segment_list())
-> list(leveled_codec:ledger_kv()).
%% @doc %% @doc
%% Get a list of slots by their ID. The slot will be converted from the binary %% Get a list of slots by their ID. The slot will be converted from the binary
%% to term form outside of the FSM loop %% to term form outside of the FSM loop
@ -400,7 +437,9 @@ sst_clear(Pid) ->
sst_deleteconfirmed(Pid) -> sst_deleteconfirmed(Pid) ->
gen_fsm:send_event(Pid, close). gen_fsm:send_event(Pid, close).
-spec sst_checkready(pid()) -> {ok, string(), tuple(), tuple()}. -spec sst_checkready(pid()) -> {ok, string(),
leveled_codec:leveled_key(),
leveled_codec:leveled_key()}.
%% @doc %% @doc
%% If a file has been set to be built, check that it has been built. Returns %% If a file has been set to be built, check that it has been built. Returns
%% the filename and the {startKey, EndKey} for the manifest. %% the filename and the {startKey, EndKey} for the manifest.

91
src/leveled_util.erl Normal file
View file

@ -0,0 +1,91 @@
%% -------- Utility Functions ---------
%%
%% Generally helpful funtions within leveled
%%
-module(leveled_util).
-include("include/leveled.hrl").
-include_lib("eunit/include/eunit.hrl").
-export([generate_uuid/0,
integer_now/0,
integer_time/1,
magic_hash/1]).
-spec generate_uuid() -> list().
%% @doc
%% Generate a new globally unique ID as a string.
%% Credit to
%% https://github.com/afiskon/erlang-uuid-v4/blob/master/src/uuid.erl
generate_uuid() ->
<<A:32, B:16, C:16, D:16, E:48>> = leveled_rand:rand_bytes(16),
L = io_lib:format("~8.16.0b-~4.16.0b-4~3.16.0b-~4.16.0b-~12.16.0b",
[A, B, C band 16#0fff, D band 16#3fff bor 16#8000, E]),
binary_to_list(list_to_binary(L)).
-spec integer_now() -> non_neg_integer().
%% @doc
%% Return now in gregorian seconds
integer_now() ->
integer_time(os:timestamp()).
-spec integer_time (erlang:timestamp()) -> non_neg_integer().
%% @doc
%% Return a given time in gergorian seconds
integer_time(TS) ->
DT = calendar:now_to_universal_time(TS),
calendar:datetime_to_gregorian_seconds(DT).
-spec magic_hash(any()) -> integer().
%% @doc
%% Use DJ Bernstein magic hash function. Note, this is more expensive than
%% phash2 but provides a much more balanced result.
%%
%% Hash function contains mysterious constants, some explanation here as to
%% what they are -
%% http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function
magic_hash({binary, BinaryKey}) ->
H = 5381,
hash1(H, BinaryKey) band 16#FFFFFFFF;
magic_hash(AnyKey) ->
BK = term_to_binary(AnyKey),
magic_hash({binary, BK}).
hash1(H, <<>>) ->
H;
hash1(H, <<B:8/integer, Rest/bytes>>) ->
H1 = H * 33,
H2 = H1 bxor B,
hash1(H2, Rest).
%%%============================================================================
%%% Test
%%%============================================================================
-ifdef(TEST).
magichashperf_test() ->
KeyFun =
fun(X) ->
K = {o, "Bucket", "Key" ++ integer_to_list(X), null},
{K, X}
end,
KL = lists:map(KeyFun, lists:seq(1, 1000)),
{TimeMH, _HL1} = timer:tc(lists, map, [fun(K) -> magic_hash(K) end, KL]),
io:format(user, "1000 keys magic hashed in ~w microseconds~n", [TimeMH]),
{TimePH, _Hl2} = timer:tc(lists, map, [fun(K) -> erlang:phash2(K) end, KL]),
io:format(user, "1000 keys phash2 hashed in ~w microseconds~n", [TimePH]),
{TimeMH2, _HL1} = timer:tc(lists, map, [fun(K) -> magic_hash(K) end, KL]),
io:format(user, "1000 keys magic hashed in ~w microseconds~n", [TimeMH2]).
-endif.

View file

@ -184,7 +184,7 @@ stdload(_Bookie, 0, Acc) ->
Acc; Acc;
stdload(Bookie, Count, Acc) -> stdload(Bookie, Count, Acc) ->
B = "Bucket", B = "Bucket",
K = leveled_codec:generate_uuid(), K = leveled_util:generate_uuid(),
V = get_compressiblevalue(), V = get_compressiblevalue(),
R = leveled_bookie:book_put(Bookie, B, K, V, [], ?STD_TAG), R = leveled_bookie:book_put(Bookie, B, K, V, [], ?STD_TAG),
case R of case R of
@ -377,7 +377,7 @@ generate_objects(0, _KeyNumber, ObjL, _Value, _IndexGen, _Bucket) ->
ObjL; ObjL;
generate_objects(Count, binary_uuid, ObjL, Value, IndexGen, Bucket) -> generate_objects(Count, binary_uuid, ObjL, Value, IndexGen, Bucket) ->
{Obj1, Spec1} = set_object(list_to_binary(Bucket), {Obj1, Spec1} = set_object(list_to_binary(Bucket),
list_to_binary(leveled_codec:generate_uuid()), list_to_binary(leveled_util:generate_uuid()),
Value, Value,
IndexGen), IndexGen),
generate_objects(Count - 1, generate_objects(Count - 1,
@ -388,7 +388,7 @@ generate_objects(Count, binary_uuid, ObjL, Value, IndexGen, Bucket) ->
Bucket); Bucket);
generate_objects(Count, uuid, ObjL, Value, IndexGen, Bucket) -> generate_objects(Count, uuid, ObjL, Value, IndexGen, Bucket) ->
{Obj1, Spec1} = set_object(Bucket, {Obj1, Spec1} = set_object(Bucket,
leveled_codec:generate_uuid(), leveled_util:generate_uuid(),
Value, Value,
IndexGen), IndexGen),
generate_objects(Count - 1, generate_objects(Count - 1,