diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index ebd6dc2..ecf5f81 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -140,16 +140,7 @@ book_snapshotstore/3, book_snapshotledger/3, book_compactjournal/2, - book_close/1, - strip_to_keyonly/1, - strip_to_keyseqonly/1, - strip_to_seqonly/1, - strip_to_statusonly/1, - strip_to_keyseqstatusonly/1, - striphead_to_details/1, - key_compare/3, - key_dominates/2, - print_key/1]). + book_close/1]). -include_lib("eunit/include/eunit.hrl"). @@ -265,7 +256,7 @@ handle_call({get, Key}, _From, State) -> not_present -> {reply, not_found, State}; Head -> - {Seqn, Status, _MD} = striphead_to_details(Head), + {Seqn, Status, _MD} = leveled_codec:striphead_to_details(Head), case Status of {tomb, _} -> {reply, not_found, State}; @@ -283,12 +274,12 @@ handle_call({head, Key}, _From, State) -> not_present -> {reply, not_found, State}; Head -> - {_Seqn, Status, MD} = striphead_to_details(Head), + {_Seqn, Status, MD} = leveled_codec:striphead_to_details(Head), case Status of {tomb, _} -> {reply, not_found, State}; {active, _} -> - OMD = build_metadata_object(Key, MD), + OMD = leveled_codec:build_metadata_object(Key, MD), {reply, {ok, OMD}, State} end end; @@ -442,116 +433,20 @@ fetch_value(Key, SQN, Inker) -> not_present end. -%% Format of a Key within the ledger is -%% {PrimaryKey, SQN, Metadata, Status} - -strip_to_keyonly({keyonly, K}) -> K; -strip_to_keyonly({K, _V}) -> K. - -strip_to_keyseqonly({K, {SeqN, _, _}}) -> {K, SeqN}. - -strip_to_keyseqstatusonly({K, {SeqN, St, _MD}}) -> {K, SeqN, St}. - -strip_to_statusonly({_, {_, St, _}}) -> St. - -strip_to_seqonly({_, {SeqN, _, _}}) -> SeqN. - -striphead_to_details({SeqN, St, MD}) -> {SeqN, St, MD}. - -key_dominates(LeftKey, RightKey) -> - case {LeftKey, RightKey} of - {{LK, _LVAL}, {RK, _RVAL}} when LK < RK -> - left_hand_first; - {{LK, _LVAL}, {RK, _RVAL}} when RK < LK -> - right_hand_first; - {{LK, {LSN, _LST, _LMD}}, {RK, {RSN, _RST, _RMD}}} - when LK == RK, LSN >= RSN -> - left_hand_dominant; - {{LK, {LSN, _LST, _LMD}}, {RK, {RSN, _RST, _RMD}}} - when LK == RK, LSN < RSN -> - right_hand_dominant - end. - - - -get_metadatas(#r_object{contents=Contents}) -> - [Content#r_content.metadata || Content <- Contents]. - -set_vclock(Object=#r_object{}, VClock) -> Object#r_object{vclock=VClock}. - -vclock(#r_object{vclock=VClock}) -> VClock. - -to_binary(v0, Obj) -> - term_to_binary(Obj). - -hash(Obj=#r_object{}) -> - Vclock = vclock(Obj), - UpdObj = set_vclock(Obj, lists:sort(Vclock)), - erlang:phash2(to_binary(v0, UpdObj)). - -extract_metadata(Obj, Size) -> - {get_metadatas(Obj), vclock(Obj), hash(Obj), Size}. accumulate_size(_Key, Value, {Size, Count}) -> {_, _, MD} = Value, {_, _, _, ObjSize} = MD, {Size + ObjSize, Count + 1}. -build_metadata_object(PrimaryKey, Head) -> - {o, Bucket, Key, null} = PrimaryKey, - {MD, VC, _, _} = Head, - Contents = lists:foldl(fun(X, Acc) -> Acc ++ [#r_content{metadata=X}] end, - [], - MD), - #r_object{contents=Contents, bucket=Bucket, key=Key, vclock=VC}. - -convert_indexspecs(IndexSpecs, SQN, PrimaryKey) -> - lists:map(fun({IndexOp, IndexField, IndexValue}) -> - Status = case IndexOp of - add -> - %% TODO: timestamp support - {active, infinity}; - remove -> - %% TODO: timestamps for delayed reaping - {tomb, infinity} - end, - {o, B, K, _SK} = PrimaryKey, - {{i, B, {IndexField, IndexValue}, K}, - {SQN, Status, null}} - end, - IndexSpecs). - -% Return a tuple of string to ease the printing of keys to logs -print_key(Key) -> - case Key of - {o, B, K, _SK} -> - {"Object", B, K}; - {i, B, {F, _V}, _K} -> - {"Index", B, F} - end. - -% Compare a key against a query key, only comparing elements that are non-null -% in the Query key -key_compare(QueryKey, CheckingKey, gt) -> - key_compare(QueryKey, CheckingKey, fun(X,Y) -> X > Y end); -key_compare(QueryKey, CheckingKey, lt) -> - key_compare(QueryKey, CheckingKey, fun(X,Y) -> X < Y end); -key_compare({QK1, null, null, null}, {CK1, _, _, _}, CompareFun) -> - CompareFun(QK1, CK1); -key_compare({QK1, QK2, null, null}, {CK1, CK2, _, _}, CompareFun) -> - CompareFun({QK1, QK2}, {CK1, CK2}); -key_compare({QK1, QK2, QK3, null}, {CK1, CK2, CK3, _}, CompareFun) -> - CompareFun({QK1, QK2, QK3}, {CK1, CK2, CK3}); -key_compare(QueryKey, CheckingKey, CompareFun) -> - CompareFun(QueryKey, CheckingKey). preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs) -> PrimaryChange = {PK, {SQN, {active, infinity}, - extract_metadata(Obj, Size)}}, - SecChanges = convert_indexspecs(IndexSpecs, SQN, PK), + leveled_codec:extract_metadata(Obj, Size)}}, + SecChanges = leveled_codec:convert_indexspecs(IndexSpecs, SQN, PK), [PrimaryChange] ++ SecChanges. addto_ledgercache(Changes, Cache) -> @@ -709,17 +604,5 @@ multi_key_test() -> ?assertMatch(F2D, Obj2), ok = book_close(Bookie2), reset_filestructure(). - -indexspecs_test() -> - IndexSpecs = [{add, "t1_int", 456}, - {add, "t1_bin", "adbc123"}, - {remove, "t1_bin", "abdc456"}], - Changes = convert_indexspecs(IndexSpecs, 1, {o, "Bucket", "Key2", null}), - ?assertMatch({{i, "Bucket", {"t1_int", 456}, "Key2"}, - {1, {active, infinity}, null}}, lists:nth(1, Changes)), - ?assertMatch({{i, "Bucket", {"t1_bin", "adbc123"}, "Key2"}, - {1, {active, infinity}, null}}, lists:nth(2, Changes)), - ?assertMatch({{i, "Bucket", {"t1_bin", "abdc456"}, "Key2"}, - {1, {tomb, infinity}, null}}, lists:nth(3, Changes)). -endif. \ No newline at end of file diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 45f2a6a..9b9af5a 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -1098,12 +1098,18 @@ write_key_value_pairs(Handle, [HeadPair|TailList], Acc) -> write_hash_tables(Handle, HashTree) -> Seq = lists:seq(0, 255), {ok, StartPos} = file:position(Handle, cur), + SWC = os:timestamp(), {IndexList, HashTreeBin} = write_hash_tables(Seq, HashTree, StartPos, [], <<>>), + io:format("HashTree computed in ~w microseconds~n", + [timer:now_diff(os:timestamp(), SWC)]), + SWW = os:timestamp(), ok = file:write(Handle, HashTreeBin), + io:format("HashTree written in ~w microseconds~n", + [timer:now_diff(os:timestamp(), SWW)]), {ok, EndPos} = file:position(Handle, cur), ok = file:advise(Handle, StartPos, EndPos - StartPos, will_need), IndexList. diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl new file mode 100644 index 0000000..6ae0f6a --- /dev/null +++ b/src/leveled_codec.erl @@ -0,0 +1,142 @@ +%% -------- Key Codec --------- +%% +%% Functions for manipulating keys and values within leveled. These are +%% currently static functions, they cannot be overridden in the store other +%% than by changing them here. The formats are focused on the problem of +%% supporting Riak KV + +-module(leveled_codec). + +-include("../include/leveled.hrl"). + + +-include_lib("eunit/include/eunit.hrl"). + +-export([strip_to_keyonly/1, + strip_to_keyseqonly/1, + strip_to_seqonly/1, + strip_to_statusonly/1, + strip_to_keyseqstatusonly/1, + striphead_to_details/1, + endkey_passed/2, + key_dominates/2, + print_key/1, + extract_metadata/2, + build_metadata_object/2, + convert_indexspecs/3]). + + +strip_to_keyonly({keyonly, K}) -> K; +strip_to_keyonly({K, _V}) -> K. + +strip_to_keyseqonly({K, {SeqN, _, _}}) -> {K, SeqN}. + +strip_to_keyseqstatusonly({K, {SeqN, St, _MD}}) -> {K, SeqN, St}. + +strip_to_statusonly({_, {_, St, _}}) -> St. + +strip_to_seqonly({_, {SeqN, _, _}}) -> SeqN. + +striphead_to_details({SeqN, St, MD}) -> {SeqN, St, MD}. + +key_dominates(LeftKey, RightKey) -> + case {LeftKey, RightKey} of + {{LK, _LVAL}, {RK, _RVAL}} when LK < RK -> + left_hand_first; + {{LK, _LVAL}, {RK, _RVAL}} when RK < LK -> + right_hand_first; + {{LK, {LSN, _LST, _LMD}}, {RK, {RSN, _RST, _RMD}}} + when LK == RK, LSN >= RSN -> + left_hand_dominant; + {{LK, {LSN, _LST, _LMD}}, {RK, {RSN, _RST, _RMD}}} + when LK == RK, LSN < RSN -> + right_hand_dominant + end. + + + +get_metadatas(#r_object{contents=Contents}) -> + [Content#r_content.metadata || Content <- Contents]. + +set_vclock(Object=#r_object{}, VClock) -> Object#r_object{vclock=VClock}. + +vclock(#r_object{vclock=VClock}) -> VClock. + +to_binary(v0, Obj) -> + term_to_binary(Obj). + +hash(Obj=#r_object{}) -> + Vclock = vclock(Obj), + UpdObj = set_vclock(Obj, lists:sort(Vclock)), + erlang:phash2(to_binary(v0, UpdObj)). + +extract_metadata(Obj, Size) -> + {get_metadatas(Obj), vclock(Obj), hash(Obj), Size}. + + +build_metadata_object(PrimaryKey, Head) -> + {o, Bucket, Key, null} = PrimaryKey, + {MD, VC, _, _} = Head, + Contents = lists:foldl(fun(X, Acc) -> Acc ++ [#r_content{metadata=X}] end, + [], + MD), + #r_object{contents=Contents, bucket=Bucket, key=Key, vclock=VC}. + +convert_indexspecs(IndexSpecs, SQN, PrimaryKey) -> + lists:map(fun({IndexOp, IndexField, IndexValue}) -> + Status = case IndexOp of + add -> + %% TODO: timestamp support + {active, infinity}; + remove -> + %% TODO: timestamps for delayed reaping + {tomb, infinity} + end, + {o, B, K, _SK} = PrimaryKey, + {{i, B, {IndexField, IndexValue}, K}, + {SQN, Status, null}} + end, + IndexSpecs). + +% Return a tuple of string to ease the printing of keys to logs +print_key(Key) -> + case Key of + {o, B, K, _SK} -> + {"Object", B, K}; + {i, B, {F, _V}, _K} -> + {"Index", B, F} + end. + +% Compare a key against a query key, only comparing elements that are non-null +% in the Query key. This is used for comparing against end keys in queries. +endkey_passed({EK1, null, null, null}, {CK1, _, _, _}) -> + EK1 < CK1; +endkey_passed({EK1, EK2, null, null}, {CK1, CK2, _, _}) -> + {EK1, EK2} < {CK1, CK2}; +endkey_passed({EK1, EK2, EK3, null}, {CK1, CK2, CK3, _}) -> + {EK1, EK2, EK3} < {CK1, CK2, CK3}; +endkey_passed(EndKey, CheckingKey) -> + EndKey < CheckingKey. + + + +%%%============================================================================ +%%% Test +%%%============================================================================ + +-ifdef(TEST). + + +indexspecs_test() -> + IndexSpecs = [{add, "t1_int", 456}, + {add, "t1_bin", "adbc123"}, + {remove, "t1_bin", "abdc456"}], + Changes = convert_indexspecs(IndexSpecs, 1, {o, "Bucket", "Key2", null}), + ?assertMatch({{i, "Bucket", {"t1_int", 456}, "Key2"}, + {1, {active, infinity}, null}}, lists:nth(1, Changes)), + ?assertMatch({{i, "Bucket", {"t1_bin", "adbc123"}, "Key2"}, + {1, {active, infinity}, null}}, lists:nth(2, Changes)), + ?assertMatch({{i, "Bucket", {"t1_bin", "abdc456"}, "Key2"}, + {1, {tomb, infinity}, null}}, lists:nth(3, Changes)). + +-endif. \ No newline at end of file diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index 413205b..32d97ee 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -370,7 +370,7 @@ find_randomkeys(_FList, 0, _Source) -> ok; find_randomkeys(FList, Count, Source) -> KV1 = lists:nth(random:uniform(length(Source)), Source), - K1 = leveled_bookie:strip_to_keyonly(KV1), + K1 = leveled_codec:strip_to_keyonly(KV1), P1 = choose_pid_toquery(FList, K1), FoundKV = leveled_sft:sft_get(P1, K1), Check = case FoundKV of @@ -378,7 +378,7 @@ find_randomkeys(FList, Count, Source) -> io:format("Failed to find ~w in ~w~n", [K1, P1]), error; _ -> - Found = leveled_bookie:strip_to_keyonly(FoundKV), + Found = leveled_codec:strip_to_keyonly(FoundKV), io:format("success finding ~w in ~w~n", [K1, P1]), ?assertMatch(K1, Found), ok diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index b785802..b1399ce 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -843,7 +843,7 @@ compare_to_sqn(Obj, SQN) -> not_present -> false; Obj -> - SQNToCompare = leveled_bookie:strip_to_seqonly(Obj), + SQNToCompare = leveled_codec:strip_to_seqonly(Obj), if SQNToCompare > SQN -> false; @@ -936,7 +936,7 @@ roll_new_tree(Tree, [], HighSQN) -> roll_new_tree(Tree, [{SQN, KVList}|TailIncs], HighSQN) when SQN >= HighSQN -> R = lists:foldl(fun({Kx, Vx}, {TreeAcc, MaxSQN}) -> UpdTree = gb_trees:enter(Kx, Vx, TreeAcc), - SQNx = leveled_bookie:strip_to_seqonly({Kx, Vx}), + SQNx = leveled_codec:strip_to_seqonly({Kx, Vx}), {UpdTree, max(SQNx, MaxSQN)} end, {Tree, HighSQN}, @@ -1025,8 +1025,8 @@ print_manifest(Manifest) -> lists:seq(0, ?MAX_LEVELS - 1)). print_manifest_entry(Entry) -> - {S1, S2, S3} = leveled_bookie:print_key(Entry#manifest_entry.start_key), - {E1, E2, E3} = leveled_bookie:print_key(Entry#manifest_entry.end_key), + {S1, S2, S3} = leveled_codec:print_key(Entry#manifest_entry.start_key), + {E1, E2, E3} = leveled_codec:print_key(Entry#manifest_entry.end_key), io:format("Manifest entry of " ++ "startkey ~s ~s ~s " ++ "endkey ~s ~s ~s " ++ @@ -1036,12 +1036,9 @@ print_manifest_entry(Entry) -> initiate_rangequery_frommanifest(StartKey, EndKey, Manifest) -> CompareFun = fun(M) -> - C1 = leveled_bookie:key_compare(StartKey, - M#manifest_entry.end_key, - gt), - C2 = leveled_bookie:key_compare(EndKey, - M#manifest_entry.start_key, - lt), + C1 = StartKey > M#manifest_entry.end_key, + C2 = leveled_codec:endkey_passed(EndKey, + M#manifest_entry.start_key), not (C1 or C2) end, lists:foldl(fun(L, AccL) -> Level = get_item(L, Manifest, []), @@ -1143,8 +1140,8 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, QueryFunT) -> {LCnt, {Key, Val}}, QueryFunT); {{Key, Val}, BKL, {BestKey, BestVal}} when Key == BestKey -> - SQN = leveled_bookie:strip_to_seqonly({Key, Val}), - BestSQN = leveled_bookie:strip_to_seqonly({BestKey, BestVal}), + SQN = leveled_codec:strip_to_seqonly({Key, Val}), + BestSQN = leveled_codec:strip_to_seqonly({BestKey, BestVal}), if SQN =< BestSQN -> % This is a dominated key, so we need to skip over it @@ -1193,7 +1190,7 @@ keyfolder(IMMiterator, SFTiterator, StartKey, EndKey, {AccFun, Acc}) -> % iterate only over the remaining keys in the SFT iterator keyfolder(null, SFTiterator, StartKey, EndKey, {AccFun, Acc}); {IMMKey, IMMVal, NxtIMMiterator} -> - case leveled_bookie:key_compare(EndKey, IMMKey, lt) of + case leveled_codec:endkey_passed(EndKey, IMMKey) of true -> % There are no more keys in-range in the in-memory % iterator, so take action as if this iterator is empty @@ -1212,7 +1209,7 @@ keyfolder(IMMiterator, SFTiterator, StartKey, EndKey, {AccFun, Acc}) -> % There is a next key, so need to know which is the % next key between the two (and handle two keys % with different sequence numbers). - case leveled_bookie:key_dominates({IMMKey, + case leveled_codec:key_dominates({IMMKey, IMMVal}, {SFTKey, SFTVal}) of @@ -1377,7 +1374,7 @@ assess_sqn(DumpList) -> assess_sqn([], MinSQN, MaxSQN) -> {MinSQN, MaxSQN}; assess_sqn([HeadKey|Tail], MinSQN, MaxSQN) -> - {_K, SQN} = leveled_bookie:strip_to_keyseqonly(HeadKey), + {_K, SQN} = leveled_codec:strip_to_keyseqonly(HeadKey), assess_sqn(Tail, min(MinSQN, SQN), max(MaxSQN, SQN)). @@ -1763,7 +1760,7 @@ foldwithimm_simple_test() -> {9, {active, infinity}, null}, IMM1), IMMiter = gb_trees:iterator_from({o, "Bucket1", "Key1"}, IMM2), - AccFun = fun(K, V, Acc) -> SQN= leveled_bookie:strip_to_seqonly({K, V}), + AccFun = fun(K, V, Acc) -> SQN = leveled_codec:strip_to_seqonly({K, V}), Acc ++ [{K, SQN}] end, Acc = keyfolder(IMMiter, QueryArray, diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index dda8607..6cead92 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -563,7 +563,7 @@ acc_list_keysonly(null, empty) -> acc_list_keysonly(null, RList) -> RList; acc_list_keysonly(R, RList) -> - lists:append(RList, [leveled_bookie:strip_to_keyseqstatusonly(R)]). + lists:append(RList, [leveled_codec:strip_to_keyseqstatusonly(R)]). acc_list_kv(null, empty) -> []; @@ -652,10 +652,8 @@ fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, FunList, scan_block([], StartKey, _EndKey, _FunList, _AccFun, Acc) -> {partial, Acc, StartKey}; scan_block([HeadKV|T], StartKey, EndKey, FunList, AccFun, Acc) -> - K = leveled_bookie:strip_to_keyonly(HeadKV), - Pre = leveled_bookie:key_compare(StartKey, K, gt), - Post = leveled_bookie:key_compare(EndKey, K, lt), - case {Pre, Post} of + K = leveled_codec:strip_to_keyonly(HeadKV), + case {StartKey > K, leveled_codec:endkey_passed(EndKey, K)} of {true, _} when StartKey /= all -> scan_block(T, StartKey, EndKey, FunList, AccFun, Acc); {_, true} when EndKey /= all -> @@ -988,15 +986,15 @@ create_slot(KL1, KL2, Level, BlockCount, SegLists, SerialisedSlot, LengthList, TrackingMetadata = case LowKey of null -> [NewLowKeyV|_] = BlockKeyList, - {leveled_bookie:strip_to_keyonly(NewLowKeyV), + {leveled_codec:strip_to_keyonly(NewLowKeyV), min(LSN, LSNb), max(HSN, HSNb), - leveled_bookie:strip_to_keyonly(last(BlockKeyList, + leveled_codec:strip_to_keyonly(last(BlockKeyList, {last, LastKey})), Status}; _ -> {LowKey, min(LSN, LSNb), max(HSN, HSNb), - leveled_bookie:strip_to_keyonly(last(BlockKeyList, + leveled_codec:strip_to_keyonly(last(BlockKeyList, {last, LastKey})), Status} end, @@ -1035,7 +1033,7 @@ key_dominates(KL1, KL2, Level) -> Level). key_dominates_expanded([H1|T1], [], Level) -> - St1 = leveled_bookie:strip_to_statusonly(H1), + St1 = leveled_codec:strip_to_statusonly(H1), case maybe_reap_expiredkey(St1, Level) of true -> {skipped_key, maybe_expand_pointer(T1), []}; @@ -1043,7 +1041,7 @@ key_dominates_expanded([H1|T1], [], Level) -> {{next_key, H1}, maybe_expand_pointer(T1), []} end; key_dominates_expanded([], [H2|T2], Level) -> - St2 = leveled_bookie:strip_to_statusonly(H2), + St2 = leveled_codec:strip_to_statusonly(H2), case maybe_reap_expiredkey(St2, Level) of true -> {skipped_key, [], maybe_expand_pointer(T2)}; @@ -1052,8 +1050,8 @@ key_dominates_expanded([], [H2|T2], Level) -> end; key_dominates_expanded([H1|T1], [H2|T2], Level) -> {{K1, V1}, {K2, V2}} = {H1, H2}, - {Sq1, St1, _MD1} = leveled_bookie:striphead_to_details(V1), - {Sq2, St2, _MD2} = leveled_bookie:striphead_to_details(V2), + {Sq1, St1, _MD1} = leveled_codec:striphead_to_details(V1), + {Sq2, St2, _MD2} = leveled_codec:striphead_to_details(V2), case K1 of K2 -> case Sq1 > Sq2 of @@ -1116,7 +1114,7 @@ pointer_append_queryresults(Results, QueryPid) -> %% Update the sequence numbers update_sequencenumbers(Item, LSN, HSN) when is_tuple(Item) -> - update_sequencenumbers(leveled_bookie:strip_to_seqonly(Item), LSN, HSN); + update_sequencenumbers(leveled_codec:strip_to_seqonly(Item), LSN, HSN); update_sequencenumbers(SN, 0, 0) -> {SN, SN}; update_sequencenumbers(SN, LSN, HSN) when SN < LSN -> @@ -1227,7 +1225,7 @@ merge_seglists({SegList1, SegList2, SegList3, SegList4}) -> lists:sort(Stage4). hash_for_segmentid(KV) -> - erlang:phash2(leveled_bookie:strip_to_keyonly(KV), ?MAX_SEG_HASH). + erlang:phash2(leveled_codec:strip_to_keyonly(KV), ?MAX_SEG_HASH). %% Check for a given list of segments in the filter, returning in normal