From 8e687ee7c8ae5a0779151f78d0df06ca23c9e2eb Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 6 Dec 2018 21:00:59 +0000 Subject: [PATCH] Add user-defined functions To allow for extraction of metadata, and building of head responses - it should eb possible to dynamically and user-defined tags, and functions to treat them. If no function is defined, revert to the behaviour of the ?STD tag. --- src/leveled_bookie.erl | 15 +- src/leveled_codec.erl | 41 +++++- src/leveled_head.erl | 211 +++++++++++++++++---------- src/leveled_runner.erl | 6 +- test/end_to_end/appdefined_SUITE.erl | 121 +++++++++++++++ 5 files changed, 307 insertions(+), 87 deletions(-) create mode 100644 test/end_to_end/appdefined_SUITE.erl diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 1a1e626..30d2caf 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -130,7 +130,8 @@ {compression_method, ?COMPRESSION_METHOD}, {compression_point, ?COMPRESSION_POINT}, {log_level, ?LOG_LEVEL}, - {forced_logs, []}]). + {forced_logs, []}, + {override_functions, []}]). -record(ledger_cache, {mem :: ets:tab(), loader = leveled_tree:empty(?CACHE_TYPE) @@ -314,7 +315,7 @@ % moving to higher log levels will at present make the operator % blind to sample performance statistics of leveled sub-components % etc - {forced_logs, list(string())} + {forced_logs, list(string())} | % Forced logs allow for specific info level logs, such as those % logging stats to be logged even when the default log level has % been set to a higher log level. Using: @@ -323,6 +324,9 @@ % "P0032", "SST12", "CDB19", "SST13", "I0019"]} % Will log all timing points even when log_level is not set to % support info + {override_functions, list(leveled_head:appdefinable_function_tuple())} + % Provide a list of override functions that will be used for + % user-defined tags ]. @@ -1065,6 +1069,13 @@ init([Opts]) -> ForcedLogs = proplists:get_value(forced_logs, Opts), ok = application:set_env(leveled, forced_logs, ForcedLogs), + OverrideFunctions = proplists:get_value(override_functions, Opts), + SetFun = + fun({FuncName, Func}) -> + application:set_env(leveled, FuncName, Func) + end, + lists:foreach(SetFun, OverrideFunctions), + ConfiguredCacheSize = max(proplists:get_value(cache_size, Opts), ?MIN_CACHE_SIZE), CacheJitter = diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 9a58074..6e0ce63 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -45,7 +45,8 @@ obj_objectspecs/3, segment_hash/1, to_lookup/1, - next_key/1]). + next_key/1, + return_proxy/4]). -define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w"). -define(NRT_IDX, "$aae."). @@ -113,6 +114,17 @@ % first element must be re_pattern, but tuple may change legnth with % versions +-type value_fetcher() :: + {fun((pid(), leveled_codec:journal_key()) -> any()), + pid(), leveled_codec:journal_key()}. + % A 2-arity function, which when passed the other two elements of the tuple + % will return the value +-type proxy_object() :: + {proxy_object, leveled_head:head(), non_neg_integer(), value_fetcher()}. + % Returns the head, size and a tuple for accessing the value +-type proxy_objectbin() :: + binary(). + % using term_to_binary(proxy_object()) -type segment_list() @@ -138,7 +150,9 @@ maybe_lookup/0, last_moddate/0, lastmod_range/0, - regular_expression/0]). + regular_expression/0, + value_fetcher/0, + proxy_object/0]). %%%============================================================================ @@ -353,7 +367,7 @@ compact_inkerkvc({{SQN, ?INKT_STND, LK}, V, CrcCheck}, Strategy) -> -spec get_tagstrategy(ledger_key(), compaction_strategy()) -> skip|retain|recalc. %% @doc -%% Work out the compaction startegy for the key +%% Work out the compaction strategy for the key get_tagstrategy({Tag, _, _, _}, Strategy) -> case lists:keyfind(Tag, 1, Strategy) of {Tag, TagStrat} -> @@ -579,6 +593,27 @@ gen_headspec({IdxOp, Bucket, Key, SubKey, Value}, SQN, TTL) -> {K, {SQN, Status, segment_hash(K), Value, undefined}}. +-spec return_proxy(leveled_head:object_tag()|leveled_head:headonly_tag(), + leveled_head:object_metadata(), + pid(), journal_ref()) + -> proxy_objectbin()|leveled_head:object_metadata(). +%% @doc +%% If the object has a value, return the metadata and a proxy through which +%% the applictaion or runner can access the value. If it is a ?HEAD_TAG +%% then it has no value, so just return the metadata +return_proxy(?HEAD_TAG, ObjectMetadata, _InkerClone, _JR) -> + % Object has no value - so proxy object makese no sense, just return the + % metadata as is + ObjectMetadata; +return_proxy(Tag, ObjMetadata, InkerClone, JournalRef) -> + Size = leveled_head:get_size(Tag, ObjMetadata), + HeadBin = leveled_head:build_head(Tag, ObjMetadata), + term_to_binary({proxy_object, + HeadBin, + Size, + {fun leveled_bookie:fetch_value/2, + InkerClone, + JournalRef}}). set_status(add, TTL) -> {active, TTL}; diff --git a/src/leveled_head.erl b/src/leveled_head.erl index f089a36..93d0330 100644 --- a/src/leveled_head.erl +++ b/src/leveled_head.erl @@ -6,6 +6,13 @@ %% For the ?RIAK tag this is pre-defined. For the ?STD_TAG there is minimal %% definition. For best use of Riak define a new tag and use pattern matching %% to extend these exported functions. +%% +%% Dynamic user-defined tags are allowed, and to support these user-defined +%% shadow versions of the functions: +%% - key_to_canonicalbinary/1 -> binary(), +%% - build_head/2 -> head(), +%% - extract_metadata/3 -> {std_metadata(), list(erlang:timestamp()} +%% That support all the user-defined tags that are to be used -module(leveled_head). @@ -13,14 +20,17 @@ -include_lib("eunit/include/eunit.hrl"). --export([build_head/2, - maybe_build_proxy/4, - extract_metadata/3, - get_size/2, +-export([key_to_canonicalbinary/1, + build_head/2, + extract_metadata/3 + ]). + +-export([get_size/2, get_hash/2, - default_reload_strategy/1, defined_objecttags/0, - key_to_canonicalbinary/1]). + default_reload_strategy/1, + standard_hash/1 + ]). %% Exported for testing purposes -export([riak_metadata_to_binary/2, @@ -30,55 +40,73 @@ -define(MAGIC, 53). % riak_kv -> riak_object -define(V1_VERS, 1). --type riak_metadata() :: {binary()|delete, % Sibling Metadata - binary()|null, % Vclock Metadata - non_neg_integer()|null, % Hash of vclock - non-exportable - non_neg_integer()}. % Size in bytes of real object --type std_metadata() :: {non_neg_integer()|null, % Hash of value - non_neg_integer(), % Size in bytes of real object - list(tuple())|undefined}. - - -type object_tag() :: ?STD_TAG|?RIAK_TAG. % tags assigned to objects % (not other special entities such as ?HEAD or ?IDX) -type headonly_tag() :: ?HEAD_TAG. % Tag assigned to head_only objects. Behaviour cannot be changed --type object_metadata() :: riak_metadata()|std_metadata(). --type head_bin() :: +-type riak_metadata() :: {binary()|delete, + % Sibling Metadata + binary()|null, + % Vclock Metadata + non_neg_integer()|null, + % Hash of vclock - non-exportable + non_neg_integer() + % Size in bytes of real object + }. +-type std_metadata() :: {non_neg_integer()|null, + % Hash of value + non_neg_integer(), + % Size in bytes of real object + list(tuple())|undefined + % User-define metadata + }. +-type head_metadata() :: {non_neg_integer()|null, + % Hash of value + non_neg_integer() + % Size in bytes of real object + }. + +-type object_metadata() :: riak_metadata()|std_metadata()|head_metadata(). + +-type appdefinable_function() :: + key_to_canonicalbinary | build_head | extract_metadata. + % Functions for which default behaviour can be over-written for the + % application's own tags +-type appdefinable_function_tuple() :: + {appdefinable_function(), fun()}. + +-type head() :: binary()|tuple(). % TODO: % This is currently not always a binary. Wish is to migrate this so that % it is predictably a binary --type value_fetcher() :: - {fun((pid(), leveled_codec:journal_key()) -> any()), - pid(), leveled_codec:journal_key()}. - % A 2-arity function, which when passed the other two elements of the tuple - % will return the value --type proxy_object() :: - {proxy_object, head_bin(), non_neg_integer(), value_fetcher()}. - % Returns the head, size and a tuple for accessing the value --type proxy_objectbin() :: - binary(). - % using term_to_binary(proxy_object()) -export_type([object_tag/0, - proxy_object/0, - value_fetcher/0, - head_bin/0, - proxy_objectbin/0]). + head/0, + object_metadata/0, + appdefinable_function_tuple/0]). %%%============================================================================ -%%% External Functions +%%% Mutable External Functions %%%============================================================================ --spec defined_objecttags() -> list(object_tag()). +-spec get_appdefined_function(appdefinable_function(), + fun(), + non_neg_integer()) -> fun(). %% @doc -%% Return the list of object tags -defined_objecttags() -> - [?STD_TAG, ?RIAK_TAG]. +%% If a keylist of [{function_name, fun()}] has been set as an environment +%% variable for a tag, then this FunctionName can be used instead of the +%% default +get_appdefined_function(FunctionName, DefaultFun, RequiredArity) -> + case application:get_env(leveled, FunctionName) of + undefined -> + DefaultFun; + {ok, Fun} when is_function(Fun, RequiredArity) -> + Fun + end. -spec key_to_canonicalbinary(tuple()) -> binary(). @@ -97,7 +125,7 @@ key_to_canonicalbinary({?RIAK_TAG, {BucketType, Bucket}, Key, SubKey}) key_to_canonicalbinary({?HEAD_TAG, Bucket, Key, SubK}) when is_binary(Bucket), is_binary(Key), is_binary(SubK) -> <>; -key_to_canonicalbinary({?HEAD_TAG, Bucket, Key, _SubK}) +key_to_canonicalbinary({?HEAD_TAG, Bucket, Key, null}) when is_binary(Bucket), is_binary(Key) -> <>; key_to_canonicalbinary({?HEAD_TAG, {BucketType, Bucket}, Key, SubKey}) @@ -106,42 +134,45 @@ key_to_canonicalbinary({?HEAD_TAG, {BucketType, Bucket}, Key, SubKey}) <>, Key, SubKey}); +key_to_canonicalbinary(Key) when element(1, Key) == ?HEAD_TAG -> + % In unit tests head specs can have non-binary keys, so handle + % this through hashing the whole key + default_key_to_canonicalbinary(Key); +key_to_canonicalbinary(Key) when element(1, Key) == ?STD_TAG -> + default_key_to_canonicalbinary(Key); key_to_canonicalbinary(Key) -> + OverrideFun = + get_appdefined_function(key_to_canonicalbinary, + fun default_key_to_canonicalbinary/1, + 1), + OverrideFun(Key). + +default_key_to_canonicalbinary(Key) -> term_to_binary(Key). --spec build_head(object_tag()|headonly_tag(), object_metadata()) -> head_bin(). + +-spec build_head(object_tag()|headonly_tag(), object_metadata()) -> head(). %% @doc %% Return the object metadata as a binary to be the "head" of the object build_head(?RIAK_TAG, Metadata) -> {SibData, Vclock, _Hash, _Size} = Metadata, riak_metadata_to_binary(Vclock, SibData); -build_head(_Tag, Metadata) -> +build_head(?HEAD_TAG, Metadata) -> % term_to_binary(Metadata). + default_build_head(?HEAD_TAG, Metadata); +build_head(?STD_TAG, Metadata) -> + default_build_head(?STD_TAG, Metadata); +build_head(Tag, Metadata) -> + OverrideFun = + get_appdefined_function(build_head, + fun default_build_head/2, + 2), + OverrideFun(Tag, Metadata). + +default_build_head(_Tag, Metadata) -> Metadata. --spec maybe_build_proxy(object_tag()|headonly_tag(), object_metadata(), - pid(), leveled_codec:journal_ref()) - -> proxy_objectbin()|object_metadata(). -%% @doc -%% Return a proxyObject (e.g. form a head fold, so that the potential fetching -%% of an object can be deferred (e.g. it can be make dependent on the -%% applictaion making a decision on the contents of the object_metadata -maybe_build_proxy(?HEAD_TAG, ObjectMetadata, _InkerClone, _JR) -> - % Object has no value - so proxy object makese no sense, just return the - % metadata as is - ObjectMetadata; -maybe_build_proxy(Tag, ObjMetadata, InkerClone, JournalRef) -> - Size = get_size(Tag, ObjMetadata), - HeadBin = build_head(Tag, ObjMetadata), - term_to_binary({proxy_object, - HeadBin, - Size, - {fun leveled_bookie:fetch_value/2, - InkerClone, - JournalRef}}). - - -spec extract_metadata(object_tag()|headonly_tag(), non_neg_integer(), any()) -> {object_metadata(), list(erlang:timestamp())}. %% @doc @@ -158,10 +189,43 @@ maybe_build_proxy(Tag, ObjMetadata, InkerClone, JournalRef) -> %% view of size is required within the header extract_metadata(?RIAK_TAG, SizeAsStoredInJournal, RiakObj) -> riak_extract_metadata(RiakObj, SizeAsStoredInJournal); -extract_metadata(_Tag, SizeAsStoredInJournal, Obj) -> +extract_metadata(?HEAD_TAG, SizeAsStoredInJournal, Obj) -> + {{standard_hash(Obj), SizeAsStoredInJournal}, []}; +extract_metadata(?STD_TAG, SizeAsStoredInJournal, Obj) -> + default_extract_metadata(?STD_TAG, SizeAsStoredInJournal, Obj); +extract_metadata(Tag, SizeAsStoredInJournal, Obj) -> + OverrideFun = + get_appdefined_function(extract_metadata, + fun default_extract_metadata/3, + 3), + OverrideFun(Tag, SizeAsStoredInJournal, Obj). + +default_extract_metadata(_Tag, SizeAsStoredInJournal, Obj) -> {{standard_hash(Obj), SizeAsStoredInJournal, undefined}, []}. +%%%============================================================================ +%%% Standard External Functions +%%%============================================================================ + +-spec defined_objecttags() -> list(object_tag()). +%% @doc +%% Return the list of object tags +defined_objecttags() -> + [?STD_TAG, ?RIAK_TAG]. + + +-spec default_reload_strategy(object_tag()) + -> {object_tag(), + leveled_codec:compaction_method()}. +%% @doc +%% State the compaction_method to be used when reloading the Ledger from the +%% journal for each object tag. Note, no compaction startegy required for +%% head_only tag +default_reload_strategy(Tag) -> + {Tag, retain}. + + -spec get_size(object_tag()|headonly_tag(), object_metadata()) -> non_neg_integer(). %% @doc @@ -181,25 +245,16 @@ get_hash(?RIAK_TAG, RiakObjectMetadata) -> get_hash(_Tag, ObjectMetadata) -> element(1, ObjectMetadata). - --spec default_reload_strategy(object_tag()) - -> {object_tag(), - leveled_codec:compaction_method()}. +-spec standard_hash(any()) -> non_neg_integer(). %% @doc -%% State the compaction_method to be used when reloading the Ledger from the -%% journal for each object tag. Note, no compaction startegy required for -%% head_only tag -default_reload_strategy(Tag) -> - {Tag, retain}. - - -%%%============================================================================ -%%% Tag-specific Functions -%%%============================================================================ - +%% Hash the whole object standard_hash(Obj) -> erlang:phash2(term_to_binary(Obj)). +%%%============================================================================ +%%% Tag-specific Internal Functions +%%%============================================================================ + -spec riak_extract_metadata(binary()|delete, non_neg_integer()) -> {riak_metadata(), list()}. diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index 3f582d6..a6d4019 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -680,10 +680,8 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) -> case DeferredFetch of {true, JournalCheck} -> ProxyObj = - leveled_head:maybe_build_proxy(Tag, - MD, - InkerClone, - JK), + leveled_codec:return_proxy(Tag, MD, + InkerClone, JK), case JournalCheck of true -> InJournal = diff --git a/test/end_to_end/appdefined_SUITE.erl b/test/end_to_end/appdefined_SUITE.erl new file mode 100644 index 0000000..79301de --- /dev/null +++ b/test/end_to_end/appdefined_SUITE.erl @@ -0,0 +1,121 @@ +-module(appdefined_SUITE). +-include_lib("common_test/include/ct.hrl"). +-include("include/leveled.hrl"). +-export([all/0]). +-export([application_defined_tag/1 + ]). + +all() -> [ + application_defined_tag + ]. + + + +application_defined_tag(_Config) -> + T1 = os:timestamp(), + application_defined_tag_tester(40000, ?STD_TAG, [], false), + io:format("Completed with std tag in ~w ms~n", + [timer:now_diff(os:timestamp(), T1)/1000]), + + T2 = os:timestamp(), + application_defined_tag_tester(40000, bespoke_tag1, [], false), + io:format("Completed with app tag but not function in ~w ms~n", + [timer:now_diff(os:timestamp(), T2)/1000]), + + ExtractMDFun = + fun(Tag, Size, Obj) -> + [{hash, Hash}, {shard, Shard}, {random, Random}, {value, _V}] + = Obj, + case Tag of + bespoke_tag1 -> + {{Hash, Size, [{shard, Shard}, {random, Random}]}, []}; + bespoke_tag2 -> + {{Hash, Size, [{shard, Shard}]}, [os:timestamp()]} + end + end, + + T3 = os:timestamp(), + application_defined_tag_tester(40000, ?STD_TAG, + [{extract_metadata, ExtractMDFun}], + false), + io:format("Completed with std tag and override function in ~w ms~n", + [timer:now_diff(os:timestamp(), T3)/1000]), + + T4 = os:timestamp(), + application_defined_tag_tester(40000, bespoke_tag1, + [{extract_metadata, ExtractMDFun}], + true), + io:format("Completed with app tag and override function in ~w ms~n", + [timer:now_diff(os:timestamp(), T4)/1000]), + + + T5 = os:timestamp(), + application_defined_tag_tester(40000, bespoke_tag2, + [{extract_metadata, ExtractMDFun}], + true), + io:format("Completed with app tag and override function in ~w ms~n", + [timer:now_diff(os:timestamp(), T5)/1000]). + + +application_defined_tag_tester(KeyCount, Tag, Functions, ExpectMD) -> + RootPath = testutil:reset_filestructure(), + StartOpts1 = [{root_path, RootPath}, + {sync_strategy, testutil:sync_strategy()}, + {log_level, warn}, + {override_functions, Functions}], + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + Value = leveled_rand:rand_bytes(512), + MapFun = + fun(C) -> + {C, object_generator(C, Value)} + end, + CBKVL = lists:map(MapFun, lists:seq(1, KeyCount)), + + PutFun = + fun({_C, {B, K, O}}) -> + R = leveled_bookie:book_put(Bookie1, B, K, O, [], Tag), + case R of + ok -> ok; + pause -> timer:sleep(100) + end + end, + lists:foreach(PutFun, CBKVL), + + CheckFun = + fun(Book) -> + fun({C, {B, K, O}}) -> + {ok, O} = leveled_bookie:book_get(Book, B, K, Tag), + {ok, H} = leveled_bookie:book_head(Book, B, K, Tag), + MD = element(3, H), + case ExpectMD of + true -> + true = + {shard, C rem 10} == lists:keyfind(shard, 1, MD); + false -> + true = + undefined == MD + end + end + end, + + lists:foreach(CheckFun(Bookie1), CBKVL), + + ok = leveled_bookie:book_close(Bookie1), + {ok, Bookie2} = leveled_bookie:book_start(StartOpts1), + + lists:foreach(CheckFun(Bookie2), CBKVL), + + ok = leveled_bookie:book_close(Bookie2). + + + + +object_generator(Count, V) -> + Hash = erlang:phash2({count, V}), + Random = leveled_rand:uniform(1000), + Key = list_to_binary(leveled_util:generate_uuid()), + Bucket = <<"B">>, + {Bucket, + Key, + [{hash, Hash}, {shard, Count rem 10}, + {random, Random}, {value, V}]}. \ No newline at end of file