Split out key codec
Aplit out key codec, and also saner approach to key comparison (although still awkward).
This commit is contained in:
parent
de54a28328
commit
bbdac65f8d
6 changed files with 181 additions and 155 deletions
|
@ -140,16 +140,7 @@
|
||||||
book_snapshotstore/3,
|
book_snapshotstore/3,
|
||||||
book_snapshotledger/3,
|
book_snapshotledger/3,
|
||||||
book_compactjournal/2,
|
book_compactjournal/2,
|
||||||
book_close/1,
|
book_close/1]).
|
||||||
strip_to_keyonly/1,
|
|
||||||
strip_to_keyseqonly/1,
|
|
||||||
strip_to_seqonly/1,
|
|
||||||
strip_to_statusonly/1,
|
|
||||||
strip_to_keyseqstatusonly/1,
|
|
||||||
striphead_to_details/1,
|
|
||||||
key_compare/3,
|
|
||||||
key_dominates/2,
|
|
||||||
print_key/1]).
|
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
@ -265,7 +256,7 @@ handle_call({get, Key}, _From, State) ->
|
||||||
not_present ->
|
not_present ->
|
||||||
{reply, not_found, State};
|
{reply, not_found, State};
|
||||||
Head ->
|
Head ->
|
||||||
{Seqn, Status, _MD} = striphead_to_details(Head),
|
{Seqn, Status, _MD} = leveled_codec:striphead_to_details(Head),
|
||||||
case Status of
|
case Status of
|
||||||
{tomb, _} ->
|
{tomb, _} ->
|
||||||
{reply, not_found, State};
|
{reply, not_found, State};
|
||||||
|
@ -283,12 +274,12 @@ handle_call({head, Key}, _From, State) ->
|
||||||
not_present ->
|
not_present ->
|
||||||
{reply, not_found, State};
|
{reply, not_found, State};
|
||||||
Head ->
|
Head ->
|
||||||
{_Seqn, Status, MD} = striphead_to_details(Head),
|
{_Seqn, Status, MD} = leveled_codec:striphead_to_details(Head),
|
||||||
case Status of
|
case Status of
|
||||||
{tomb, _} ->
|
{tomb, _} ->
|
||||||
{reply, not_found, State};
|
{reply, not_found, State};
|
||||||
{active, _} ->
|
{active, _} ->
|
||||||
OMD = build_metadata_object(Key, MD),
|
OMD = leveled_codec:build_metadata_object(Key, MD),
|
||||||
{reply, {ok, OMD}, State}
|
{reply, {ok, OMD}, State}
|
||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
|
@ -442,116 +433,20 @@ fetch_value(Key, SQN, Inker) ->
|
||||||
not_present
|
not_present
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% Format of a Key within the ledger is
|
|
||||||
%% {PrimaryKey, SQN, Metadata, Status}
|
|
||||||
|
|
||||||
strip_to_keyonly({keyonly, K}) -> K;
|
|
||||||
strip_to_keyonly({K, _V}) -> K.
|
|
||||||
|
|
||||||
strip_to_keyseqonly({K, {SeqN, _, _}}) -> {K, SeqN}.
|
|
||||||
|
|
||||||
strip_to_keyseqstatusonly({K, {SeqN, St, _MD}}) -> {K, SeqN, St}.
|
|
||||||
|
|
||||||
strip_to_statusonly({_, {_, St, _}}) -> St.
|
|
||||||
|
|
||||||
strip_to_seqonly({_, {SeqN, _, _}}) -> SeqN.
|
|
||||||
|
|
||||||
striphead_to_details({SeqN, St, MD}) -> {SeqN, St, MD}.
|
|
||||||
|
|
||||||
key_dominates(LeftKey, RightKey) ->
|
|
||||||
case {LeftKey, RightKey} of
|
|
||||||
{{LK, _LVAL}, {RK, _RVAL}} when LK < RK ->
|
|
||||||
left_hand_first;
|
|
||||||
{{LK, _LVAL}, {RK, _RVAL}} when RK < LK ->
|
|
||||||
right_hand_first;
|
|
||||||
{{LK, {LSN, _LST, _LMD}}, {RK, {RSN, _RST, _RMD}}}
|
|
||||||
when LK == RK, LSN >= RSN ->
|
|
||||||
left_hand_dominant;
|
|
||||||
{{LK, {LSN, _LST, _LMD}}, {RK, {RSN, _RST, _RMD}}}
|
|
||||||
when LK == RK, LSN < RSN ->
|
|
||||||
right_hand_dominant
|
|
||||||
end.
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
get_metadatas(#r_object{contents=Contents}) ->
|
|
||||||
[Content#r_content.metadata || Content <- Contents].
|
|
||||||
|
|
||||||
set_vclock(Object=#r_object{}, VClock) -> Object#r_object{vclock=VClock}.
|
|
||||||
|
|
||||||
vclock(#r_object{vclock=VClock}) -> VClock.
|
|
||||||
|
|
||||||
to_binary(v0, Obj) ->
|
|
||||||
term_to_binary(Obj).
|
|
||||||
|
|
||||||
hash(Obj=#r_object{}) ->
|
|
||||||
Vclock = vclock(Obj),
|
|
||||||
UpdObj = set_vclock(Obj, lists:sort(Vclock)),
|
|
||||||
erlang:phash2(to_binary(v0, UpdObj)).
|
|
||||||
|
|
||||||
extract_metadata(Obj, Size) ->
|
|
||||||
{get_metadatas(Obj), vclock(Obj), hash(Obj), Size}.
|
|
||||||
|
|
||||||
accumulate_size(_Key, Value, {Size, Count}) ->
|
accumulate_size(_Key, Value, {Size, Count}) ->
|
||||||
{_, _, MD} = Value,
|
{_, _, MD} = Value,
|
||||||
{_, _, _, ObjSize} = MD,
|
{_, _, _, ObjSize} = MD,
|
||||||
{Size + ObjSize, Count + 1}.
|
{Size + ObjSize, Count + 1}.
|
||||||
|
|
||||||
build_metadata_object(PrimaryKey, Head) ->
|
|
||||||
{o, Bucket, Key, null} = PrimaryKey,
|
|
||||||
{MD, VC, _, _} = Head,
|
|
||||||
Contents = lists:foldl(fun(X, Acc) -> Acc ++ [#r_content{metadata=X}] end,
|
|
||||||
[],
|
|
||||||
MD),
|
|
||||||
#r_object{contents=Contents, bucket=Bucket, key=Key, vclock=VC}.
|
|
||||||
|
|
||||||
convert_indexspecs(IndexSpecs, SQN, PrimaryKey) ->
|
|
||||||
lists:map(fun({IndexOp, IndexField, IndexValue}) ->
|
|
||||||
Status = case IndexOp of
|
|
||||||
add ->
|
|
||||||
%% TODO: timestamp support
|
|
||||||
{active, infinity};
|
|
||||||
remove ->
|
|
||||||
%% TODO: timestamps for delayed reaping
|
|
||||||
{tomb, infinity}
|
|
||||||
end,
|
|
||||||
{o, B, K, _SK} = PrimaryKey,
|
|
||||||
{{i, B, {IndexField, IndexValue}, K},
|
|
||||||
{SQN, Status, null}}
|
|
||||||
end,
|
|
||||||
IndexSpecs).
|
|
||||||
|
|
||||||
% Return a tuple of string to ease the printing of keys to logs
|
|
||||||
print_key(Key) ->
|
|
||||||
case Key of
|
|
||||||
{o, B, K, _SK} ->
|
|
||||||
{"Object", B, K};
|
|
||||||
{i, B, {F, _V}, _K} ->
|
|
||||||
{"Index", B, F}
|
|
||||||
end.
|
|
||||||
|
|
||||||
% Compare a key against a query key, only comparing elements that are non-null
|
|
||||||
% in the Query key
|
|
||||||
key_compare(QueryKey, CheckingKey, gt) ->
|
|
||||||
key_compare(QueryKey, CheckingKey, fun(X,Y) -> X > Y end);
|
|
||||||
key_compare(QueryKey, CheckingKey, lt) ->
|
|
||||||
key_compare(QueryKey, CheckingKey, fun(X,Y) -> X < Y end);
|
|
||||||
key_compare({QK1, null, null, null}, {CK1, _, _, _}, CompareFun) ->
|
|
||||||
CompareFun(QK1, CK1);
|
|
||||||
key_compare({QK1, QK2, null, null}, {CK1, CK2, _, _}, CompareFun) ->
|
|
||||||
CompareFun({QK1, QK2}, {CK1, CK2});
|
|
||||||
key_compare({QK1, QK2, QK3, null}, {CK1, CK2, CK3, _}, CompareFun) ->
|
|
||||||
CompareFun({QK1, QK2, QK3}, {CK1, CK2, CK3});
|
|
||||||
key_compare(QueryKey, CheckingKey, CompareFun) ->
|
|
||||||
CompareFun(QueryKey, CheckingKey).
|
|
||||||
|
|
||||||
|
|
||||||
preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs) ->
|
preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs) ->
|
||||||
PrimaryChange = {PK,
|
PrimaryChange = {PK,
|
||||||
{SQN,
|
{SQN,
|
||||||
{active, infinity},
|
{active, infinity},
|
||||||
extract_metadata(Obj, Size)}},
|
leveled_codec:extract_metadata(Obj, Size)}},
|
||||||
SecChanges = convert_indexspecs(IndexSpecs, SQN, PK),
|
SecChanges = leveled_codec:convert_indexspecs(IndexSpecs, SQN, PK),
|
||||||
[PrimaryChange] ++ SecChanges.
|
[PrimaryChange] ++ SecChanges.
|
||||||
|
|
||||||
addto_ledgercache(Changes, Cache) ->
|
addto_ledgercache(Changes, Cache) ->
|
||||||
|
@ -709,17 +604,5 @@ multi_key_test() ->
|
||||||
?assertMatch(F2D, Obj2),
|
?assertMatch(F2D, Obj2),
|
||||||
ok = book_close(Bookie2),
|
ok = book_close(Bookie2),
|
||||||
reset_filestructure().
|
reset_filestructure().
|
||||||
|
|
||||||
indexspecs_test() ->
|
|
||||||
IndexSpecs = [{add, "t1_int", 456},
|
|
||||||
{add, "t1_bin", "adbc123"},
|
|
||||||
{remove, "t1_bin", "abdc456"}],
|
|
||||||
Changes = convert_indexspecs(IndexSpecs, 1, {o, "Bucket", "Key2", null}),
|
|
||||||
?assertMatch({{i, "Bucket", {"t1_int", 456}, "Key2"},
|
|
||||||
{1, {active, infinity}, null}}, lists:nth(1, Changes)),
|
|
||||||
?assertMatch({{i, "Bucket", {"t1_bin", "adbc123"}, "Key2"},
|
|
||||||
{1, {active, infinity}, null}}, lists:nth(2, Changes)),
|
|
||||||
?assertMatch({{i, "Bucket", {"t1_bin", "abdc456"}, "Key2"},
|
|
||||||
{1, {tomb, infinity}, null}}, lists:nth(3, Changes)).
|
|
||||||
|
|
||||||
-endif.
|
-endif.
|
|
@ -1098,12 +1098,18 @@ write_key_value_pairs(Handle, [HeadPair|TailList], Acc) ->
|
||||||
write_hash_tables(Handle, HashTree) ->
|
write_hash_tables(Handle, HashTree) ->
|
||||||
Seq = lists:seq(0, 255),
|
Seq = lists:seq(0, 255),
|
||||||
{ok, StartPos} = file:position(Handle, cur),
|
{ok, StartPos} = file:position(Handle, cur),
|
||||||
|
SWC = os:timestamp(),
|
||||||
{IndexList, HashTreeBin} = write_hash_tables(Seq,
|
{IndexList, HashTreeBin} = write_hash_tables(Seq,
|
||||||
HashTree,
|
HashTree,
|
||||||
StartPos,
|
StartPos,
|
||||||
[],
|
[],
|
||||||
<<>>),
|
<<>>),
|
||||||
|
io:format("HashTree computed in ~w microseconds~n",
|
||||||
|
[timer:now_diff(os:timestamp(), SWC)]),
|
||||||
|
SWW = os:timestamp(),
|
||||||
ok = file:write(Handle, HashTreeBin),
|
ok = file:write(Handle, HashTreeBin),
|
||||||
|
io:format("HashTree written in ~w microseconds~n",
|
||||||
|
[timer:now_diff(os:timestamp(), SWW)]),
|
||||||
{ok, EndPos} = file:position(Handle, cur),
|
{ok, EndPos} = file:position(Handle, cur),
|
||||||
ok = file:advise(Handle, StartPos, EndPos - StartPos, will_need),
|
ok = file:advise(Handle, StartPos, EndPos - StartPos, will_need),
|
||||||
IndexList.
|
IndexList.
|
||||||
|
|
142
src/leveled_codec.erl
Normal file
142
src/leveled_codec.erl
Normal file
|
@ -0,0 +1,142 @@
|
||||||
|
%% -------- Key Codec ---------
|
||||||
|
%%
|
||||||
|
%% Functions for manipulating keys and values within leveled. These are
|
||||||
|
%% currently static functions, they cannot be overridden in the store other
|
||||||
|
%% than by changing them here. The formats are focused on the problem of
|
||||||
|
%% supporting Riak KV
|
||||||
|
|
||||||
|
-module(leveled_codec).
|
||||||
|
|
||||||
|
-include("../include/leveled.hrl").
|
||||||
|
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
-export([strip_to_keyonly/1,
|
||||||
|
strip_to_keyseqonly/1,
|
||||||
|
strip_to_seqonly/1,
|
||||||
|
strip_to_statusonly/1,
|
||||||
|
strip_to_keyseqstatusonly/1,
|
||||||
|
striphead_to_details/1,
|
||||||
|
endkey_passed/2,
|
||||||
|
key_dominates/2,
|
||||||
|
print_key/1,
|
||||||
|
extract_metadata/2,
|
||||||
|
build_metadata_object/2,
|
||||||
|
convert_indexspecs/3]).
|
||||||
|
|
||||||
|
|
||||||
|
strip_to_keyonly({keyonly, K}) -> K;
|
||||||
|
strip_to_keyonly({K, _V}) -> K.
|
||||||
|
|
||||||
|
strip_to_keyseqonly({K, {SeqN, _, _}}) -> {K, SeqN}.
|
||||||
|
|
||||||
|
strip_to_keyseqstatusonly({K, {SeqN, St, _MD}}) -> {K, SeqN, St}.
|
||||||
|
|
||||||
|
strip_to_statusonly({_, {_, St, _}}) -> St.
|
||||||
|
|
||||||
|
strip_to_seqonly({_, {SeqN, _, _}}) -> SeqN.
|
||||||
|
|
||||||
|
striphead_to_details({SeqN, St, MD}) -> {SeqN, St, MD}.
|
||||||
|
|
||||||
|
key_dominates(LeftKey, RightKey) ->
|
||||||
|
case {LeftKey, RightKey} of
|
||||||
|
{{LK, _LVAL}, {RK, _RVAL}} when LK < RK ->
|
||||||
|
left_hand_first;
|
||||||
|
{{LK, _LVAL}, {RK, _RVAL}} when RK < LK ->
|
||||||
|
right_hand_first;
|
||||||
|
{{LK, {LSN, _LST, _LMD}}, {RK, {RSN, _RST, _RMD}}}
|
||||||
|
when LK == RK, LSN >= RSN ->
|
||||||
|
left_hand_dominant;
|
||||||
|
{{LK, {LSN, _LST, _LMD}}, {RK, {RSN, _RST, _RMD}}}
|
||||||
|
when LK == RK, LSN < RSN ->
|
||||||
|
right_hand_dominant
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
get_metadatas(#r_object{contents=Contents}) ->
|
||||||
|
[Content#r_content.metadata || Content <- Contents].
|
||||||
|
|
||||||
|
set_vclock(Object=#r_object{}, VClock) -> Object#r_object{vclock=VClock}.
|
||||||
|
|
||||||
|
vclock(#r_object{vclock=VClock}) -> VClock.
|
||||||
|
|
||||||
|
to_binary(v0, Obj) ->
|
||||||
|
term_to_binary(Obj).
|
||||||
|
|
||||||
|
hash(Obj=#r_object{}) ->
|
||||||
|
Vclock = vclock(Obj),
|
||||||
|
UpdObj = set_vclock(Obj, lists:sort(Vclock)),
|
||||||
|
erlang:phash2(to_binary(v0, UpdObj)).
|
||||||
|
|
||||||
|
extract_metadata(Obj, Size) ->
|
||||||
|
{get_metadatas(Obj), vclock(Obj), hash(Obj), Size}.
|
||||||
|
|
||||||
|
|
||||||
|
build_metadata_object(PrimaryKey, Head) ->
|
||||||
|
{o, Bucket, Key, null} = PrimaryKey,
|
||||||
|
{MD, VC, _, _} = Head,
|
||||||
|
Contents = lists:foldl(fun(X, Acc) -> Acc ++ [#r_content{metadata=X}] end,
|
||||||
|
[],
|
||||||
|
MD),
|
||||||
|
#r_object{contents=Contents, bucket=Bucket, key=Key, vclock=VC}.
|
||||||
|
|
||||||
|
convert_indexspecs(IndexSpecs, SQN, PrimaryKey) ->
|
||||||
|
lists:map(fun({IndexOp, IndexField, IndexValue}) ->
|
||||||
|
Status = case IndexOp of
|
||||||
|
add ->
|
||||||
|
%% TODO: timestamp support
|
||||||
|
{active, infinity};
|
||||||
|
remove ->
|
||||||
|
%% TODO: timestamps for delayed reaping
|
||||||
|
{tomb, infinity}
|
||||||
|
end,
|
||||||
|
{o, B, K, _SK} = PrimaryKey,
|
||||||
|
{{i, B, {IndexField, IndexValue}, K},
|
||||||
|
{SQN, Status, null}}
|
||||||
|
end,
|
||||||
|
IndexSpecs).
|
||||||
|
|
||||||
|
% Return a tuple of string to ease the printing of keys to logs
|
||||||
|
print_key(Key) ->
|
||||||
|
case Key of
|
||||||
|
{o, B, K, _SK} ->
|
||||||
|
{"Object", B, K};
|
||||||
|
{i, B, {F, _V}, _K} ->
|
||||||
|
{"Index", B, F}
|
||||||
|
end.
|
||||||
|
|
||||||
|
% Compare a key against a query key, only comparing elements that are non-null
|
||||||
|
% in the Query key. This is used for comparing against end keys in queries.
|
||||||
|
endkey_passed({EK1, null, null, null}, {CK1, _, _, _}) ->
|
||||||
|
EK1 < CK1;
|
||||||
|
endkey_passed({EK1, EK2, null, null}, {CK1, CK2, _, _}) ->
|
||||||
|
{EK1, EK2} < {CK1, CK2};
|
||||||
|
endkey_passed({EK1, EK2, EK3, null}, {CK1, CK2, CK3, _}) ->
|
||||||
|
{EK1, EK2, EK3} < {CK1, CK2, CK3};
|
||||||
|
endkey_passed(EndKey, CheckingKey) ->
|
||||||
|
EndKey < CheckingKey.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
%%%============================================================================
|
||||||
|
%%% Test
|
||||||
|
%%%============================================================================
|
||||||
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
|
||||||
|
|
||||||
|
indexspecs_test() ->
|
||||||
|
IndexSpecs = [{add, "t1_int", 456},
|
||||||
|
{add, "t1_bin", "adbc123"},
|
||||||
|
{remove, "t1_bin", "abdc456"}],
|
||||||
|
Changes = convert_indexspecs(IndexSpecs, 1, {o, "Bucket", "Key2", null}),
|
||||||
|
?assertMatch({{i, "Bucket", {"t1_int", 456}, "Key2"},
|
||||||
|
{1, {active, infinity}, null}}, lists:nth(1, Changes)),
|
||||||
|
?assertMatch({{i, "Bucket", {"t1_bin", "adbc123"}, "Key2"},
|
||||||
|
{1, {active, infinity}, null}}, lists:nth(2, Changes)),
|
||||||
|
?assertMatch({{i, "Bucket", {"t1_bin", "abdc456"}, "Key2"},
|
||||||
|
{1, {tomb, infinity}, null}}, lists:nth(3, Changes)).
|
||||||
|
|
||||||
|
-endif.
|
|
@ -370,7 +370,7 @@ find_randomkeys(_FList, 0, _Source) ->
|
||||||
ok;
|
ok;
|
||||||
find_randomkeys(FList, Count, Source) ->
|
find_randomkeys(FList, Count, Source) ->
|
||||||
KV1 = lists:nth(random:uniform(length(Source)), Source),
|
KV1 = lists:nth(random:uniform(length(Source)), Source),
|
||||||
K1 = leveled_bookie:strip_to_keyonly(KV1),
|
K1 = leveled_codec:strip_to_keyonly(KV1),
|
||||||
P1 = choose_pid_toquery(FList, K1),
|
P1 = choose_pid_toquery(FList, K1),
|
||||||
FoundKV = leveled_sft:sft_get(P1, K1),
|
FoundKV = leveled_sft:sft_get(P1, K1),
|
||||||
Check = case FoundKV of
|
Check = case FoundKV of
|
||||||
|
@ -378,7 +378,7 @@ find_randomkeys(FList, Count, Source) ->
|
||||||
io:format("Failed to find ~w in ~w~n", [K1, P1]),
|
io:format("Failed to find ~w in ~w~n", [K1, P1]),
|
||||||
error;
|
error;
|
||||||
_ ->
|
_ ->
|
||||||
Found = leveled_bookie:strip_to_keyonly(FoundKV),
|
Found = leveled_codec:strip_to_keyonly(FoundKV),
|
||||||
io:format("success finding ~w in ~w~n", [K1, P1]),
|
io:format("success finding ~w in ~w~n", [K1, P1]),
|
||||||
?assertMatch(K1, Found),
|
?assertMatch(K1, Found),
|
||||||
ok
|
ok
|
||||||
|
|
|
@ -843,7 +843,7 @@ compare_to_sqn(Obj, SQN) ->
|
||||||
not_present ->
|
not_present ->
|
||||||
false;
|
false;
|
||||||
Obj ->
|
Obj ->
|
||||||
SQNToCompare = leveled_bookie:strip_to_seqonly(Obj),
|
SQNToCompare = leveled_codec:strip_to_seqonly(Obj),
|
||||||
if
|
if
|
||||||
SQNToCompare > SQN ->
|
SQNToCompare > SQN ->
|
||||||
false;
|
false;
|
||||||
|
@ -936,7 +936,7 @@ roll_new_tree(Tree, [], HighSQN) ->
|
||||||
roll_new_tree(Tree, [{SQN, KVList}|TailIncs], HighSQN) when SQN >= HighSQN ->
|
roll_new_tree(Tree, [{SQN, KVList}|TailIncs], HighSQN) when SQN >= HighSQN ->
|
||||||
R = lists:foldl(fun({Kx, Vx}, {TreeAcc, MaxSQN}) ->
|
R = lists:foldl(fun({Kx, Vx}, {TreeAcc, MaxSQN}) ->
|
||||||
UpdTree = gb_trees:enter(Kx, Vx, TreeAcc),
|
UpdTree = gb_trees:enter(Kx, Vx, TreeAcc),
|
||||||
SQNx = leveled_bookie:strip_to_seqonly({Kx, Vx}),
|
SQNx = leveled_codec:strip_to_seqonly({Kx, Vx}),
|
||||||
{UpdTree, max(SQNx, MaxSQN)}
|
{UpdTree, max(SQNx, MaxSQN)}
|
||||||
end,
|
end,
|
||||||
{Tree, HighSQN},
|
{Tree, HighSQN},
|
||||||
|
@ -1025,8 +1025,8 @@ print_manifest(Manifest) ->
|
||||||
lists:seq(0, ?MAX_LEVELS - 1)).
|
lists:seq(0, ?MAX_LEVELS - 1)).
|
||||||
|
|
||||||
print_manifest_entry(Entry) ->
|
print_manifest_entry(Entry) ->
|
||||||
{S1, S2, S3} = leveled_bookie:print_key(Entry#manifest_entry.start_key),
|
{S1, S2, S3} = leveled_codec:print_key(Entry#manifest_entry.start_key),
|
||||||
{E1, E2, E3} = leveled_bookie:print_key(Entry#manifest_entry.end_key),
|
{E1, E2, E3} = leveled_codec:print_key(Entry#manifest_entry.end_key),
|
||||||
io:format("Manifest entry of " ++
|
io:format("Manifest entry of " ++
|
||||||
"startkey ~s ~s ~s " ++
|
"startkey ~s ~s ~s " ++
|
||||||
"endkey ~s ~s ~s " ++
|
"endkey ~s ~s ~s " ++
|
||||||
|
@ -1036,12 +1036,9 @@ print_manifest_entry(Entry) ->
|
||||||
|
|
||||||
initiate_rangequery_frommanifest(StartKey, EndKey, Manifest) ->
|
initiate_rangequery_frommanifest(StartKey, EndKey, Manifest) ->
|
||||||
CompareFun = fun(M) ->
|
CompareFun = fun(M) ->
|
||||||
C1 = leveled_bookie:key_compare(StartKey,
|
C1 = StartKey > M#manifest_entry.end_key,
|
||||||
M#manifest_entry.end_key,
|
C2 = leveled_codec:endkey_passed(EndKey,
|
||||||
gt),
|
M#manifest_entry.start_key),
|
||||||
C2 = leveled_bookie:key_compare(EndKey,
|
|
||||||
M#manifest_entry.start_key,
|
|
||||||
lt),
|
|
||||||
not (C1 or C2) end,
|
not (C1 or C2) end,
|
||||||
lists:foldl(fun(L, AccL) ->
|
lists:foldl(fun(L, AccL) ->
|
||||||
Level = get_item(L, Manifest, []),
|
Level = get_item(L, Manifest, []),
|
||||||
|
@ -1143,8 +1140,8 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, QueryFunT) ->
|
||||||
{LCnt, {Key, Val}},
|
{LCnt, {Key, Val}},
|
||||||
QueryFunT);
|
QueryFunT);
|
||||||
{{Key, Val}, BKL, {BestKey, BestVal}} when Key == BestKey ->
|
{{Key, Val}, BKL, {BestKey, BestVal}} when Key == BestKey ->
|
||||||
SQN = leveled_bookie:strip_to_seqonly({Key, Val}),
|
SQN = leveled_codec:strip_to_seqonly({Key, Val}),
|
||||||
BestSQN = leveled_bookie:strip_to_seqonly({BestKey, BestVal}),
|
BestSQN = leveled_codec:strip_to_seqonly({BestKey, BestVal}),
|
||||||
if
|
if
|
||||||
SQN =< BestSQN ->
|
SQN =< BestSQN ->
|
||||||
% This is a dominated key, so we need to skip over it
|
% This is a dominated key, so we need to skip over it
|
||||||
|
@ -1193,7 +1190,7 @@ keyfolder(IMMiterator, SFTiterator, StartKey, EndKey, {AccFun, Acc}) ->
|
||||||
% iterate only over the remaining keys in the SFT iterator
|
% iterate only over the remaining keys in the SFT iterator
|
||||||
keyfolder(null, SFTiterator, StartKey, EndKey, {AccFun, Acc});
|
keyfolder(null, SFTiterator, StartKey, EndKey, {AccFun, Acc});
|
||||||
{IMMKey, IMMVal, NxtIMMiterator} ->
|
{IMMKey, IMMVal, NxtIMMiterator} ->
|
||||||
case leveled_bookie:key_compare(EndKey, IMMKey, lt) of
|
case leveled_codec:endkey_passed(EndKey, IMMKey) of
|
||||||
true ->
|
true ->
|
||||||
% There are no more keys in-range in the in-memory
|
% There are no more keys in-range in the in-memory
|
||||||
% iterator, so take action as if this iterator is empty
|
% iterator, so take action as if this iterator is empty
|
||||||
|
@ -1212,7 +1209,7 @@ keyfolder(IMMiterator, SFTiterator, StartKey, EndKey, {AccFun, Acc}) ->
|
||||||
% There is a next key, so need to know which is the
|
% There is a next key, so need to know which is the
|
||||||
% next key between the two (and handle two keys
|
% next key between the two (and handle two keys
|
||||||
% with different sequence numbers).
|
% with different sequence numbers).
|
||||||
case leveled_bookie:key_dominates({IMMKey,
|
case leveled_codec:key_dominates({IMMKey,
|
||||||
IMMVal},
|
IMMVal},
|
||||||
{SFTKey,
|
{SFTKey,
|
||||||
SFTVal}) of
|
SFTVal}) of
|
||||||
|
@ -1377,7 +1374,7 @@ assess_sqn(DumpList) ->
|
||||||
assess_sqn([], MinSQN, MaxSQN) ->
|
assess_sqn([], MinSQN, MaxSQN) ->
|
||||||
{MinSQN, MaxSQN};
|
{MinSQN, MaxSQN};
|
||||||
assess_sqn([HeadKey|Tail], MinSQN, MaxSQN) ->
|
assess_sqn([HeadKey|Tail], MinSQN, MaxSQN) ->
|
||||||
{_K, SQN} = leveled_bookie:strip_to_keyseqonly(HeadKey),
|
{_K, SQN} = leveled_codec:strip_to_keyseqonly(HeadKey),
|
||||||
assess_sqn(Tail, min(MinSQN, SQN), max(MaxSQN, SQN)).
|
assess_sqn(Tail, min(MinSQN, SQN), max(MaxSQN, SQN)).
|
||||||
|
|
||||||
|
|
||||||
|
@ -1763,7 +1760,7 @@ foldwithimm_simple_test() ->
|
||||||
{9, {active, infinity}, null},
|
{9, {active, infinity}, null},
|
||||||
IMM1),
|
IMM1),
|
||||||
IMMiter = gb_trees:iterator_from({o, "Bucket1", "Key1"}, IMM2),
|
IMMiter = gb_trees:iterator_from({o, "Bucket1", "Key1"}, IMM2),
|
||||||
AccFun = fun(K, V, Acc) -> SQN= leveled_bookie:strip_to_seqonly({K, V}),
|
AccFun = fun(K, V, Acc) -> SQN = leveled_codec:strip_to_seqonly({K, V}),
|
||||||
Acc ++ [{K, SQN}] end,
|
Acc ++ [{K, SQN}] end,
|
||||||
Acc = keyfolder(IMMiter,
|
Acc = keyfolder(IMMiter,
|
||||||
QueryArray,
|
QueryArray,
|
||||||
|
|
|
@ -563,7 +563,7 @@ acc_list_keysonly(null, empty) ->
|
||||||
acc_list_keysonly(null, RList) ->
|
acc_list_keysonly(null, RList) ->
|
||||||
RList;
|
RList;
|
||||||
acc_list_keysonly(R, RList) ->
|
acc_list_keysonly(R, RList) ->
|
||||||
lists:append(RList, [leveled_bookie:strip_to_keyseqstatusonly(R)]).
|
lists:append(RList, [leveled_codec:strip_to_keyseqstatusonly(R)]).
|
||||||
|
|
||||||
acc_list_kv(null, empty) ->
|
acc_list_kv(null, empty) ->
|
||||||
[];
|
[];
|
||||||
|
@ -652,10 +652,8 @@ fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, FunList,
|
||||||
scan_block([], StartKey, _EndKey, _FunList, _AccFun, Acc) ->
|
scan_block([], StartKey, _EndKey, _FunList, _AccFun, Acc) ->
|
||||||
{partial, Acc, StartKey};
|
{partial, Acc, StartKey};
|
||||||
scan_block([HeadKV|T], StartKey, EndKey, FunList, AccFun, Acc) ->
|
scan_block([HeadKV|T], StartKey, EndKey, FunList, AccFun, Acc) ->
|
||||||
K = leveled_bookie:strip_to_keyonly(HeadKV),
|
K = leveled_codec:strip_to_keyonly(HeadKV),
|
||||||
Pre = leveled_bookie:key_compare(StartKey, K, gt),
|
case {StartKey > K, leveled_codec:endkey_passed(EndKey, K)} of
|
||||||
Post = leveled_bookie:key_compare(EndKey, K, lt),
|
|
||||||
case {Pre, Post} of
|
|
||||||
{true, _} when StartKey /= all ->
|
{true, _} when StartKey /= all ->
|
||||||
scan_block(T, StartKey, EndKey, FunList, AccFun, Acc);
|
scan_block(T, StartKey, EndKey, FunList, AccFun, Acc);
|
||||||
{_, true} when EndKey /= all ->
|
{_, true} when EndKey /= all ->
|
||||||
|
@ -988,15 +986,15 @@ create_slot(KL1, KL2, Level, BlockCount, SegLists, SerialisedSlot, LengthList,
|
||||||
TrackingMetadata = case LowKey of
|
TrackingMetadata = case LowKey of
|
||||||
null ->
|
null ->
|
||||||
[NewLowKeyV|_] = BlockKeyList,
|
[NewLowKeyV|_] = BlockKeyList,
|
||||||
{leveled_bookie:strip_to_keyonly(NewLowKeyV),
|
{leveled_codec:strip_to_keyonly(NewLowKeyV),
|
||||||
min(LSN, LSNb), max(HSN, HSNb),
|
min(LSN, LSNb), max(HSN, HSNb),
|
||||||
leveled_bookie:strip_to_keyonly(last(BlockKeyList,
|
leveled_codec:strip_to_keyonly(last(BlockKeyList,
|
||||||
{last, LastKey})),
|
{last, LastKey})),
|
||||||
Status};
|
Status};
|
||||||
_ ->
|
_ ->
|
||||||
{LowKey,
|
{LowKey,
|
||||||
min(LSN, LSNb), max(HSN, HSNb),
|
min(LSN, LSNb), max(HSN, HSNb),
|
||||||
leveled_bookie:strip_to_keyonly(last(BlockKeyList,
|
leveled_codec:strip_to_keyonly(last(BlockKeyList,
|
||||||
{last, LastKey})),
|
{last, LastKey})),
|
||||||
Status}
|
Status}
|
||||||
end,
|
end,
|
||||||
|
@ -1035,7 +1033,7 @@ key_dominates(KL1, KL2, Level) ->
|
||||||
Level).
|
Level).
|
||||||
|
|
||||||
key_dominates_expanded([H1|T1], [], Level) ->
|
key_dominates_expanded([H1|T1], [], Level) ->
|
||||||
St1 = leveled_bookie:strip_to_statusonly(H1),
|
St1 = leveled_codec:strip_to_statusonly(H1),
|
||||||
case maybe_reap_expiredkey(St1, Level) of
|
case maybe_reap_expiredkey(St1, Level) of
|
||||||
true ->
|
true ->
|
||||||
{skipped_key, maybe_expand_pointer(T1), []};
|
{skipped_key, maybe_expand_pointer(T1), []};
|
||||||
|
@ -1043,7 +1041,7 @@ key_dominates_expanded([H1|T1], [], Level) ->
|
||||||
{{next_key, H1}, maybe_expand_pointer(T1), []}
|
{{next_key, H1}, maybe_expand_pointer(T1), []}
|
||||||
end;
|
end;
|
||||||
key_dominates_expanded([], [H2|T2], Level) ->
|
key_dominates_expanded([], [H2|T2], Level) ->
|
||||||
St2 = leveled_bookie:strip_to_statusonly(H2),
|
St2 = leveled_codec:strip_to_statusonly(H2),
|
||||||
case maybe_reap_expiredkey(St2, Level) of
|
case maybe_reap_expiredkey(St2, Level) of
|
||||||
true ->
|
true ->
|
||||||
{skipped_key, [], maybe_expand_pointer(T2)};
|
{skipped_key, [], maybe_expand_pointer(T2)};
|
||||||
|
@ -1052,8 +1050,8 @@ key_dominates_expanded([], [H2|T2], Level) ->
|
||||||
end;
|
end;
|
||||||
key_dominates_expanded([H1|T1], [H2|T2], Level) ->
|
key_dominates_expanded([H1|T1], [H2|T2], Level) ->
|
||||||
{{K1, V1}, {K2, V2}} = {H1, H2},
|
{{K1, V1}, {K2, V2}} = {H1, H2},
|
||||||
{Sq1, St1, _MD1} = leveled_bookie:striphead_to_details(V1),
|
{Sq1, St1, _MD1} = leveled_codec:striphead_to_details(V1),
|
||||||
{Sq2, St2, _MD2} = leveled_bookie:striphead_to_details(V2),
|
{Sq2, St2, _MD2} = leveled_codec:striphead_to_details(V2),
|
||||||
case K1 of
|
case K1 of
|
||||||
K2 ->
|
K2 ->
|
||||||
case Sq1 > Sq2 of
|
case Sq1 > Sq2 of
|
||||||
|
@ -1116,7 +1114,7 @@ pointer_append_queryresults(Results, QueryPid) ->
|
||||||
|
|
||||||
%% Update the sequence numbers
|
%% Update the sequence numbers
|
||||||
update_sequencenumbers(Item, LSN, HSN) when is_tuple(Item) ->
|
update_sequencenumbers(Item, LSN, HSN) when is_tuple(Item) ->
|
||||||
update_sequencenumbers(leveled_bookie:strip_to_seqonly(Item), LSN, HSN);
|
update_sequencenumbers(leveled_codec:strip_to_seqonly(Item), LSN, HSN);
|
||||||
update_sequencenumbers(SN, 0, 0) ->
|
update_sequencenumbers(SN, 0, 0) ->
|
||||||
{SN, SN};
|
{SN, SN};
|
||||||
update_sequencenumbers(SN, LSN, HSN) when SN < LSN ->
|
update_sequencenumbers(SN, LSN, HSN) when SN < LSN ->
|
||||||
|
@ -1227,7 +1225,7 @@ merge_seglists({SegList1, SegList2, SegList3, SegList4}) ->
|
||||||
lists:sort(Stage4).
|
lists:sort(Stage4).
|
||||||
|
|
||||||
hash_for_segmentid(KV) ->
|
hash_for_segmentid(KV) ->
|
||||||
erlang:phash2(leveled_bookie:strip_to_keyonly(KV), ?MAX_SEG_HASH).
|
erlang:phash2(leveled_codec:strip_to_keyonly(KV), ?MAX_SEG_HASH).
|
||||||
|
|
||||||
|
|
||||||
%% Check for a given list of segments in the filter, returning in normal
|
%% Check for a given list of segments in the filter, returning in normal
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue