From 7eb5a16899498fee926b4ebf2b16ff9dcedb410e Mon Sep 17 00:00:00 2001 From: martinsumner Date: Fri, 14 Oct 2016 18:43:16 +0100 Subject: [PATCH] Supporting Tags - Improving abstraction between Riak and non-Riak workloads The object tag "o" which was taken from eleveldb has been an extended to allow for specific functions to be triggered for different object types, in particular when extracting metadata for stroing in the Ledger. There is now a riak tag (o_rkv@v1), and in theory other tags can be added and used, as long as their is an appropriate set of functions in the leveled_codec. --- include/leveled.hrl | 4 +- src/leveled_bookie.erl | 97 +++++++++++------- src/leveled_cdb.erl | 5 +- src/leveled_codec.erl | 176 ++++++++++++++++++++++---------- src/leveled_sft.erl | 2 +- test/end_to_end/basic_SUITE.erl | 2 +- 6 files changed, 191 insertions(+), 95 deletions(-) diff --git a/include/leveled.hrl b/include/leveled.hrl index cd82d2a..e421500 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -51,8 +51,6 @@ max_run_length :: integer(), cdb_options :: #cdb_options{}}). -%% Temp location for records related to riak - -record(r_content, { metadata, value :: term() @@ -65,4 +63,4 @@ vclock, updatemetadata=dict:store(clean, true, dict:new()), updatevalue :: term()}). - \ No newline at end of file + \ No newline at end of file diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index ecf5f81..28b628d 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -71,11 +71,11 @@ %% The Bookie should generate a series of ledger key changes from this %% information, using a function passed in at startup. For Riak this will be %% of the form: -%% {{o, Bucket, Key, SubKey|null}, +%% {{o_rkv@v1, Bucket, Key, SubKey|null}, %% SQN, %% {Hash, Size, {Riak_Metadata}}, %% {active, TS}|{tomb, TS}} or -%% {{i, Bucket, IndexTerm, IndexField, Key}, +%% {{i, Bucket, {IndexTerm, IndexField}, Key}, %% SQN, %% null, %% {active, TS}|{tomb, TS}} @@ -136,6 +136,9 @@ book_riakput/3, book_riakget/3, book_riakhead/3, + book_put/5, + book_get/3, + book_head/3, book_returnfolder/2, book_snapshotstore/3, book_snapshotledger/3, @@ -167,17 +170,33 @@ book_start(Opts) -> gen_server:start(?MODULE, [Opts], []). -book_riakput(Pid, Object, IndexSpecs) -> - PrimaryKey = {o, Object#r_object.bucket, Object#r_object.key, null}, - gen_server:call(Pid, {put, PrimaryKey, Object, IndexSpecs}, infinity). +book_riakput(Pid, RiakObject, IndexSpecs) -> + {Bucket, Key} = leveled_codec:riakto_keydetails(RiakObject), + book_put(Pid, Bucket, Key, RiakObject, IndexSpecs, o_rkv@v1). + +book_put(Pid, Bucket, Key, Object, IndexSpecs) -> + book_put(Pid, Bucket, Key, Object, IndexSpecs, o). book_riakget(Pid, Bucket, Key) -> - PrimaryKey = {o, Bucket, Key, null}, - gen_server:call(Pid, {get, PrimaryKey}, infinity). + book_get(Pid, Bucket, Key, o_rkv@v1). + +book_get(Pid, Bucket, Key) -> + book_get(Pid, Bucket, Key, o). book_riakhead(Pid, Bucket, Key) -> - PrimaryKey = {o, Bucket, Key, null}, - gen_server:call(Pid, {head, PrimaryKey}, infinity). + book_head(Pid, Bucket, Key, o_rkv@v1). + +book_head(Pid, Bucket, Key) -> + book_head(Pid, Bucket, Key, o). + +book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag) -> + gen_server:call(Pid, {put, Bucket, Key, Object, IndexSpecs, Tag}, infinity). + +book_get(Pid, Bucket, Key, Tag) -> + gen_server:call(Pid, {get, Bucket, Key, Tag}, infinity). + +book_head(Pid, Bucket, Key, Tag) -> + gen_server:call(Pid, {head, Bucket, Key, Tag}, infinity). book_returnfolder(Pid, FolderType) -> gen_server:call(Pid, {return_folder, FolderType}, infinity). @@ -231,12 +250,13 @@ init([Opts]) -> end. -handle_call({put, PrimaryKey, Object, IndexSpecs}, From, State) -> +handle_call({put, Bucket, Key, Object, IndexSpecs, Tag}, From, State) -> + LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), {ok, SQN, ObjSize} = leveled_inker:ink_put(State#state.inker, - PrimaryKey, + LedgerKey, Object, IndexSpecs), - Changes = preparefor_ledgercache(PrimaryKey, + Changes = preparefor_ledgercache(LedgerKey, SQN, Object, ObjSize, @@ -251,8 +271,11 @@ handle_call({put, PrimaryKey, Object, IndexSpecs}, From, State) -> {pause, NewCache} -> {noreply, State#state{ledger_cache=NewCache, back_pressure=true}} end; -handle_call({get, Key}, _From, State) -> - case fetch_head(Key, State#state.penciller, State#state.ledger_cache) of +handle_call({get, Bucket, Key, Tag}, _From, State) -> + LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), + case fetch_head(LedgerKey, + State#state.penciller, + State#state.ledger_cache) of not_present -> {reply, not_found, State}; Head -> @@ -261,7 +284,7 @@ handle_call({get, Key}, _From, State) -> {tomb, _} -> {reply, not_found, State}; {active, _} -> - case fetch_value(Key, Seqn, State#state.inker) of + case fetch_value(LedgerKey, Seqn, State#state.inker) of not_present -> {reply, not_found, State}; Object -> @@ -269,8 +292,11 @@ handle_call({get, Key}, _From, State) -> end end end; -handle_call({head, Key}, _From, State) -> - case fetch_head(Key, State#state.penciller, State#state.ledger_cache) of +handle_call({head, Bucket, Key, Tag}, _From, State) -> + LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), + case fetch_head(LedgerKey, + State#state.penciller, + State#state.ledger_cache) of not_present -> {reply, not_found, State}; Head -> @@ -279,7 +305,7 @@ handle_call({head, Key}, _From, State) -> {tomb, _} -> {reply, not_found, State}; {active, _} -> - OMD = leveled_codec:build_metadata_object(Key, MD), + OMD = leveled_codec:build_metadata_object(LedgerKey, MD), {reply, {ok, OMD}, State} end end; @@ -308,11 +334,12 @@ handle_call({snapshot, _Requestor, SnapType, _Timeout}, _From, State) -> end; handle_call({return_folder, FolderType}, _From, State) -> case FolderType of - {bucket_stats, Bucket} -> + {riakbucket_stats, Bucket} -> {reply, bucket_stats(State#state.penciller, State#state.ledger_cache, - Bucket), + Bucket, + o_rkv@v1), State} end; handle_call({compact_journal, Timeout}, _From, State) -> @@ -351,7 +378,7 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================ -bucket_stats(Penciller, LedgerCache, Bucket) -> +bucket_stats(Penciller, LedgerCache, Bucket, Tag) -> PCLopts = #penciller_options{start_snapshot=true, source_penciller=Penciller}, {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), @@ -361,8 +388,8 @@ bucket_stats(Penciller, LedgerCache, Bucket) -> [length(Increment)]), ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, {infinity, Increment}), - StartKey = {o, Bucket, null, null}, - EndKey = {o, Bucket, null, null}, + StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), + EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, StartKey, EndKey, @@ -433,21 +460,16 @@ fetch_value(Key, SQN, Inker) -> not_present end. - -accumulate_size(_Key, Value, {Size, Count}) -> - {_, _, MD} = Value, - {_, _, _, ObjSize} = MD, - {Size + ObjSize, Count + 1}. - - +accumulate_size(Key, Value, {Size, Count}) -> + {Size + leveled_codec:get_size(Key, Value), Count + 1}. preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs) -> - PrimaryChange = {PK, - {SQN, - {active, infinity}, - leveled_codec:extract_metadata(Obj, Size)}}, - SecChanges = leveled_codec:convert_indexspecs(IndexSpecs, SQN, PK), - [PrimaryChange] ++ SecChanges. + {Bucket, Key, PrimaryChange} = leveled_codec:generate_ledgerkv(PK, + SQN, + Obj, + Size), + ConvSpecs = leveled_codec:convert_indexspecs(IndexSpecs, Bucket, Key, SQN), + [PrimaryChange] ++ ConvSpecs. addto_ledgercache(Changes, Cache) -> lists:foldl(fun({K, V}, Acc) -> gb_trees:enter(K, V, Acc) end, @@ -511,6 +533,9 @@ reset_filestructure() -> leveled_penciller:clean_testdir(RootPath ++ "/" ++ ?LEDGER_FP), RootPath. + + + generate_multiple_objects(Count, KeyNumber) -> generate_multiple_objects(Count, KeyNumber, []). diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index e49138a..d368896 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -1655,7 +1655,10 @@ find_lastkey_test() -> {ok, F2} = cdb_complete(P2), {ok, P3} = cdb_open_reader(F2), ?assertMatch("Key2", cdb_lastkey(P3)), - ok = cdb_close(P3), + {ok, _FN} = cdb_complete(P3), + {ok, P4} = cdb_open_reader(F2), + ?assertMatch("Key2", cdb_lastkey(P4)), + ok = cdb_close(P4), ok = file:delete("../test/lastkey.cdb"). get_keys_byposition_simple_test() -> diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 6ae0f6a..5a384e8 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -1,15 +1,35 @@ %% -------- Key Codec --------- %% -%% Functions for manipulating keys and values within leveled. These are -%% currently static functions, they cannot be overridden in the store other -%% than by changing them here. The formats are focused on the problem of -%% supporting Riak KV +%% Functions for manipulating keys and values within leveled. +%% +%% +%% Within the LEDGER: +%% Keys are of the form - +%% {Tag, Bucket, Key, SubKey|null} +%% Values are of the form +%% {SQN, Status, MD} +%% +%% Within the JOURNAL: +%% Keys are of the form - +%% {SQN, LedgerKey} +%% Values are of the form +%% {Object, IndexSpecs} (as a binary) +%% +%% IndexSpecs are of the form of a Ledger Key/Value +%% +%% Tags need to be set during PUT operations and each Tag used must be +%% supported in an extract_metadata and a build_metadata_object function clause +%% +%% Currently the only tags supported are: +%% - o (standard objects) +%% - o_rkv@v1 (riak objects) +%% - i (index entries) + -module(leveled_codec). -include("../include/leveled.hrl"). - -include_lib("eunit/include/eunit.hrl"). -export([strip_to_keyonly/1, @@ -21,15 +41,20 @@ endkey_passed/2, key_dominates/2, print_key/1, - extract_metadata/2, + to_ledgerkey/3, build_metadata_object/2, - convert_indexspecs/3]). - + generate_ledgerkv/4, + generate_ledgerkv/5, + get_size/2, + convert_indexspecs/4, + riakto_keydetails/1]). + + strip_to_keyonly({keyonly, K}) -> K; strip_to_keyonly({K, _V}) -> K. -strip_to_keyseqonly({K, {SeqN, _, _}}) -> {K, SeqN}. +strip_to_keyseqonly({K, {SeqN, _, _ }}) -> {K, SeqN}. strip_to_keyseqstatusonly({K, {SeqN, St, _MD}}) -> {K, SeqN, St}. @@ -53,56 +78,19 @@ key_dominates(LeftKey, RightKey) -> right_hand_dominant end. +to_ledgerkey(Bucket, Key, Tag) -> + {Tag, Bucket, Key, null}. - -get_metadatas(#r_object{contents=Contents}) -> - [Content#r_content.metadata || Content <- Contents]. - -set_vclock(Object=#r_object{}, VClock) -> Object#r_object{vclock=VClock}. - -vclock(#r_object{vclock=VClock}) -> VClock. - -to_binary(v0, Obj) -> - term_to_binary(Obj). - -hash(Obj=#r_object{}) -> - Vclock = vclock(Obj), - UpdObj = set_vclock(Obj, lists:sort(Vclock)), - erlang:phash2(to_binary(v0, UpdObj)). - -extract_metadata(Obj, Size) -> - {get_metadatas(Obj), vclock(Obj), hash(Obj), Size}. - - -build_metadata_object(PrimaryKey, Head) -> - {o, Bucket, Key, null} = PrimaryKey, - {MD, VC, _, _} = Head, - Contents = lists:foldl(fun(X, Acc) -> Acc ++ [#r_content{metadata=X}] end, - [], - MD), - #r_object{contents=Contents, bucket=Bucket, key=Key, vclock=VC}. - -convert_indexspecs(IndexSpecs, SQN, PrimaryKey) -> - lists:map(fun({IndexOp, IndexField, IndexValue}) -> - Status = case IndexOp of - add -> - %% TODO: timestamp support - {active, infinity}; - remove -> - %% TODO: timestamps for delayed reaping - {tomb, infinity} - end, - {o, B, K, _SK} = PrimaryKey, - {{i, B, {IndexField, IndexValue}, K}, - {SQN, Status, null}} - end, - IndexSpecs). +hash(Obj) -> + erlang:phash2(term_to_binary(Obj)). % Return a tuple of string to ease the printing of keys to logs print_key(Key) -> case Key of {o, B, K, _SK} -> {"Object", B, K}; + {o_rkv@v1, B, K, _SK} -> + {"RiakObject", B, K}; {i, B, {F, _V}, _K} -> {"Index", B, F} end. @@ -118,6 +106,88 @@ endkey_passed({EK1, EK2, EK3, null}, {CK1, CK2, CK3, _}) -> endkey_passed(EndKey, CheckingKey) -> EndKey < CheckingKey. +convert_indexspecs(IndexSpecs, Bucket, Key, SQN) -> + lists:map(fun({IndexOp, IndexField, IndexValue}) -> + Status = case IndexOp of + add -> + %% TODO: timestamp support + {active, infinity}; + remove -> + %% TODO: timestamps for delayed reaping + {tomb, infinity} + end, + {{i, Bucket, {IndexField, IndexValue}, Key}, + {SQN, Status, null}} + end, + IndexSpecs). + +generate_ledgerkv(PrimaryKey, SQN, Obj, Size) -> + generate_ledgerkv(PrimaryKey, SQN, Obj, Size, infinity). + +generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) -> + {Tag, Bucket, Key, _} = PrimaryKey, + {Bucket, + Key, + {PrimaryKey, {SQN, {active, TS}, extract_metadata(Obj, Size, Tag)}}}. + + +extract_metadata(Obj, Size, o_rkv@v1) -> + riak_extract_metadata(Obj, Size); +extract_metadata(Obj, Size, o) -> + {hash(Obj), Size}. + +get_size(PK, Value) -> + {Tag, _Bucket, _Key, _} = PK, + {_, _, MD} = Value, + case Tag of + o_rkv@v1 -> + {_RMD, _VC, _Hash, Size} = MD, + Size; + o -> + {_Hash, Size} = MD, + Size + end. + + +build_metadata_object(PrimaryKey, MD) -> + {Tag, Bucket, Key, null} = PrimaryKey, + case Tag of + o_rkv@v1 -> + riak_metadata_object(Bucket, Key, MD); + o -> + MD + end. + + + + +riak_metadata_object(Bucket, Key, MD) -> + {RMD, VC, _Hash, _Size} = MD, + Contents = lists:foldl(fun(X, Acc) -> Acc ++ [#r_content{metadata=X}] end, + [], + RMD), + #r_object{contents=Contents, bucket=Bucket, key=Key, vclock=VC}. + +riak_extract_metadata(Obj, Size) -> + {get_metadatas(Obj), vclock(Obj), riak_hash(Obj), Size}. + +riak_hash(Obj=#r_object{}) -> + Vclock = vclock(Obj), + UpdObj = set_vclock(Obj, lists:sort(Vclock)), + erlang:phash2(term_to_binary(UpdObj)). + +riakto_keydetails(Object) -> + {Object#r_object.bucket, Object#r_object.key}. + +get_metadatas(#r_object{contents=Contents}) -> + [Content#r_content.metadata || Content <- Contents]. + +set_vclock(Object=#r_object{}, VClock) -> Object#r_object{vclock=VClock}. + +vclock(#r_object{vclock=VClock}) -> VClock. + + + %%%============================================================================ @@ -131,7 +201,7 @@ indexspecs_test() -> IndexSpecs = [{add, "t1_int", 456}, {add, "t1_bin", "adbc123"}, {remove, "t1_bin", "abdc456"}], - Changes = convert_indexspecs(IndexSpecs, 1, {o, "Bucket", "Key2", null}), + Changes = convert_indexspecs(IndexSpecs, "Bucket", "Key2", 1), ?assertMatch({{i, "Bucket", {"t1_int", 456}, "Key2"}, {1, {active, infinity}, null}}, lists:nth(1, Changes)), ?assertMatch({{i, "Bucket", {"t1_bin", "adbc123"}, "Key2"}, diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 6cead92..95a3f63 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -14,7 +14,7 @@ %% %% All keys are not equal in sft files, keys are only expected in a specific %% series of formats -%% - {o, Bucket, Key, SubKey|null} - Object Keys +%% - {Tag, Bucket, Key, SubKey|null} - Object Keys %% - {i, Bucket, {IndexName, IndexTerm}, Key} - Postings %% The {Bucket, Key} part of all types of keys are hashed for segment filters. %% For Postings the {Bucket, IndexName, IndexTerm} is also hashed. This diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 2f30975..40ad320 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -300,7 +300,7 @@ check_bucket_stats(Bookie, Bucket) -> FoldSW1 = os:timestamp(), io:format("Checking bucket size~n"), {async, Folder1} = leveled_bookie:book_returnfolder(Bookie, - {bucket_stats, + {riakbucket_stats, Bucket}), {B1Size, B1Count} = Folder1(), io:format("Bucket fold completed in ~w microseconds~n",