Add a wrapper to fold_keys queries

Queries that in Riak will be based on fold_keys need to be able to catch throws, and re-throw them to be detected by the worker (whilst still clearing up the snapshot)
This commit is contained in:
Martin Sumner 2018-09-24 19:54:28 +01:00
parent 041e86fed9
commit a9b097e392
2 changed files with 82 additions and 29 deletions

View file

@ -176,6 +176,7 @@
pcl_fetchlevelzero/2, pcl_fetchlevelzero/2,
pcl_fetch/4, pcl_fetch/4,
pcl_fetchkeys/5, pcl_fetchkeys/5,
pcl_fetchkeys/6,
pcl_fetchkeysbysegment/6, pcl_fetchkeysbysegment/6,
pcl_fetchnextkey/5, pcl_fetchnextkey/5,
pcl_checksequencenumber/3, pcl_checksequencenumber/3,
@ -386,7 +387,7 @@ pcl_fetch(Pid, Key, Hash, UseL0Index) ->
-spec pcl_fetchkeys(pid(), -spec pcl_fetchkeys(pid(),
leveled_codec:ledger_key(), leveled_codec:ledger_key(),
leveled_codec:ledger_key(), leveled_codec:ledger_key(),
fun(), any()) -> any(). fun(), any(), as_pcl|by_runner) -> any().
%% @doc %% @doc
%% Run a range query between StartKey and EndKey (inclusive). This will cover %% Run a range query between StartKey and EndKey (inclusive). This will cover
%% all keys in the range - so must only be run against snapshots of the %% all keys in the range - so must only be run against snapshots of the
@ -397,13 +398,18 @@ pcl_fetch(Pid, Key, Hash, UseL0Index) ->
%% the top of the range. Comparison with the start of the range is based on %% the top of the range. Comparison with the start of the range is based on
%% Erlang term order. %% Erlang term order.
pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc) -> pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc) ->
pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc, as_pcl).
pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc, By) ->
gen_server:call(Pid, gen_server:call(Pid,
{fetch_keys, {fetch_keys,
StartKey, EndKey, StartKey, EndKey,
AccFun, InitAcc, AccFun, InitAcc,
false, -1}, false, -1,
By},
infinity). infinity).
-spec pcl_fetchkeysbysegment(pid(), -spec pcl_fetchkeysbysegment(pid(),
leveled_codec:ledger_key(), leveled_codec:ledger_key(),
leveled_codec:ledger_key(), leveled_codec:ledger_key(),
@ -426,7 +432,8 @@ pcl_fetchkeysbysegment(Pid, StartKey, EndKey, AccFun, InitAcc, SegmentList) ->
{fetch_keys, {fetch_keys,
StartKey, EndKey, StartKey, EndKey,
AccFun, InitAcc, AccFun, InitAcc,
SegmentList, -1}, SegmentList, -1,
as_pcl},
infinity). infinity).
-spec pcl_fetchnextkey(pid(), -spec pcl_fetchnextkey(pid(),
@ -442,7 +449,8 @@ pcl_fetchnextkey(Pid, StartKey, EndKey, AccFun, InitAcc) ->
{fetch_keys, {fetch_keys,
StartKey, EndKey, StartKey, EndKey,
AccFun, InitAcc, AccFun, InitAcc,
false, 1}, false, 1,
as_pcl},
infinity). infinity).
-spec pcl_checksequencenumber(pid(), -spec pcl_checksequencenumber(pid(),
@ -676,7 +684,7 @@ handle_call({check_sqn, Key, Hash, SQN}, _From, State) ->
handle_call({fetch_keys, handle_call({fetch_keys,
StartKey, EndKey, StartKey, EndKey,
AccFun, InitAcc, AccFun, InitAcc,
SegmentList, MaxKeys}, SegmentList, MaxKeys, By},
_From, _From,
State=#state{snapshot_fully_loaded=Ready}) State=#state{snapshot_fully_loaded=Ready})
when Ready == true -> when Ready == true ->
@ -707,13 +715,19 @@ handle_call({fetch_keys,
end end
end, end,
SSTiter = lists:foldl(SetupFoldFun, [], lists:seq(0, ?MAX_LEVELS - 1)), SSTiter = lists:foldl(SetupFoldFun, [], lists:seq(0, ?MAX_LEVELS - 1)),
Folder =
Acc = keyfolder({L0AsList, SSTiter}, fun() ->
keyfolder({L0AsList, SSTiter},
{StartKey, EndKey}, {StartKey, EndKey},
{AccFun, InitAcc}, {AccFun, InitAcc},
{SegmentList, MaxKeys}), {SegmentList, MaxKeys})
end,
{reply, Acc, State}; case By of
as_pcl ->
{reply, Folder(), State};
by_runner ->
{reply, Folder, State}
end;
handle_call(get_startup_sqn, _From, State) -> handle_call(get_startup_sqn, _From, State) ->
{reply, State#state.persisted_sqn, State}; {reply, State#state.persisted_sqn, State};
handle_call({register_snapshot, Snapshot, Query, BookiesMem, LR}, _From, State) -> handle_call({register_snapshot, Snapshot, Query, BookiesMem, LR}, _From, State) ->

View file

@ -65,10 +65,11 @@ bucket_sizestats(SnapFun, Bucket, Tag) ->
fun() -> fun() ->
{ok, LedgerSnap, _JournalSnap} = SnapFun(), {ok, LedgerSnap, _JournalSnap} = SnapFun(),
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnap, Acc = leveled_penciller:pcl_fetchkeys(LedgerSnap,
StartKey, StartKey,
EndKey, EndKey,
AccFun, AccFun,
{0, 0}), {0, 0},
as_pcl),
ok = leveled_penciller:pcl_close(LedgerSnap), ok = leveled_penciller:pcl_close(LedgerSnap),
Acc Acc
end, end,
@ -119,16 +120,21 @@ index_query(SnapFun, {StartKey, EndKey, TermHandling}, FoldAccT) ->
fun add_keys/2 fun add_keys/2
end, end,
AccFun = accumulate_index(TermRegex, AddFun, FoldKeysFun), AccFun = accumulate_index(TermRegex, AddFun, FoldKeysFun),
Runner = Runner =
fun() -> fun() ->
{ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(), {ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(),
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, Folder = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
StartKey, StartKey,
EndKey, EndKey,
AccFun, AccFun,
InitAcc), InitAcc,
ok = leveled_penciller:pcl_close(LedgerSnapshot), by_runner),
Acc AfterFun =
fun() ->
ok = leveled_penciller:pcl_close(LedgerSnapshot)
end,
wrap_runner(Folder, AfterFun)
end, end,
{async, Runner}. {async, Runner}.
@ -145,14 +151,18 @@ bucketkey_query(SnapFun, Tag, Bucket,
AccFun = accumulate_keys(FoldKeysFun, TermRegex), AccFun = accumulate_keys(FoldKeysFun, TermRegex),
Runner = Runner =
fun() -> fun() ->
{ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(), {ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(),
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, Folder = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
SK, SK,
EK, EK,
AccFun, AccFun,
InitAcc), InitAcc,
ok = leveled_penciller:pcl_close(LedgerSnapshot), by_runner),
Acc AfterFun =
fun() ->
ok = leveled_penciller:pcl_close(LedgerSnapshot)
end,
wrap_runner(Folder, AfterFun)
end, end,
{async, Runner}. {async, Runner}.
@ -726,6 +736,18 @@ accumulate_index(TermRe, AddFun, FoldKeysFun) ->
end end end end
end. end.
-spec wrap_runner(fun(), fun()) -> any().
%% @doc
%% Allow things to be thrown in folds, and ensure clean-up action is still
%% undertaken if they are
wrap_runner(FoldAction, AfterAction) ->
try FoldAction()
catch throw:Throw ->
throw(Throw)
after AfterAction()
end.
%%%============================================================================ %%%============================================================================
%%% Test %%% Test
@ -733,6 +755,23 @@ accumulate_index(TermRe, AddFun, FoldKeysFun) ->
-ifdef(TEST). -ifdef(TEST).
throw_test() ->
StoppedFolder =
fun() ->
throw(stop_fold)
end,
CompletedFolder =
fun() ->
{ok, ['1']}
end,
AfterAction =
fun() ->
error
end,
?assertMatch({ok, ['1']},
wrap_runner(CompletedFolder, AfterAction)),
?assertException(throw, stop_fold,
wrap_runner(StoppedFolder, AfterAction)).
-endif. -endif.