Supporting Tags - Improving abstraction between Riak and non-Riak workloads

The object tag "o" which was taken from eleveldb has been an extended to
allow for specific functions to be triggered for different object types,
in particular when extracting metadata for stroing in the Ledger.

There is now a riak tag (o_rkv@v1), and in theory other tags can be
added and used, as long as their is an appropriate set of functions in
the leveled_codec.
This commit is contained in:
martinsumner 2016-10-14 18:43:16 +01:00
parent 9be0f96406
commit 7eb5a16899
6 changed files with 191 additions and 95 deletions

View file

@ -51,8 +51,6 @@
max_run_length :: integer(),
cdb_options :: #cdb_options{}}).
%% Temp location for records related to riak
-record(r_content, {
metadata,
value :: term()

View file

@ -71,11 +71,11 @@
%% The Bookie should generate a series of ledger key changes from this
%% information, using a function passed in at startup. For Riak this will be
%% of the form:
%% {{o, Bucket, Key, SubKey|null},
%% {{o_rkv@v1, Bucket, Key, SubKey|null},
%% SQN,
%% {Hash, Size, {Riak_Metadata}},
%% {active, TS}|{tomb, TS}} or
%% {{i, Bucket, IndexTerm, IndexField, Key},
%% {{i, Bucket, {IndexTerm, IndexField}, Key},
%% SQN,
%% null,
%% {active, TS}|{tomb, TS}}
@ -136,6 +136,9 @@
book_riakput/3,
book_riakget/3,
book_riakhead/3,
book_put/5,
book_get/3,
book_head/3,
book_returnfolder/2,
book_snapshotstore/3,
book_snapshotledger/3,
@ -167,17 +170,33 @@
book_start(Opts) ->
gen_server:start(?MODULE, [Opts], []).
book_riakput(Pid, Object, IndexSpecs) ->
PrimaryKey = {o, Object#r_object.bucket, Object#r_object.key, null},
gen_server:call(Pid, {put, PrimaryKey, Object, IndexSpecs}, infinity).
book_riakput(Pid, RiakObject, IndexSpecs) ->
{Bucket, Key} = leveled_codec:riakto_keydetails(RiakObject),
book_put(Pid, Bucket, Key, RiakObject, IndexSpecs, o_rkv@v1).
book_put(Pid, Bucket, Key, Object, IndexSpecs) ->
book_put(Pid, Bucket, Key, Object, IndexSpecs, o).
book_riakget(Pid, Bucket, Key) ->
PrimaryKey = {o, Bucket, Key, null},
gen_server:call(Pid, {get, PrimaryKey}, infinity).
book_get(Pid, Bucket, Key, o_rkv@v1).
book_get(Pid, Bucket, Key) ->
book_get(Pid, Bucket, Key, o).
book_riakhead(Pid, Bucket, Key) ->
PrimaryKey = {o, Bucket, Key, null},
gen_server:call(Pid, {head, PrimaryKey}, infinity).
book_head(Pid, Bucket, Key, o_rkv@v1).
book_head(Pid, Bucket, Key) ->
book_head(Pid, Bucket, Key, o).
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag) ->
gen_server:call(Pid, {put, Bucket, Key, Object, IndexSpecs, Tag}, infinity).
book_get(Pid, Bucket, Key, Tag) ->
gen_server:call(Pid, {get, Bucket, Key, Tag}, infinity).
book_head(Pid, Bucket, Key, Tag) ->
gen_server:call(Pid, {head, Bucket, Key, Tag}, infinity).
book_returnfolder(Pid, FolderType) ->
gen_server:call(Pid, {return_folder, FolderType}, infinity).
@ -231,12 +250,13 @@ init([Opts]) ->
end.
handle_call({put, PrimaryKey, Object, IndexSpecs}, From, State) ->
handle_call({put, Bucket, Key, Object, IndexSpecs, Tag}, From, State) ->
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
{ok, SQN, ObjSize} = leveled_inker:ink_put(State#state.inker,
PrimaryKey,
LedgerKey,
Object,
IndexSpecs),
Changes = preparefor_ledgercache(PrimaryKey,
Changes = preparefor_ledgercache(LedgerKey,
SQN,
Object,
ObjSize,
@ -251,8 +271,11 @@ handle_call({put, PrimaryKey, Object, IndexSpecs}, From, State) ->
{pause, NewCache} ->
{noreply, State#state{ledger_cache=NewCache, back_pressure=true}}
end;
handle_call({get, Key}, _From, State) ->
case fetch_head(Key, State#state.penciller, State#state.ledger_cache) of
handle_call({get, Bucket, Key, Tag}, _From, State) ->
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
case fetch_head(LedgerKey,
State#state.penciller,
State#state.ledger_cache) of
not_present ->
{reply, not_found, State};
Head ->
@ -261,7 +284,7 @@ handle_call({get, Key}, _From, State) ->
{tomb, _} ->
{reply, not_found, State};
{active, _} ->
case fetch_value(Key, Seqn, State#state.inker) of
case fetch_value(LedgerKey, Seqn, State#state.inker) of
not_present ->
{reply, not_found, State};
Object ->
@ -269,8 +292,11 @@ handle_call({get, Key}, _From, State) ->
end
end
end;
handle_call({head, Key}, _From, State) ->
case fetch_head(Key, State#state.penciller, State#state.ledger_cache) of
handle_call({head, Bucket, Key, Tag}, _From, State) ->
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
case fetch_head(LedgerKey,
State#state.penciller,
State#state.ledger_cache) of
not_present ->
{reply, not_found, State};
Head ->
@ -279,7 +305,7 @@ handle_call({head, Key}, _From, State) ->
{tomb, _} ->
{reply, not_found, State};
{active, _} ->
OMD = leveled_codec:build_metadata_object(Key, MD),
OMD = leveled_codec:build_metadata_object(LedgerKey, MD),
{reply, {ok, OMD}, State}
end
end;
@ -308,11 +334,12 @@ handle_call({snapshot, _Requestor, SnapType, _Timeout}, _From, State) ->
end;
handle_call({return_folder, FolderType}, _From, State) ->
case FolderType of
{bucket_stats, Bucket} ->
{riakbucket_stats, Bucket} ->
{reply,
bucket_stats(State#state.penciller,
State#state.ledger_cache,
Bucket),
Bucket,
o_rkv@v1),
State}
end;
handle_call({compact_journal, Timeout}, _From, State) ->
@ -351,7 +378,7 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%============================================================================
bucket_stats(Penciller, LedgerCache, Bucket) ->
bucket_stats(Penciller, LedgerCache, Bucket, Tag) ->
PCLopts = #penciller_options{start_snapshot=true,
source_penciller=Penciller},
{ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts),
@ -361,8 +388,8 @@ bucket_stats(Penciller, LedgerCache, Bucket) ->
[length(Increment)]),
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
{infinity, Increment}),
StartKey = {o, Bucket, null, null},
EndKey = {o, Bucket, null, null},
StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
StartKey,
EndKey,
@ -433,21 +460,16 @@ fetch_value(Key, SQN, Inker) ->
not_present
end.
accumulate_size(_Key, Value, {Size, Count}) ->
{_, _, MD} = Value,
{_, _, _, ObjSize} = MD,
{Size + ObjSize, Count + 1}.
accumulate_size(Key, Value, {Size, Count}) ->
{Size + leveled_codec:get_size(Key, Value), Count + 1}.
preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs) ->
PrimaryChange = {PK,
{SQN,
{active, infinity},
leveled_codec:extract_metadata(Obj, Size)}},
SecChanges = leveled_codec:convert_indexspecs(IndexSpecs, SQN, PK),
[PrimaryChange] ++ SecChanges.
{Bucket, Key, PrimaryChange} = leveled_codec:generate_ledgerkv(PK,
SQN,
Obj,
Size),
ConvSpecs = leveled_codec:convert_indexspecs(IndexSpecs, Bucket, Key, SQN),
[PrimaryChange] ++ ConvSpecs.
addto_ledgercache(Changes, Cache) ->
lists:foldl(fun({K, V}, Acc) -> gb_trees:enter(K, V, Acc) end,
@ -511,6 +533,9 @@ reset_filestructure() ->
leveled_penciller:clean_testdir(RootPath ++ "/" ++ ?LEDGER_FP),
RootPath.
generate_multiple_objects(Count, KeyNumber) ->
generate_multiple_objects(Count, KeyNumber, []).

View file

@ -1655,7 +1655,10 @@ find_lastkey_test() ->
{ok, F2} = cdb_complete(P2),
{ok, P3} = cdb_open_reader(F2),
?assertMatch("Key2", cdb_lastkey(P3)),
ok = cdb_close(P3),
{ok, _FN} = cdb_complete(P3),
{ok, P4} = cdb_open_reader(F2),
?assertMatch("Key2", cdb_lastkey(P4)),
ok = cdb_close(P4),
ok = file:delete("../test/lastkey.cdb").
get_keys_byposition_simple_test() ->

View file

@ -1,15 +1,35 @@
%% -------- Key Codec ---------
%%
%% Functions for manipulating keys and values within leveled. These are
%% currently static functions, they cannot be overridden in the store other
%% than by changing them here. The formats are focused on the problem of
%% supporting Riak KV
%% Functions for manipulating keys and values within leveled.
%%
%%
%% Within the LEDGER:
%% Keys are of the form -
%% {Tag, Bucket, Key, SubKey|null}
%% Values are of the form
%% {SQN, Status, MD}
%%
%% Within the JOURNAL:
%% Keys are of the form -
%% {SQN, LedgerKey}
%% Values are of the form
%% {Object, IndexSpecs} (as a binary)
%%
%% IndexSpecs are of the form of a Ledger Key/Value
%%
%% Tags need to be set during PUT operations and each Tag used must be
%% supported in an extract_metadata and a build_metadata_object function clause
%%
%% Currently the only tags supported are:
%% - o (standard objects)
%% - o_rkv@v1 (riak objects)
%% - i (index entries)
-module(leveled_codec).
-include("../include/leveled.hrl").
-include_lib("eunit/include/eunit.hrl").
-export([strip_to_keyonly/1,
@ -21,15 +41,20 @@
endkey_passed/2,
key_dominates/2,
print_key/1,
extract_metadata/2,
to_ledgerkey/3,
build_metadata_object/2,
convert_indexspecs/3]).
generate_ledgerkv/4,
generate_ledgerkv/5,
get_size/2,
convert_indexspecs/4,
riakto_keydetails/1]).
strip_to_keyonly({keyonly, K}) -> K;
strip_to_keyonly({K, _V}) -> K.
strip_to_keyseqonly({K, {SeqN, _, _}}) -> {K, SeqN}.
strip_to_keyseqonly({K, {SeqN, _, _ }}) -> {K, SeqN}.
strip_to_keyseqstatusonly({K, {SeqN, St, _MD}}) -> {K, SeqN, St}.
@ -53,56 +78,19 @@ key_dominates(LeftKey, RightKey) ->
right_hand_dominant
end.
to_ledgerkey(Bucket, Key, Tag) ->
{Tag, Bucket, Key, null}.
get_metadatas(#r_object{contents=Contents}) ->
[Content#r_content.metadata || Content <- Contents].
set_vclock(Object=#r_object{}, VClock) -> Object#r_object{vclock=VClock}.
vclock(#r_object{vclock=VClock}) -> VClock.
to_binary(v0, Obj) ->
term_to_binary(Obj).
hash(Obj=#r_object{}) ->
Vclock = vclock(Obj),
UpdObj = set_vclock(Obj, lists:sort(Vclock)),
erlang:phash2(to_binary(v0, UpdObj)).
extract_metadata(Obj, Size) ->
{get_metadatas(Obj), vclock(Obj), hash(Obj), Size}.
build_metadata_object(PrimaryKey, Head) ->
{o, Bucket, Key, null} = PrimaryKey,
{MD, VC, _, _} = Head,
Contents = lists:foldl(fun(X, Acc) -> Acc ++ [#r_content{metadata=X}] end,
[],
MD),
#r_object{contents=Contents, bucket=Bucket, key=Key, vclock=VC}.
convert_indexspecs(IndexSpecs, SQN, PrimaryKey) ->
lists:map(fun({IndexOp, IndexField, IndexValue}) ->
Status = case IndexOp of
add ->
%% TODO: timestamp support
{active, infinity};
remove ->
%% TODO: timestamps for delayed reaping
{tomb, infinity}
end,
{o, B, K, _SK} = PrimaryKey,
{{i, B, {IndexField, IndexValue}, K},
{SQN, Status, null}}
end,
IndexSpecs).
hash(Obj) ->
erlang:phash2(term_to_binary(Obj)).
% Return a tuple of string to ease the printing of keys to logs
print_key(Key) ->
case Key of
{o, B, K, _SK} ->
{"Object", B, K};
{o_rkv@v1, B, K, _SK} ->
{"RiakObject", B, K};
{i, B, {F, _V}, _K} ->
{"Index", B, F}
end.
@ -118,6 +106,88 @@ endkey_passed({EK1, EK2, EK3, null}, {CK1, CK2, CK3, _}) ->
endkey_passed(EndKey, CheckingKey) ->
EndKey < CheckingKey.
convert_indexspecs(IndexSpecs, Bucket, Key, SQN) ->
lists:map(fun({IndexOp, IndexField, IndexValue}) ->
Status = case IndexOp of
add ->
%% TODO: timestamp support
{active, infinity};
remove ->
%% TODO: timestamps for delayed reaping
{tomb, infinity}
end,
{{i, Bucket, {IndexField, IndexValue}, Key},
{SQN, Status, null}}
end,
IndexSpecs).
generate_ledgerkv(PrimaryKey, SQN, Obj, Size) ->
generate_ledgerkv(PrimaryKey, SQN, Obj, Size, infinity).
generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) ->
{Tag, Bucket, Key, _} = PrimaryKey,
{Bucket,
Key,
{PrimaryKey, {SQN, {active, TS}, extract_metadata(Obj, Size, Tag)}}}.
extract_metadata(Obj, Size, o_rkv@v1) ->
riak_extract_metadata(Obj, Size);
extract_metadata(Obj, Size, o) ->
{hash(Obj), Size}.
get_size(PK, Value) ->
{Tag, _Bucket, _Key, _} = PK,
{_, _, MD} = Value,
case Tag of
o_rkv@v1 ->
{_RMD, _VC, _Hash, Size} = MD,
Size;
o ->
{_Hash, Size} = MD,
Size
end.
build_metadata_object(PrimaryKey, MD) ->
{Tag, Bucket, Key, null} = PrimaryKey,
case Tag of
o_rkv@v1 ->
riak_metadata_object(Bucket, Key, MD);
o ->
MD
end.
riak_metadata_object(Bucket, Key, MD) ->
{RMD, VC, _Hash, _Size} = MD,
Contents = lists:foldl(fun(X, Acc) -> Acc ++ [#r_content{metadata=X}] end,
[],
RMD),
#r_object{contents=Contents, bucket=Bucket, key=Key, vclock=VC}.
riak_extract_metadata(Obj, Size) ->
{get_metadatas(Obj), vclock(Obj), riak_hash(Obj), Size}.
riak_hash(Obj=#r_object{}) ->
Vclock = vclock(Obj),
UpdObj = set_vclock(Obj, lists:sort(Vclock)),
erlang:phash2(term_to_binary(UpdObj)).
riakto_keydetails(Object) ->
{Object#r_object.bucket, Object#r_object.key}.
get_metadatas(#r_object{contents=Contents}) ->
[Content#r_content.metadata || Content <- Contents].
set_vclock(Object=#r_object{}, VClock) -> Object#r_object{vclock=VClock}.
vclock(#r_object{vclock=VClock}) -> VClock.
%%%============================================================================
@ -131,7 +201,7 @@ indexspecs_test() ->
IndexSpecs = [{add, "t1_int", 456},
{add, "t1_bin", "adbc123"},
{remove, "t1_bin", "abdc456"}],
Changes = convert_indexspecs(IndexSpecs, 1, {o, "Bucket", "Key2", null}),
Changes = convert_indexspecs(IndexSpecs, "Bucket", "Key2", 1),
?assertMatch({{i, "Bucket", {"t1_int", 456}, "Key2"},
{1, {active, infinity}, null}}, lists:nth(1, Changes)),
?assertMatch({{i, "Bucket", {"t1_bin", "adbc123"}, "Key2"},

View file

@ -14,7 +14,7 @@
%%
%% All keys are not equal in sft files, keys are only expected in a specific
%% series of formats
%% - {o, Bucket, Key, SubKey|null} - Object Keys
%% - {Tag, Bucket, Key, SubKey|null} - Object Keys
%% - {i, Bucket, {IndexName, IndexTerm}, Key} - Postings
%% The {Bucket, Key} part of all types of keys are hashed for segment filters.
%% For Postings the {Bucket, IndexName, IndexTerm} is also hashed. This

View file

@ -300,7 +300,7 @@ check_bucket_stats(Bookie, Bucket) ->
FoldSW1 = os:timestamp(),
io:format("Checking bucket size~n"),
{async, Folder1} = leveled_bookie:book_returnfolder(Bookie,
{bucket_stats,
{riakbucket_stats,
Bucket}),
{B1Size, B1Count} = Folder1(),
io:format("Bucket fold completed in ~w microseconds~n",