Add user-defined functions

To allow for extraction of metadata, and building of head responses - it should eb possible to dynamically and user-defined tags, and functions to treat them.

If no function is defined, revert to the behaviour of the ?STD tag.
This commit is contained in:
Martin Sumner 2018-12-06 21:00:59 +00:00
parent 881b93229b
commit 8e687ee7c8
5 changed files with 307 additions and 87 deletions

View file

@ -130,7 +130,8 @@
{compression_method, ?COMPRESSION_METHOD},
{compression_point, ?COMPRESSION_POINT},
{log_level, ?LOG_LEVEL},
{forced_logs, []}]).
{forced_logs, []},
{override_functions, []}]).
-record(ledger_cache, {mem :: ets:tab(),
loader = leveled_tree:empty(?CACHE_TYPE)
@ -314,7 +315,7 @@
% moving to higher log levels will at present make the operator
% blind to sample performance statistics of leveled sub-components
% etc
{forced_logs, list(string())}
{forced_logs, list(string())} |
% Forced logs allow for specific info level logs, such as those
% logging stats to be logged even when the default log level has
% been set to a higher log level. Using:
@ -323,6 +324,9 @@
% "P0032", "SST12", "CDB19", "SST13", "I0019"]}
% Will log all timing points even when log_level is not set to
% support info
{override_functions, list(leveled_head:appdefinable_function_tuple())}
% Provide a list of override functions that will be used for
% user-defined tags
].
@ -1065,6 +1069,13 @@ init([Opts]) ->
ForcedLogs = proplists:get_value(forced_logs, Opts),
ok = application:set_env(leveled, forced_logs, ForcedLogs),
OverrideFunctions = proplists:get_value(override_functions, Opts),
SetFun =
fun({FuncName, Func}) ->
application:set_env(leveled, FuncName, Func)
end,
lists:foreach(SetFun, OverrideFunctions),
ConfiguredCacheSize =
max(proplists:get_value(cache_size, Opts), ?MIN_CACHE_SIZE),
CacheJitter =

View file

@ -45,7 +45,8 @@
obj_objectspecs/3,
segment_hash/1,
to_lookup/1,
next_key/1]).
next_key/1,
return_proxy/4]).
-define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w").
-define(NRT_IDX, "$aae.").
@ -113,6 +114,17 @@
% first element must be re_pattern, but tuple may change legnth with
% versions
-type value_fetcher() ::
{fun((pid(), leveled_codec:journal_key()) -> any()),
pid(), leveled_codec:journal_key()}.
% A 2-arity function, which when passed the other two elements of the tuple
% will return the value
-type proxy_object() ::
{proxy_object, leveled_head:head(), non_neg_integer(), value_fetcher()}.
% Returns the head, size and a tuple for accessing the value
-type proxy_objectbin() ::
binary().
% using term_to_binary(proxy_object())
-type segment_list()
@ -138,7 +150,9 @@
maybe_lookup/0,
last_moddate/0,
lastmod_range/0,
regular_expression/0]).
regular_expression/0,
value_fetcher/0,
proxy_object/0]).
%%%============================================================================
@ -353,7 +367,7 @@ compact_inkerkvc({{SQN, ?INKT_STND, LK}, V, CrcCheck}, Strategy) ->
-spec get_tagstrategy(ledger_key(), compaction_strategy())
-> skip|retain|recalc.
%% @doc
%% Work out the compaction startegy for the key
%% Work out the compaction strategy for the key
get_tagstrategy({Tag, _, _, _}, Strategy) ->
case lists:keyfind(Tag, 1, Strategy) of
{Tag, TagStrat} ->
@ -579,6 +593,27 @@ gen_headspec({IdxOp, Bucket, Key, SubKey, Value}, SQN, TTL) ->
{K, {SQN, Status, segment_hash(K), Value, undefined}}.
-spec return_proxy(leveled_head:object_tag()|leveled_head:headonly_tag(),
leveled_head:object_metadata(),
pid(), journal_ref())
-> proxy_objectbin()|leveled_head:object_metadata().
%% @doc
%% If the object has a value, return the metadata and a proxy through which
%% the applictaion or runner can access the value. If it is a ?HEAD_TAG
%% then it has no value, so just return the metadata
return_proxy(?HEAD_TAG, ObjectMetadata, _InkerClone, _JR) ->
% Object has no value - so proxy object makese no sense, just return the
% metadata as is
ObjectMetadata;
return_proxy(Tag, ObjMetadata, InkerClone, JournalRef) ->
Size = leveled_head:get_size(Tag, ObjMetadata),
HeadBin = leveled_head:build_head(Tag, ObjMetadata),
term_to_binary({proxy_object,
HeadBin,
Size,
{fun leveled_bookie:fetch_value/2,
InkerClone,
JournalRef}}).
set_status(add, TTL) ->
{active, TTL};

View file

@ -6,6 +6,13 @@
%% For the ?RIAK tag this is pre-defined. For the ?STD_TAG there is minimal
%% definition. For best use of Riak define a new tag and use pattern matching
%% to extend these exported functions.
%%
%% Dynamic user-defined tags are allowed, and to support these user-defined
%% shadow versions of the functions:
%% - key_to_canonicalbinary/1 -> binary(),
%% - build_head/2 -> head(),
%% - extract_metadata/3 -> {std_metadata(), list(erlang:timestamp()}
%% That support all the user-defined tags that are to be used
-module(leveled_head).
@ -13,14 +20,17 @@
-include_lib("eunit/include/eunit.hrl").
-export([build_head/2,
maybe_build_proxy/4,
extract_metadata/3,
get_size/2,
-export([key_to_canonicalbinary/1,
build_head/2,
extract_metadata/3
]).
-export([get_size/2,
get_hash/2,
default_reload_strategy/1,
defined_objecttags/0,
key_to_canonicalbinary/1]).
default_reload_strategy/1,
standard_hash/1
]).
%% Exported for testing purposes
-export([riak_metadata_to_binary/2,
@ -30,55 +40,73 @@
-define(MAGIC, 53). % riak_kv -> riak_object
-define(V1_VERS, 1).
-type riak_metadata() :: {binary()|delete, % Sibling Metadata
binary()|null, % Vclock Metadata
non_neg_integer()|null, % Hash of vclock - non-exportable
non_neg_integer()}. % Size in bytes of real object
-type std_metadata() :: {non_neg_integer()|null, % Hash of value
non_neg_integer(), % Size in bytes of real object
list(tuple())|undefined}.
-type object_tag() :: ?STD_TAG|?RIAK_TAG.
% tags assigned to objects
% (not other special entities such as ?HEAD or ?IDX)
-type headonly_tag() :: ?HEAD_TAG.
% Tag assigned to head_only objects. Behaviour cannot be changed
-type object_metadata() :: riak_metadata()|std_metadata().
-type head_bin() ::
-type riak_metadata() :: {binary()|delete,
% Sibling Metadata
binary()|null,
% Vclock Metadata
non_neg_integer()|null,
% Hash of vclock - non-exportable
non_neg_integer()
% Size in bytes of real object
}.
-type std_metadata() :: {non_neg_integer()|null,
% Hash of value
non_neg_integer(),
% Size in bytes of real object
list(tuple())|undefined
% User-define metadata
}.
-type head_metadata() :: {non_neg_integer()|null,
% Hash of value
non_neg_integer()
% Size in bytes of real object
}.
-type object_metadata() :: riak_metadata()|std_metadata()|head_metadata().
-type appdefinable_function() ::
key_to_canonicalbinary | build_head | extract_metadata.
% Functions for which default behaviour can be over-written for the
% application's own tags
-type appdefinable_function_tuple() ::
{appdefinable_function(), fun()}.
-type head() ::
binary()|tuple().
% TODO:
% This is currently not always a binary. Wish is to migrate this so that
% it is predictably a binary
-type value_fetcher() ::
{fun((pid(), leveled_codec:journal_key()) -> any()),
pid(), leveled_codec:journal_key()}.
% A 2-arity function, which when passed the other two elements of the tuple
% will return the value
-type proxy_object() ::
{proxy_object, head_bin(), non_neg_integer(), value_fetcher()}.
% Returns the head, size and a tuple for accessing the value
-type proxy_objectbin() ::
binary().
% using term_to_binary(proxy_object())
-export_type([object_tag/0,
proxy_object/0,
value_fetcher/0,
head_bin/0,
proxy_objectbin/0]).
head/0,
object_metadata/0,
appdefinable_function_tuple/0]).
%%%============================================================================
%%% External Functions
%%% Mutable External Functions
%%%============================================================================
-spec defined_objecttags() -> list(object_tag()).
-spec get_appdefined_function(appdefinable_function(),
fun(),
non_neg_integer()) -> fun().
%% @doc
%% Return the list of object tags
defined_objecttags() ->
[?STD_TAG, ?RIAK_TAG].
%% If a keylist of [{function_name, fun()}] has been set as an environment
%% variable for a tag, then this FunctionName can be used instead of the
%% default
get_appdefined_function(FunctionName, DefaultFun, RequiredArity) ->
case application:get_env(leveled, FunctionName) of
undefined ->
DefaultFun;
{ok, Fun} when is_function(Fun, RequiredArity) ->
Fun
end.
-spec key_to_canonicalbinary(tuple()) -> binary().
@ -97,7 +125,7 @@ key_to_canonicalbinary({?RIAK_TAG, {BucketType, Bucket}, Key, SubKey})
key_to_canonicalbinary({?HEAD_TAG, Bucket, Key, SubK})
when is_binary(Bucket), is_binary(Key), is_binary(SubK) ->
<<Bucket/binary, Key/binary, SubK/binary>>;
key_to_canonicalbinary({?HEAD_TAG, Bucket, Key, _SubK})
key_to_canonicalbinary({?HEAD_TAG, Bucket, Key, null})
when is_binary(Bucket), is_binary(Key) ->
<<Bucket/binary, Key/binary>>;
key_to_canonicalbinary({?HEAD_TAG, {BucketType, Bucket}, Key, SubKey})
@ -106,42 +134,45 @@ key_to_canonicalbinary({?HEAD_TAG, {BucketType, Bucket}, Key, SubKey})
<<BucketType/binary, Bucket/binary>>,
Key,
SubKey});
key_to_canonicalbinary(Key) when element(1, Key) == ?HEAD_TAG ->
% In unit tests head specs can have non-binary keys, so handle
% this through hashing the whole key
default_key_to_canonicalbinary(Key);
key_to_canonicalbinary(Key) when element(1, Key) == ?STD_TAG ->
default_key_to_canonicalbinary(Key);
key_to_canonicalbinary(Key) ->
OverrideFun =
get_appdefined_function(key_to_canonicalbinary,
fun default_key_to_canonicalbinary/1,
1),
OverrideFun(Key).
default_key_to_canonicalbinary(Key) ->
term_to_binary(Key).
-spec build_head(object_tag()|headonly_tag(), object_metadata()) -> head_bin().
-spec build_head(object_tag()|headonly_tag(), object_metadata()) -> head().
%% @doc
%% Return the object metadata as a binary to be the "head" of the object
build_head(?RIAK_TAG, Metadata) ->
{SibData, Vclock, _Hash, _Size} = Metadata,
riak_metadata_to_binary(Vclock, SibData);
build_head(_Tag, Metadata) ->
build_head(?HEAD_TAG, Metadata) ->
% term_to_binary(Metadata).
default_build_head(?HEAD_TAG, Metadata);
build_head(?STD_TAG, Metadata) ->
default_build_head(?STD_TAG, Metadata);
build_head(Tag, Metadata) ->
OverrideFun =
get_appdefined_function(build_head,
fun default_build_head/2,
2),
OverrideFun(Tag, Metadata).
default_build_head(_Tag, Metadata) ->
Metadata.
-spec maybe_build_proxy(object_tag()|headonly_tag(), object_metadata(),
pid(), leveled_codec:journal_ref())
-> proxy_objectbin()|object_metadata().
%% @doc
%% Return a proxyObject (e.g. form a head fold, so that the potential fetching
%% of an object can be deferred (e.g. it can be make dependent on the
%% applictaion making a decision on the contents of the object_metadata
maybe_build_proxy(?HEAD_TAG, ObjectMetadata, _InkerClone, _JR) ->
% Object has no value - so proxy object makese no sense, just return the
% metadata as is
ObjectMetadata;
maybe_build_proxy(Tag, ObjMetadata, InkerClone, JournalRef) ->
Size = get_size(Tag, ObjMetadata),
HeadBin = build_head(Tag, ObjMetadata),
term_to_binary({proxy_object,
HeadBin,
Size,
{fun leveled_bookie:fetch_value/2,
InkerClone,
JournalRef}}).
-spec extract_metadata(object_tag()|headonly_tag(), non_neg_integer(), any())
-> {object_metadata(), list(erlang:timestamp())}.
%% @doc
@ -158,10 +189,43 @@ maybe_build_proxy(Tag, ObjMetadata, InkerClone, JournalRef) ->
%% view of size is required within the header
extract_metadata(?RIAK_TAG, SizeAsStoredInJournal, RiakObj) ->
riak_extract_metadata(RiakObj, SizeAsStoredInJournal);
extract_metadata(_Tag, SizeAsStoredInJournal, Obj) ->
extract_metadata(?HEAD_TAG, SizeAsStoredInJournal, Obj) ->
{{standard_hash(Obj), SizeAsStoredInJournal}, []};
extract_metadata(?STD_TAG, SizeAsStoredInJournal, Obj) ->
default_extract_metadata(?STD_TAG, SizeAsStoredInJournal, Obj);
extract_metadata(Tag, SizeAsStoredInJournal, Obj) ->
OverrideFun =
get_appdefined_function(extract_metadata,
fun default_extract_metadata/3,
3),
OverrideFun(Tag, SizeAsStoredInJournal, Obj).
default_extract_metadata(_Tag, SizeAsStoredInJournal, Obj) ->
{{standard_hash(Obj), SizeAsStoredInJournal, undefined}, []}.
%%%============================================================================
%%% Standard External Functions
%%%============================================================================
-spec defined_objecttags() -> list(object_tag()).
%% @doc
%% Return the list of object tags
defined_objecttags() ->
[?STD_TAG, ?RIAK_TAG].
-spec default_reload_strategy(object_tag())
-> {object_tag(),
leveled_codec:compaction_method()}.
%% @doc
%% State the compaction_method to be used when reloading the Ledger from the
%% journal for each object tag. Note, no compaction startegy required for
%% head_only tag
default_reload_strategy(Tag) ->
{Tag, retain}.
-spec get_size(object_tag()|headonly_tag(), object_metadata())
-> non_neg_integer().
%% @doc
@ -181,25 +245,16 @@ get_hash(?RIAK_TAG, RiakObjectMetadata) ->
get_hash(_Tag, ObjectMetadata) ->
element(1, ObjectMetadata).
-spec default_reload_strategy(object_tag())
-> {object_tag(),
leveled_codec:compaction_method()}.
-spec standard_hash(any()) -> non_neg_integer().
%% @doc
%% State the compaction_method to be used when reloading the Ledger from the
%% journal for each object tag. Note, no compaction startegy required for
%% head_only tag
default_reload_strategy(Tag) ->
{Tag, retain}.
%%%============================================================================
%%% Tag-specific Functions
%%%============================================================================
%% Hash the whole object
standard_hash(Obj) ->
erlang:phash2(term_to_binary(Obj)).
%%%============================================================================
%%% Tag-specific Internal Functions
%%%============================================================================
-spec riak_extract_metadata(binary()|delete, non_neg_integer()) ->
{riak_metadata(), list()}.

View file

@ -680,10 +680,8 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
case DeferredFetch of
{true, JournalCheck} ->
ProxyObj =
leveled_head:maybe_build_proxy(Tag,
MD,
InkerClone,
JK),
leveled_codec:return_proxy(Tag, MD,
InkerClone, JK),
case JournalCheck of
true ->
InJournal =

View file

@ -0,0 +1,121 @@
-module(appdefined_SUITE).
-include_lib("common_test/include/ct.hrl").
-include("include/leveled.hrl").
-export([all/0]).
-export([application_defined_tag/1
]).
all() -> [
application_defined_tag
].
application_defined_tag(_Config) ->
T1 = os:timestamp(),
application_defined_tag_tester(40000, ?STD_TAG, [], false),
io:format("Completed with std tag in ~w ms~n",
[timer:now_diff(os:timestamp(), T1)/1000]),
T2 = os:timestamp(),
application_defined_tag_tester(40000, bespoke_tag1, [], false),
io:format("Completed with app tag but not function in ~w ms~n",
[timer:now_diff(os:timestamp(), T2)/1000]),
ExtractMDFun =
fun(Tag, Size, Obj) ->
[{hash, Hash}, {shard, Shard}, {random, Random}, {value, _V}]
= Obj,
case Tag of
bespoke_tag1 ->
{{Hash, Size, [{shard, Shard}, {random, Random}]}, []};
bespoke_tag2 ->
{{Hash, Size, [{shard, Shard}]}, [os:timestamp()]}
end
end,
T3 = os:timestamp(),
application_defined_tag_tester(40000, ?STD_TAG,
[{extract_metadata, ExtractMDFun}],
false),
io:format("Completed with std tag and override function in ~w ms~n",
[timer:now_diff(os:timestamp(), T3)/1000]),
T4 = os:timestamp(),
application_defined_tag_tester(40000, bespoke_tag1,
[{extract_metadata, ExtractMDFun}],
true),
io:format("Completed with app tag and override function in ~w ms~n",
[timer:now_diff(os:timestamp(), T4)/1000]),
T5 = os:timestamp(),
application_defined_tag_tester(40000, bespoke_tag2,
[{extract_metadata, ExtractMDFun}],
true),
io:format("Completed with app tag and override function in ~w ms~n",
[timer:now_diff(os:timestamp(), T5)/1000]).
application_defined_tag_tester(KeyCount, Tag, Functions, ExpectMD) ->
RootPath = testutil:reset_filestructure(),
StartOpts1 = [{root_path, RootPath},
{sync_strategy, testutil:sync_strategy()},
{log_level, warn},
{override_functions, Functions}],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
Value = leveled_rand:rand_bytes(512),
MapFun =
fun(C) ->
{C, object_generator(C, Value)}
end,
CBKVL = lists:map(MapFun, lists:seq(1, KeyCount)),
PutFun =
fun({_C, {B, K, O}}) ->
R = leveled_bookie:book_put(Bookie1, B, K, O, [], Tag),
case R of
ok -> ok;
pause -> timer:sleep(100)
end
end,
lists:foreach(PutFun, CBKVL),
CheckFun =
fun(Book) ->
fun({C, {B, K, O}}) ->
{ok, O} = leveled_bookie:book_get(Book, B, K, Tag),
{ok, H} = leveled_bookie:book_head(Book, B, K, Tag),
MD = element(3, H),
case ExpectMD of
true ->
true =
{shard, C rem 10} == lists:keyfind(shard, 1, MD);
false ->
true =
undefined == MD
end
end
end,
lists:foreach(CheckFun(Bookie1), CBKVL),
ok = leveled_bookie:book_close(Bookie1),
{ok, Bookie2} = leveled_bookie:book_start(StartOpts1),
lists:foreach(CheckFun(Bookie2), CBKVL),
ok = leveled_bookie:book_close(Bookie2).
object_generator(Count, V) ->
Hash = erlang:phash2({count, V}),
Random = leveled_rand:uniform(1000),
Key = list_to_binary(leveled_util:generate_uuid()),
Bucket = <<"B">>,
{Bucket,
Key,
[{hash, Hash}, {shard, Count rem 10},
{random, Random}, {value, V}]}.