Switch to binary format Riak object
Initial change to try and test assuming that leveled received the binary format of Riak objects (and parses that for metadata).
This commit is contained in:
parent
b0a515553f
commit
e8c1d39df9
6 changed files with 212 additions and 177 deletions
|
@ -124,7 +124,7 @@ journal_compaction(_Config) ->
|
|||
"Key1",
|
||||
"Value1",
|
||||
[],
|
||||
{"MDK1", "MDV1"}},
|
||||
[{"MDK1", "MDV1"}]},
|
||||
{TestObject2, TestSpec2} = testutil:generate_testobject(B2, K2,
|
||||
V2, Spec2, MD),
|
||||
ok = testutil:book_riakput(Bookie1, TestObject2, TestSpec2),
|
||||
|
@ -428,7 +428,8 @@ load_and_count_withdelete(_Config) ->
|
|||
0,
|
||||
lists:seq(1, 20)),
|
||||
testutil:check_forobject(Bookie1, TestObject),
|
||||
{BucketD, KeyD} = leveled_codec:riakto_keydetails(TestObject),
|
||||
{BucketD, KeyD} = {TestObject#r_object.bucket,
|
||||
TestObject#r_object.key},
|
||||
{_, 1} = testutil:check_bucket_stats(Bookie1, BucketD),
|
||||
ok = testutil:book_riakdelete(Bookie1, BucketD, KeyD, []),
|
||||
not_found = testutil:book_riakget(Bookie1, BucketD, KeyD),
|
||||
|
@ -490,8 +491,8 @@ space_clear_ondelete(_Config) ->
|
|||
[length(FNsA_J), length(FNsA_L)]),
|
||||
|
||||
% Get an iterator to lock the inker during compaction
|
||||
FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, testutil:riak_hash(V)}|Acc]
|
||||
end,
|
||||
FoldObjectsFun = fun(B, K, ObjBin, Acc) ->
|
||||
[{B, K, erlang:phash2(ObjBin)}|Acc] end,
|
||||
{async, HTreeF1} = leveled_bookie:book_returnfolder(Book1,
|
||||
{foldobjects_allkeys,
|
||||
?RIAK_TAG,
|
||||
|
|
|
@ -66,12 +66,12 @@ small_load_with2i(_Config) ->
|
|||
DSpc = lists:map(fun({add, F, T}) -> {remove, F, T}
|
||||
end,
|
||||
Spc),
|
||||
{B, K} = leveled_codec:riakto_keydetails(Obj),
|
||||
{B, K} = {Obj#r_object.bucket, Obj#r_object.key},
|
||||
testutil:book_riakdelete(Bookie1, B, K, DSpc)
|
||||
end,
|
||||
ChkList1),
|
||||
%% Get the Buckets Keys and Hashes for the whole bucket
|
||||
FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, testutil:riak_hash(V)}|Acc]
|
||||
FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, erlang:phash2(V)}|Acc]
|
||||
end,
|
||||
{async, HTreeF1} = leveled_bookie:book_returnfolder(Bookie1,
|
||||
{foldobjects_allkeys,
|
||||
|
@ -96,9 +96,8 @@ small_load_with2i(_Config) ->
|
|||
true = 9900 == length(KeyHashList2),
|
||||
true = 9900 == length(KeyHashList3),
|
||||
|
||||
SumIntFun = fun(_B, _K, V, Acc) ->
|
||||
[C] = V#r_object.contents,
|
||||
{I, _Bin} = C#r_content.value,
|
||||
SumIntFun = fun(_B, _K, Obj, Acc) ->
|
||||
{I, _Bin} = testutil:get_value(Obj),
|
||||
Acc + I
|
||||
end,
|
||||
BucketObjQ = {foldobjects_bybucket, ?RIAK_TAG, "Bucket", {SumIntFun, 0}},
|
||||
|
@ -138,7 +137,7 @@ query_count(_Config) ->
|
|||
"Key1",
|
||||
"Value1",
|
||||
[],
|
||||
{"MDK1", "MDV1"}),
|
||||
[{"MDK1", "MDV1"}]),
|
||||
ok = testutil:book_riakput(Book1, TestObject, TestSpec),
|
||||
testutil:check_forobject(Book1, TestObject),
|
||||
testutil:check_formissingobject(Book1, "Bucket1", "Key2"),
|
||||
|
|
|
@ -67,10 +67,12 @@ recovr_strategy(_Config) ->
|
|||
|
||||
lists:foreach(fun({K, _SpcL}) ->
|
||||
{ok, OH} = testutil:book_riakhead(Book1, "Bucket6", K),
|
||||
K = OH#r_object.key,
|
||||
VCH = testutil:get_vclock(OH),
|
||||
{ok, OG} = testutil:book_riakget(Book1, "Bucket6", K),
|
||||
V = testutil:get_value(OG),
|
||||
true = V == V4
|
||||
VCG = testutil:get_vclock(OG),
|
||||
true = V == V4,
|
||||
true = VCH == VCG
|
||||
end,
|
||||
lists:nthtail(6400, AllSpcL)),
|
||||
Q = fun(RT) -> {index_query,
|
||||
|
@ -154,7 +156,7 @@ aae_bustedjournal(_Config) ->
|
|||
% Will need to remove the file or corrupt the hashtree to get presence to
|
||||
% fail
|
||||
|
||||
FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, testutil:riak_hash(V)}|Acc]
|
||||
FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, erlang:phash2(V)}|Acc]
|
||||
end,
|
||||
SW = os:timestamp(),
|
||||
{async, HashTreeF3} = leveled_bookie:book_returnfolder(Bookie2,
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
set_object/5,
|
||||
get_key/1,
|
||||
get_value/1,
|
||||
get_vclock/1,
|
||||
get_compressiblevalue/0,
|
||||
get_compressiblevalue_andinteger/0,
|
||||
get_randomindexes_generator/1,
|
||||
|
@ -39,15 +40,91 @@
|
|||
restore_file/2,
|
||||
restore_topending/2,
|
||||
find_journals/1,
|
||||
riak_hash/1,
|
||||
wait_for_compaction/1,
|
||||
foldkeysfun/3,
|
||||
sync_strategy/0]).
|
||||
|
||||
-define(RETURN_TERMS, {true, undefined}).
|
||||
-define(SLOWOFFER_DELAY, 5).
|
||||
-define(V1_VERS, 1).
|
||||
-define(MAGIC, 53). % riak_kv -> riak_object
|
||||
-define(MD_VTAG, <<"X-Riak-VTag">>).
|
||||
-define(MD_LASTMOD, <<"X-Riak-Last-Modified">>).
|
||||
-define(MD_DELETED, <<"X-Riak-Deleted">>).
|
||||
-define(EMPTY_VTAG_BIN, <<"e">>).
|
||||
|
||||
%% =================================================
|
||||
%% From riak_object
|
||||
|
||||
to_binary(v1, #r_object{contents=Contents, vclock=VClock}) ->
|
||||
new_v1(VClock, Contents).
|
||||
|
||||
new_v1(Vclock, Siblings) ->
|
||||
VclockBin = term_to_binary(Vclock),
|
||||
VclockLen = byte_size(VclockBin),
|
||||
SibCount = length(Siblings),
|
||||
SibsBin = bin_contents(Siblings),
|
||||
<<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
|
||||
VclockBin/binary, SibCount:32/integer, SibsBin/binary>>.
|
||||
|
||||
bin_content(#r_content{metadata=Meta, value=Val}) ->
|
||||
ValBin = encode_maybe_binary(Val),
|
||||
ValLen = byte_size(ValBin),
|
||||
MetaBin = meta_bin(Meta),
|
||||
MetaLen = byte_size(MetaBin),
|
||||
<<ValLen:32/integer, ValBin:ValLen/binary,
|
||||
MetaLen:32/integer, MetaBin:MetaLen/binary>>.
|
||||
|
||||
bin_contents(Contents) ->
|
||||
F = fun(Content, Acc) ->
|
||||
<<Acc/binary, (bin_content(Content))/binary>>
|
||||
end,
|
||||
lists:foldl(F, <<>>, Contents).
|
||||
|
||||
meta_bin(MD) ->
|
||||
{{VTagVal, Deleted, LastModVal}, RestBin} =
|
||||
dict:fold(fun fold_meta_to_bin/3,
|
||||
{{undefined, <<0>>, undefined}, <<>>},
|
||||
MD),
|
||||
VTagBin = case VTagVal of
|
||||
undefined -> ?EMPTY_VTAG_BIN;
|
||||
_ -> list_to_binary(VTagVal)
|
||||
end,
|
||||
VTagLen = byte_size(VTagBin),
|
||||
LastModBin = case LastModVal of
|
||||
undefined ->
|
||||
<<0:32/integer, 0:32/integer, 0:32/integer>>;
|
||||
{Mega,Secs,Micro} ->
|
||||
<<Mega:32/integer, Secs:32/integer, Micro:32/integer>>
|
||||
end,
|
||||
<<LastModBin/binary, VTagLen:8/integer, VTagBin:VTagLen/binary,
|
||||
Deleted:1/binary-unit:8, RestBin/binary>>.
|
||||
|
||||
fold_meta_to_bin(?MD_VTAG, Value, {{_Vt,Del,Lm},RestBin}) ->
|
||||
{{Value, Del, Lm}, RestBin};
|
||||
fold_meta_to_bin(?MD_LASTMOD, Value, {{Vt,Del,_Lm},RestBin}) ->
|
||||
{{Vt, Del, Value}, RestBin};
|
||||
fold_meta_to_bin(?MD_DELETED, true, {{Vt,_Del,Lm},RestBin})->
|
||||
{{Vt, <<1>>, Lm}, RestBin};
|
||||
fold_meta_to_bin(?MD_DELETED, "true", Acc) ->
|
||||
fold_meta_to_bin(?MD_DELETED, true, Acc);
|
||||
fold_meta_to_bin(?MD_DELETED, _, {{Vt,_Del,Lm},RestBin}) ->
|
||||
{{Vt, <<0>>, Lm}, RestBin};
|
||||
fold_meta_to_bin(Key, Value, {{_Vt,_Del,_Lm}=Elems,RestBin}) ->
|
||||
ValueBin = encode_maybe_binary(Value),
|
||||
ValueLen = byte_size(ValueBin),
|
||||
KeyBin = encode_maybe_binary(Key),
|
||||
KeyLen = byte_size(KeyBin),
|
||||
MetaBin = <<KeyLen:32/integer, KeyBin/binary,
|
||||
ValueLen:32/integer, ValueBin/binary>>,
|
||||
{Elems, <<RestBin/binary, MetaBin/binary>>}.
|
||||
|
||||
encode_maybe_binary(Bin) when is_binary(Bin) ->
|
||||
<<1, Bin/binary>>;
|
||||
encode_maybe_binary(Bin) ->
|
||||
<<0, (term_to_binary(Bin))/binary>>.
|
||||
|
||||
%% =================================================
|
||||
|
||||
sync_strategy() ->
|
||||
case erlang:system_info(otp_release) of
|
||||
|
@ -61,9 +138,14 @@ sync_strategy() ->
|
|||
none
|
||||
end.
|
||||
|
||||
|
||||
book_riakput(Pid, RiakObject, IndexSpecs) ->
|
||||
{Bucket, Key} = leveled_codec:riakto_keydetails(RiakObject),
|
||||
leveled_bookie:book_put(Pid, Bucket, Key, RiakObject, IndexSpecs, ?RIAK_TAG).
|
||||
leveled_bookie:book_put(Pid,
|
||||
RiakObject#r_object.bucket,
|
||||
RiakObject#r_object.key,
|
||||
to_binary(v1, RiakObject),
|
||||
IndexSpecs,
|
||||
?RIAK_TAG).
|
||||
|
||||
book_riakdelete(Pid, Bucket, Key, IndexSpecs) ->
|
||||
leveled_bookie:book_put(Pid, Bucket, Key, delete, IndexSpecs, ?RIAK_TAG).
|
||||
|
@ -144,9 +226,9 @@ check_forlist(Bookie, ChkList, Log) ->
|
|||
R = book_riakget(Bookie,
|
||||
Obj#r_object.bucket,
|
||||
Obj#r_object.key),
|
||||
ok = case R of
|
||||
{ok, Obj} ->
|
||||
ok;
|
||||
true = case R of
|
||||
{ok, Val} ->
|
||||
to_binary(v1, Obj) == Val;
|
||||
not_found ->
|
||||
io:format("Object not found for key ~s~n",
|
||||
[Obj#r_object.key]),
|
||||
|
@ -169,20 +251,18 @@ check_formissinglist(Bookie, ChkList) ->
|
|||
[timer:now_diff(os:timestamp(), SW), length(ChkList)]).
|
||||
|
||||
check_forobject(Bookie, TestObject) ->
|
||||
{ok, TestObject} = book_riakget(Bookie,
|
||||
TestBinary = to_binary(v1, TestObject),
|
||||
{ok, TestBinary} = book_riakget(Bookie,
|
||||
TestObject#r_object.bucket,
|
||||
TestObject#r_object.key),
|
||||
{ok, HeadObject} = book_riakhead(Bookie,
|
||||
{ok, HeadBinary} = book_riakhead(Bookie,
|
||||
TestObject#r_object.bucket,
|
||||
TestObject#r_object.key),
|
||||
ok = case {HeadObject#r_object.bucket,
|
||||
HeadObject#r_object.key,
|
||||
HeadObject#r_object.vclock} of
|
||||
{B1, K1, VC1} when B1 == TestObject#r_object.bucket,
|
||||
K1 == TestObject#r_object.key,
|
||||
VC1 == TestObject#r_object.vclock ->
|
||||
ok
|
||||
end.
|
||||
{_SibMetaBinList,
|
||||
Vclock,
|
||||
_Hash,
|
||||
size} = leveled_codec:riak_extract_metadata(HeadBinary, size),
|
||||
true = Vclock == TestObject#r_object.vclock.
|
||||
|
||||
check_formissingobject(Bookie, Bucket, Key) ->
|
||||
not_found = book_riakget(Bookie, Bucket, Key),
|
||||
|
@ -194,11 +274,11 @@ generate_testobject() ->
|
|||
"Key1",
|
||||
"Value1",
|
||||
[],
|
||||
{"MDK1", "MDV1"}},
|
||||
[{"MDK1", "MDV1"}]},
|
||||
generate_testobject(B1, K1, V1, Spec1, MD).
|
||||
|
||||
generate_testobject(B, K, V, Spec, MD) ->
|
||||
Content = #r_content{metadata=MD, value=V},
|
||||
Content = #r_content{metadata=dict:from_list(MD), value=V},
|
||||
{#r_object{bucket=B, key=K, contents=[Content], vclock=[{'a',1}]},
|
||||
Spec}.
|
||||
|
||||
|
@ -282,6 +362,7 @@ set_object(Bucket, Key, Value, IndexGen) ->
|
|||
set_object(Bucket, Key, Value, IndexGen, []).
|
||||
|
||||
set_object(Bucket, Key, Value, IndexGen, Indexes2Remove) ->
|
||||
|
||||
Obj = {Bucket,
|
||||
Key,
|
||||
Value,
|
||||
|
@ -291,16 +372,52 @@ set_object(Bucket, Key, Value, IndexGen, Indexes2Remove) ->
|
|||
[{"MDK", "MDV" ++ Key},
|
||||
{"MDK2", "MDV" ++ Key}]},
|
||||
{B1, K1, V1, Spec1, MD} = Obj,
|
||||
Content = #r_content{metadata=MD, value=V1},
|
||||
{#r_object{bucket=B1, key=K1, contents=[Content], vclock=[{'a',1}]},
|
||||
Content = #r_content{metadata=dict:from_list(MD), value=V1},
|
||||
{#r_object{bucket=B1,
|
||||
key=K1,
|
||||
contents=[Content],
|
||||
vclock=generate_vclock()},
|
||||
Spec1}.
|
||||
|
||||
|
||||
generate_vclock() ->
|
||||
lists:map(fun(X) ->
|
||||
{_, Actor} = lists:keyfind(random:uniform(10),
|
||||
1,
|
||||
actor_list()),
|
||||
{Actor, X} end,
|
||||
lists:seq(1, random:uniform(8))).
|
||||
|
||||
|
||||
actor_list() ->
|
||||
[{1, albert}, {2, bertie}, {3, clara}, {4, dave}, {5, elton},
|
||||
{6, fred}, {7, george}, {8, harry}, {9, isaac}, {10, leila}].
|
||||
|
||||
get_key(Object) ->
|
||||
Object#r_object.key.
|
||||
|
||||
get_value(Object) ->
|
||||
[Content] = Object#r_object.contents,
|
||||
Content#r_content.value.
|
||||
get_value(ObjectBin) ->
|
||||
<<_Magic:8/integer, _Vers:8/integer, VclockLen:32/integer,
|
||||
Rest1/binary>> = ObjectBin,
|
||||
<<_VclockBin:VclockLen/binary, SibCount:32/integer, SibsBin/binary>> = Rest1,
|
||||
case SibCount of
|
||||
1 ->
|
||||
<<SibLength:32/integer, Rest2/binary>> = SibsBin,
|
||||
<<ContentBin:SibLength/binary, _MetaBin/binary>> = Rest2,
|
||||
case ContentBin of
|
||||
<<0, ContentBin0/binary>> ->
|
||||
binary_to_term(ContentBin0)
|
||||
end;
|
||||
N ->
|
||||
io:format("SibCount of ~w with ObjectBin ~w~n", [N, ObjectBin]),
|
||||
error
|
||||
end.
|
||||
|
||||
get_vclock(ObjectBin) ->
|
||||
<<_Magic:8/integer, _Vers:8/integer, VclockLen:32/integer,
|
||||
Rest1/binary>> = ObjectBin,
|
||||
<<VclockBin:VclockLen/binary, _Bin/binary>> = Rest1,
|
||||
binary_to_term(VclockBin).
|
||||
|
||||
load_objects(ChunkSize, GenList, Bookie, TestObject, Generator) ->
|
||||
lists:map(fun(KN) ->
|
||||
|
@ -467,6 +584,10 @@ corrupt_journal(RootPath, FileName, Corruptions, BasePosition, GapSize) ->
|
|||
OriginalPath = RootPath ++ "/journal/journal_files/" ++ FileName,
|
||||
BackupPath = RootPath ++ "/journal/journal_files/" ++
|
||||
filename:basename(FileName, ".cdb") ++ ".bak",
|
||||
io:format("Corruption attempt to be made to filename ~s ~w ~w~n",
|
||||
[FileName,
|
||||
filelib:is_file(OriginalPath),
|
||||
filelib:is_file(BackupPath)]),
|
||||
{ok, _BytesCopied} = file:copy(OriginalPath, BackupPath),
|
||||
{ok, Handle} = file:open(OriginalPath, [binary, raw, read, write]),
|
||||
lists:foreach(fun(X) ->
|
||||
|
@ -504,11 +625,3 @@ find_journals(RootPath) ->
|
|||
FNsA_J),
|
||||
CDBFiles.
|
||||
|
||||
|
||||
riak_hash(Obj=#r_object{}) ->
|
||||
Vclock = vclock(Obj),
|
||||
UpdObj = set_vclock(Obj, lists:sort(Vclock)),
|
||||
erlang:phash2(term_to_binary(UpdObj)).
|
||||
|
||||
set_vclock(Object=#r_object{}, VClock) -> Object#r_object{vclock=VClock}.
|
||||
vclock(#r_object{vclock=VClock}) -> VClock.
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue