diff --git a/include/leveled.hrl b/include/leveled.hrl index cd82d2a..e421500 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -51,8 +51,6 @@ max_run_length :: integer(), cdb_options :: #cdb_options{}}). -%% Temp location for records related to riak - -record(r_content, { metadata, value :: term() @@ -65,4 +63,4 @@ vclock, updatemetadata=dict:store(clean, true, dict:new()), updatevalue :: term()}). - \ No newline at end of file + \ No newline at end of file diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index ecf5f81..28b628d 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -71,11 +71,11 @@ %% The Bookie should generate a series of ledger key changes from this %% information, using a function passed in at startup. For Riak this will be %% of the form: -%% {{o, Bucket, Key, SubKey|null}, +%% {{o_rkv@v1, Bucket, Key, SubKey|null}, %% SQN, %% {Hash, Size, {Riak_Metadata}}, %% {active, TS}|{tomb, TS}} or -%% {{i, Bucket, IndexTerm, IndexField, Key}, +%% {{i, Bucket, {IndexTerm, IndexField}, Key}, %% SQN, %% null, %% {active, TS}|{tomb, TS}} @@ -136,6 +136,9 @@ book_riakput/3, book_riakget/3, book_riakhead/3, + book_put/5, + book_get/3, + book_head/3, book_returnfolder/2, book_snapshotstore/3, book_snapshotledger/3, @@ -167,17 +170,33 @@ book_start(Opts) -> gen_server:start(?MODULE, [Opts], []). -book_riakput(Pid, Object, IndexSpecs) -> - PrimaryKey = {o, Object#r_object.bucket, Object#r_object.key, null}, - gen_server:call(Pid, {put, PrimaryKey, Object, IndexSpecs}, infinity). +book_riakput(Pid, RiakObject, IndexSpecs) -> + {Bucket, Key} = leveled_codec:riakto_keydetails(RiakObject), + book_put(Pid, Bucket, Key, RiakObject, IndexSpecs, o_rkv@v1). + +book_put(Pid, Bucket, Key, Object, IndexSpecs) -> + book_put(Pid, Bucket, Key, Object, IndexSpecs, o). book_riakget(Pid, Bucket, Key) -> - PrimaryKey = {o, Bucket, Key, null}, - gen_server:call(Pid, {get, PrimaryKey}, infinity). + book_get(Pid, Bucket, Key, o_rkv@v1). + +book_get(Pid, Bucket, Key) -> + book_get(Pid, Bucket, Key, o). book_riakhead(Pid, Bucket, Key) -> - PrimaryKey = {o, Bucket, Key, null}, - gen_server:call(Pid, {head, PrimaryKey}, infinity). + book_head(Pid, Bucket, Key, o_rkv@v1). + +book_head(Pid, Bucket, Key) -> + book_head(Pid, Bucket, Key, o). + +book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag) -> + gen_server:call(Pid, {put, Bucket, Key, Object, IndexSpecs, Tag}, infinity). + +book_get(Pid, Bucket, Key, Tag) -> + gen_server:call(Pid, {get, Bucket, Key, Tag}, infinity). + +book_head(Pid, Bucket, Key, Tag) -> + gen_server:call(Pid, {head, Bucket, Key, Tag}, infinity). book_returnfolder(Pid, FolderType) -> gen_server:call(Pid, {return_folder, FolderType}, infinity). @@ -231,12 +250,13 @@ init([Opts]) -> end. -handle_call({put, PrimaryKey, Object, IndexSpecs}, From, State) -> +handle_call({put, Bucket, Key, Object, IndexSpecs, Tag}, From, State) -> + LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), {ok, SQN, ObjSize} = leveled_inker:ink_put(State#state.inker, - PrimaryKey, + LedgerKey, Object, IndexSpecs), - Changes = preparefor_ledgercache(PrimaryKey, + Changes = preparefor_ledgercache(LedgerKey, SQN, Object, ObjSize, @@ -251,8 +271,11 @@ handle_call({put, PrimaryKey, Object, IndexSpecs}, From, State) -> {pause, NewCache} -> {noreply, State#state{ledger_cache=NewCache, back_pressure=true}} end; -handle_call({get, Key}, _From, State) -> - case fetch_head(Key, State#state.penciller, State#state.ledger_cache) of +handle_call({get, Bucket, Key, Tag}, _From, State) -> + LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), + case fetch_head(LedgerKey, + State#state.penciller, + State#state.ledger_cache) of not_present -> {reply, not_found, State}; Head -> @@ -261,7 +284,7 @@ handle_call({get, Key}, _From, State) -> {tomb, _} -> {reply, not_found, State}; {active, _} -> - case fetch_value(Key, Seqn, State#state.inker) of + case fetch_value(LedgerKey, Seqn, State#state.inker) of not_present -> {reply, not_found, State}; Object -> @@ -269,8 +292,11 @@ handle_call({get, Key}, _From, State) -> end end end; -handle_call({head, Key}, _From, State) -> - case fetch_head(Key, State#state.penciller, State#state.ledger_cache) of +handle_call({head, Bucket, Key, Tag}, _From, State) -> + LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), + case fetch_head(LedgerKey, + State#state.penciller, + State#state.ledger_cache) of not_present -> {reply, not_found, State}; Head -> @@ -279,7 +305,7 @@ handle_call({head, Key}, _From, State) -> {tomb, _} -> {reply, not_found, State}; {active, _} -> - OMD = leveled_codec:build_metadata_object(Key, MD), + OMD = leveled_codec:build_metadata_object(LedgerKey, MD), {reply, {ok, OMD}, State} end end; @@ -308,11 +334,12 @@ handle_call({snapshot, _Requestor, SnapType, _Timeout}, _From, State) -> end; handle_call({return_folder, FolderType}, _From, State) -> case FolderType of - {bucket_stats, Bucket} -> + {riakbucket_stats, Bucket} -> {reply, bucket_stats(State#state.penciller, State#state.ledger_cache, - Bucket), + Bucket, + o_rkv@v1), State} end; handle_call({compact_journal, Timeout}, _From, State) -> @@ -351,7 +378,7 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================ -bucket_stats(Penciller, LedgerCache, Bucket) -> +bucket_stats(Penciller, LedgerCache, Bucket, Tag) -> PCLopts = #penciller_options{start_snapshot=true, source_penciller=Penciller}, {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), @@ -361,8 +388,8 @@ bucket_stats(Penciller, LedgerCache, Bucket) -> [length(Increment)]), ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, {infinity, Increment}), - StartKey = {o, Bucket, null, null}, - EndKey = {o, Bucket, null, null}, + StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), + EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, StartKey, EndKey, @@ -433,21 +460,16 @@ fetch_value(Key, SQN, Inker) -> not_present end. - -accumulate_size(_Key, Value, {Size, Count}) -> - {_, _, MD} = Value, - {_, _, _, ObjSize} = MD, - {Size + ObjSize, Count + 1}. - - +accumulate_size(Key, Value, {Size, Count}) -> + {Size + leveled_codec:get_size(Key, Value), Count + 1}. preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs) -> - PrimaryChange = {PK, - {SQN, - {active, infinity}, - leveled_codec:extract_metadata(Obj, Size)}}, - SecChanges = leveled_codec:convert_indexspecs(IndexSpecs, SQN, PK), - [PrimaryChange] ++ SecChanges. + {Bucket, Key, PrimaryChange} = leveled_codec:generate_ledgerkv(PK, + SQN, + Obj, + Size), + ConvSpecs = leveled_codec:convert_indexspecs(IndexSpecs, Bucket, Key, SQN), + [PrimaryChange] ++ ConvSpecs. addto_ledgercache(Changes, Cache) -> lists:foldl(fun({K, V}, Acc) -> gb_trees:enter(K, V, Acc) end, @@ -511,6 +533,9 @@ reset_filestructure() -> leveled_penciller:clean_testdir(RootPath ++ "/" ++ ?LEDGER_FP), RootPath. + + + generate_multiple_objects(Count, KeyNumber) -> generate_multiple_objects(Count, KeyNumber, []). diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index e49138a..d368896 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -1655,7 +1655,10 @@ find_lastkey_test() -> {ok, F2} = cdb_complete(P2), {ok, P3} = cdb_open_reader(F2), ?assertMatch("Key2", cdb_lastkey(P3)), - ok = cdb_close(P3), + {ok, _FN} = cdb_complete(P3), + {ok, P4} = cdb_open_reader(F2), + ?assertMatch("Key2", cdb_lastkey(P4)), + ok = cdb_close(P4), ok = file:delete("../test/lastkey.cdb"). get_keys_byposition_simple_test() -> diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 6ae0f6a..5a384e8 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -1,15 +1,35 @@ %% -------- Key Codec --------- %% -%% Functions for manipulating keys and values within leveled. These are -%% currently static functions, they cannot be overridden in the store other -%% than by changing them here. The formats are focused on the problem of -%% supporting Riak KV +%% Functions for manipulating keys and values within leveled. +%% +%% +%% Within the LEDGER: +%% Keys are of the form - +%% {Tag, Bucket, Key, SubKey|null} +%% Values are of the form +%% {SQN, Status, MD} +%% +%% Within the JOURNAL: +%% Keys are of the form - +%% {SQN, LedgerKey} +%% Values are of the form +%% {Object, IndexSpecs} (as a binary) +%% +%% IndexSpecs are of the form of a Ledger Key/Value +%% +%% Tags need to be set during PUT operations and each Tag used must be +%% supported in an extract_metadata and a build_metadata_object function clause +%% +%% Currently the only tags supported are: +%% - o (standard objects) +%% - o_rkv@v1 (riak objects) +%% - i (index entries) + -module(leveled_codec). -include("../include/leveled.hrl"). - -include_lib("eunit/include/eunit.hrl"). -export([strip_to_keyonly/1, @@ -21,15 +41,20 @@ endkey_passed/2, key_dominates/2, print_key/1, - extract_metadata/2, + to_ledgerkey/3, build_metadata_object/2, - convert_indexspecs/3]). - + generate_ledgerkv/4, + generate_ledgerkv/5, + get_size/2, + convert_indexspecs/4, + riakto_keydetails/1]). + + strip_to_keyonly({keyonly, K}) -> K; strip_to_keyonly({K, _V}) -> K. -strip_to_keyseqonly({K, {SeqN, _, _}}) -> {K, SeqN}. +strip_to_keyseqonly({K, {SeqN, _, _ }}) -> {K, SeqN}. strip_to_keyseqstatusonly({K, {SeqN, St, _MD}}) -> {K, SeqN, St}. @@ -53,56 +78,19 @@ key_dominates(LeftKey, RightKey) -> right_hand_dominant end. +to_ledgerkey(Bucket, Key, Tag) -> + {Tag, Bucket, Key, null}. - -get_metadatas(#r_object{contents=Contents}) -> - [Content#r_content.metadata || Content <- Contents]. - -set_vclock(Object=#r_object{}, VClock) -> Object#r_object{vclock=VClock}. - -vclock(#r_object{vclock=VClock}) -> VClock. - -to_binary(v0, Obj) -> - term_to_binary(Obj). - -hash(Obj=#r_object{}) -> - Vclock = vclock(Obj), - UpdObj = set_vclock(Obj, lists:sort(Vclock)), - erlang:phash2(to_binary(v0, UpdObj)). - -extract_metadata(Obj, Size) -> - {get_metadatas(Obj), vclock(Obj), hash(Obj), Size}. - - -build_metadata_object(PrimaryKey, Head) -> - {o, Bucket, Key, null} = PrimaryKey, - {MD, VC, _, _} = Head, - Contents = lists:foldl(fun(X, Acc) -> Acc ++ [#r_content{metadata=X}] end, - [], - MD), - #r_object{contents=Contents, bucket=Bucket, key=Key, vclock=VC}. - -convert_indexspecs(IndexSpecs, SQN, PrimaryKey) -> - lists:map(fun({IndexOp, IndexField, IndexValue}) -> - Status = case IndexOp of - add -> - %% TODO: timestamp support - {active, infinity}; - remove -> - %% TODO: timestamps for delayed reaping - {tomb, infinity} - end, - {o, B, K, _SK} = PrimaryKey, - {{i, B, {IndexField, IndexValue}, K}, - {SQN, Status, null}} - end, - IndexSpecs). +hash(Obj) -> + erlang:phash2(term_to_binary(Obj)). % Return a tuple of string to ease the printing of keys to logs print_key(Key) -> case Key of {o, B, K, _SK} -> {"Object", B, K}; + {o_rkv@v1, B, K, _SK} -> + {"RiakObject", B, K}; {i, B, {F, _V}, _K} -> {"Index", B, F} end. @@ -118,6 +106,88 @@ endkey_passed({EK1, EK2, EK3, null}, {CK1, CK2, CK3, _}) -> endkey_passed(EndKey, CheckingKey) -> EndKey < CheckingKey. +convert_indexspecs(IndexSpecs, Bucket, Key, SQN) -> + lists:map(fun({IndexOp, IndexField, IndexValue}) -> + Status = case IndexOp of + add -> + %% TODO: timestamp support + {active, infinity}; + remove -> + %% TODO: timestamps for delayed reaping + {tomb, infinity} + end, + {{i, Bucket, {IndexField, IndexValue}, Key}, + {SQN, Status, null}} + end, + IndexSpecs). + +generate_ledgerkv(PrimaryKey, SQN, Obj, Size) -> + generate_ledgerkv(PrimaryKey, SQN, Obj, Size, infinity). + +generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) -> + {Tag, Bucket, Key, _} = PrimaryKey, + {Bucket, + Key, + {PrimaryKey, {SQN, {active, TS}, extract_metadata(Obj, Size, Tag)}}}. + + +extract_metadata(Obj, Size, o_rkv@v1) -> + riak_extract_metadata(Obj, Size); +extract_metadata(Obj, Size, o) -> + {hash(Obj), Size}. + +get_size(PK, Value) -> + {Tag, _Bucket, _Key, _} = PK, + {_, _, MD} = Value, + case Tag of + o_rkv@v1 -> + {_RMD, _VC, _Hash, Size} = MD, + Size; + o -> + {_Hash, Size} = MD, + Size + end. + + +build_metadata_object(PrimaryKey, MD) -> + {Tag, Bucket, Key, null} = PrimaryKey, + case Tag of + o_rkv@v1 -> + riak_metadata_object(Bucket, Key, MD); + o -> + MD + end. + + + + +riak_metadata_object(Bucket, Key, MD) -> + {RMD, VC, _Hash, _Size} = MD, + Contents = lists:foldl(fun(X, Acc) -> Acc ++ [#r_content{metadata=X}] end, + [], + RMD), + #r_object{contents=Contents, bucket=Bucket, key=Key, vclock=VC}. + +riak_extract_metadata(Obj, Size) -> + {get_metadatas(Obj), vclock(Obj), riak_hash(Obj), Size}. + +riak_hash(Obj=#r_object{}) -> + Vclock = vclock(Obj), + UpdObj = set_vclock(Obj, lists:sort(Vclock)), + erlang:phash2(term_to_binary(UpdObj)). + +riakto_keydetails(Object) -> + {Object#r_object.bucket, Object#r_object.key}. + +get_metadatas(#r_object{contents=Contents}) -> + [Content#r_content.metadata || Content <- Contents]. + +set_vclock(Object=#r_object{}, VClock) -> Object#r_object{vclock=VClock}. + +vclock(#r_object{vclock=VClock}) -> VClock. + + + %%%============================================================================ @@ -131,7 +201,7 @@ indexspecs_test() -> IndexSpecs = [{add, "t1_int", 456}, {add, "t1_bin", "adbc123"}, {remove, "t1_bin", "abdc456"}], - Changes = convert_indexspecs(IndexSpecs, 1, {o, "Bucket", "Key2", null}), + Changes = convert_indexspecs(IndexSpecs, "Bucket", "Key2", 1), ?assertMatch({{i, "Bucket", {"t1_int", 456}, "Key2"}, {1, {active, infinity}, null}}, lists:nth(1, Changes)), ?assertMatch({{i, "Bucket", {"t1_bin", "adbc123"}, "Key2"}, diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 6cead92..95a3f63 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -14,7 +14,7 @@ %% %% All keys are not equal in sft files, keys are only expected in a specific %% series of formats -%% - {o, Bucket, Key, SubKey|null} - Object Keys +%% - {Tag, Bucket, Key, SubKey|null} - Object Keys %% - {i, Bucket, {IndexName, IndexTerm}, Key} - Postings %% The {Bucket, Key} part of all types of keys are hashed for segment filters. %% For Postings the {Bucket, IndexName, IndexTerm} is also hashed. This diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 2f30975..40ad320 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -300,7 +300,7 @@ check_bucket_stats(Bookie, Bucket) -> FoldSW1 = os:timestamp(), io:format("Checking bucket size~n"), {async, Folder1} = leveled_bookie:book_returnfolder(Bookie, - {bucket_stats, + {riakbucket_stats, Bucket}), {B1Size, B1Count} = Folder1(), io:format("Bucket fold completed in ~w microseconds~n",