Add initial timestamp support

Covered only by basic unit test at present.
This commit is contained in:
martinsumner 2016-10-31 12:12:06 +00:00
parent 4cffecf2ca
commit 3b05874b8a
5 changed files with 163 additions and 56 deletions

View file

@ -34,7 +34,7 @@
-record(level,
{level :: integer(),
is_basement = false :: boolean(),
timestamp :: erlang:timestamp()}).
timestamp :: integer()}).
-record(manifest_entry,
{start_key :: tuple(),

View file

@ -139,6 +139,8 @@
book_riakget/3,
book_riakhead/3,
book_put/5,
book_put/6,
book_tempput/7,
book_delete/4,
book_get/3,
book_head/3,
@ -183,9 +185,17 @@ book_riakput(Pid, RiakObject, IndexSpecs) ->
{Bucket, Key} = leveled_codec:riakto_keydetails(RiakObject),
book_put(Pid, Bucket, Key, RiakObject, IndexSpecs, ?RIAK_TAG).
book_tempput(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) when is_integer(TTL) ->
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL).
book_put(Pid, Bucket, Key, Object, IndexSpecs) ->
book_put(Pid, Bucket, Key, Object, IndexSpecs, ?STD_TAG).
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag) ->
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, infinity).
%% TODO:
%% It is not enough simply to change the value to delete, as the journal
%% needs to know the key is a tombstone at compaction time, and currently at
@ -213,8 +223,10 @@ book_riakhead(Pid, Bucket, Key) ->
book_head(Pid, Bucket, Key) ->
book_head(Pid, Bucket, Key, ?STD_TAG).
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag) ->
gen_server:call(Pid, {put, Bucket, Key, Object, IndexSpecs, Tag}, infinity).
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) ->
gen_server:call(Pid,
{put, Bucket, Key, Object, IndexSpecs, Tag, TTL},
infinity).
book_get(Pid, Bucket, Key, Tag) ->
gen_server:call(Pid, {get, Bucket, Key, Tag}, infinity).
@ -278,18 +290,18 @@ init([Opts]) ->
end.
handle_call({put, Bucket, Key, Object, IndexSpecs, Tag}, From, State) ->
handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) ->
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
{ok, SQN, ObjSize} = leveled_inker:ink_put(State#state.inker,
LedgerKey,
Object,
IndexSpecs),
{IndexSpecs, TTL}),
Changes = preparefor_ledgercache(no_type_assigned,
LedgerKey,
SQN,
Object,
ObjSize,
IndexSpecs),
{IndexSpecs, TTL}),
Cache0 = addto_ledgercache(Changes, State#state.ledger_cache),
gen_server:reply(From, ok),
{ok, NewCache} = maybepush_ledgercache(State#state.cache_size,
@ -308,12 +320,14 @@ handle_call({get, Bucket, Key, Tag}, _From, State) ->
case Status of
tomb ->
{reply, not_found, State};
{active, _} ->
case fetch_value(LedgerKey, Seqn, State#state.inker) of
not_present ->
{reply, not_found, State};
Object ->
{reply, {ok, Object}, State}
{active, TS} ->
Active = TS >= leveled_codec:integer_now(),
case {Active,
fetch_value(LedgerKey, Seqn, State#state.inker)} of
{true, Object} ->
{reply, {ok, Object}, State};
_ ->
{reply, not_found, State}
end
end
end;
@ -329,9 +343,14 @@ handle_call({head, Bucket, Key, Tag}, _From, State) ->
case Status of
tomb ->
{reply, not_found, State};
{active, _} ->
{active, TS} ->
case TS >= leveled_codec:integer_now() of
true ->
OMD = leveled_codec:build_metadata_object(LedgerKey, MD),
{reply, {ok, OMD}, State}
{reply, {ok, OMD}, State};
false ->
{reply, not_found, State}
end
end
end;
handle_call({snapshot, _Requestor, SnapType, _Timeout}, _From, State) ->
@ -359,6 +378,13 @@ handle_call({snapshot, _Requestor, SnapType, _Timeout}, _From, State) ->
end;
handle_call({return_folder, FolderType}, _From, State) ->
case FolderType of
{bucket_stats, Bucket} ->
{reply,
bucket_stats(State#state.penciller,
State#state.ledger_cache,
Bucket,
?STD_TAG),
State};
{riakbucket_stats, Bucket} ->
{reply,
bucket_stats(State#state.penciller,
@ -425,10 +451,11 @@ bucket_stats(Penciller, LedgerCache, Bucket, Tag) ->
LedgerCache),
StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
AccFun = accumulate_size(),
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
StartKey,
EndKey,
fun accumulate_size/3,
AccFun,
{0, 0}),
ok = leveled_penciller:pcl_close(LedgerSnapshot),
Acc
@ -479,10 +506,11 @@ allkey_query(Penciller, LedgerCache, Tag) ->
LedgerCache),
SK = leveled_codec:to_ledgerkey(null, null, Tag),
EK = leveled_codec:to_ledgerkey(null, null, Tag),
AccFun = accumulate_keys(),
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
SK,
EK,
fun accumulate_keys/3,
AccFun,
[]),
ok = leveled_penciller:pcl_close(LedgerSnapshot),
lists:reverse(Acc)
@ -558,22 +586,31 @@ fetch_value(Key, SQN, Inker) ->
not_present
end.
accumulate_size(Key, Value, {Size, Count}) ->
case leveled_codec:is_active(Key, Value) of
accumulate_size() ->
Now = leveled_codec:integer_now(),
AccFun = fun(Key, Value, {Size, Count}) ->
case leveled_codec:is_active(Key, Value, Now) of
true ->
{Size + leveled_codec:get_size(Key, Value), Count + 1};
{Size + leveled_codec:get_size(Key, Value),
Count + 1};
false ->
{Size, Count}
end.
end
end,
AccFun.
accumulate_keys(Key, Value, KeyList) ->
case leveled_codec:is_active(Key, Value) of
accumulate_keys() ->
Now = leveled_codec:integer_now(),
AccFun = fun(Key, Value, KeyList) ->
case leveled_codec:is_active(Key, Value, Now) of
true ->
[leveled_codec:from_ledgerkey(Key)|KeyList];
false ->
KeyList
end.
end
end,
AccFun.
add_keys(ObjKey, _IdxValue, Acc) ->
Acc ++ [ObjKey].
@ -582,10 +619,11 @@ add_terms(ObjKey, IdxValue, Acc) ->
Acc ++ [{IdxValue, ObjKey}].
accumulate_index(TermRe, AddFun) ->
Now = leveled_codec:integer_now(),
case TermRe of
undefined ->
fun(Key, Value, Acc) ->
case leveled_codec:is_active(Key, Value) of
case leveled_codec:is_active(Key, Value, Now) of
true ->
{_Bucket,
ObjKey,
@ -596,7 +634,7 @@ accumulate_index(TermRe, AddFun) ->
end end;
TermRe ->
fun(Key, Value, Acc) ->
case leveled_codec:is_active(Key, Value) of
case leveled_codec:is_active(Key, Value, Now) of
true ->
{_Bucket,
ObjKey,
@ -613,16 +651,21 @@ accumulate_index(TermRe, AddFun) ->
end.
preparefor_ledgercache(?INKT_KEYD, LedgerKey, SQN, _Obj, _Size, IndexSpecs) ->
preparefor_ledgercache(?INKT_KEYD,
LedgerKey, SQN, _Obj, _Size, {IndexSpecs, TTL}) ->
{Bucket, Key} = leveled_codec:from_ledgerkey(LedgerKey),
leveled_codec:convert_indexspecs(IndexSpecs, Bucket, Key, SQN);
preparefor_ledgercache(_Type, LedgerKey, SQN, Obj, Size, IndexSpecs) ->
leveled_codec:convert_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL);
preparefor_ledgercache(_Type, LedgerKey, SQN, Obj, Size, {IndexSpecs, TTL}) ->
{Bucket, Key, PrimaryChange} = leveled_codec:generate_ledgerkv(LedgerKey,
SQN,
Obj,
Size),
ConvSpecs = leveled_codec:convert_indexspecs(IndexSpecs, Bucket, Key, SQN),
[PrimaryChange] ++ ConvSpecs.
Size,
TTL),
[PrimaryChange] ++ leveled_codec:convert_indexspecs(IndexSpecs,
Bucket,
Key,
SQN,
TTL).
addto_ledgercache(Changes, Cache) ->
@ -700,14 +743,26 @@ reset_filestructure() ->
RootPath.
generate_multiple_objects(Count, KeyNumber) ->
generate_multiple_objects(Count, KeyNumber, []).
generate_multiple_objects(0, _KeyNumber, ObjL) ->
ObjL;
generate_multiple_objects(Count, KeyNumber, ObjL) ->
Key = "Key" ++ integer_to_list(KeyNumber),
Value = crypto:rand_bytes(256),
IndexSpec = [{add, "idx1_bin", "f" ++ integer_to_list(KeyNumber rem 10)}],
generate_multiple_objects(Count - 1,
KeyNumber + 1,
ObjL ++ [{Key, Value, IndexSpec}]).
generate_multiple_robjects(Count, KeyNumber) ->
generate_multiple_robjects(Count, KeyNumber, []).
generate_multiple_robjects(0, _KeyNumber, ObjL) ->
ObjL;
generate_multiple_robjects(Count, KeyNumber, ObjL) ->
Obj = {"Bucket",
"Key" ++ integer_to_list(KeyNumber),
crypto:rand_bytes(1024),
@ -717,7 +772,7 @@ generate_multiple_objects(Count, KeyNumber, ObjL) ->
{B1, K1, V1, Spec1, MD} = Obj,
Content = #r_content{metadata=MD, value=V1},
Obj1 = #r_object{bucket=B1, key=K1, contents=[Content], vclock=[{'a',1}]},
generate_multiple_objects(Count - 1, KeyNumber + 1, ObjL ++ [{Obj1, Spec1}]).
generate_multiple_robjects(Count - 1, KeyNumber + 1, ObjL ++ [{Obj1, Spec1}]).
single_key_test() ->
@ -758,7 +813,7 @@ multi_key_test() ->
C2 = #r_content{metadata=MD2, value=V2},
Obj2 = #r_object{bucket=B2, key=K2, contents=[C2], vclock=[{'a',1}]},
ok = book_riakput(Bookie1, Obj1, Spec1),
ObjL1 = generate_multiple_objects(100, 3),
ObjL1 = generate_multiple_robjects(100, 3),
SW1 = os:timestamp(),
lists:foreach(fun({O, S}) -> ok = book_riakput(Bookie1, O, S) end, ObjL1),
io:format("PUT of 100 objects completed in ~w microseconds~n",
@ -768,7 +823,7 @@ multi_key_test() ->
?assertMatch(F1A, Obj1),
{ok, F2A} = book_riakget(Bookie1, B2, K2),
?assertMatch(F2A, Obj2),
ObjL2 = generate_multiple_objects(100, 103),
ObjL2 = generate_multiple_robjects(100, 103),
SW2 = os:timestamp(),
lists:foreach(fun({O, S}) -> ok = book_riakput(Bookie1, O, S) end, ObjL2),
io:format("PUT of 100 objects completed in ~w microseconds~n",
@ -784,7 +839,7 @@ multi_key_test() ->
?assertMatch(F1C, Obj1),
{ok, F2C} = book_riakget(Bookie2, B2, K2),
?assertMatch(F2C, Obj2),
ObjL3 = generate_multiple_objects(100, 203),
ObjL3 = generate_multiple_robjects(100, 203),
SW3 = os:timestamp(),
lists:foreach(fun({O, S}) -> ok = book_riakput(Bookie2, O, S) end, ObjL3),
io:format("PUT of 100 objects completed in ~w microseconds~n",
@ -796,4 +851,47 @@ multi_key_test() ->
ok = book_close(Bookie2),
reset_filestructure().
ttl_test() ->
RootPath = reset_filestructure(),
{ok, Bookie1} = book_start(#bookie_options{root_path=RootPath}),
ObjL1 = generate_multiple_objects(100, 1),
% Put in all the objects with a TTL in the future
Future = leveled_codec:integer_now() + 300,
lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1,
"Bucket", K, V, S,
?STD_TAG,
Future) end,
ObjL1),
lists:foreach(fun({K, V, _S}) ->
{ok, V} = book_get(Bookie1, "Bucket", K, ?STD_TAG)
end,
ObjL1),
lists:foreach(fun({K, _V, _S}) ->
{ok, _} = book_head(Bookie1, "Bucket", K, ?STD_TAG)
end,
ObjL1),
ObjL2 = generate_multiple_objects(100, 101),
Past = leveled_codec:integer_now() - 300,
lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1,
"Bucket", K, V, S,
?STD_TAG,
Past) end,
ObjL2),
lists:foreach(fun({K, _V, _S}) ->
not_found = book_get(Bookie1, "Bucket", K, ?STD_TAG)
end,
ObjL2),
lists:foreach(fun({K, _V, _S}) ->
not_found = book_head(Bookie1, "Bucket", K, ?STD_TAG)
end,
ObjL2),
{async, BucketFolder} = book_returnfolder(Bookie1,
{bucket_stats, "Bucket"}),
{_Size, Count} = BucketFolder(),
?assertMatch(100, Count),
ok = book_close(Bookie1),
reset_filestructure().
-endif.

View file

@ -39,7 +39,7 @@
strip_to_statusonly/1,
strip_to_keyseqstatusonly/1,
striphead_to_details/1,
is_active/2,
is_active/3,
endkey_passed/2,
key_dominates/2,
maybe_reap_expiredkey/2,
@ -58,9 +58,10 @@
generate_ledgerkv/4,
generate_ledgerkv/5,
get_size/2,
convert_indexspecs/4,
convert_indexspecs/5,
riakto_keydetails/1,
generate_uuid/0]).
generate_uuid/0,
integer_now/0]).
%% Credit to
@ -117,11 +118,15 @@ maybe_reap(tomb, {true, _CurrTS}) ->
maybe_reap(_, _) ->
false.
is_active(Key, Value) ->
is_active(Key, Value, Now) ->
case strip_to_statusonly({Key, Value}) of
{active, infinity} ->
true;
tomb ->
false;
{active, TS} when Now >= TS ->
true;
{active, _TS} ->
false
end.
@ -247,12 +252,11 @@ endkey_passed({EK1, EK2, EK3, null}, {CK1, CK2, CK3, _}) ->
endkey_passed(EndKey, CheckingKey) ->
EndKey < CheckingKey.
convert_indexspecs(IndexSpecs, Bucket, Key, SQN) ->
convert_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) ->
lists:map(fun({IndexOp, IdxField, IdxValue}) ->
Status = case IndexOp of
add ->
%% TODO: timestamp support
{active, infinity};
{active, TTL};
remove ->
%% TODO: timestamps for delayed reaping
tomb
@ -279,7 +283,12 @@ generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) ->
{PrimaryKey, {SQN, Status, extract_metadata(Obj, Size, Tag)}}}.
integer_now() ->
integer_time(os:timestamp()).
integer_time(TS) ->
DT = calendar:now_to_universal_time(TS),
calendar:datetime_to_gregorian_seconds(DT).
extract_metadata(Obj, Size, ?RIAK_TAG) ->
riak_extract_metadata(Obj, Size);
@ -353,7 +362,7 @@ indexspecs_test() ->
IndexSpecs = [{add, "t1_int", 456},
{add, "t1_bin", "adbc123"},
{remove, "t1_bin", "abdc456"}],
Changes = convert_indexspecs(IndexSpecs, "Bucket", "Key2", 1),
Changes = convert_indexspecs(IndexSpecs, "Bucket", "Key2", 1, infinity),
?assertMatch({{i, "Bucket", {"t1_int", 456}, "Key2"},
{1, {active, infinity}, null}}, lists:nth(1, Changes)),
?assertMatch({{i, "Bucket", {"t1_bin", "adbc123"}, "Key2"},

View file

@ -319,7 +319,7 @@ do_merge(KL1, KL2, {SrcLevel, IsB}, {Filepath, MSN}, FileCounter, OutList) ->
true ->
#level{level = SrcLevel + 1,
is_basement = true,
timestamp = os:timestamp()};
timestamp = leveled_codec:integer_now()};
false ->
SrcLevel + 1
end,

View file

@ -46,7 +46,7 @@ simple_put_fetch_head_delete(_Config) ->
ok = leveled_bookie:book_put(Bookie2, "Bucket1", "Key2", "Value2",
[{add, "Index1", "Term1"}]),
{ok, "Value2"} = leveled_bookie:book_get(Bookie2, "Bucket1", "Key2"),
{ok, {62888926, 43}} = leveled_bookie:book_head(Bookie2,
{ok, {62888926, 56}} = leveled_bookie:book_head(Bookie2,
"Bucket1",
"Key2"),
testutil:check_formissingobject(Bookie2, "Bucket1", "Key2"),