diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 8be3b0b..45f3a78 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -968,112 +968,6 @@ generate_multiple_objects(Count, KeyNumber, ObjL) -> KeyNumber + 1, ObjL ++ [{Key, Value, IndexSpec}]). - -generate_multiple_robjects(Count, KeyNumber) -> - generate_multiple_robjects(Count, KeyNumber, []). - -generate_multiple_robjects(0, _KeyNumber, ObjL) -> - ObjL; -generate_multiple_robjects(Count, KeyNumber, ObjL) -> - Obj = {"Bucket", - "Key" ++ integer_to_list(KeyNumber), - crypto:rand_bytes(1024), - [], - [{"MDK", "MDV" ++ integer_to_list(KeyNumber)}, - {"MDK2", "MDV" ++ integer_to_list(KeyNumber)}]}, - {B1, K1, V1, Spec1, MD} = Obj, - Content = #r_content{metadata=MD, value=V1}, - Obj1 = #r_object{bucket=B1, key=K1, contents=[Content], vclock=[{'a',1}]}, - generate_multiple_robjects(Count - 1, KeyNumber + 1, ObjL ++ [{Obj1, Spec1}]). - - -single_key_test() -> - RootPath = reset_filestructure(), - {ok, Bookie1} = book_start([{root_path, RootPath}]), - {B1, K1, V1, Spec1, MD} = {"Bucket1", - "Key1", - "Value1", - [], - {"MDK1", "MDV1"}}, - Content = #r_content{metadata=MD, value=V1}, - Object = #r_object{bucket=B1, key=K1, contents=[Content], vclock=[{'a',1}]}, - ok = book_put(Bookie1, B1, K1, Object, Spec1, ?RIAK_TAG), - {ok, F1} = book_get(Bookie1, B1, K1, ?RIAK_TAG), - ?assertMatch(F1, Object), - ok = book_close(Bookie1), - {ok, Bookie2} = book_start([{root_path, RootPath}]), - {ok, F2} = book_get(Bookie2, B1, K1, ?RIAK_TAG), - ?assertMatch(F2, Object), - ok = book_close(Bookie2), - reset_filestructure(). - -multi_key_test() -> - RootPath = reset_filestructure(), - {ok, Bookie1} = book_start([{root_path, RootPath}]), - {B1, K1, V1, Spec1, MD1} = {"Bucket", - "Key1", - "Value1", - [], - {"MDK1", "MDV1"}}, - C1 = #r_content{metadata=MD1, value=V1}, - Obj1 = #r_object{bucket=B1, key=K1, contents=[C1], vclock=[{'a',1}]}, - {B2, K2, V2, Spec2, MD2} = {"Bucket", - "Key2", - "Value2", - [], - {"MDK2", "MDV2"}}, - C2 = #r_content{metadata=MD2, value=V2}, - Obj2 = #r_object{bucket=B2, key=K2, contents=[C2], vclock=[{'a',1}]}, - ok = book_put(Bookie1, B1, K1, Obj1, Spec1, ?RIAK_TAG), - ObjL1 = generate_multiple_robjects(20, 3), - SW1 = os:timestamp(), - lists:foreach(fun({O, S}) -> - {B, K} = leveled_codec:riakto_keydetails(O), - ok = book_put(Bookie1, B, K, O, S, ?RIAK_TAG) - end, - ObjL1), - io:format("PUT of 20 objects completed in ~w microseconds~n", - [timer:now_diff(os:timestamp(),SW1)]), - ok = book_put(Bookie1, B2, K2, Obj2, Spec2, ?RIAK_TAG), - {ok, F1A} = book_get(Bookie1, B1, K1, ?RIAK_TAG), - ?assertMatch(F1A, Obj1), - {ok, F2A} = book_get(Bookie1, B2, K2, ?RIAK_TAG), - ?assertMatch(F2A, Obj2), - ObjL2 = generate_multiple_robjects(20, 23), - SW2 = os:timestamp(), - lists:foreach(fun({O, S}) -> - {B, K} = leveled_codec:riakto_keydetails(O), - ok = book_put(Bookie1, B, K, O, S, ?RIAK_TAG) - end, - ObjL2), - io:format("PUT of 20 objects completed in ~w microseconds~n", - [timer:now_diff(os:timestamp(),SW2)]), - {ok, F1B} = book_get(Bookie1, B1, K1, ?RIAK_TAG), - ?assertMatch(F1B, Obj1), - {ok, F2B} = book_get(Bookie1, B2, K2, ?RIAK_TAG), - ?assertMatch(F2B, Obj2), - ok = book_close(Bookie1), - % Now reopen the file, and confirm that a fetch is still possible - {ok, Bookie2} = book_start([{root_path, RootPath}]), - {ok, F1C} = book_get(Bookie2, B1, K1, ?RIAK_TAG), - ?assertMatch(F1C, Obj1), - {ok, F2C} = book_get(Bookie2, B2, K2, ?RIAK_TAG), - ?assertMatch(F2C, Obj2), - ObjL3 = generate_multiple_robjects(20, 43), - SW3 = os:timestamp(), - lists:foreach(fun({O, S}) -> - {B, K} = leveled_codec:riakto_keydetails(O), - ok = book_put(Bookie2, B, K, O, S, ?RIAK_TAG) - end, - ObjL3), - io:format("PUT of 20 objects completed in ~w microseconds~n", - [timer:now_diff(os:timestamp(),SW3)]), - {ok, F1D} = book_get(Bookie2, B1, K1, ?RIAK_TAG), - ?assertMatch(F1D, Obj1), - {ok, F2D} = book_get(Bookie2, B2, K2, ?RIAK_TAG), - ?assertMatch(F2D, Obj2), - ok = book_close(Bookie2), - reset_filestructure(). ttl_test() -> RootPath = reset_filestructure(), diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 817ef04..8903198 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -60,9 +60,12 @@ get_size/2, get_keyandhash/2, convert_indexspecs/5, - riakto_keydetails/1, generate_uuid/0, - integer_now/0]). + integer_now/0, + riak_extract_metadata/2]). + +-define(V1_VERS, 1). +-define(MAGIC, 53). % riak_kv -> riak_object %% Credit to @@ -325,44 +328,67 @@ get_keyandhash(LK, Value) -> build_metadata_object(PrimaryKey, MD) -> - {Tag, Bucket, Key, null} = PrimaryKey, + {Tag, _Bucket, _Key, null} = PrimaryKey, case Tag of ?RIAK_TAG -> - riak_metadata_object(Bucket, Key, MD); + {SibMetaBinList, Vclock, _Hash, _Size} = MD, + riak_metadata_to_binary(Vclock, SibMetaBinList); ?STD_TAG -> MD end. - - -riak_metadata_object(Bucket, Key, MD) -> - {RMD, VC, _Hash, _Size} = MD, - Contents = lists:foldl(fun(X, Acc) -> Acc ++ [#r_content{metadata=X}] end, - [], - RMD), - #r_object{contents=Contents, bucket=Bucket, key=Key, vclock=VC}. - riak_extract_metadata(delete, Size) -> {delete, null, null, Size}; -riak_extract_metadata(Obj, Size) -> - {get_metadatas(Obj), vclock(Obj), riak_hash(Obj), Size}. +riak_extract_metadata(ObjBin, Size) -> + {Vclock, SibMetaBinList} = riak_metadata_from_binary(ObjBin), + {SibMetaBinList, Vclock, erlang:phash2(ObjBin), Size}. -riak_hash(Obj=#r_object{}) -> - Vclock = vclock(Obj), - UpdObj = set_vclock(Obj, lists:sort(Vclock)), - erlang:phash2(term_to_binary(UpdObj)). +%% <>. -riakto_keydetails(Object) -> - {Object#r_object.bucket, Object#r_object.key}. +riak_metadata_to_binary(Vclock, SibMetaBinList) -> + VclockBin = term_to_binary(Vclock), + VclockLen = byte_size(VclockBin), + SibCount = length(SibMetaBinList), + SibsBin = slimbin_contents(SibMetaBinList), + <>. + +% Fixes the value length for each sibling to be zero, and so includes no value +slimbin_content(MetaBin) -> + MetaLen = byte_size(MetaBin), + <<0:32/integer, MetaLen:32/integer, MetaBin:MetaLen/binary>>. -get_metadatas(#r_object{contents=Contents}) -> - [Content#r_content.metadata || Content <- Contents]. - -set_vclock(Object=#r_object{}, VClock) -> Object#r_object{vclock=VClock}. - -vclock(#r_object{vclock=VClock}) -> VClock. +slimbin_contents(SibMetaBinList) -> + F = fun(MetaBin, Acc) -> + <> + end, + lists:foldl(F, <<>>, SibMetaBinList). +riak_metadata_from_binary(V1Binary) -> + <> = V1Binary, + <> = Rest, + SibMetaBinList = + case SibCount of + 0 -> + []; + SC when is_integer(SC) -> + get_metadata_from_siblings(SibsBin, SibCount, []) + end, + {binary_to_term(VclockBin), SibMetaBinList}. + +get_metadata_from_siblings(<<>>, 0, SibMetaBinList) -> + SibMetaBinList; +get_metadata_from_siblings(<>, + SibCount, + SibMetaBinList) -> + <<_ValBin:ValLen/binary, MetaLen:32/integer, Rest1/binary>> = Rest0, + <> = Rest1, + get_metadata_from_siblings(Rest2, + SibCount - 1, + [MetaBin|SibMetaBinList]). diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index c36246d..4a7486a 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -124,7 +124,7 @@ journal_compaction(_Config) -> "Key1", "Value1", [], - {"MDK1", "MDV1"}}, + [{"MDK1", "MDV1"}]}, {TestObject2, TestSpec2} = testutil:generate_testobject(B2, K2, V2, Spec2, MD), ok = testutil:book_riakput(Bookie1, TestObject2, TestSpec2), @@ -428,7 +428,8 @@ load_and_count_withdelete(_Config) -> 0, lists:seq(1, 20)), testutil:check_forobject(Bookie1, TestObject), - {BucketD, KeyD} = leveled_codec:riakto_keydetails(TestObject), + {BucketD, KeyD} = {TestObject#r_object.bucket, + TestObject#r_object.key}, {_, 1} = testutil:check_bucket_stats(Bookie1, BucketD), ok = testutil:book_riakdelete(Bookie1, BucketD, KeyD, []), not_found = testutil:book_riakget(Bookie1, BucketD, KeyD), @@ -490,8 +491,8 @@ space_clear_ondelete(_Config) -> [length(FNsA_J), length(FNsA_L)]), % Get an iterator to lock the inker during compaction - FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, testutil:riak_hash(V)}|Acc] - end, + FoldObjectsFun = fun(B, K, ObjBin, Acc) -> + [{B, K, erlang:phash2(ObjBin)}|Acc] end, {async, HTreeF1} = leveled_bookie:book_returnfolder(Book1, {foldobjects_allkeys, ?RIAK_TAG, diff --git a/test/end_to_end/iterator_SUITE.erl b/test/end_to_end/iterator_SUITE.erl index 3b71496..3d6e50f 100644 --- a/test/end_to_end/iterator_SUITE.erl +++ b/test/end_to_end/iterator_SUITE.erl @@ -66,12 +66,12 @@ small_load_with2i(_Config) -> DSpc = lists:map(fun({add, F, T}) -> {remove, F, T} end, Spc), - {B, K} = leveled_codec:riakto_keydetails(Obj), + {B, K} = {Obj#r_object.bucket, Obj#r_object.key}, testutil:book_riakdelete(Bookie1, B, K, DSpc) end, ChkList1), %% Get the Buckets Keys and Hashes for the whole bucket - FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, testutil:riak_hash(V)}|Acc] + FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, erlang:phash2(V)}|Acc] end, {async, HTreeF1} = leveled_bookie:book_returnfolder(Bookie1, {foldobjects_allkeys, @@ -96,9 +96,8 @@ small_load_with2i(_Config) -> true = 9900 == length(KeyHashList2), true = 9900 == length(KeyHashList3), - SumIntFun = fun(_B, _K, V, Acc) -> - [C] = V#r_object.contents, - {I, _Bin} = C#r_content.value, + SumIntFun = fun(_B, _K, Obj, Acc) -> + {I, _Bin} = testutil:get_value(Obj), Acc + I end, BucketObjQ = {foldobjects_bybucket, ?RIAK_TAG, "Bucket", {SumIntFun, 0}}, @@ -138,7 +137,7 @@ query_count(_Config) -> "Key1", "Value1", [], - {"MDK1", "MDV1"}), + [{"MDK1", "MDV1"}]), ok = testutil:book_riakput(Book1, TestObject, TestSpec), testutil:check_forobject(Book1, TestObject), testutil:check_formissingobject(Book1, "Bucket1", "Key2"), diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index 6e59a2e..3a0df7b 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -67,10 +67,12 @@ recovr_strategy(_Config) -> lists:foreach(fun({K, _SpcL}) -> {ok, OH} = testutil:book_riakhead(Book1, "Bucket6", K), - K = OH#r_object.key, + VCH = testutil:get_vclock(OH), {ok, OG} = testutil:book_riakget(Book1, "Bucket6", K), V = testutil:get_value(OG), - true = V == V4 + VCG = testutil:get_vclock(OG), + true = V == V4, + true = VCH == VCG end, lists:nthtail(6400, AllSpcL)), Q = fun(RT) -> {index_query, @@ -154,7 +156,7 @@ aae_bustedjournal(_Config) -> % Will need to remove the file or corrupt the hashtree to get presence to % fail - FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, testutil:riak_hash(V)}|Acc] + FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, erlang:phash2(V)}|Acc] end, SW = os:timestamp(), {async, HashTreeF3} = leveled_bookie:book_returnfolder(Bookie2, diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index 596c442..e53eb4f 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -25,6 +25,7 @@ set_object/5, get_key/1, get_value/1, + get_vclock/1, get_compressiblevalue/0, get_compressiblevalue_andinteger/0, get_randomindexes_generator/1, @@ -39,15 +40,91 @@ restore_file/2, restore_topending/2, find_journals/1, - riak_hash/1, wait_for_compaction/1, foldkeysfun/3, sync_strategy/0]). -define(RETURN_TERMS, {true, undefined}). -define(SLOWOFFER_DELAY, 5). +-define(V1_VERS, 1). +-define(MAGIC, 53). % riak_kv -> riak_object +-define(MD_VTAG, <<"X-Riak-VTag">>). +-define(MD_LASTMOD, <<"X-Riak-Last-Modified">>). +-define(MD_DELETED, <<"X-Riak-Deleted">>). +-define(EMPTY_VTAG_BIN, <<"e">>). +%% ================================================= +%% From riak_object +to_binary(v1, #r_object{contents=Contents, vclock=VClock}) -> + new_v1(VClock, Contents). + +new_v1(Vclock, Siblings) -> + VclockBin = term_to_binary(Vclock), + VclockLen = byte_size(VclockBin), + SibCount = length(Siblings), + SibsBin = bin_contents(Siblings), + <>. + +bin_content(#r_content{metadata=Meta, value=Val}) -> + ValBin = encode_maybe_binary(Val), + ValLen = byte_size(ValBin), + MetaBin = meta_bin(Meta), + MetaLen = byte_size(MetaBin), + <>. + +bin_contents(Contents) -> + F = fun(Content, Acc) -> + <> + end, + lists:foldl(F, <<>>, Contents). + +meta_bin(MD) -> + {{VTagVal, Deleted, LastModVal}, RestBin} = + dict:fold(fun fold_meta_to_bin/3, + {{undefined, <<0>>, undefined}, <<>>}, + MD), + VTagBin = case VTagVal of + undefined -> ?EMPTY_VTAG_BIN; + _ -> list_to_binary(VTagVal) + end, + VTagLen = byte_size(VTagBin), + LastModBin = case LastModVal of + undefined -> + <<0:32/integer, 0:32/integer, 0:32/integer>>; + {Mega,Secs,Micro} -> + <> + end, + <>. + +fold_meta_to_bin(?MD_VTAG, Value, {{_Vt,Del,Lm},RestBin}) -> + {{Value, Del, Lm}, RestBin}; +fold_meta_to_bin(?MD_LASTMOD, Value, {{Vt,Del,_Lm},RestBin}) -> + {{Vt, Del, Value}, RestBin}; +fold_meta_to_bin(?MD_DELETED, true, {{Vt,_Del,Lm},RestBin})-> + {{Vt, <<1>>, Lm}, RestBin}; +fold_meta_to_bin(?MD_DELETED, "true", Acc) -> + fold_meta_to_bin(?MD_DELETED, true, Acc); +fold_meta_to_bin(?MD_DELETED, _, {{Vt,_Del,Lm},RestBin}) -> + {{Vt, <<0>>, Lm}, RestBin}; +fold_meta_to_bin(Key, Value, {{_Vt,_Del,_Lm}=Elems,RestBin}) -> + ValueBin = encode_maybe_binary(Value), + ValueLen = byte_size(ValueBin), + KeyBin = encode_maybe_binary(Key), + KeyLen = byte_size(KeyBin), + MetaBin = <>, + {Elems, <>}. + +encode_maybe_binary(Bin) when is_binary(Bin) -> + <<1, Bin/binary>>; +encode_maybe_binary(Bin) -> + <<0, (term_to_binary(Bin))/binary>>. + +%% ================================================= sync_strategy() -> case erlang:system_info(otp_release) of @@ -61,9 +138,14 @@ sync_strategy() -> none end. + book_riakput(Pid, RiakObject, IndexSpecs) -> - {Bucket, Key} = leveled_codec:riakto_keydetails(RiakObject), - leveled_bookie:book_put(Pid, Bucket, Key, RiakObject, IndexSpecs, ?RIAK_TAG). + leveled_bookie:book_put(Pid, + RiakObject#r_object.bucket, + RiakObject#r_object.key, + to_binary(v1, RiakObject), + IndexSpecs, + ?RIAK_TAG). book_riakdelete(Pid, Bucket, Key, IndexSpecs) -> leveled_bookie:book_put(Pid, Bucket, Key, delete, IndexSpecs, ?RIAK_TAG). @@ -144,9 +226,9 @@ check_forlist(Bookie, ChkList, Log) -> R = book_riakget(Bookie, Obj#r_object.bucket, Obj#r_object.key), - ok = case R of - {ok, Obj} -> - ok; + true = case R of + {ok, Val} -> + to_binary(v1, Obj) == Val; not_found -> io:format("Object not found for key ~s~n", [Obj#r_object.key]), @@ -169,20 +251,18 @@ check_formissinglist(Bookie, ChkList) -> [timer:now_diff(os:timestamp(), SW), length(ChkList)]). check_forobject(Bookie, TestObject) -> - {ok, TestObject} = book_riakget(Bookie, + TestBinary = to_binary(v1, TestObject), + {ok, TestBinary} = book_riakget(Bookie, TestObject#r_object.bucket, TestObject#r_object.key), - {ok, HeadObject} = book_riakhead(Bookie, + {ok, HeadBinary} = book_riakhead(Bookie, TestObject#r_object.bucket, TestObject#r_object.key), - ok = case {HeadObject#r_object.bucket, - HeadObject#r_object.key, - HeadObject#r_object.vclock} of - {B1, K1, VC1} when B1 == TestObject#r_object.bucket, - K1 == TestObject#r_object.key, - VC1 == TestObject#r_object.vclock -> - ok - end. + {_SibMetaBinList, + Vclock, + _Hash, + size} = leveled_codec:riak_extract_metadata(HeadBinary, size), + true = Vclock == TestObject#r_object.vclock. check_formissingobject(Bookie, Bucket, Key) -> not_found = book_riakget(Bookie, Bucket, Key), @@ -194,11 +274,11 @@ generate_testobject() -> "Key1", "Value1", [], - {"MDK1", "MDV1"}}, + [{"MDK1", "MDV1"}]}, generate_testobject(B1, K1, V1, Spec1, MD). generate_testobject(B, K, V, Spec, MD) -> - Content = #r_content{metadata=MD, value=V}, + Content = #r_content{metadata=dict:from_list(MD), value=V}, {#r_object{bucket=B, key=K, contents=[Content], vclock=[{'a',1}]}, Spec}. @@ -282,6 +362,7 @@ set_object(Bucket, Key, Value, IndexGen) -> set_object(Bucket, Key, Value, IndexGen, []). set_object(Bucket, Key, Value, IndexGen, Indexes2Remove) -> + Obj = {Bucket, Key, Value, @@ -291,16 +372,52 @@ set_object(Bucket, Key, Value, IndexGen, Indexes2Remove) -> [{"MDK", "MDV" ++ Key}, {"MDK2", "MDV" ++ Key}]}, {B1, K1, V1, Spec1, MD} = Obj, - Content = #r_content{metadata=MD, value=V1}, - {#r_object{bucket=B1, key=K1, contents=[Content], vclock=[{'a',1}]}, + Content = #r_content{metadata=dict:from_list(MD), value=V1}, + {#r_object{bucket=B1, + key=K1, + contents=[Content], + vclock=generate_vclock()}, Spec1}. + +generate_vclock() -> + lists:map(fun(X) -> + {_, Actor} = lists:keyfind(random:uniform(10), + 1, + actor_list()), + {Actor, X} end, + lists:seq(1, random:uniform(8))). + + +actor_list() -> + [{1, albert}, {2, bertie}, {3, clara}, {4, dave}, {5, elton}, + {6, fred}, {7, george}, {8, harry}, {9, isaac}, {10, leila}]. + get_key(Object) -> Object#r_object.key. -get_value(Object) -> - [Content] = Object#r_object.contents, - Content#r_content.value. +get_value(ObjectBin) -> + <<_Magic:8/integer, _Vers:8/integer, VclockLen:32/integer, + Rest1/binary>> = ObjectBin, + <<_VclockBin:VclockLen/binary, SibCount:32/integer, SibsBin/binary>> = Rest1, + case SibCount of + 1 -> + <> = SibsBin, + <> = Rest2, + case ContentBin of + <<0, ContentBin0/binary>> -> + binary_to_term(ContentBin0) + end; + N -> + io:format("SibCount of ~w with ObjectBin ~w~n", [N, ObjectBin]), + error + end. + +get_vclock(ObjectBin) -> + <<_Magic:8/integer, _Vers:8/integer, VclockLen:32/integer, + Rest1/binary>> = ObjectBin, + <> = Rest1, + binary_to_term(VclockBin). load_objects(ChunkSize, GenList, Bookie, TestObject, Generator) -> lists:map(fun(KN) -> @@ -467,6 +584,10 @@ corrupt_journal(RootPath, FileName, Corruptions, BasePosition, GapSize) -> OriginalPath = RootPath ++ "/journal/journal_files/" ++ FileName, BackupPath = RootPath ++ "/journal/journal_files/" ++ filename:basename(FileName, ".cdb") ++ ".bak", + io:format("Corruption attempt to be made to filename ~s ~w ~w~n", + [FileName, + filelib:is_file(OriginalPath), + filelib:is_file(BackupPath)]), {ok, _BytesCopied} = file:copy(OriginalPath, BackupPath), {ok, Handle} = file:open(OriginalPath, [binary, raw, read, write]), lists:foreach(fun(X) -> @@ -504,11 +625,3 @@ find_journals(RootPath) -> FNsA_J), CDBFiles. - -riak_hash(Obj=#r_object{}) -> - Vclock = vclock(Obj), - UpdObj = set_vclock(Obj, lists:sort(Vclock)), - erlang:phash2(term_to_binary(UpdObj)). - -set_vclock(Object=#r_object{}, VClock) -> Object#r_object{vclock=VClock}. -vclock(#r_object{vclock=VClock}) -> VClock.