Add access to SQN

Use book_sqn/3 or book_sqn/4 to get the SQN of an object in the store.
This commit is contained in:
Martin Sumner 2019-03-13 16:21:03 +00:00
parent 055854c049
commit 01f0dadbb3
3 changed files with 49 additions and 16 deletions

View file

@ -59,6 +59,8 @@
book_get/4,
book_head/3,
book_head/4,
book_sqn/3,
book_sqn/4,
book_headonly/4,
book_snapshot/4,
book_compactjournal/2,
@ -535,6 +537,11 @@ book_delete(Pid, Bucket, Key, IndexSpecs) ->
-spec book_head(pid(),
leveled_codec:key(), leveled_codec:key(), leveled_codec:tag())
-> {ok, any()}|not_found.
-spec book_sqn(pid(),
leveled_codec:key(), leveled_codec:key(), leveled_codec:tag())
-> {ok, non_neg_integer()}|not_found.
-spec book_headonly(pid(),
leveled_codec:key(), leveled_codec:key(), leveled_codec:key())
-> {ok, any()}|not_found.
@ -557,7 +564,7 @@ book_get(Pid, Bucket, Key, Tag) ->
gen_server:call(Pid, {get, Bucket, Key, Tag}, infinity).
book_head(Pid, Bucket, Key, Tag) ->
gen_server:call(Pid, {head, Bucket, Key, Tag}, infinity).
gen_server:call(Pid, {head, Bucket, Key, Tag, false}, infinity).
book_get(Pid, Bucket, Key) ->
book_get(Pid, Bucket, Key, ?STD_TAG).
@ -566,9 +573,17 @@ book_head(Pid, Bucket, Key) ->
book_head(Pid, Bucket, Key, ?STD_TAG).
book_headonly(Pid, Bucket, Key, SubKey) ->
gen_server:call(Pid, {head, Bucket, {Key, SubKey}, ?HEAD_TAG}, infinity).
gen_server:call(Pid,
{head, Bucket, {Key, SubKey}, ?HEAD_TAG, false},
infinity).
book_sqn(Pid, Bucket, Key) ->
book_sqn(Pid, Bucket, Key, ?STD_TAG).
book_sqn(Pid, Bucket, Key, Tag) ->
gen_server:call(Pid, {head, Bucket, Key, Tag, true}, infinity).
-spec book_returnfolder(pid(), tuple()) -> {async, fun()}.
%% @doc Folds over store - deprecated
@ -1298,7 +1313,7 @@ handle_call({get, Bucket, Key, Tag}, _From, State)
update_statetimings(get, Timings2, State#state.get_countdown),
{reply, Reply, State#state{get_timings = Timings,
get_countdown = CountDown}};
handle_call({head, Bucket, Key, Tag}, _From, State)
handle_call({head, Bucket, Key, Tag, SQNOnly}, _From, State)
when State#state.head_lookup == true ->
SWp = os:timestamp(),
LK = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
@ -1308,14 +1323,14 @@ handle_call({head, Bucket, Key, Tag}, _From, State)
State#state.head_only),
{SWr, UpdTimingsP} =
update_timings(SWp, {head, pcl}, State#state.head_timings),
{LedgerMD, JournalCheckFrequency} =
{LedgerMD, SQN, JournalCheckFrequency} =
case Head of
not_present ->
{not_found, State#state.ink_checking};
{not_found, null, State#state.ink_checking};
Head ->
case leveled_codec:striphead_to_v1details(Head) of
{_SeqN, tomb, _MH, _MD} ->
{not_found, State#state.ink_checking};
{not_found, null, State#state.ink_checking};
{SeqN, {active, TS}, _MH, MD} ->
case TS >= leveled_util:integer_now() of
true ->
@ -1331,21 +1346,23 @@ handle_call({head, Bucket, Key, Tag}, _From, State)
LK,
SeqN) of
{true, UppedFrequency} ->
{not_found, UppedFrequency};
{not_found, null, UppedFrequency};
{false, ReducedFrequency} ->
{MD, ReducedFrequency}
{MD, SeqN, ReducedFrequency}
end;
false ->
{not_found, State#state.ink_checking}
{not_found, null, State#state.ink_checking}
end
end
end,
Reply =
case LedgerMD of
not_found ->
case {LedgerMD, SQNOnly} of
{not_found, _} ->
not_found;
_ ->
{ok, leveled_head:build_head(Tag, LedgerMD)}
{_, false} ->
{ok, leveled_head:build_head(Tag, LedgerMD)};
{_, true} ->
{ok, SQN}
end,
{_SW, UpdTimingsR} =
update_timings(SWr, {head, rsp}, UpdTimingsP),

View file

@ -1060,7 +1060,7 @@ close_allmanifest([H|ManifestT]) ->
open_all_manifest([], RootPath, CDBOpts) ->
leveled_log:log("I0011", []),
leveled_imanifest:add_entry([],
start_new_activejournal(1, RootPath, CDBOpts),
start_new_activejournal(0, RootPath, CDBOpts),
true);
open_all_manifest(Man0, RootPath, CDBOpts) ->
Man1 = leveled_imanifest:to_list(Man0),
@ -1512,9 +1512,9 @@ empty_manifest_test() ->
?assertMatch(not_present, ink_fetch(Ink2, key_converter("Key1"), 1)),
{ok, SQN, Size} =
ink_put(Ink2, key_converter("Key1"), "Value1", {[], infinity}),
?assertMatch(2, SQN),
?assertMatch(1, SQN), % This is the first key - so should have SQN of 1
?assertMatch(true, Size > 0),
{ok, V} = ink_fetch(Ink2, key_converter("Key1"), 2),
{ok, V} = ink_fetch(Ink2, key_converter("Key1"), 1),
?assertMatch("Value1", V),
ink_close(Ink2),
clean_testdir(RootPath).

View file

@ -103,6 +103,10 @@ many_put_fetch_head(_Config) ->
{TestObject, TestSpec} = testutil:generate_testobject(),
ok = testutil:book_riakput(Bookie1, TestObject, TestSpec),
testutil:check_forobject(Bookie1, TestObject),
{ok, 1} = leveled_bookie:book_sqn(Bookie1,
testutil:get_bucket(TestObject),
testutil:get_key(TestObject),
?RIAK_TAG),
ok = leveled_bookie:book_close(Bookie1),
StartOpts2 = [{root_path, RootPath},
{max_journalsize, 50000000},
@ -113,6 +117,10 @@ many_put_fetch_head(_Config) ->
ok = leveled_bookie:book_loglevel(Bookie2, error),
ok = leveled_bookie:book_addlogs(Bookie2, ["B0015"]),
testutil:check_forobject(Bookie2, TestObject),
{ok, 1} = leveled_bookie:book_sqn(Bookie2,
testutil:get_bucket(TestObject),
testutil:get_key(TestObject),
?RIAK_TAG),
GenList = [2, 20002, 40002, 60002, 80002,
100002, 120002, 140002, 160002, 180002],
CLs = testutil:load_objects(20000, GenList, Bookie2, TestObject,
@ -137,6 +145,14 @@ many_put_fetch_head(_Config) ->
{ok, Bookie3} = leveled_bookie:book_start(StartOpts2),
testutil:check_forlist(Bookie3, ChkList2A),
testutil:check_forobject(Bookie3, TestObject),
{ok, 1} = leveled_bookie:book_sqn(Bookie3,
testutil:get_bucket(TestObject),
testutil:get_key(TestObject),
?RIAK_TAG),
not_found = leveled_bookie:book_sqn(Bookie3,
testutil:get_bucket(TestObject),
testutil:get_key(TestObject),
?STD_TAG),
testutil:check_formissingobject(Bookie3, "Bookie1", "MissingKey0123"),
ok = leveled_bookie:book_destroy(Bookie3).