From 881b93229b2634de236ede73ee6a7ee04a9efed2 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 6 Dec 2018 15:31:11 +0000 Subject: [PATCH 1/7] Isolate better changes needed to support changes to metadata extraction More obvious how to extend the code as it is all in one module. Also add a new field to the standard object metadata tuple that may hold in the future other object metadata base don user-defined functions. --- src/leveled_bookie.erl | 6 +- src/leveled_codec.erl | 194 +++------------------- src/leveled_head.erl | 274 ++++++++++++++++++++++++++++++++ src/leveled_runner.erl | 51 +++--- src/leveled_sst.erl | 2 +- test/end_to_end/basic_SUITE.erl | 6 +- test/end_to_end/testutil.erl | 2 +- 7 files changed, 322 insertions(+), 213 deletions(-) create mode 100644 src/leveled_head.erl diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 3b1b56c..1a1e626 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -1270,7 +1270,7 @@ handle_call({head, Bucket, Key, Tag}, _From, State) not_found -> not_found; _ -> - {ok, leveled_codec:build_metadata_object(LK, LedgerMD)} + {ok, leveled_head:build_head(Tag, LedgerMD)} end, {_SW, UpdTimingsR} = update_timings(SWr, {head, rsp}, UpdTimingsP), @@ -1437,7 +1437,7 @@ snapshot_store(State, SnapType, Query, LongRunning) -> Query, LongRunning). --spec fetch_value(pid(), {any(), integer()}) -> not_present|any(). +-spec fetch_value(pid(), leveled_codec:journal_ref()) -> not_present|any(). %% @doc %% Fetch a value from the Journal fetch_value(Inker, {Key, SQN}) -> @@ -2517,7 +2517,7 @@ foldobjects_vs_hashtree_testto() -> MD, _Size, _Fetcher} = binary_to_term(ProxyV), - {Hash, _Size} = MD, + {Hash, _Size, _UserDefinedMD} = MD, [{B, K, Hash}|Acc] end, diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 665b687..9a58074 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -2,28 +2,8 @@ %% %% Functions for manipulating keys and values within leveled. %% -%% -%% Within the LEDGER: -%% Keys are of the form - -%% {Tag, Bucket, Key, SubKey|null} -%% Values are of the form -%% {SQN, Status, MD} -%% -%% Within the JOURNAL: -%% Keys are of the form - -%% {SQN, LedgerKey} -%% Values are of the form -%% {Object, IndexSpecs} (as a binary) -%% -%% IndexSpecs are of the form of a Ledger Key/Value -%% -%% Tags need to be set during PUT operations and each Tag used must be -%% supported in an extract_metadata and a build_metadata_object function clause -%% -%% Currently the only tags supported are: -%% - o (standard objects) -%% - o_rkv (riak objects) -%% - i (index entries) +%% Any thing specific to handling of a given tag should be encapsulated +%% within the leveled_head module -module(leveled_codec). @@ -58,30 +38,20 @@ check_forinkertype/2, maybe_compress/2, create_value_for_journal/3, - build_metadata_object/2, generate_ledgerkv/5, get_size/2, get_keyandobjhash/2, idx_indexspecs/5, obj_objectspecs/3, - riak_extract_metadata/2, segment_hash/1, to_lookup/1, - riak_metadata_to_binary/2, next_key/1]). --define(V1_VERS, 1). --define(MAGIC, 53). % riak_kv -> riak_object -define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w"). -define(NRT_IDX, "$aae."). --type riak_metadata() :: {binary()|delete, % Sibling Metadata - binary()|null, % Vclock Metadata - integer()|null, % Hash of vclock - non-exportable - integer()}. % Size in bytes of real object - -type tag() :: - ?STD_TAG|?RIAK_TAG|?IDX_TAG|?HEAD_TAG. + leveled_head:object_tag()|?IDX_TAG|?HEAD_TAG. -type key() :: binary()|string()|{binary(), binary()}. % Keys SHOULD be binary() @@ -113,12 +83,16 @@ {sqn(), ledger_status(), segment_hash(), metadata(), last_moddate()}. -type ledger_kv() :: {ledger_key(), ledger_value()}. +-type compaction_method() :: + retain|skip|recalc. -type compaction_strategy() :: - list({tag(), retain|skip|recalc}). + list({tag(), compaction_method()}). -type journal_key_tag() :: ?INKT_STND|?INKT_TOMB|?INKT_MPUT|?INKT_KEYD. -type journal_key() :: - {integer(), journal_key_tag(), ledger_key()}. + {sqn(), journal_key_tag(), ledger_key()}. +-type journal_ref() :: + {ledger_key(), sqn()}. -type object_spec_v0() :: {add|remove, key(), key(), key()|null, any()}. -type object_spec_v1() :: @@ -153,8 +127,10 @@ ledger_value/0, ledger_kv/0, compaction_strategy/0, + compaction_method/0, journal_key_tag/0, journal_key/0, + journal_ref/0, compression_method/0, journal_keychanges/0, index_specs/0, @@ -179,29 +155,8 @@ segment_hash(Key) when is_binary(Key) -> {segment_hash, SegmentID, ExtraHash, _AltHash} = leveled_tictac:keyto_segment48(Key), {SegmentID, ExtraHash}; -segment_hash({?RIAK_TAG, Bucket, Key, null}) - when is_binary(Bucket), is_binary(Key) -> - segment_hash(<>); -segment_hash({?RIAK_TAG, {BucketType, Bucket}, Key, SubKey}) - when is_binary(BucketType), is_binary(Bucket) -> - segment_hash({?RIAK_TAG, - <>, - Key, - SubKey}); -segment_hash({?HEAD_TAG, Bucket, Key, SubK}) - when is_binary(Bucket), is_binary(Key), is_binary(SubK) -> - segment_hash(<>); -segment_hash({?HEAD_TAG, Bucket, Key, _SubK}) - when is_binary(Bucket), is_binary(Key) -> - segment_hash(<>); -segment_hash({?HEAD_TAG, {BucketType, Bucket}, Key, SubKey}) - when is_binary(BucketType), is_binary(Bucket) -> - segment_hash({?HEAD_TAG, - <>, - Key, - SubKey}); -segment_hash(Key) -> - segment_hash(term_to_binary(Key)). +segment_hash(KeyTuple) when is_tuple(KeyTuple) -> + segment_hash(leveled_head:key_to_canonicalbinary(KeyTuple)). -spec to_lookup(ledger_key()) -> maybe_lookup(). @@ -355,7 +310,9 @@ endkey_passed(EndKey, CheckingKey) -> %% Take the default startegy for compaction, and override the approach for any %% tags passed in inker_reload_strategy(AltList) -> - ReloadStrategy0 = [{?RIAK_TAG, retain}, {?STD_TAG, retain}], + ReloadStrategy0 = + lists:map(fun leveled_head:default_reload_strategy/1, + leveled_head:defined_objecttags()), lists:foldl(fun({X, Y}, SList) -> lists:keyreplace(X, 1, SList, {X, Y}) end, @@ -571,8 +528,6 @@ check_forinkertype(_LedgerKey, head_only) -> check_forinkertype(_LedgerKey, _Object) -> ?INKT_STND. -hash(Obj) -> - erlang:phash2(term_to_binary(Obj)). @@ -657,8 +612,8 @@ generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) -> {active, TS} end, Hash = segment_hash(PrimaryKey), - {MD, LastMods} = extract_metadata(Obj, Size, Tag), - ObjHash = get_objhash(Tag, MD), + {MD, LastMods} = leveled_head:extract_metadata(Tag, Size, Obj), + ObjHash = leveled_head:get_hash(Tag, MD), Value = {SQN, Status, Hash, @@ -679,23 +634,10 @@ get_last_lastmodification(LastMods) -> {Mega, Sec, _Micro} = lists:max(LastMods), Mega * 1000000 + Sec. - -extract_metadata(Obj, Size, ?RIAK_TAG) -> - riak_extract_metadata(Obj, Size); -extract_metadata(Obj, Size, ?STD_TAG) -> - {{hash(Obj), Size}, []}. - get_size(PK, Value) -> {Tag, _Bucket, _Key, _} = PK, MD = element(4, Value), - case Tag of - ?RIAK_TAG -> - {_RMD, _VC, _Hash, Size} = MD, - Size; - ?STD_TAG -> - {_Hash, Size} = MD, - Size - end. + leveled_head:get_size(Tag, MD). -spec get_keyandobjhash(tuple(), tuple()) -> tuple(). %% @doc @@ -709,105 +651,9 @@ get_keyandobjhash(LK, Value) -> ?IDX_TAG -> from_ledgerkey(LK); % returns {Bucket, Key, IdxValue} _ -> - {Bucket, Key, get_objhash(Tag, MD)} + {Bucket, Key, leveled_head:get_hash(Tag, MD)} end. -get_objhash(Tag, ObjMetaData) -> - case Tag of - ?RIAK_TAG -> - {_RMD, _VC, Hash, _Size} = ObjMetaData, - Hash; - ?STD_TAG -> - {Hash, _Size} = ObjMetaData, - Hash - end. - - -build_metadata_object(PrimaryKey, MD) -> - {Tag, _Bucket, _Key, _SubKey} = PrimaryKey, - case Tag of - ?RIAK_TAG -> - {SibData, Vclock, _Hash, _Size} = MD, - riak_metadata_to_binary(Vclock, SibData); - ?STD_TAG -> - MD; - ?HEAD_TAG -> - MD - end. - - --spec riak_extract_metadata(binary()|delete, non_neg_integer()) -> - {riak_metadata(), list()}. -%% @doc -%% Riak extract metadata should extract a metadata object which is a -%% five-tuple of: -%% - Binary of sibling Metadata -%% - Binary of vector clock metadata -%% - Non-exportable hash of the vector clock metadata -%% - The largest last modified date of the object -%% - Size of the object -%% -%% The metadata object should be returned with the full list of last -%% modified dates (which will be used for recent anti-entropy index creation) -riak_extract_metadata(delete, Size) -> - {{delete, null, null, Size}, []}; -riak_extract_metadata(ObjBin, Size) -> - {VclockBin, SibBin, LastMods} = riak_metadata_from_binary(ObjBin), - {{SibBin, - VclockBin, - erlang:phash2(lists:sort(binary_to_term(VclockBin))), - Size}, - LastMods}. - -%% <>. - -riak_metadata_to_binary(VclockBin, SibMetaBin) -> - VclockLen = byte_size(VclockBin), - <>. - -riak_metadata_from_binary(V1Binary) -> - <> = V1Binary, - <> = Rest, - {SibMetaBin, LastMods} = - case SibCount of - SC when is_integer(SC) -> - get_metadata_from_siblings(SibsBin, - SibCount, - <>, - []) - end, - {VclockBin, SibMetaBin, LastMods}. - -get_metadata_from_siblings(<<>>, 0, SibMetaBin, LastMods) -> - {SibMetaBin, LastMods}; -get_metadata_from_siblings(<>, - SibCount, - SibMetaBin, - LastMods) -> - <<_ValBin:ValLen/binary, MetaLen:32/integer, Rest1/binary>> = Rest0, - <> = Rest1, - LastMod = - case MetaBin of - <> -> - {MegaSec, Sec, MicroSec}; - _ -> - {0, 0, 0} - end, - get_metadata_from_siblings(Rest2, - SibCount - 1, - <>, - [LastMod|LastMods]). - -spec next_key(key()) -> key(). %% @doc %% Get the next key to iterate from a given point diff --git a/src/leveled_head.erl b/src/leveled_head.erl new file mode 100644 index 0000000..f089a36 --- /dev/null +++ b/src/leveled_head.erl @@ -0,0 +1,274 @@ +%% -------- Metadata Seperation - Head and Body --------- +%% +%% The definition of the part of the object that belongs to the HEAD, and +%% the part which belongs to the body. +%% +%% For the ?RIAK tag this is pre-defined. For the ?STD_TAG there is minimal +%% definition. For best use of Riak define a new tag and use pattern matching +%% to extend these exported functions. + +-module(leveled_head). + +-include("include/leveled.hrl"). + +-include_lib("eunit/include/eunit.hrl"). + +-export([build_head/2, + maybe_build_proxy/4, + extract_metadata/3, + get_size/2, + get_hash/2, + default_reload_strategy/1, + defined_objecttags/0, + key_to_canonicalbinary/1]). + +%% Exported for testing purposes +-export([riak_metadata_to_binary/2, + riak_extract_metadata/2]). + + +-define(MAGIC, 53). % riak_kv -> riak_object +-define(V1_VERS, 1). + +-type riak_metadata() :: {binary()|delete, % Sibling Metadata + binary()|null, % Vclock Metadata + non_neg_integer()|null, % Hash of vclock - non-exportable + non_neg_integer()}. % Size in bytes of real object +-type std_metadata() :: {non_neg_integer()|null, % Hash of value + non_neg_integer(), % Size in bytes of real object + list(tuple())|undefined}. + + +-type object_tag() :: ?STD_TAG|?RIAK_TAG. + % tags assigned to objects + % (not other special entities such as ?HEAD or ?IDX) +-type headonly_tag() :: ?HEAD_TAG. + % Tag assigned to head_only objects. Behaviour cannot be changed + +-type object_metadata() :: riak_metadata()|std_metadata(). +-type head_bin() :: + binary()|tuple(). + % TODO: + % This is currently not always a binary. Wish is to migrate this so that + % it is predictably a binary +-type value_fetcher() :: + {fun((pid(), leveled_codec:journal_key()) -> any()), + pid(), leveled_codec:journal_key()}. + % A 2-arity function, which when passed the other two elements of the tuple + % will return the value +-type proxy_object() :: + {proxy_object, head_bin(), non_neg_integer(), value_fetcher()}. + % Returns the head, size and a tuple for accessing the value +-type proxy_objectbin() :: + binary(). + % using term_to_binary(proxy_object()) + + +-export_type([object_tag/0, + proxy_object/0, + value_fetcher/0, + head_bin/0, + proxy_objectbin/0]). + +%%%============================================================================ +%%% External Functions +%%%============================================================================ + +-spec defined_objecttags() -> list(object_tag()). +%% @doc +%% Return the list of object tags +defined_objecttags() -> + [?STD_TAG, ?RIAK_TAG]. + + +-spec key_to_canonicalbinary(tuple()) -> binary(). +%% @doc +%% Convert a key to a binary in a consistent way for the tag. The binary will +%% then be used to create the hash +key_to_canonicalbinary({?RIAK_TAG, Bucket, Key, null}) + when is_binary(Bucket), is_binary(Key) -> + <>; +key_to_canonicalbinary({?RIAK_TAG, {BucketType, Bucket}, Key, SubKey}) + when is_binary(BucketType), is_binary(Bucket) -> + key_to_canonicalbinary({?RIAK_TAG, + <>, + Key, + SubKey}); +key_to_canonicalbinary({?HEAD_TAG, Bucket, Key, SubK}) + when is_binary(Bucket), is_binary(Key), is_binary(SubK) -> + <>; +key_to_canonicalbinary({?HEAD_TAG, Bucket, Key, _SubK}) + when is_binary(Bucket), is_binary(Key) -> + <>; +key_to_canonicalbinary({?HEAD_TAG, {BucketType, Bucket}, Key, SubKey}) + when is_binary(BucketType), is_binary(Bucket) -> + key_to_canonicalbinary({?HEAD_TAG, + <>, + Key, + SubKey}); +key_to_canonicalbinary(Key) -> + term_to_binary(Key). + +-spec build_head(object_tag()|headonly_tag(), object_metadata()) -> head_bin(). +%% @doc +%% Return the object metadata as a binary to be the "head" of the object +build_head(?RIAK_TAG, Metadata) -> + {SibData, Vclock, _Hash, _Size} = Metadata, + riak_metadata_to_binary(Vclock, SibData); +build_head(_Tag, Metadata) -> + % term_to_binary(Metadata). + Metadata. + + +-spec maybe_build_proxy(object_tag()|headonly_tag(), object_metadata(), + pid(), leveled_codec:journal_ref()) + -> proxy_objectbin()|object_metadata(). +%% @doc +%% Return a proxyObject (e.g. form a head fold, so that the potential fetching +%% of an object can be deferred (e.g. it can be make dependent on the +%% applictaion making a decision on the contents of the object_metadata +maybe_build_proxy(?HEAD_TAG, ObjectMetadata, _InkerClone, _JR) -> + % Object has no value - so proxy object makese no sense, just return the + % metadata as is + ObjectMetadata; +maybe_build_proxy(Tag, ObjMetadata, InkerClone, JournalRef) -> + Size = get_size(Tag, ObjMetadata), + HeadBin = build_head(Tag, ObjMetadata), + term_to_binary({proxy_object, + HeadBin, + Size, + {fun leveled_bookie:fetch_value/2, + InkerClone, + JournalRef}}). + + +-spec extract_metadata(object_tag()|headonly_tag(), non_neg_integer(), any()) + -> {object_metadata(), list(erlang:timestamp())}. +%% @doc +%% Take the inbound object and extract from it the metadata to be stored within +%% the ledger (and ultimately returned from a leveled_boookie:book_head/4 +%% request (after conversion using build_head/2). +%% +%% As part of the response also return a list of last_modification_dates +%% associated with the object - with those dates being expressed as erlang +%% timestamps. +%% +%% The Object Size passed in to this function is as calculated when writing +%% the object to the Journal. It may be recalculated here, if an alternative +%% view of size is required within the header +extract_metadata(?RIAK_TAG, SizeAsStoredInJournal, RiakObj) -> + riak_extract_metadata(RiakObj, SizeAsStoredInJournal); +extract_metadata(_Tag, SizeAsStoredInJournal, Obj) -> + {{standard_hash(Obj), SizeAsStoredInJournal, undefined}, []}. + + +-spec get_size(object_tag()|headonly_tag(), object_metadata()) + -> non_neg_integer(). +%% @doc +%% Fetch the size from the metadata +get_size(?RIAK_TAG, RiakObjectMetadata) -> + element(4, RiakObjectMetadata); +get_size(_Tag, ObjectMetadata) -> + element(2, ObjectMetadata). + + +-spec get_hash(object_tag()|headonly_tag(), object_metadata()) + -> non_neg_integer(). +%% @doc +%% Fetch the hash from the metadata +get_hash(?RIAK_TAG, RiakObjectMetadata) -> + element(3, RiakObjectMetadata); +get_hash(_Tag, ObjectMetadata) -> + element(1, ObjectMetadata). + + +-spec default_reload_strategy(object_tag()) + -> {object_tag(), + leveled_codec:compaction_method()}. +%% @doc +%% State the compaction_method to be used when reloading the Ledger from the +%% journal for each object tag. Note, no compaction startegy required for +%% head_only tag +default_reload_strategy(Tag) -> + {Tag, retain}. + + +%%%============================================================================ +%%% Tag-specific Functions +%%%============================================================================ + +standard_hash(Obj) -> + erlang:phash2(term_to_binary(Obj)). + + +-spec riak_extract_metadata(binary()|delete, non_neg_integer()) -> + {riak_metadata(), list()}. +%% @doc +%% Riak extract metadata should extract a metadata object which is a +%% five-tuple of: +%% - Binary of sibling Metadata +%% - Binary of vector clock metadata +%% - Non-exportable hash of the vector clock metadata +%% - The largest last modified date of the object +%% - Size of the object +%% +%% The metadata object should be returned with the full list of last +%% modified dates (which will be used for recent anti-entropy index creation) +riak_extract_metadata(delete, Size) -> + {{delete, null, null, Size}, []}; +riak_extract_metadata(ObjBin, Size) -> + {VclockBin, SibBin, LastMods} = riak_metadata_from_binary(ObjBin), + {{SibBin, + VclockBin, + erlang:phash2(lists:sort(binary_to_term(VclockBin))), + Size}, + LastMods}. + +%% <>. + +riak_metadata_to_binary(VclockBin, SibMetaBin) -> + VclockLen = byte_size(VclockBin), + <>. + +riak_metadata_from_binary(V1Binary) -> + <> = V1Binary, + <> = Rest, + {SibMetaBin, LastMods} = + case SibCount of + SC when is_integer(SC) -> + get_metadata_from_siblings(SibsBin, + SibCount, + <>, + []) + end, + {VclockBin, SibMetaBin, LastMods}. + +get_metadata_from_siblings(<<>>, 0, SibMetaBin, LastMods) -> + {SibMetaBin, LastMods}; +get_metadata_from_siblings(<>, + SibCount, + SibMetaBin, + LastMods) -> + <<_ValBin:ValLen/binary, MetaLen:32/integer, Rest1/binary>> = Rest0, + <> = Rest1, + LastMod = + case MetaBin of + <> -> + {MegaSec, Sec, MicroSec}; + _ -> + {0, 0, 0} + end, + get_metadata_from_siblings(Rest2, + SibCount - 1, + <>, + [LastMod|LastMods]). diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index 45e3518..3f582d6 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -678,27 +678,27 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) -> end, JK = {leveled_codec:to_ledgerkey(B, K, Tag), SQN}, case DeferredFetch of - {true, true} -> - InJournal = - leveled_inker:ink_keycheck(InkerClone, - LK, - SQN), - case InJournal of - probably -> - ProxyObj = - make_proxy_object(Tag, - LK, JK, MD, V, - InkerClone), - FoldObjectsFun(B, K, ProxyObj, Acc); - missing -> - Acc - end; - {true, false} -> + {true, JournalCheck} -> ProxyObj = - make_proxy_object(Tag, - LK, JK, MD, V, - InkerClone), - FoldObjectsFun(B, K, ProxyObj, Acc); + leveled_head:maybe_build_proxy(Tag, + MD, + InkerClone, + JK), + case JournalCheck of + true -> + InJournal = + leveled_inker:ink_keycheck(InkerClone, + LK, + SQN), + case InJournal of + probably -> + FoldObjectsFun(B, K, ProxyObj, Acc); + missing -> + Acc + end; + false -> + FoldObjectsFun(B, K, ProxyObj, Acc) + end; false -> R = leveled_bookie:fetch_value(InkerClone, JK), case R of @@ -706,7 +706,6 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) -> Acc; Value -> FoldObjectsFun(B, K, Value, Acc) - end end; false -> @@ -716,16 +715,6 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) -> AccFun. -make_proxy_object(?HEAD_TAG, _LK, _JK, MD, _V, _InkerClone) -> - MD; -make_proxy_object(_Tag, LK, JK, MD, V, InkerClone) -> - Size = leveled_codec:get_size(LK, V), - MDBin = leveled_codec:build_metadata_object(LK, MD), - term_to_binary({proxy_object, - MDBin, - Size, - {fun leveled_bookie:fetch_value/2, InkerClone, JK}}). - check_presence(Key, Value, InkerClone) -> {LedgerKey, SQN} = leveled_codec:strip_to_keyseqonly({Key, Value}), case leveled_inker:ink_keycheck(InkerClone, LedgerKey, SQN) of diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index e2f226b..6eb0ddc 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -3259,7 +3259,7 @@ nonsense_coverage_test() -> hashmatching_bytreesize_test() -> B = <<"Bucket">>, - V = leveled_codec:riak_metadata_to_binary(term_to_binary([{"actor1", 1}]), + V = leveled_head:riak_metadata_to_binary(term_to_binary([{"actor1", 1}]), <<1:32/integer, 0:32/integer, 0:32/integer>>), diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 2317510..fdde0c4 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -69,9 +69,9 @@ simple_test_withlog(LogLevel, ForcedLogs) -> ok = leveled_bookie:book_put(Bookie2, "Bucket1", "Key2", "Value2", [{add, "Index1", "Term1"}]), {ok, "Value2"} = leveled_bookie:book_get(Bookie2, "Bucket1", "Key2"), - {ok, {62888926, 60}} = leveled_bookie:book_head(Bookie2, - "Bucket1", - "Key2"), + {ok, {62888926, 60, undefined}} = leveled_bookie:book_head(Bookie2, + "Bucket1", + "Key2"), testutil:check_formissingobject(Bookie2, "Bucket1", "Key2"), ok = leveled_bookie:book_put(Bookie2, "Bucket1", "Key2", <<"Value2">>, [{remove, "Index1", "Term1"}, diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index d4c9e00..98ea9af 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -363,7 +363,7 @@ check_forobject(Bookie, TestObject) -> TestObject#r_object.bucket, TestObject#r_object.key), {{_SibMetaBin, Vclock, _Hash, size}, _LMS} - = leveled_codec:riak_extract_metadata(HeadBinary, size), + = leveled_head:riak_extract_metadata(HeadBinary, size), true = binary_to_term(Vclock) == TestObject#r_object.vclock. check_formissingobject(Bookie, Bucket, Key) -> From 8e687ee7c8ae5a0779151f78d0df06ca23c9e2eb Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 6 Dec 2018 21:00:59 +0000 Subject: [PATCH 2/7] Add user-defined functions To allow for extraction of metadata, and building of head responses - it should eb possible to dynamically and user-defined tags, and functions to treat them. If no function is defined, revert to the behaviour of the ?STD tag. --- src/leveled_bookie.erl | 15 +- src/leveled_codec.erl | 41 +++++- src/leveled_head.erl | 211 +++++++++++++++++---------- src/leveled_runner.erl | 6 +- test/end_to_end/appdefined_SUITE.erl | 121 +++++++++++++++ 5 files changed, 307 insertions(+), 87 deletions(-) create mode 100644 test/end_to_end/appdefined_SUITE.erl diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 1a1e626..30d2caf 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -130,7 +130,8 @@ {compression_method, ?COMPRESSION_METHOD}, {compression_point, ?COMPRESSION_POINT}, {log_level, ?LOG_LEVEL}, - {forced_logs, []}]). + {forced_logs, []}, + {override_functions, []}]). -record(ledger_cache, {mem :: ets:tab(), loader = leveled_tree:empty(?CACHE_TYPE) @@ -314,7 +315,7 @@ % moving to higher log levels will at present make the operator % blind to sample performance statistics of leveled sub-components % etc - {forced_logs, list(string())} + {forced_logs, list(string())} | % Forced logs allow for specific info level logs, such as those % logging stats to be logged even when the default log level has % been set to a higher log level. Using: @@ -323,6 +324,9 @@ % "P0032", "SST12", "CDB19", "SST13", "I0019"]} % Will log all timing points even when log_level is not set to % support info + {override_functions, list(leveled_head:appdefinable_function_tuple())} + % Provide a list of override functions that will be used for + % user-defined tags ]. @@ -1065,6 +1069,13 @@ init([Opts]) -> ForcedLogs = proplists:get_value(forced_logs, Opts), ok = application:set_env(leveled, forced_logs, ForcedLogs), + OverrideFunctions = proplists:get_value(override_functions, Opts), + SetFun = + fun({FuncName, Func}) -> + application:set_env(leveled, FuncName, Func) + end, + lists:foreach(SetFun, OverrideFunctions), + ConfiguredCacheSize = max(proplists:get_value(cache_size, Opts), ?MIN_CACHE_SIZE), CacheJitter = diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 9a58074..6e0ce63 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -45,7 +45,8 @@ obj_objectspecs/3, segment_hash/1, to_lookup/1, - next_key/1]). + next_key/1, + return_proxy/4]). -define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w"). -define(NRT_IDX, "$aae."). @@ -113,6 +114,17 @@ % first element must be re_pattern, but tuple may change legnth with % versions +-type value_fetcher() :: + {fun((pid(), leveled_codec:journal_key()) -> any()), + pid(), leveled_codec:journal_key()}. + % A 2-arity function, which when passed the other two elements of the tuple + % will return the value +-type proxy_object() :: + {proxy_object, leveled_head:head(), non_neg_integer(), value_fetcher()}. + % Returns the head, size and a tuple for accessing the value +-type proxy_objectbin() :: + binary(). + % using term_to_binary(proxy_object()) -type segment_list() @@ -138,7 +150,9 @@ maybe_lookup/0, last_moddate/0, lastmod_range/0, - regular_expression/0]). + regular_expression/0, + value_fetcher/0, + proxy_object/0]). %%%============================================================================ @@ -353,7 +367,7 @@ compact_inkerkvc({{SQN, ?INKT_STND, LK}, V, CrcCheck}, Strategy) -> -spec get_tagstrategy(ledger_key(), compaction_strategy()) -> skip|retain|recalc. %% @doc -%% Work out the compaction startegy for the key +%% Work out the compaction strategy for the key get_tagstrategy({Tag, _, _, _}, Strategy) -> case lists:keyfind(Tag, 1, Strategy) of {Tag, TagStrat} -> @@ -579,6 +593,27 @@ gen_headspec({IdxOp, Bucket, Key, SubKey, Value}, SQN, TTL) -> {K, {SQN, Status, segment_hash(K), Value, undefined}}. +-spec return_proxy(leveled_head:object_tag()|leveled_head:headonly_tag(), + leveled_head:object_metadata(), + pid(), journal_ref()) + -> proxy_objectbin()|leveled_head:object_metadata(). +%% @doc +%% If the object has a value, return the metadata and a proxy through which +%% the applictaion or runner can access the value. If it is a ?HEAD_TAG +%% then it has no value, so just return the metadata +return_proxy(?HEAD_TAG, ObjectMetadata, _InkerClone, _JR) -> + % Object has no value - so proxy object makese no sense, just return the + % metadata as is + ObjectMetadata; +return_proxy(Tag, ObjMetadata, InkerClone, JournalRef) -> + Size = leveled_head:get_size(Tag, ObjMetadata), + HeadBin = leveled_head:build_head(Tag, ObjMetadata), + term_to_binary({proxy_object, + HeadBin, + Size, + {fun leveled_bookie:fetch_value/2, + InkerClone, + JournalRef}}). set_status(add, TTL) -> {active, TTL}; diff --git a/src/leveled_head.erl b/src/leveled_head.erl index f089a36..93d0330 100644 --- a/src/leveled_head.erl +++ b/src/leveled_head.erl @@ -6,6 +6,13 @@ %% For the ?RIAK tag this is pre-defined. For the ?STD_TAG there is minimal %% definition. For best use of Riak define a new tag and use pattern matching %% to extend these exported functions. +%% +%% Dynamic user-defined tags are allowed, and to support these user-defined +%% shadow versions of the functions: +%% - key_to_canonicalbinary/1 -> binary(), +%% - build_head/2 -> head(), +%% - extract_metadata/3 -> {std_metadata(), list(erlang:timestamp()} +%% That support all the user-defined tags that are to be used -module(leveled_head). @@ -13,14 +20,17 @@ -include_lib("eunit/include/eunit.hrl"). --export([build_head/2, - maybe_build_proxy/4, - extract_metadata/3, - get_size/2, +-export([key_to_canonicalbinary/1, + build_head/2, + extract_metadata/3 + ]). + +-export([get_size/2, get_hash/2, - default_reload_strategy/1, defined_objecttags/0, - key_to_canonicalbinary/1]). + default_reload_strategy/1, + standard_hash/1 + ]). %% Exported for testing purposes -export([riak_metadata_to_binary/2, @@ -30,55 +40,73 @@ -define(MAGIC, 53). % riak_kv -> riak_object -define(V1_VERS, 1). --type riak_metadata() :: {binary()|delete, % Sibling Metadata - binary()|null, % Vclock Metadata - non_neg_integer()|null, % Hash of vclock - non-exportable - non_neg_integer()}. % Size in bytes of real object --type std_metadata() :: {non_neg_integer()|null, % Hash of value - non_neg_integer(), % Size in bytes of real object - list(tuple())|undefined}. - - -type object_tag() :: ?STD_TAG|?RIAK_TAG. % tags assigned to objects % (not other special entities such as ?HEAD or ?IDX) -type headonly_tag() :: ?HEAD_TAG. % Tag assigned to head_only objects. Behaviour cannot be changed --type object_metadata() :: riak_metadata()|std_metadata(). --type head_bin() :: +-type riak_metadata() :: {binary()|delete, + % Sibling Metadata + binary()|null, + % Vclock Metadata + non_neg_integer()|null, + % Hash of vclock - non-exportable + non_neg_integer() + % Size in bytes of real object + }. +-type std_metadata() :: {non_neg_integer()|null, + % Hash of value + non_neg_integer(), + % Size in bytes of real object + list(tuple())|undefined + % User-define metadata + }. +-type head_metadata() :: {non_neg_integer()|null, + % Hash of value + non_neg_integer() + % Size in bytes of real object + }. + +-type object_metadata() :: riak_metadata()|std_metadata()|head_metadata(). + +-type appdefinable_function() :: + key_to_canonicalbinary | build_head | extract_metadata. + % Functions for which default behaviour can be over-written for the + % application's own tags +-type appdefinable_function_tuple() :: + {appdefinable_function(), fun()}. + +-type head() :: binary()|tuple(). % TODO: % This is currently not always a binary. Wish is to migrate this so that % it is predictably a binary --type value_fetcher() :: - {fun((pid(), leveled_codec:journal_key()) -> any()), - pid(), leveled_codec:journal_key()}. - % A 2-arity function, which when passed the other two elements of the tuple - % will return the value --type proxy_object() :: - {proxy_object, head_bin(), non_neg_integer(), value_fetcher()}. - % Returns the head, size and a tuple for accessing the value --type proxy_objectbin() :: - binary(). - % using term_to_binary(proxy_object()) -export_type([object_tag/0, - proxy_object/0, - value_fetcher/0, - head_bin/0, - proxy_objectbin/0]). + head/0, + object_metadata/0, + appdefinable_function_tuple/0]). %%%============================================================================ -%%% External Functions +%%% Mutable External Functions %%%============================================================================ --spec defined_objecttags() -> list(object_tag()). +-spec get_appdefined_function(appdefinable_function(), + fun(), + non_neg_integer()) -> fun(). %% @doc -%% Return the list of object tags -defined_objecttags() -> - [?STD_TAG, ?RIAK_TAG]. +%% If a keylist of [{function_name, fun()}] has been set as an environment +%% variable for a tag, then this FunctionName can be used instead of the +%% default +get_appdefined_function(FunctionName, DefaultFun, RequiredArity) -> + case application:get_env(leveled, FunctionName) of + undefined -> + DefaultFun; + {ok, Fun} when is_function(Fun, RequiredArity) -> + Fun + end. -spec key_to_canonicalbinary(tuple()) -> binary(). @@ -97,7 +125,7 @@ key_to_canonicalbinary({?RIAK_TAG, {BucketType, Bucket}, Key, SubKey}) key_to_canonicalbinary({?HEAD_TAG, Bucket, Key, SubK}) when is_binary(Bucket), is_binary(Key), is_binary(SubK) -> <>; -key_to_canonicalbinary({?HEAD_TAG, Bucket, Key, _SubK}) +key_to_canonicalbinary({?HEAD_TAG, Bucket, Key, null}) when is_binary(Bucket), is_binary(Key) -> <>; key_to_canonicalbinary({?HEAD_TAG, {BucketType, Bucket}, Key, SubKey}) @@ -106,42 +134,45 @@ key_to_canonicalbinary({?HEAD_TAG, {BucketType, Bucket}, Key, SubKey}) <>, Key, SubKey}); +key_to_canonicalbinary(Key) when element(1, Key) == ?HEAD_TAG -> + % In unit tests head specs can have non-binary keys, so handle + % this through hashing the whole key + default_key_to_canonicalbinary(Key); +key_to_canonicalbinary(Key) when element(1, Key) == ?STD_TAG -> + default_key_to_canonicalbinary(Key); key_to_canonicalbinary(Key) -> + OverrideFun = + get_appdefined_function(key_to_canonicalbinary, + fun default_key_to_canonicalbinary/1, + 1), + OverrideFun(Key). + +default_key_to_canonicalbinary(Key) -> term_to_binary(Key). --spec build_head(object_tag()|headonly_tag(), object_metadata()) -> head_bin(). + +-spec build_head(object_tag()|headonly_tag(), object_metadata()) -> head(). %% @doc %% Return the object metadata as a binary to be the "head" of the object build_head(?RIAK_TAG, Metadata) -> {SibData, Vclock, _Hash, _Size} = Metadata, riak_metadata_to_binary(Vclock, SibData); -build_head(_Tag, Metadata) -> +build_head(?HEAD_TAG, Metadata) -> % term_to_binary(Metadata). + default_build_head(?HEAD_TAG, Metadata); +build_head(?STD_TAG, Metadata) -> + default_build_head(?STD_TAG, Metadata); +build_head(Tag, Metadata) -> + OverrideFun = + get_appdefined_function(build_head, + fun default_build_head/2, + 2), + OverrideFun(Tag, Metadata). + +default_build_head(_Tag, Metadata) -> Metadata. --spec maybe_build_proxy(object_tag()|headonly_tag(), object_metadata(), - pid(), leveled_codec:journal_ref()) - -> proxy_objectbin()|object_metadata(). -%% @doc -%% Return a proxyObject (e.g. form a head fold, so that the potential fetching -%% of an object can be deferred (e.g. it can be make dependent on the -%% applictaion making a decision on the contents of the object_metadata -maybe_build_proxy(?HEAD_TAG, ObjectMetadata, _InkerClone, _JR) -> - % Object has no value - so proxy object makese no sense, just return the - % metadata as is - ObjectMetadata; -maybe_build_proxy(Tag, ObjMetadata, InkerClone, JournalRef) -> - Size = get_size(Tag, ObjMetadata), - HeadBin = build_head(Tag, ObjMetadata), - term_to_binary({proxy_object, - HeadBin, - Size, - {fun leveled_bookie:fetch_value/2, - InkerClone, - JournalRef}}). - - -spec extract_metadata(object_tag()|headonly_tag(), non_neg_integer(), any()) -> {object_metadata(), list(erlang:timestamp())}. %% @doc @@ -158,10 +189,43 @@ maybe_build_proxy(Tag, ObjMetadata, InkerClone, JournalRef) -> %% view of size is required within the header extract_metadata(?RIAK_TAG, SizeAsStoredInJournal, RiakObj) -> riak_extract_metadata(RiakObj, SizeAsStoredInJournal); -extract_metadata(_Tag, SizeAsStoredInJournal, Obj) -> +extract_metadata(?HEAD_TAG, SizeAsStoredInJournal, Obj) -> + {{standard_hash(Obj), SizeAsStoredInJournal}, []}; +extract_metadata(?STD_TAG, SizeAsStoredInJournal, Obj) -> + default_extract_metadata(?STD_TAG, SizeAsStoredInJournal, Obj); +extract_metadata(Tag, SizeAsStoredInJournal, Obj) -> + OverrideFun = + get_appdefined_function(extract_metadata, + fun default_extract_metadata/3, + 3), + OverrideFun(Tag, SizeAsStoredInJournal, Obj). + +default_extract_metadata(_Tag, SizeAsStoredInJournal, Obj) -> {{standard_hash(Obj), SizeAsStoredInJournal, undefined}, []}. +%%%============================================================================ +%%% Standard External Functions +%%%============================================================================ + +-spec defined_objecttags() -> list(object_tag()). +%% @doc +%% Return the list of object tags +defined_objecttags() -> + [?STD_TAG, ?RIAK_TAG]. + + +-spec default_reload_strategy(object_tag()) + -> {object_tag(), + leveled_codec:compaction_method()}. +%% @doc +%% State the compaction_method to be used when reloading the Ledger from the +%% journal for each object tag. Note, no compaction startegy required for +%% head_only tag +default_reload_strategy(Tag) -> + {Tag, retain}. + + -spec get_size(object_tag()|headonly_tag(), object_metadata()) -> non_neg_integer(). %% @doc @@ -181,25 +245,16 @@ get_hash(?RIAK_TAG, RiakObjectMetadata) -> get_hash(_Tag, ObjectMetadata) -> element(1, ObjectMetadata). - --spec default_reload_strategy(object_tag()) - -> {object_tag(), - leveled_codec:compaction_method()}. +-spec standard_hash(any()) -> non_neg_integer(). %% @doc -%% State the compaction_method to be used when reloading the Ledger from the -%% journal for each object tag. Note, no compaction startegy required for -%% head_only tag -default_reload_strategy(Tag) -> - {Tag, retain}. - - -%%%============================================================================ -%%% Tag-specific Functions -%%%============================================================================ - +%% Hash the whole object standard_hash(Obj) -> erlang:phash2(term_to_binary(Obj)). +%%%============================================================================ +%%% Tag-specific Internal Functions +%%%============================================================================ + -spec riak_extract_metadata(binary()|delete, non_neg_integer()) -> {riak_metadata(), list()}. diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index 3f582d6..a6d4019 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -680,10 +680,8 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) -> case DeferredFetch of {true, JournalCheck} -> ProxyObj = - leveled_head:maybe_build_proxy(Tag, - MD, - InkerClone, - JK), + leveled_codec:return_proxy(Tag, MD, + InkerClone, JK), case JournalCheck of true -> InJournal = diff --git a/test/end_to_end/appdefined_SUITE.erl b/test/end_to_end/appdefined_SUITE.erl new file mode 100644 index 0000000..79301de --- /dev/null +++ b/test/end_to_end/appdefined_SUITE.erl @@ -0,0 +1,121 @@ +-module(appdefined_SUITE). +-include_lib("common_test/include/ct.hrl"). +-include("include/leveled.hrl"). +-export([all/0]). +-export([application_defined_tag/1 + ]). + +all() -> [ + application_defined_tag + ]. + + + +application_defined_tag(_Config) -> + T1 = os:timestamp(), + application_defined_tag_tester(40000, ?STD_TAG, [], false), + io:format("Completed with std tag in ~w ms~n", + [timer:now_diff(os:timestamp(), T1)/1000]), + + T2 = os:timestamp(), + application_defined_tag_tester(40000, bespoke_tag1, [], false), + io:format("Completed with app tag but not function in ~w ms~n", + [timer:now_diff(os:timestamp(), T2)/1000]), + + ExtractMDFun = + fun(Tag, Size, Obj) -> + [{hash, Hash}, {shard, Shard}, {random, Random}, {value, _V}] + = Obj, + case Tag of + bespoke_tag1 -> + {{Hash, Size, [{shard, Shard}, {random, Random}]}, []}; + bespoke_tag2 -> + {{Hash, Size, [{shard, Shard}]}, [os:timestamp()]} + end + end, + + T3 = os:timestamp(), + application_defined_tag_tester(40000, ?STD_TAG, + [{extract_metadata, ExtractMDFun}], + false), + io:format("Completed with std tag and override function in ~w ms~n", + [timer:now_diff(os:timestamp(), T3)/1000]), + + T4 = os:timestamp(), + application_defined_tag_tester(40000, bespoke_tag1, + [{extract_metadata, ExtractMDFun}], + true), + io:format("Completed with app tag and override function in ~w ms~n", + [timer:now_diff(os:timestamp(), T4)/1000]), + + + T5 = os:timestamp(), + application_defined_tag_tester(40000, bespoke_tag2, + [{extract_metadata, ExtractMDFun}], + true), + io:format("Completed with app tag and override function in ~w ms~n", + [timer:now_diff(os:timestamp(), T5)/1000]). + + +application_defined_tag_tester(KeyCount, Tag, Functions, ExpectMD) -> + RootPath = testutil:reset_filestructure(), + StartOpts1 = [{root_path, RootPath}, + {sync_strategy, testutil:sync_strategy()}, + {log_level, warn}, + {override_functions, Functions}], + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + Value = leveled_rand:rand_bytes(512), + MapFun = + fun(C) -> + {C, object_generator(C, Value)} + end, + CBKVL = lists:map(MapFun, lists:seq(1, KeyCount)), + + PutFun = + fun({_C, {B, K, O}}) -> + R = leveled_bookie:book_put(Bookie1, B, K, O, [], Tag), + case R of + ok -> ok; + pause -> timer:sleep(100) + end + end, + lists:foreach(PutFun, CBKVL), + + CheckFun = + fun(Book) -> + fun({C, {B, K, O}}) -> + {ok, O} = leveled_bookie:book_get(Book, B, K, Tag), + {ok, H} = leveled_bookie:book_head(Book, B, K, Tag), + MD = element(3, H), + case ExpectMD of + true -> + true = + {shard, C rem 10} == lists:keyfind(shard, 1, MD); + false -> + true = + undefined == MD + end + end + end, + + lists:foreach(CheckFun(Bookie1), CBKVL), + + ok = leveled_bookie:book_close(Bookie1), + {ok, Bookie2} = leveled_bookie:book_start(StartOpts1), + + lists:foreach(CheckFun(Bookie2), CBKVL), + + ok = leveled_bookie:book_close(Bookie2). + + + + +object_generator(Count, V) -> + Hash = erlang:phash2({count, V}), + Random = leveled_rand:uniform(1000), + Key = list_to_binary(leveled_util:generate_uuid()), + Bucket = <<"B">>, + {Bucket, + Key, + [{hash, Hash}, {shard, Count rem 10}, + {random, Random}, {value, V}]}. \ No newline at end of file From e0352414f200be8ccbd91121f116dbd9669a5c28 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 6 Dec 2018 22:45:05 +0000 Subject: [PATCH 3/7] iClerk refactor the skip/retain/recalc handlign was confusing. This removes the switcheroo between leveled_codec and leveled_iclerk when mkaing the decision. Also now the building of the accumulator is handled efficiently (not using ++ on the list). Tried to rmeove as much of ?HEAD tag handling from leveled_head - as we want leveled_head to be only concerned with the head manipulation for object tags (?STD, ?RIAK and user-defined). --- src/leveled_bookie.erl | 6 +-- src/leveled_codec.erl | 109 +++++++++++++++-------------------------- src/leveled_head.erl | 65 ++++++++++-------------- src/leveled_iclerk.erl | 69 ++++++++++++++++++-------- 4 files changed, 116 insertions(+), 133 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 30d2caf..1333eed 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -1033,10 +1033,8 @@ book_destroy(Pid) -> %% The function will be 1-arity, and can be passed the absolute folder name %% to store the backup. %% -%% Backup files are hard-linked. Does not work in head_only mode -%% -%% TODO: Can extend to head_only mode, and also support another parameter -%% which would backup persisted part of ledger (to make restart faster) +%% Backup files are hard-linked. Does not work in head_only mode, or if +%% index changes are used with a `skip` compaction/reload strategy book_hotbackup(Pid) -> gen_server:call(Pid, hot_backup, infinity). diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 6e0ce63..977bcaa 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -33,9 +33,10 @@ from_inkerkv/1, from_inkerkv/2, from_journalkey/1, - compact_inkerkvc/2, + revert_to_keydeltas/2, split_inkvalue/1, check_forinkertype/2, + get_tagstrategy/2, maybe_compress/2, create_value_for_journal/3, generate_ledgerkv/5, @@ -170,7 +171,32 @@ segment_hash(Key) when is_binary(Key) -> = leveled_tictac:keyto_segment48(Key), {SegmentID, ExtraHash}; segment_hash(KeyTuple) when is_tuple(KeyTuple) -> - segment_hash(leveled_head:key_to_canonicalbinary(KeyTuple)). + BinKey = + case element(1, Keytuple) of + ?HEAD_TAG -> + headkey_to_canonicalbinary(KeyTuple); + _ -> + leveled_head:key_to_canonicalbinary(KeyTuple) + end, + segment_hash(BinKey). + + +headkey_to_canonicalbinary({?HEAD_TAG, Bucket, Key, SubK}) + when is_binary(Bucket), is_binary(Key), is_binary(SubK) -> + <>; +headkey_to_canonicalbinary({?HEAD_TAG, Bucket, Key, null}) + when is_binary(Bucket), is_binary(Key) -> + <>; +headkey_to_canonicalbinary({?HEAD_TAG, {BucketType, Bucket}, Key, SubKey}) + when is_binary(BucketType), is_binary(Bucket) -> + headkey_to_canonicalbinary({?HEAD_TAG, + <>, + Key, + SubKey}); +headkey_to_canonicalbinary(Key) when element(1, Key) == ?HEAD_TAG -> + % In unit tests head specs can have non-binary keys, so handle + % this through hashing the whole key + term_to_binary(Key). -spec to_lookup(ledger_key()) -> maybe_lookup(). @@ -334,36 +360,6 @@ inker_reload_strategy(AltList) -> AltList). --spec compact_inkerkvc({journal_key(), any(), boolean()}, - compaction_strategy()) -> - skip|{retain, any()}|{recalc, null}. -%% @doc -%% Decide whether a superceded object should be replicated in the compacted -%% file and in what format. -compact_inkerkvc({_InkerKey, crc_wonky, false}, _Strategy) -> - skip; -compact_inkerkvc({{_SQN, ?INKT_TOMB, _LK}, _V, _CrcCheck}, _Strategy) -> - skip; -compact_inkerkvc({{SQN, ?INKT_KEYD, LK}, V, CrcCheck}, Strategy) -> - case get_tagstrategy(LK, Strategy) of - skip -> - skip; - retain -> - {retain, {{SQN, ?INKT_KEYD, LK}, V, CrcCheck}}; - TagStrat -> - {TagStrat, null} - end; -compact_inkerkvc({{SQN, ?INKT_STND, LK}, V, CrcCheck}, Strategy) -> - case get_tagstrategy(LK, Strategy) of - skip -> - skip; - retain -> - {_V, KeyDeltas} = revert_value_from_journal(V), - {retain, {{SQN, ?INKT_KEYD, LK}, {null, KeyDeltas}, CrcCheck}}; - TagStrat -> - {TagStrat, null} - end. - -spec get_tagstrategy(ledger_key(), compaction_strategy()) -> skip|retain|recalc. %% @doc @@ -398,6 +394,17 @@ to_inkerkv(LedgerKey, SQN, Object, KeyChanges, PressMethod, Compress) -> create_value_for_journal({Object, KeyChanges}, Compress, PressMethod), {{SQN, InkerType, LedgerKey}, Value}. +-spec revert_to_keydeltas(journal_key(), any()) -> {journal_key(), any()}. +%% @doc +%% If we wish to retain key deltas when an object in the Journal has been +%% replaced - then this converts a Journal Key and Value into one which has no +%% object body just the key deltas. +revert_to_keydeltas({SQN, ?INKT_STND, LedgerKey}, InkerV) -> + {_V, KeyDeltas} = revert_value_from_journal(InkerV), + {{SQN, ?INKT_KEYD, LedgerKey}, {null, KeyDeltas}}; +revert_to_keydeltas(JournalKey, InkerV) -> + {JournalKey, InkerV}. + %% Used when fetching objects, so only handles standard, hashable entries from_inkerkv(Object) -> from_inkerkv(Object, false). @@ -730,44 +737,6 @@ endkey_passed_test() -> ?assertMatch(true, endkey_passed(TestKey, K2)). -general_skip_strategy_test() -> - % Confirm that we will skip if the strategy says so - TagStrat1 = compact_inkerkvc({{1, - ?INKT_STND, - {?STD_TAG, "B1", "K1andSK", null}}, - {}, - true}, - [{?STD_TAG, skip}]), - ?assertMatch(skip, TagStrat1), - TagStrat2 = compact_inkerkvc({{1, - ?INKT_KEYD, - {?STD_TAG, "B1", "K1andSK", null}}, - {}, - true}, - [{?STD_TAG, skip}]), - ?assertMatch(skip, TagStrat2), - TagStrat3 = compact_inkerkvc({{1, - ?INKT_KEYD, - {?IDX_TAG, "B1", "K1", "SK"}}, - {}, - true}, - [{?STD_TAG, skip}]), - ?assertMatch(skip, TagStrat3), - TagStrat4 = compact_inkerkvc({{1, - ?INKT_KEYD, - {?IDX_TAG, "B1", "K1", "SK"}}, - {}, - true}, - [{?STD_TAG, skip}, {?IDX_TAG, recalc}]), - ?assertMatch({recalc, null}, TagStrat4), - TagStrat5 = compact_inkerkvc({{1, - ?INKT_TOMB, - {?IDX_TAG, "B1", "K1", "SK"}}, - {}, - true}, - [{?STD_TAG, skip}, {?IDX_TAG, recalc}]), - ?assertMatch(skip, TagStrat5). - %% Test below proved that the overhead of performing hashes was trivial %% Maybe 5 microseconds per hash diff --git a/src/leveled_head.erl b/src/leveled_head.erl index 93d0330..d6ffd4f 100644 --- a/src/leveled_head.erl +++ b/src/leveled_head.erl @@ -93,22 +93,6 @@ %%% Mutable External Functions %%%============================================================================ --spec get_appdefined_function(appdefinable_function(), - fun(), - non_neg_integer()) -> fun(). -%% @doc -%% If a keylist of [{function_name, fun()}] has been set as an environment -%% variable for a tag, then this FunctionName can be used instead of the -%% default -get_appdefined_function(FunctionName, DefaultFun, RequiredArity) -> - case application:get_env(leveled, FunctionName) of - undefined -> - DefaultFun; - {ok, Fun} when is_function(Fun, RequiredArity) -> - Fun - end. - - -spec key_to_canonicalbinary(tuple()) -> binary(). %% @doc %% Convert a key to a binary in a consistent way for the tag. The binary will @@ -122,22 +106,6 @@ key_to_canonicalbinary({?RIAK_TAG, {BucketType, Bucket}, Key, SubKey}) <>, Key, SubKey}); -key_to_canonicalbinary({?HEAD_TAG, Bucket, Key, SubK}) - when is_binary(Bucket), is_binary(Key), is_binary(SubK) -> - <>; -key_to_canonicalbinary({?HEAD_TAG, Bucket, Key, null}) - when is_binary(Bucket), is_binary(Key) -> - <>; -key_to_canonicalbinary({?HEAD_TAG, {BucketType, Bucket}, Key, SubKey}) - when is_binary(BucketType), is_binary(Bucket) -> - key_to_canonicalbinary({?HEAD_TAG, - <>, - Key, - SubKey}); -key_to_canonicalbinary(Key) when element(1, Key) == ?HEAD_TAG -> - % In unit tests head specs can have non-binary keys, so handle - % this through hashing the whole key - default_key_to_canonicalbinary(Key); key_to_canonicalbinary(Key) when element(1, Key) == ?STD_TAG -> default_key_to_canonicalbinary(Key); key_to_canonicalbinary(Key) -> @@ -154,12 +122,13 @@ default_key_to_canonicalbinary(Key) -> -spec build_head(object_tag()|headonly_tag(), object_metadata()) -> head(). %% @doc %% Return the object metadata as a binary to be the "head" of the object +build_head(?HEAD_TAG, Value) -> + % Metadata is not extracted with head objects, the head response is + % just the unfiltered value that was input. + default_build_head(?HEAD_TAG, Value); build_head(?RIAK_TAG, Metadata) -> {SibData, Vclock, _Hash, _Size} = Metadata, riak_metadata_to_binary(Vclock, SibData); -build_head(?HEAD_TAG, Metadata) -> - % term_to_binary(Metadata). - default_build_head(?HEAD_TAG, Metadata); build_head(?STD_TAG, Metadata) -> default_build_head(?STD_TAG, Metadata); build_head(Tag, Metadata) -> @@ -173,7 +142,7 @@ default_build_head(_Tag, Metadata) -> Metadata. --spec extract_metadata(object_tag()|headonly_tag(), non_neg_integer(), any()) +-spec extract_metadata(object_tag(), non_neg_integer(), any()) -> {object_metadata(), list(erlang:timestamp())}. %% @doc %% Take the inbound object and extract from it the metadata to be stored within @@ -187,10 +156,10 @@ default_build_head(_Tag, Metadata) -> %% The Object Size passed in to this function is as calculated when writing %% the object to the Journal. It may be recalculated here, if an alternative %% view of size is required within the header +%% +%% Note objects with a ?HEAD_TAG should never be passed, as there is no extract_metadata(?RIAK_TAG, SizeAsStoredInJournal, RiakObj) -> riak_extract_metadata(RiakObj, SizeAsStoredInJournal); -extract_metadata(?HEAD_TAG, SizeAsStoredInJournal, Obj) -> - {{standard_hash(Obj), SizeAsStoredInJournal}, []}; extract_metadata(?STD_TAG, SizeAsStoredInJournal, Obj) -> default_extract_metadata(?STD_TAG, SizeAsStoredInJournal, Obj); extract_metadata(Tag, SizeAsStoredInJournal, Obj) -> @@ -251,6 +220,26 @@ get_hash(_Tag, ObjectMetadata) -> standard_hash(Obj) -> erlang:phash2(term_to_binary(Obj)). + +%%%============================================================================ +%%% Handling Override Functions +%%%============================================================================ + +-spec get_appdefined_function(appdefinable_function(), + fun(), + non_neg_integer()) -> fun(). +%% @doc +%% If a keylist of [{function_name, fun()}] has been set as an environment +%% variable for a tag, then this FunctionName can be used instead of the +%% default +get_appdefined_function(FunctionName, DefaultFun, RequiredArity) -> + case application:get_env(leveled, FunctionName) of + undefined -> + DefaultFun; + {ok, Fun} when is_function(Fun, RequiredArity) -> + Fun + end. + %%%============================================================================ %%% Tag-specific Internal Functions %%%============================================================================ diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 11ca1ba..957f3cb 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -654,29 +654,56 @@ split_positions_into_batches(Positions, Journal, Batches) -> Batches ++ [{Journal, ThisBatch}]). +%% @doc +%% For the Keys and values taken from the Journal file, which are required +%% in the compacted journal file. To be required, they must still be active +%% (i.e. be the current SQN for that LedgerKey in the Ledger). However, if +%% it is not active, we still need to retain some information if for this +%% object tag we want to be able to rebuild the KeyStore by relaoding the +%% KeyDeltas (the retain reload strategy) +%% +%% If the reload strategy is recalc, we assume that we can reload by +%% recalculating the KeyChanges by looking at the object when we reload. So +%% old objects can be discarded. +%% +%% If the strategy is skip, we don't care about KeyDeltas. Note though, that +%% if the ledger is deleted it may not be possible to safely rebuild a KeyStore +%% if it contains index entries. The hot_backup approach is also not safe with +%% a `skip` strategy. filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) -> - lists:foldl(fun(KVC0, Acc) -> - R = leveled_codec:compact_inkerkvc(KVC0, ReloadStrategy), - case R of - skip -> - Acc; - {TStrat, KVC1} -> - {K, _V, CrcCheck} = KVC0, - {SQN, LedgerKey} = leveled_codec:from_journalkey(K), - KeyValid = FilterFun(FilterServer, LedgerKey, SQN), - case {KeyValid, CrcCheck, SQN > MaxSQN, TStrat} of - {false, true, false, retain} -> - Acc ++ [KVC1]; - {false, true, false, _} -> - Acc; - _ -> - Acc ++ [KVC0] - end + FoldFun = + fun(KVC0, Acc) -> + case KVC0 of + {_InkKey, crc_wonky, false} -> + % Bad entry, disregard, don't check + Acc; + {JK, JV, _Check} -> + {SQN, LK} = + leveled_codec:from_journalkey(JK), + CompactStrategy = + leveled_codec:get_tagstrategy(LK, ReloadStrategy), + KeyValid = FilterFun(FilterServer, LK, SQN), + IsInMemory = SQN > MaxSQN, + case {KeyValid or IsInMemory, CompactStrategy} of + {true, _} -> + % This entry may still be required regardless of + % strategy + [KVC0|Acc]; + {false, retain} -> + % If we have a retain startegy, it can't be + % discarded - but the value part is no longer + % required as this version has been replaced + {JK0, JV0} = + leveled_codec:revert_to_keydeltas(JK, JV), + [{JK0, JV0, null}|Acc]; + {false, _} -> + % This is out of date and not retained - discard + Acc end - end, - [], - KVCs). - + end + end, + lists:reverse(lists:foldl(FoldFun, [], KVCs)). + write_values([], _CDBopts, Journal0, ManSlice0, _PressMethod) -> {Journal0, ManSlice0}; From 3ff51c000c504c6ab34528b14c2b71116f7947c8 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 6 Dec 2018 22:55:00 +0000 Subject: [PATCH 4/7] Typo --- src/leveled_codec.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 977bcaa..f39e38b 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -172,7 +172,7 @@ segment_hash(Key) when is_binary(Key) -> {SegmentID, ExtraHash}; segment_hash(KeyTuple) when is_tuple(KeyTuple) -> BinKey = - case element(1, Keytuple) of + case element(1, KeyTuple) of ?HEAD_TAG -> headkey_to_canonicalbinary(KeyTuple); _ -> From cee5a60cebe8856d5f704cdac627b8304b90a0d3 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Fri, 7 Dec 2018 00:48:42 +0000 Subject: [PATCH 5/7] Protect against bad journal keys in Ledger --- src/leveled_penciller.erl | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index ca94734..fe30e71 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -300,6 +300,7 @@ :: {pos_integer(), list(leveled_codec:ledger_kv()|leveled_sst:expandable_pointer())}. -type iterator() :: list(iterator_entry()). +-type bad_ledgerkey() :: list(). %%%============================================================================ %%% API @@ -472,7 +473,7 @@ pcl_fetchnextkey(Pid, StartKey, EndKey, AccFun, InitAcc) -> infinity). -spec pcl_checksequencenumber(pid(), - leveled_codec:ledger_key(), + leveled_codec:ledger_key()|bad_ledgerkey(), integer()) -> boolean(). %% @doc %% Check if the sequence number of the passed key is not replaced by a change @@ -482,10 +483,18 @@ pcl_fetchnextkey(Pid, StartKey, EndKey, AccFun, InitAcc) -> %% If the key is not present, it will be assumed that a higher sequence number %% tombstone once existed, and false will be returned. pcl_checksequencenumber(Pid, Key, SQN) -> - Hash = leveled_codec:segment_hash(Key), - if - Hash /= no_lookup -> - gen_server:call(Pid, {check_sqn, Key, Hash, SQN}, infinity) + try + Hash = leveled_codec:segment_hash(Key), + if + Hash /= no_lookup -> + gen_server:call(Pid, {check_sqn, Key, Hash, SQN}, infinity) + end + catch + % Can't let this crash here, as when journal files are corrupted, + % corrupted input might be received by the penciller for this check. + % Want to be able to compact away this corruption - not end up with + % perpetually failing compaction jobs + _Type:_Error -> false end. -spec pcl_workforclerk(pid()) -> ok. @@ -2003,6 +2012,17 @@ simple_server_test() -> "Key0004", null}, 3004)), + + % Try a busted key - and get false, as the exception should be handled + % Mimics a bad ledger key being discovered in the Journal, want to get + % false rather than just crashing. + ?assertMatch(false, pcl_checksequencenumber(PclSnap, + [o, + "Bucket0004", + "Key0004", + null], + 3004)), + % Add some more keys and confirm that check sequence number still % sees the old version in the previous snapshot, but will see the new % version in a new snapshot @@ -2299,6 +2319,7 @@ handle_down_test() -> pcl_close(PCLr), clean_testdir(RootPath). + %% the fake bookie. Some calls to leveled_bookie (like the two below) %% do not go via the gen_server (but it looks like they expect to be %% called by the gen_server, internally!) they use "self()" to From 714e128df85fd9c557a448719a1ed64a747f1d35 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Fri, 7 Dec 2018 09:07:22 +0000 Subject: [PATCH 6/7] Tidy up protecting against corrupt Keys this was previously not na issue as leveled_codec:segment_hash/1 would handle anyhting that could be hashed. This now has to be a tuple, and one with a first element - so corrupted tuples are failing. Add a guard chekcing for a corrupted tuple, but we only need this when doing journal compaction. Change user_defined keys to be `retain` as a tag strategy --- src/leveled_bookie.erl | 2 +- src/leveled_codec.erl | 22 ++++++++++++++++++++-- src/leveled_inker.erl | 27 ++++++++++++++++++++++++++- src/leveled_penciller.erl | 26 ++++---------------------- 4 files changed, 51 insertions(+), 26 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 1333eed..0550436 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -460,7 +460,7 @@ book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag) -> leveled_codec:index_specs(), leveled_codec:tag(), infinity|integer()) -> ok|pause. -book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) -> +book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) when is_atom(Tag) -> gen_server:call(Pid, {put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, infinity). diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index f39e38b..b749302 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -28,6 +28,7 @@ to_ledgerkey/5, from_ledgerkey/1, from_ledgerkey/2, + isvalid_ledgerkey/1, to_inkerkey/2, to_inkerkv/6, from_inkerkv/1, @@ -53,7 +54,7 @@ -define(NRT_IDX, "$aae."). -type tag() :: - leveled_head:object_tag()|?IDX_TAG|?HEAD_TAG. + leveled_head:object_tag()|?IDX_TAG|?HEAD_TAG|atom(). -type key() :: binary()|string()|{binary(), binary()}. % Keys SHOULD be binary() @@ -325,6 +326,15 @@ to_ledgerkey(Bucket, {Key, SubKey}, ?HEAD_TAG) -> to_ledgerkey(Bucket, Key, Tag) -> {Tag, Bucket, Key, null}. +%% No spec - due to tests +%% @doc +%% Check that the ledgerkey is a valid format, to handle un-checksummed keys +%% that may be returned corrupted (such as from the Journal) +isvalid_ledgerkey({Tag, _B, _K, _SK}) -> + is_atom(Tag); +isvalid_ledgerkey(_LK) -> + false. + -spec endkey_passed(ledger_key(), ledger_key()) -> boolean(). %% @oc %% Compare a key against a query key, only comparing elements that are non-null @@ -370,7 +380,7 @@ get_tagstrategy({Tag, _, _, _}, Strategy) -> TagStrat; false -> leveled_log:log("IC012", [Tag, Strategy]), - skip + retain end. %%%============================================================================ @@ -713,6 +723,14 @@ next_key({Type, Bucket}) when is_binary(Type), is_binary(Bucket) -> -ifdef(TEST). +valid_ledgerkey_test() -> + UserDefTag = {user_defined, <<"B">>, <<"K">>, null}, + ?assertMatch(true, isvalid_ledgerkey(UserDefTag)), + KeyNotTuple = [?STD_TAG, <<"B">>, <<"K">>, null], + ?assertMatch(false, isvalid_ledgerkey(KeyNotTuple)), + TagNotAtom = {"tag", <<"B">>, <<"K">>, null}, + ?assertMatch(false, isvalid_ledgerkey(TagNotAtom)), + ?assertMatch(retain, get_tagstrategy(UserDefTag, inker_reload_strategy([]))). indexspecs_test() -> IndexSpecs = [{add, "t1_int", 456}, diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 4b693c9..0a758cf 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -357,7 +357,8 @@ ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> ink_compactjournal(Pid, Bookie, Timeout) -> CheckerInitiateFun = fun initiate_penciller_snapshot/1, CheckerCloseFun = fun leveled_penciller:pcl_close/1, - CheckerFilterFun = fun leveled_penciller:pcl_checksequencenumber/3, + CheckerFilterFun = + wrap_checkfilterfun(fun leveled_penciller:pcl_checksequencenumber/3), gen_server:call(Pid, {compact, Bookie, @@ -1185,6 +1186,20 @@ initiate_penciller_snapshot(Bookie) -> MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap), {LedgerSnap, MaxSQN}. + +-spec wrap_checkfilterfun(fun()) -> fun(). +%% @doc +%% Make a check of the validity of the key being passed into the CheckFilterFun +wrap_checkfilterfun(CheckFilterFun) -> + fun(Pcl, LK, SQN) -> + case leveled_codec:isvalid_ledgerkey(LK) of + true -> + CheckFilterFun(Pcl, LK, SQN); + false -> + false + end + end. + %%%============================================================================ %%% Test %%%============================================================================ @@ -1438,6 +1453,16 @@ empty_manifest_test() -> ink_close(Ink2), clean_testdir(RootPath). + +wrapper_test() -> + KeyNotTuple = [?STD_TAG, <<"B">>, <<"K">>, null], + TagNotAtom = {"tag", <<"B">>, <<"K">>, null}, + CheckFilterFun = fun(_Pcl, _LK, _SQN) -> true end, + WrappedFun = wrap_checkfilterfun(CheckFilterFun), + ?assertMatch(false, WrappedFun(null, KeyNotTuple, 1)), + ?assertMatch(false, WrappedFun(null, TagNotAtom, 1)). + + coverage_cheat_test() -> {noreply, _State0} = handle_info(timeout, #state{}), {ok, _State1} = code_change(null, #state{}, null). diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index fe30e71..076b198 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -483,18 +483,10 @@ pcl_fetchnextkey(Pid, StartKey, EndKey, AccFun, InitAcc) -> %% If the key is not present, it will be assumed that a higher sequence number %% tombstone once existed, and false will be returned. pcl_checksequencenumber(Pid, Key, SQN) -> - try - Hash = leveled_codec:segment_hash(Key), - if - Hash /= no_lookup -> - gen_server:call(Pid, {check_sqn, Key, Hash, SQN}, infinity) - end - catch - % Can't let this crash here, as when journal files are corrupted, - % corrupted input might be received by the penciller for this check. - % Want to be able to compact away this corruption - not end up with - % perpetually failing compaction jobs - _Type:_Error -> false + Hash = leveled_codec:segment_hash(Key), + if + Hash /= no_lookup -> + gen_server:call(Pid, {check_sqn, Key, Hash, SQN}, infinity) end. -spec pcl_workforclerk(pid()) -> ok. @@ -2012,16 +2004,6 @@ simple_server_test() -> "Key0004", null}, 3004)), - - % Try a busted key - and get false, as the exception should be handled - % Mimics a bad ledger key being discovered in the Journal, want to get - % false rather than just crashing. - ?assertMatch(false, pcl_checksequencenumber(PclSnap, - [o, - "Bucket0004", - "Key0004", - null], - 3004)), % Add some more keys and confirm that check sequence number still % sees the old version in the previous snapshot, but will see the new From fe177f306d54770e9fa871ddac3680a5b9065afc Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Fri, 7 Dec 2018 11:42:59 +0000 Subject: [PATCH 7/7] Update docs --- README.md | 61 ++++++++++------------------------------- docs/DESIGN.md | 4 ++- docs/STARTUP_OPTIONS.md | 20 +++++++++++--- 3 files changed, 34 insertions(+), 51 deletions(-) diff --git a/README.md b/README.md index 0a0adda..3d54494 100644 --- a/README.md +++ b/README.md @@ -2,17 +2,19 @@ ## Introduction -Leveled is a work-in-progress prototype of a simple Key-Value store based on the concept of Log-Structured Merge Trees, with the following characteristics: +Leveled is a simple Key-Value store based on the concept of Log-Structured Merge Trees, with the following characteristics: - Optimised for workloads with larger values (e.g. > 4KB). -- Explicitly supports HEAD requests in addition to GET requests. - - Splits the storage of value between keys/metadata and body, +- Explicitly supports HEAD requests in addition to GET requests: + - Splits the storage of value between keys/metadata and body (assuming some definition of metadata is provided); + - Allows for the application to define what constitutes object metadata and what constitutes the body (value-part) of the object - and assign tags to objects to manage multiple object-types with different extract rules; - Stores keys/metadata in a merge tree and the full object in a journal of [CDB files](https://en.wikipedia.org/wiki/Cdb_(software)) - - allowing for HEAD requests which have lower overheads than GET requests, and - - queries which traverse keys/metadatas to be supported with fewer side effects on the page cache. + - allowing for HEAD requests which have lower overheads than GET requests; and + - queries which traverse keys/metadatas to be supported with fewer side effects on the page cache than folds over keys/objects. - Support for tagging of object types and the implementation of alternative store behaviour based on type. + - Allows for changes to extract specific information as metadata to be returned from HEAD requests; - Potentially usable for objects with special retention or merge properties. - Support for low-cost clones without locking to provide for scanning queries (e.g. secondary indexes). @@ -24,6 +26,10 @@ The store has been developed with a focus on being a potential backend to a R An optimised version of Riak KV has been produced in parallel which will exploit the availability of HEAD requests (to access object metadata including version vectors), where a full GET is not required. This, along with reduced write amplification when compared to leveldb, is expected to offer significant improvement in the volume and predictability of throughput for workloads with larger (> 4KB) object sizes, as well as reduced tail latency. +There may be more general uses of Leveled, with the following caveats: + - Leveled should be extended to define new tags that specify what metadata is to be extracted for the inserted objects (or to override the behaviour for the ?STD_TAG). Without this, there will be limited scope to take advantage of the relative efficiency of HEAD and FOLD_HEAD requests. + - If objects are small, the [`head_only` mode](docs/STARTUP_OPTIONS.md#head-only) may be used, which will cease separation of object body from header and use the Key/Metadata store as the only long-term persisted store. In this mode all of the object is treated as Metadata, and the behaviour is closer to that of the leveldb LSM-tree, although with higher median latency. + ## More Details For more details on the store: @@ -71,28 +77,6 @@ More information can be found in the [volume testing section](docs/VOLUME.md). As a general rule though, the most interesting thing is the potential to enable [new features](docs/FUTURE.md). The tagging of different object types, with an ability to set different rules for both compaction and metadata creation by tag, is a potential enabler for further change. Further, having a separate key/metadata store which can be scanned without breaking the page cache or working against mitigation for write amplifications, is also potentially an enabler to offer features to both the developer and the operator. -## Next Steps - -Further volume test scenarios are the immediate priority, in particular volume test scenarios with: - -- Significant use of secondary indexes; - -- Use of newly available [EC2 hardware](https://aws.amazon.com/about-aws/whats-new/2017/02/now-available-amazon-ec2-i3-instances-next-generation-storage-optimized-high-i-o-instances/) which potentially is a significant changes to assumptions about hardware efficiency and cost. - -- Create riak_test tests for new Riak features enabled by leveled. - -However a number of other changes are planned in the next month to (my branch of) riak_kv to better use leveled: - -- Support for rapid rebuild of hashtrees - -- Fixes to [priority issues](https://github.com/martinsumner/leveled/issues) - -- Experiments with flexible sync on write settings - -- A cleaner and easier build of Riak with leveled included, including cuttlefish configuration support - -More information can be found in the [future section](docs/FUTURE.md). - ## Feedback Please create an issue if you have any suggestions. You can ping me @masleeds if you wish @@ -104,28 +88,14 @@ Unit and current tests in leveled should run with rebar3. Leveled has been test A new database can be started by running ``` -{ok, Bookie} = leveled_bookie:book_start(RootPath, LedgerCacheSize, JournalSize, SyncStrategy) +{ok, Bookie} = leveled_bookie:book_start(StartupOptions) ``` -This will start a new Bookie. It will start and look for existing data files, under the RootPath, and start empty if none exist. A LedgerCacheSize of `2000`, a JournalSize of `500000000` (500MB) and a SyncStrategy of `none` should work OK. Further information on startup options can be found [here](docs/STARTUP_OPTIONS.md). +This will start a new Bookie. It will start and look for existing data files, under the RootPath, and start empty if none exist. Further information on startup options can be found here [here](docs/STARTUP_OPTIONS.md). The book_start method should respond once startup is complete. The [leveled_bookie module](src/leveled_bookie.erl) includes the full API for external use of the store. -It should run anywhere that OTP will run - it has been tested on Ubuntu 14, MAC OS X and Windows 10. - -Running in Riak requires one of the branches of riak_kv referenced [here](docs/FUTURE.md). There is a [Riak branch](https://github.com/martinsumner/riak/tree/mas-leveleddb) intended to support the automatic build of this, and the configuration via cuttlefish. However, the auto-build fails due to other dependencies (e.g. riak_search) bringing in an alternative version of riak_kv, and the configuration via cuttlefish is broken for reasons unknown. - -Building this from source as part of Riak will require a bit of fiddling around. - -- clone and build [riak](https://github.com/martinsumner/riak/tree/mas-leveleddb) -- cd deps -- rm -rf riak_kv -- git clone -b mas-leveled-putfsm --single-branch https://github.com/martinsumner/riak_kv.git -- cd .. -- make rel -- remember to set the storage backend to leveled in riak.conf - -To help with the breakdown of cuttlefish, leveled parameters can be set via riak_kv/include/riak_kv_leveled.hrl - although a new make will be required for these changes to take effect. +Running in Riak requires Riak 2.9 or beyond, which is available from January 2019. ### Contributing @@ -136,5 +106,4 @@ ct with 100% coverage. To have rebar3 execute the full set of tests, run: - rebar3 as test do cover --reset, eunit --cover, ct --cover, cover --verbose - + `rebar3 as test do cover --reset, eunit --cover, ct --cover, cover --verbose` diff --git a/docs/DESIGN.md b/docs/DESIGN.md index 4440dd6..8e8c142 100644 --- a/docs/DESIGN.md +++ b/docs/DESIGN.md @@ -82,6 +82,8 @@ Three types are initially supported: All Ledger Keys created for any type must be 4-tuples starting with the tag. Abstraction with regards to types is currently imperfect, but the expectation is that these types will make support for application specific behaviours easier to achieve, such as behaviours which maybe required to support different [CRDTs](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type). +Currently user-defined tags are supported as an experimental feature along with the ability to override the function which controls how metadata is split from the object value. Good choice of metadata is important to ensure the improved efficiency of folds over heads (as opposed to folds over objects), and the use of HEAD requests (as opposed to GET requests), can be exploited by applications using leveled. + ## GET/PUT Paths The PUT path for new objects and object changes depends on the Bookie interacting with the Inker to ensure that the change has been persisted with the Journal, the Ledger is updated in batches after the PUT has been completed. @@ -128,7 +130,7 @@ Backups are taken of the Journal only, as the Ledger can be recreated on startu The backup uses hard-links, so at the point the backup is taken, there will be a minimal change to the on-disk footprint of the store. However, as journal compaction is run, the hard-links will prevent space from getting released by the dropping of replaced journal files - so backups will cause the size of the store to grow faster than it would otherwise do. It is an operator responsibility to garbage collect old backups, to prevent this growth from being an issue. -As backups depend on hard-links, they cannot be taken with a `BackupPath` on a different file system to the standard data path. The move a backup across to a different file system, standard tools should be used such as rsync. The leveled backups should be relatively friendly for rsync-like delta-based backup approaches due to significantly lower write amplification when compared to other LSM stores (e.g. leveldb). +As backups depend on hard-links, they cannot be taken with a `BackupPath` on a different file system to the standard data path. The move a backup across to a different file system, standard tools should be used such as rsync. The leveled backups should be relatively friendly for rsync-like delta-based backup approaches due to significantly lower write amplification when compared to other LSM stores (e.g. leveldb). ## Head only diff --git a/docs/STARTUP_OPTIONS.md b/docs/STARTUP_OPTIONS.md index a8d7998..b946385 100644 --- a/docs/STARTUP_OPTIONS.md +++ b/docs/STARTUP_OPTIONS.md @@ -22,6 +22,18 @@ There is no stats facility within leveled, the stats are only available from the The `forced_logs` option will force a particular log reference to be logged regardless of the log level that has been set. This can be used to log at a higher level than `info`, whilst allowing for specific logs to still be logged out, such as logs providing sample performance statistics. +## User-Defined Tags + +There are 2 primary object tags - ?STD_TAG (o) which is the default, and ?RIAK_TAG (o_rkv). Objects PUT into the store with different tags may have different behaviours in leveled. + +The differences between tags are encapsulated within the `leveled_head` module. The primary difference of interest is the alternative handling within the function `extract_metadata/3`. Significant efficiency can be gained in leveled (as opposed to other LSM-stores) through using book_head requests when book_get would otherwise be necessary. If 80% of the requests are interested in less than 20% of the information within an object, then having that 20% in the object metadata and switching fetch requests to the book_head API, will improve efficiency. Also folds over heads are much more efficient that folds over objects, so significant improvements can be also be made within folds by having the right information within the metadata. + +To make use of this efficiency, metadata needs to be extracted on PUT, and made into leveled object metadata. For the ?RIAK_TAG this work is within the `leveled_head` module. If an application wants to control this behaviour for its application, then a tag can be created, and the `leveled_head` module updated. However, it is also possible to have more dynamic definitions for handling of application-defined tags, by passing in alternative versions of one or more of the functions `extract_metadata/3`, `build_head/1` and `key_to_canonicalbinary/1` on start-up. These functions will be applied to user-defined tags (but will not override the behaviour for pre-defined tags). + +The startup option `override_functions` can be used to manage this override. [This test](../test/end_to_end/appdefined_SUITE.erl) provides a simple example of using override_functions. + +This option is currently experimental. Issues such as versioning, and handling a failure to consistently start a store with the same override_functions, should be handled by the application. + ## Max Journal Size The maximum size of an individual Journal file can be set using `{max_journalsize, integer()}`, which sets the size in bytes. The default value is 1,000,000,000 (~1GB). The maximum size, which cannot be exceed is `2^32`. It is not expected that the Journal Size should normally set to lower than 100 MB, it should be sized to hold many thousands of objects at least. @@ -61,13 +73,13 @@ The purpose of the reload strategy is to define the behaviour at compaction of t By default nothing is compacted from the Journal if the SQN of the Journal entry is greater than the largest sequence number which has been persisted in the Ledger. So when an object is compacted in the Journal (as it has been replaced), it should not need to be replayed from the Journal into the Ledger in the future - as it, and all its related key changes, have already been persisted to the Ledger. -However, what if the Ledger had been erased? This could happen due to some corruption, or perhaps because only the Journal is to be backed up. As the object has been replaced, the value is not required - however KeyChanges ay be required (such as indexes which are built incrementally across a series of object changes). So to revert the indexes to their previous state the Key Changes would need to be retained in this case, so the indexes in the Ledger would be correctly rebuilt. +However, what if the Ledger had been erased? This could happen due to some corruption, or perhaps because only the Journal is to be backed up. As the object has been replaced, the value is not required - however KeyChanges may be required (such as indexes which are built incrementally across a series of object changes). So to revert the indexes to their previous state the Key Changes would need to be retained in this case, so the indexes in the Ledger would be correctly rebuilt. The are three potential strategies: -`skip` - don't worry about this scenario, require the Ledger to be backed up; -`retain` - discard the object itself on compaction but keep the key changes; -`recalc` - recalculate the indexes on reload by comparing the information on the object with the current state of the Ledger (as would be required by the PUT process when comparing IndexSpecs at PUT time). + - `skip` - don't worry about this scenario, require the Ledger to be backed up; + - `retain` - discard the object itself on compaction but keep the key changes; + - `recalc` - recalculate the indexes on reload by comparing the information on the object with the current state of the Ledger (as would be required by the PUT process when comparing IndexSpecs at PUT time). There is no code for `recalc` at present it is simply a logical possibility. So to set a reload strategy there should be an entry like `{reload_strategy, [{TagName, skip|retain}]}`. By default tags are pre-set to `retain`. If there is no need to handle a corrupted Ledger, then all tags could be set to `skip`.