From 3b05874b8afab2d507da1d557cc6e4d47f00e80b Mon Sep 17 00:00:00 2001 From: martinsumner Date: Mon, 31 Oct 2016 12:12:06 +0000 Subject: [PATCH] Add initial timestamp support Covered only by basic unit test at present. --- include/leveled.hrl | 2 +- src/leveled_bookie.erl | 188 ++++++++++++++++++++++++-------- src/leveled_codec.erl | 25 +++-- src/leveled_pclerk.erl | 2 +- test/end_to_end/basic_SUITE.erl | 2 +- 5 files changed, 163 insertions(+), 56 deletions(-) diff --git a/include/leveled.hrl b/include/leveled.hrl index 9b32415..3a85aa7 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -34,7 +34,7 @@ -record(level, {level :: integer(), is_basement = false :: boolean(), - timestamp :: erlang:timestamp()}). + timestamp :: integer()}). -record(manifest_entry, {start_key :: tuple(), diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 45a1884..2ace726 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -139,6 +139,8 @@ book_riakget/3, book_riakhead/3, book_put/5, + book_put/6, + book_tempput/7, book_delete/4, book_get/3, book_head/3, @@ -183,9 +185,17 @@ book_riakput(Pid, RiakObject, IndexSpecs) -> {Bucket, Key} = leveled_codec:riakto_keydetails(RiakObject), book_put(Pid, Bucket, Key, RiakObject, IndexSpecs, ?RIAK_TAG). +book_tempput(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) when is_integer(TTL) -> + book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL). + book_put(Pid, Bucket, Key, Object, IndexSpecs) -> book_put(Pid, Bucket, Key, Object, IndexSpecs, ?STD_TAG). +book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag) -> + book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, infinity). + + + %% TODO: %% It is not enough simply to change the value to delete, as the journal %% needs to know the key is a tombstone at compaction time, and currently at @@ -213,8 +223,10 @@ book_riakhead(Pid, Bucket, Key) -> book_head(Pid, Bucket, Key) -> book_head(Pid, Bucket, Key, ?STD_TAG). -book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag) -> - gen_server:call(Pid, {put, Bucket, Key, Object, IndexSpecs, Tag}, infinity). +book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) -> + gen_server:call(Pid, + {put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, + infinity). book_get(Pid, Bucket, Key, Tag) -> gen_server:call(Pid, {get, Bucket, Key, Tag}, infinity). @@ -278,18 +290,18 @@ init([Opts]) -> end. -handle_call({put, Bucket, Key, Object, IndexSpecs, Tag}, From, State) -> +handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) -> LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), {ok, SQN, ObjSize} = leveled_inker:ink_put(State#state.inker, LedgerKey, Object, - IndexSpecs), + {IndexSpecs, TTL}), Changes = preparefor_ledgercache(no_type_assigned, LedgerKey, SQN, Object, ObjSize, - IndexSpecs), + {IndexSpecs, TTL}), Cache0 = addto_ledgercache(Changes, State#state.ledger_cache), gen_server:reply(From, ok), {ok, NewCache} = maybepush_ledgercache(State#state.cache_size, @@ -308,12 +320,14 @@ handle_call({get, Bucket, Key, Tag}, _From, State) -> case Status of tomb -> {reply, not_found, State}; - {active, _} -> - case fetch_value(LedgerKey, Seqn, State#state.inker) of - not_present -> - {reply, not_found, State}; - Object -> - {reply, {ok, Object}, State} + {active, TS} -> + Active = TS >= leveled_codec:integer_now(), + case {Active, + fetch_value(LedgerKey, Seqn, State#state.inker)} of + {true, Object} -> + {reply, {ok, Object}, State}; + _ -> + {reply, not_found, State} end end end; @@ -329,9 +343,14 @@ handle_call({head, Bucket, Key, Tag}, _From, State) -> case Status of tomb -> {reply, not_found, State}; - {active, _} -> - OMD = leveled_codec:build_metadata_object(LedgerKey, MD), - {reply, {ok, OMD}, State} + {active, TS} -> + case TS >= leveled_codec:integer_now() of + true -> + OMD = leveled_codec:build_metadata_object(LedgerKey, MD), + {reply, {ok, OMD}, State}; + false -> + {reply, not_found, State} + end end end; handle_call({snapshot, _Requestor, SnapType, _Timeout}, _From, State) -> @@ -359,6 +378,13 @@ handle_call({snapshot, _Requestor, SnapType, _Timeout}, _From, State) -> end; handle_call({return_folder, FolderType}, _From, State) -> case FolderType of + {bucket_stats, Bucket} -> + {reply, + bucket_stats(State#state.penciller, + State#state.ledger_cache, + Bucket, + ?STD_TAG), + State}; {riakbucket_stats, Bucket} -> {reply, bucket_stats(State#state.penciller, @@ -425,10 +451,11 @@ bucket_stats(Penciller, LedgerCache, Bucket, Tag) -> LedgerCache), StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), + AccFun = accumulate_size(), Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, StartKey, EndKey, - fun accumulate_size/3, + AccFun, {0, 0}), ok = leveled_penciller:pcl_close(LedgerSnapshot), Acc @@ -479,10 +506,11 @@ allkey_query(Penciller, LedgerCache, Tag) -> LedgerCache), SK = leveled_codec:to_ledgerkey(null, null, Tag), EK = leveled_codec:to_ledgerkey(null, null, Tag), + AccFun = accumulate_keys(), Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, SK, EK, - fun accumulate_keys/3, + AccFun, []), ok = leveled_penciller:pcl_close(LedgerSnapshot), lists:reverse(Acc) @@ -558,22 +586,31 @@ fetch_value(Key, SQN, Inker) -> not_present end. -accumulate_size(Key, Value, {Size, Count}) -> - case leveled_codec:is_active(Key, Value) of - true -> - {Size + leveled_codec:get_size(Key, Value), Count + 1}; - false -> - {Size, Count} - end. -accumulate_keys(Key, Value, KeyList) -> - case leveled_codec:is_active(Key, Value) of - true -> - [leveled_codec:from_ledgerkey(Key)|KeyList]; - false -> - KeyList - end. +accumulate_size() -> + Now = leveled_codec:integer_now(), + AccFun = fun(Key, Value, {Size, Count}) -> + case leveled_codec:is_active(Key, Value, Now) of + true -> + {Size + leveled_codec:get_size(Key, Value), + Count + 1}; + false -> + {Size, Count} + end + end, + AccFun. +accumulate_keys() -> + Now = leveled_codec:integer_now(), + AccFun = fun(Key, Value, KeyList) -> + case leveled_codec:is_active(Key, Value, Now) of + true -> + [leveled_codec:from_ledgerkey(Key)|KeyList]; + false -> + KeyList + end + end, + AccFun. add_keys(ObjKey, _IdxValue, Acc) -> Acc ++ [ObjKey]. @@ -582,10 +619,11 @@ add_terms(ObjKey, IdxValue, Acc) -> Acc ++ [{IdxValue, ObjKey}]. accumulate_index(TermRe, AddFun) -> + Now = leveled_codec:integer_now(), case TermRe of undefined -> fun(Key, Value, Acc) -> - case leveled_codec:is_active(Key, Value) of + case leveled_codec:is_active(Key, Value, Now) of true -> {_Bucket, ObjKey, @@ -596,7 +634,7 @@ accumulate_index(TermRe, AddFun) -> end end; TermRe -> fun(Key, Value, Acc) -> - case leveled_codec:is_active(Key, Value) of + case leveled_codec:is_active(Key, Value, Now) of true -> {_Bucket, ObjKey, @@ -613,16 +651,21 @@ accumulate_index(TermRe, AddFun) -> end. -preparefor_ledgercache(?INKT_KEYD, LedgerKey, SQN, _Obj, _Size, IndexSpecs) -> +preparefor_ledgercache(?INKT_KEYD, + LedgerKey, SQN, _Obj, _Size, {IndexSpecs, TTL}) -> {Bucket, Key} = leveled_codec:from_ledgerkey(LedgerKey), - leveled_codec:convert_indexspecs(IndexSpecs, Bucket, Key, SQN); -preparefor_ledgercache(_Type, LedgerKey, SQN, Obj, Size, IndexSpecs) -> + leveled_codec:convert_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL); +preparefor_ledgercache(_Type, LedgerKey, SQN, Obj, Size, {IndexSpecs, TTL}) -> {Bucket, Key, PrimaryChange} = leveled_codec:generate_ledgerkv(LedgerKey, SQN, Obj, - Size), - ConvSpecs = leveled_codec:convert_indexspecs(IndexSpecs, Bucket, Key, SQN), - [PrimaryChange] ++ ConvSpecs. + Size, + TTL), + [PrimaryChange] ++ leveled_codec:convert_indexspecs(IndexSpecs, + Bucket, + Key, + SQN, + TTL). addto_ledgercache(Changes, Cache) -> @@ -700,14 +743,26 @@ reset_filestructure() -> RootPath. - - generate_multiple_objects(Count, KeyNumber) -> generate_multiple_objects(Count, KeyNumber, []). - + generate_multiple_objects(0, _KeyNumber, ObjL) -> ObjL; generate_multiple_objects(Count, KeyNumber, ObjL) -> + Key = "Key" ++ integer_to_list(KeyNumber), + Value = crypto:rand_bytes(256), + IndexSpec = [{add, "idx1_bin", "f" ++ integer_to_list(KeyNumber rem 10)}], + generate_multiple_objects(Count - 1, + KeyNumber + 1, + ObjL ++ [{Key, Value, IndexSpec}]). + + +generate_multiple_robjects(Count, KeyNumber) -> + generate_multiple_robjects(Count, KeyNumber, []). + +generate_multiple_robjects(0, _KeyNumber, ObjL) -> + ObjL; +generate_multiple_robjects(Count, KeyNumber, ObjL) -> Obj = {"Bucket", "Key" ++ integer_to_list(KeyNumber), crypto:rand_bytes(1024), @@ -717,7 +772,7 @@ generate_multiple_objects(Count, KeyNumber, ObjL) -> {B1, K1, V1, Spec1, MD} = Obj, Content = #r_content{metadata=MD, value=V1}, Obj1 = #r_object{bucket=B1, key=K1, contents=[Content], vclock=[{'a',1}]}, - generate_multiple_objects(Count - 1, KeyNumber + 1, ObjL ++ [{Obj1, Spec1}]). + generate_multiple_robjects(Count - 1, KeyNumber + 1, ObjL ++ [{Obj1, Spec1}]). single_key_test() -> @@ -758,7 +813,7 @@ multi_key_test() -> C2 = #r_content{metadata=MD2, value=V2}, Obj2 = #r_object{bucket=B2, key=K2, contents=[C2], vclock=[{'a',1}]}, ok = book_riakput(Bookie1, Obj1, Spec1), - ObjL1 = generate_multiple_objects(100, 3), + ObjL1 = generate_multiple_robjects(100, 3), SW1 = os:timestamp(), lists:foreach(fun({O, S}) -> ok = book_riakput(Bookie1, O, S) end, ObjL1), io:format("PUT of 100 objects completed in ~w microseconds~n", @@ -768,7 +823,7 @@ multi_key_test() -> ?assertMatch(F1A, Obj1), {ok, F2A} = book_riakget(Bookie1, B2, K2), ?assertMatch(F2A, Obj2), - ObjL2 = generate_multiple_objects(100, 103), + ObjL2 = generate_multiple_robjects(100, 103), SW2 = os:timestamp(), lists:foreach(fun({O, S}) -> ok = book_riakput(Bookie1, O, S) end, ObjL2), io:format("PUT of 100 objects completed in ~w microseconds~n", @@ -784,7 +839,7 @@ multi_key_test() -> ?assertMatch(F1C, Obj1), {ok, F2C} = book_riakget(Bookie2, B2, K2), ?assertMatch(F2C, Obj2), - ObjL3 = generate_multiple_objects(100, 203), + ObjL3 = generate_multiple_robjects(100, 203), SW3 = os:timestamp(), lists:foreach(fun({O, S}) -> ok = book_riakput(Bookie2, O, S) end, ObjL3), io:format("PUT of 100 objects completed in ~w microseconds~n", @@ -796,4 +851,47 @@ multi_key_test() -> ok = book_close(Bookie2), reset_filestructure(). +ttl_test() -> + RootPath = reset_filestructure(), + {ok, Bookie1} = book_start(#bookie_options{root_path=RootPath}), + ObjL1 = generate_multiple_objects(100, 1), + % Put in all the objects with a TTL in the future + Future = leveled_codec:integer_now() + 300, + lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1, + "Bucket", K, V, S, + ?STD_TAG, + Future) end, + ObjL1), + lists:foreach(fun({K, V, _S}) -> + {ok, V} = book_get(Bookie1, "Bucket", K, ?STD_TAG) + end, + ObjL1), + lists:foreach(fun({K, _V, _S}) -> + {ok, _} = book_head(Bookie1, "Bucket", K, ?STD_TAG) + end, + ObjL1), + + ObjL2 = generate_multiple_objects(100, 101), + Past = leveled_codec:integer_now() - 300, + lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1, + "Bucket", K, V, S, + ?STD_TAG, + Past) end, + ObjL2), + lists:foreach(fun({K, _V, _S}) -> + not_found = book_get(Bookie1, "Bucket", K, ?STD_TAG) + end, + ObjL2), + lists:foreach(fun({K, _V, _S}) -> + not_found = book_head(Bookie1, "Bucket", K, ?STD_TAG) + end, + ObjL2), + + {async, BucketFolder} = book_returnfolder(Bookie1, + {bucket_stats, "Bucket"}), + {_Size, Count} = BucketFolder(), + ?assertMatch(100, Count), + ok = book_close(Bookie1), + reset_filestructure(). + -endif. \ No newline at end of file diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index afb6792..d212dcc 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -39,7 +39,7 @@ strip_to_statusonly/1, strip_to_keyseqstatusonly/1, striphead_to_details/1, - is_active/2, + is_active/3, endkey_passed/2, key_dominates/2, maybe_reap_expiredkey/2, @@ -58,9 +58,10 @@ generate_ledgerkv/4, generate_ledgerkv/5, get_size/2, - convert_indexspecs/4, + convert_indexspecs/5, riakto_keydetails/1, - generate_uuid/0]). + generate_uuid/0, + integer_now/0]). %% Credit to @@ -117,11 +118,15 @@ maybe_reap(tomb, {true, _CurrTS}) -> maybe_reap(_, _) -> false. -is_active(Key, Value) -> +is_active(Key, Value, Now) -> case strip_to_statusonly({Key, Value}) of {active, infinity} -> true; tomb -> + false; + {active, TS} when Now >= TS -> + true; + {active, _TS} -> false end. @@ -247,12 +252,11 @@ endkey_passed({EK1, EK2, EK3, null}, {CK1, CK2, CK3, _}) -> endkey_passed(EndKey, CheckingKey) -> EndKey < CheckingKey. -convert_indexspecs(IndexSpecs, Bucket, Key, SQN) -> +convert_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) -> lists:map(fun({IndexOp, IdxField, IdxValue}) -> Status = case IndexOp of add -> - %% TODO: timestamp support - {active, infinity}; + {active, TTL}; remove -> %% TODO: timestamps for delayed reaping tomb @@ -279,7 +283,12 @@ generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) -> {PrimaryKey, {SQN, Status, extract_metadata(Obj, Size, Tag)}}}. +integer_now() -> + integer_time(os:timestamp()). +integer_time(TS) -> + DT = calendar:now_to_universal_time(TS), + calendar:datetime_to_gregorian_seconds(DT). extract_metadata(Obj, Size, ?RIAK_TAG) -> riak_extract_metadata(Obj, Size); @@ -353,7 +362,7 @@ indexspecs_test() -> IndexSpecs = [{add, "t1_int", 456}, {add, "t1_bin", "adbc123"}, {remove, "t1_bin", "abdc456"}], - Changes = convert_indexspecs(IndexSpecs, "Bucket", "Key2", 1), + Changes = convert_indexspecs(IndexSpecs, "Bucket", "Key2", 1, infinity), ?assertMatch({{i, "Bucket", {"t1_int", 456}, "Key2"}, {1, {active, infinity}, null}}, lists:nth(1, Changes)), ?assertMatch({{i, "Bucket", {"t1_bin", "adbc123"}, "Key2"}, diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index 635d661..34edf96 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -319,7 +319,7 @@ do_merge(KL1, KL2, {SrcLevel, IsB}, {Filepath, MSN}, FileCounter, OutList) -> true -> #level{level = SrcLevel + 1, is_basement = true, - timestamp = os:timestamp()}; + timestamp = leveled_codec:integer_now()}; false -> SrcLevel + 1 end, diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 442390c..612a448 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -46,7 +46,7 @@ simple_put_fetch_head_delete(_Config) -> ok = leveled_bookie:book_put(Bookie2, "Bucket1", "Key2", "Value2", [{add, "Index1", "Term1"}]), {ok, "Value2"} = leveled_bookie:book_get(Bookie2, "Bucket1", "Key2"), - {ok, {62888926, 43}} = leveled_bookie:book_head(Bookie2, + {ok, {62888926, 56}} = leveled_bookie:book_head(Bookie2, "Bucket1", "Key2"), testutil:check_formissingobject(Bookie2, "Bucket1", "Key2"),