From a9b097e39250955178cdc6f5c9a984da2a488b53 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 24 Sep 2018 19:54:28 +0100 Subject: [PATCH] Add a wrapper to fold_keys queries Queries that in Riak will be based on fold_keys need to be able to catch throws, and re-throw them to be detected by the worker (whilst still clearing up the snapshot) --- src/leveled_penciller.erl | 34 ++++++++++++----- src/leveled_runner.erl | 77 +++++++++++++++++++++++++++++---------- 2 files changed, 82 insertions(+), 29 deletions(-) diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 776da87..4692bfc 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -176,6 +176,7 @@ pcl_fetchlevelzero/2, pcl_fetch/4, pcl_fetchkeys/5, + pcl_fetchkeys/6, pcl_fetchkeysbysegment/6, pcl_fetchnextkey/5, pcl_checksequencenumber/3, @@ -386,7 +387,7 @@ pcl_fetch(Pid, Key, Hash, UseL0Index) -> -spec pcl_fetchkeys(pid(), leveled_codec:ledger_key(), leveled_codec:ledger_key(), - fun(), any()) -> any(). + fun(), any(), as_pcl|by_runner) -> any(). %% @doc %% Run a range query between StartKey and EndKey (inclusive). This will cover %% all keys in the range - so must only be run against snapshots of the @@ -397,13 +398,18 @@ pcl_fetch(Pid, Key, Hash, UseL0Index) -> %% the top of the range. Comparison with the start of the range is based on %% Erlang term order. pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc) -> + pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc, as_pcl). + +pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc, By) -> gen_server:call(Pid, {fetch_keys, StartKey, EndKey, AccFun, InitAcc, - false, -1}, + false, -1, + By}, infinity). + -spec pcl_fetchkeysbysegment(pid(), leveled_codec:ledger_key(), leveled_codec:ledger_key(), @@ -426,7 +432,8 @@ pcl_fetchkeysbysegment(Pid, StartKey, EndKey, AccFun, InitAcc, SegmentList) -> {fetch_keys, StartKey, EndKey, AccFun, InitAcc, - SegmentList, -1}, + SegmentList, -1, + as_pcl}, infinity). -spec pcl_fetchnextkey(pid(), @@ -442,7 +449,8 @@ pcl_fetchnextkey(Pid, StartKey, EndKey, AccFun, InitAcc) -> {fetch_keys, StartKey, EndKey, AccFun, InitAcc, - false, 1}, + false, 1, + as_pcl}, infinity). -spec pcl_checksequencenumber(pid(), @@ -676,7 +684,7 @@ handle_call({check_sqn, Key, Hash, SQN}, _From, State) -> handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, - SegmentList, MaxKeys}, + SegmentList, MaxKeys, By}, _From, State=#state{snapshot_fully_loaded=Ready}) when Ready == true -> @@ -707,13 +715,19 @@ handle_call({fetch_keys, end end, SSTiter = lists:foldl(SetupFoldFun, [], lists:seq(0, ?MAX_LEVELS - 1)), - - Acc = keyfolder({L0AsList, SSTiter}, + Folder = + fun() -> + keyfolder({L0AsList, SSTiter}, {StartKey, EndKey}, {AccFun, InitAcc}, - {SegmentList, MaxKeys}), - - {reply, Acc, State}; + {SegmentList, MaxKeys}) + end, + case By of + as_pcl -> + {reply, Folder(), State}; + by_runner -> + {reply, Folder, State} + end; handle_call(get_startup_sqn, _From, State) -> {reply, State#state.persisted_sqn, State}; handle_call({register_snapshot, Snapshot, Query, BookiesMem, LR}, _From, State) -> diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index 90e27d5..3db2267 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -65,10 +65,11 @@ bucket_sizestats(SnapFun, Bucket, Tag) -> fun() -> {ok, LedgerSnap, _JournalSnap} = SnapFun(), Acc = leveled_penciller:pcl_fetchkeys(LedgerSnap, - StartKey, - EndKey, - AccFun, - {0, 0}), + StartKey, + EndKey, + AccFun, + {0, 0}, + as_pcl), ok = leveled_penciller:pcl_close(LedgerSnap), Acc end, @@ -119,16 +120,21 @@ index_query(SnapFun, {StartKey, EndKey, TermHandling}, FoldAccT) -> fun add_keys/2 end, AccFun = accumulate_index(TermRegex, AddFun, FoldKeysFun), + Runner = fun() -> {ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(), - Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, - StartKey, - EndKey, - AccFun, - InitAcc), - ok = leveled_penciller:pcl_close(LedgerSnapshot), - Acc + Folder = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, + StartKey, + EndKey, + AccFun, + InitAcc, + by_runner), + AfterFun = + fun() -> + ok = leveled_penciller:pcl_close(LedgerSnapshot) + end, + wrap_runner(Folder, AfterFun) end, {async, Runner}. @@ -145,14 +151,18 @@ bucketkey_query(SnapFun, Tag, Bucket, AccFun = accumulate_keys(FoldKeysFun, TermRegex), Runner = fun() -> - {ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(), - Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, - SK, - EK, - AccFun, - InitAcc), - ok = leveled_penciller:pcl_close(LedgerSnapshot), - Acc + {ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(), + Folder = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, + SK, + EK, + AccFun, + InitAcc, + by_runner), + AfterFun = + fun() -> + ok = leveled_penciller:pcl_close(LedgerSnapshot) + end, + wrap_runner(Folder, AfterFun) end, {async, Runner}. @@ -726,6 +736,18 @@ accumulate_index(TermRe, AddFun, FoldKeysFun) -> end end end. +-spec wrap_runner(fun(), fun()) -> any(). +%% @doc +%% Allow things to be thrown in folds, and ensure clean-up action is still +%% undertaken if they are +wrap_runner(FoldAction, AfterAction) -> + try FoldAction() + catch throw:Throw -> + throw(Throw) + after AfterAction() + end. + + %%%============================================================================ %%% Test @@ -733,6 +755,23 @@ accumulate_index(TermRe, AddFun, FoldKeysFun) -> -ifdef(TEST). +throw_test() -> + StoppedFolder = + fun() -> + throw(stop_fold) + end, + CompletedFolder = + fun() -> + {ok, ['1']} + end, + AfterAction = + fun() -> + error + end, + ?assertMatch({ok, ['1']}, + wrap_runner(CompletedFolder, AfterAction)), + ?assertException(throw, stop_fold, + wrap_runner(StoppedFolder, AfterAction)). -endif.