Merge pull request #271 from martinsumner/mas-i255-getsqn

Mas i255 getsqn
This commit is contained in:
Martin Sumner 2019-03-14 08:20:02 +00:00 committed by GitHub
commit 1c5bc3f6d3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 62 additions and 17 deletions

View file

@ -59,6 +59,8 @@
book_get/4, book_get/4,
book_head/3, book_head/3,
book_head/4, book_head/4,
book_sqn/3,
book_sqn/4,
book_headonly/4, book_headonly/4,
book_snapshot/4, book_snapshot/4,
book_compactjournal/2, book_compactjournal/2,
@ -535,6 +537,11 @@ book_delete(Pid, Bucket, Key, IndexSpecs) ->
-spec book_head(pid(), -spec book_head(pid(),
leveled_codec:key(), leveled_codec:key(), leveled_codec:tag()) leveled_codec:key(), leveled_codec:key(), leveled_codec:tag())
-> {ok, any()}|not_found. -> {ok, any()}|not_found.
-spec book_sqn(pid(),
leveled_codec:key(), leveled_codec:key(), leveled_codec:tag())
-> {ok, non_neg_integer()}|not_found.
-spec book_headonly(pid(), -spec book_headonly(pid(),
leveled_codec:key(), leveled_codec:key(), leveled_codec:key()) leveled_codec:key(), leveled_codec:key(), leveled_codec:key())
-> {ok, any()}|not_found. -> {ok, any()}|not_found.
@ -557,7 +564,7 @@ book_get(Pid, Bucket, Key, Tag) ->
gen_server:call(Pid, {get, Bucket, Key, Tag}, infinity). gen_server:call(Pid, {get, Bucket, Key, Tag}, infinity).
book_head(Pid, Bucket, Key, Tag) -> book_head(Pid, Bucket, Key, Tag) ->
gen_server:call(Pid, {head, Bucket, Key, Tag}, infinity). gen_server:call(Pid, {head, Bucket, Key, Tag, false}, infinity).
book_get(Pid, Bucket, Key) -> book_get(Pid, Bucket, Key) ->
book_get(Pid, Bucket, Key, ?STD_TAG). book_get(Pid, Bucket, Key, ?STD_TAG).
@ -566,9 +573,17 @@ book_head(Pid, Bucket, Key) ->
book_head(Pid, Bucket, Key, ?STD_TAG). book_head(Pid, Bucket, Key, ?STD_TAG).
book_headonly(Pid, Bucket, Key, SubKey) -> book_headonly(Pid, Bucket, Key, SubKey) ->
gen_server:call(Pid, {head, Bucket, {Key, SubKey}, ?HEAD_TAG}, infinity). gen_server:call(Pid,
{head, Bucket, {Key, SubKey}, ?HEAD_TAG, false},
infinity).
book_sqn(Pid, Bucket, Key) ->
book_sqn(Pid, Bucket, Key, ?STD_TAG).
book_sqn(Pid, Bucket, Key, Tag) ->
gen_server:call(Pid, {head, Bucket, Key, Tag, true}, infinity).
-spec book_returnfolder(pid(), tuple()) -> {async, fun()}. -spec book_returnfolder(pid(), tuple()) -> {async, fun()}.
%% @doc Folds over store - deprecated %% @doc Folds over store - deprecated
@ -1298,7 +1313,7 @@ handle_call({get, Bucket, Key, Tag}, _From, State)
update_statetimings(get, Timings2, State#state.get_countdown), update_statetimings(get, Timings2, State#state.get_countdown),
{reply, Reply, State#state{get_timings = Timings, {reply, Reply, State#state{get_timings = Timings,
get_countdown = CountDown}}; get_countdown = CountDown}};
handle_call({head, Bucket, Key, Tag}, _From, State) handle_call({head, Bucket, Key, Tag, SQNOnly}, _From, State)
when State#state.head_lookup == true -> when State#state.head_lookup == true ->
SWp = os:timestamp(), SWp = os:timestamp(),
LK = leveled_codec:to_ledgerkey(Bucket, Key, Tag), LK = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
@ -1308,14 +1323,14 @@ handle_call({head, Bucket, Key, Tag}, _From, State)
State#state.head_only), State#state.head_only),
{SWr, UpdTimingsP} = {SWr, UpdTimingsP} =
update_timings(SWp, {head, pcl}, State#state.head_timings), update_timings(SWp, {head, pcl}, State#state.head_timings),
{LedgerMD, JournalCheckFrequency} = {LedgerMD, SQN, JournalCheckFrequency} =
case Head of case Head of
not_present -> not_present ->
{not_found, State#state.ink_checking}; {not_found, null, State#state.ink_checking};
Head -> Head ->
case leveled_codec:striphead_to_v1details(Head) of case leveled_codec:striphead_to_v1details(Head) of
{_SeqN, tomb, _MH, _MD} -> {_SeqN, tomb, _MH, _MD} ->
{not_found, State#state.ink_checking}; {not_found, null, State#state.ink_checking};
{SeqN, {active, TS}, _MH, MD} -> {SeqN, {active, TS}, _MH, MD} ->
case TS >= leveled_util:integer_now() of case TS >= leveled_util:integer_now() of
true -> true ->
@ -1331,21 +1346,23 @@ handle_call({head, Bucket, Key, Tag}, _From, State)
LK, LK,
SeqN) of SeqN) of
{true, UppedFrequency} -> {true, UppedFrequency} ->
{not_found, UppedFrequency}; {not_found, null, UppedFrequency};
{false, ReducedFrequency} -> {false, ReducedFrequency} ->
{MD, ReducedFrequency} {MD, SeqN, ReducedFrequency}
end; end;
false -> false ->
{not_found, State#state.ink_checking} {not_found, null, State#state.ink_checking}
end end
end end
end, end,
Reply = Reply =
case LedgerMD of case {LedgerMD, SQNOnly} of
not_found -> {not_found, _} ->
not_found; not_found;
_ -> {_, false} ->
{ok, leveled_head:build_head(Tag, LedgerMD)} {ok, leveled_head:build_head(Tag, LedgerMD)};
{_, true} ->
{ok, SQN}
end, end,
{_SW, UpdTimingsR} = {_SW, UpdTimingsR} =
update_timings(SWr, {head, rsp}, UpdTimingsP), update_timings(SWr, {head, rsp}, UpdTimingsP),

View file

@ -1060,7 +1060,7 @@ close_allmanifest([H|ManifestT]) ->
open_all_manifest([], RootPath, CDBOpts) -> open_all_manifest([], RootPath, CDBOpts) ->
leveled_log:log("I0011", []), leveled_log:log("I0011", []),
leveled_imanifest:add_entry([], leveled_imanifest:add_entry([],
start_new_activejournal(1, RootPath, CDBOpts), start_new_activejournal(0, RootPath, CDBOpts),
true); true);
open_all_manifest(Man0, RootPath, CDBOpts) -> open_all_manifest(Man0, RootPath, CDBOpts) ->
Man1 = leveled_imanifest:to_list(Man0), Man1 = leveled_imanifest:to_list(Man0),
@ -1512,9 +1512,9 @@ empty_manifest_test() ->
?assertMatch(not_present, ink_fetch(Ink2, key_converter("Key1"), 1)), ?assertMatch(not_present, ink_fetch(Ink2, key_converter("Key1"), 1)),
{ok, SQN, Size} = {ok, SQN, Size} =
ink_put(Ink2, key_converter("Key1"), "Value1", {[], infinity}), ink_put(Ink2, key_converter("Key1"), "Value1", {[], infinity}),
?assertMatch(2, SQN), ?assertMatch(1, SQN), % This is the first key - so should have SQN of 1
?assertMatch(true, Size > 0), ?assertMatch(true, Size > 0),
{ok, V} = ink_fetch(Ink2, key_converter("Key1"), 2), {ok, V} = ink_fetch(Ink2, key_converter("Key1"), 1),
?assertMatch("Value1", V), ?assertMatch("Value1", V),
ink_close(Ink2), ink_close(Ink2),
clean_testdir(RootPath). clean_testdir(RootPath).

View file

@ -1210,7 +1210,7 @@ shutdown_manifest(Manifest, L0Constructor) ->
end end
end, end,
ok = ok =
case is_pid(Owner) of case check_alive(Owner) of
true -> true ->
leveled_sst:sst_close(Owner); leveled_sst:sst_close(Owner);
false -> false ->
@ -1221,6 +1221,15 @@ shutdown_manifest(Manifest, L0Constructor) ->
EntryCloseFun(L0Constructor). EntryCloseFun(L0Constructor).
-spec check_alive(pid()|undefined) -> boolean().
%% @doc
%% Double-check a processis active before attempting to terminate
check_alive(Owner) when is_pid(Owner) ->
is_process_alive(Owner);
check_alive(_Owner) ->
false.
-spec archive_files(list(), list()) -> ok. -spec archive_files(list(), list()) -> ok.
%% @doc %% @doc
%% Archive any sst files in the folder that have not been used to build the %% Archive any sst files in the folder that have not been used to build the

View file

@ -103,6 +103,10 @@ many_put_fetch_head(_Config) ->
{TestObject, TestSpec} = testutil:generate_testobject(), {TestObject, TestSpec} = testutil:generate_testobject(),
ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), ok = testutil:book_riakput(Bookie1, TestObject, TestSpec),
testutil:check_forobject(Bookie1, TestObject), testutil:check_forobject(Bookie1, TestObject),
{ok, 1} = leveled_bookie:book_sqn(Bookie1,
testutil:get_bucket(TestObject),
testutil:get_key(TestObject),
?RIAK_TAG),
ok = leveled_bookie:book_close(Bookie1), ok = leveled_bookie:book_close(Bookie1),
StartOpts2 = [{root_path, RootPath}, StartOpts2 = [{root_path, RootPath},
{max_journalsize, 50000000}, {max_journalsize, 50000000},
@ -113,6 +117,10 @@ many_put_fetch_head(_Config) ->
ok = leveled_bookie:book_loglevel(Bookie2, error), ok = leveled_bookie:book_loglevel(Bookie2, error),
ok = leveled_bookie:book_addlogs(Bookie2, ["B0015"]), ok = leveled_bookie:book_addlogs(Bookie2, ["B0015"]),
testutil:check_forobject(Bookie2, TestObject), testutil:check_forobject(Bookie2, TestObject),
{ok, 1} = leveled_bookie:book_sqn(Bookie2,
testutil:get_bucket(TestObject),
testutil:get_key(TestObject),
?RIAK_TAG),
GenList = [2, 20002, 40002, 60002, 80002, GenList = [2, 20002, 40002, 60002, 80002,
100002, 120002, 140002, 160002, 180002], 100002, 120002, 140002, 160002, 180002],
CLs = testutil:load_objects(20000, GenList, Bookie2, TestObject, CLs = testutil:load_objects(20000, GenList, Bookie2, TestObject,
@ -137,6 +145,17 @@ many_put_fetch_head(_Config) ->
{ok, Bookie3} = leveled_bookie:book_start(StartOpts2), {ok, Bookie3} = leveled_bookie:book_start(StartOpts2),
testutil:check_forlist(Bookie3, ChkList2A), testutil:check_forlist(Bookie3, ChkList2A),
testutil:check_forobject(Bookie3, TestObject), testutil:check_forobject(Bookie3, TestObject),
{ok, 1} = leveled_bookie:book_sqn(Bookie3,
testutil:get_bucket(TestObject),
testutil:get_key(TestObject),
?RIAK_TAG),
not_found = leveled_bookie:book_sqn(Bookie3,
testutil:get_bucket(TestObject),
testutil:get_key(TestObject),
?STD_TAG),
not_found = leveled_bookie:book_sqn(Bookie3,
testutil:get_bucket(TestObject),
testutil:get_key(TestObject)),
testutil:check_formissingobject(Bookie3, "Bookie1", "MissingKey0123"), testutil:check_formissingobject(Bookie3, "Bookie1", "MissingKey0123"),
ok = leveled_bookie:book_destroy(Bookie3). ok = leveled_bookie:book_destroy(Bookie3).