From a9b097e39250955178cdc6f5c9a984da2a488b53 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 24 Sep 2018 19:54:28 +0100 Subject: [PATCH 1/3] Add a wrapper to fold_keys queries Queries that in Riak will be based on fold_keys need to be able to catch throws, and re-throw them to be detected by the worker (whilst still clearing up the snapshot) --- src/leveled_penciller.erl | 34 ++++++++++++----- src/leveled_runner.erl | 77 +++++++++++++++++++++++++++++---------- 2 files changed, 82 insertions(+), 29 deletions(-) diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 776da87..4692bfc 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -176,6 +176,7 @@ pcl_fetchlevelzero/2, pcl_fetch/4, pcl_fetchkeys/5, + pcl_fetchkeys/6, pcl_fetchkeysbysegment/6, pcl_fetchnextkey/5, pcl_checksequencenumber/3, @@ -386,7 +387,7 @@ pcl_fetch(Pid, Key, Hash, UseL0Index) -> -spec pcl_fetchkeys(pid(), leveled_codec:ledger_key(), leveled_codec:ledger_key(), - fun(), any()) -> any(). + fun(), any(), as_pcl|by_runner) -> any(). %% @doc %% Run a range query between StartKey and EndKey (inclusive). This will cover %% all keys in the range - so must only be run against snapshots of the @@ -397,13 +398,18 @@ pcl_fetch(Pid, Key, Hash, UseL0Index) -> %% the top of the range. Comparison with the start of the range is based on %% Erlang term order. pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc) -> + pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc, as_pcl). + +pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc, By) -> gen_server:call(Pid, {fetch_keys, StartKey, EndKey, AccFun, InitAcc, - false, -1}, + false, -1, + By}, infinity). + -spec pcl_fetchkeysbysegment(pid(), leveled_codec:ledger_key(), leveled_codec:ledger_key(), @@ -426,7 +432,8 @@ pcl_fetchkeysbysegment(Pid, StartKey, EndKey, AccFun, InitAcc, SegmentList) -> {fetch_keys, StartKey, EndKey, AccFun, InitAcc, - SegmentList, -1}, + SegmentList, -1, + as_pcl}, infinity). -spec pcl_fetchnextkey(pid(), @@ -442,7 +449,8 @@ pcl_fetchnextkey(Pid, StartKey, EndKey, AccFun, InitAcc) -> {fetch_keys, StartKey, EndKey, AccFun, InitAcc, - false, 1}, + false, 1, + as_pcl}, infinity). -spec pcl_checksequencenumber(pid(), @@ -676,7 +684,7 @@ handle_call({check_sqn, Key, Hash, SQN}, _From, State) -> handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, - SegmentList, MaxKeys}, + SegmentList, MaxKeys, By}, _From, State=#state{snapshot_fully_loaded=Ready}) when Ready == true -> @@ -707,13 +715,19 @@ handle_call({fetch_keys, end end, SSTiter = lists:foldl(SetupFoldFun, [], lists:seq(0, ?MAX_LEVELS - 1)), - - Acc = keyfolder({L0AsList, SSTiter}, + Folder = + fun() -> + keyfolder({L0AsList, SSTiter}, {StartKey, EndKey}, {AccFun, InitAcc}, - {SegmentList, MaxKeys}), - - {reply, Acc, State}; + {SegmentList, MaxKeys}) + end, + case By of + as_pcl -> + {reply, Folder(), State}; + by_runner -> + {reply, Folder, State} + end; handle_call(get_startup_sqn, _From, State) -> {reply, State#state.persisted_sqn, State}; handle_call({register_snapshot, Snapshot, Query, BookiesMem, LR}, _From, State) -> diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index 90e27d5..3db2267 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -65,10 +65,11 @@ bucket_sizestats(SnapFun, Bucket, Tag) -> fun() -> {ok, LedgerSnap, _JournalSnap} = SnapFun(), Acc = leveled_penciller:pcl_fetchkeys(LedgerSnap, - StartKey, - EndKey, - AccFun, - {0, 0}), + StartKey, + EndKey, + AccFun, + {0, 0}, + as_pcl), ok = leveled_penciller:pcl_close(LedgerSnap), Acc end, @@ -119,16 +120,21 @@ index_query(SnapFun, {StartKey, EndKey, TermHandling}, FoldAccT) -> fun add_keys/2 end, AccFun = accumulate_index(TermRegex, AddFun, FoldKeysFun), + Runner = fun() -> {ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(), - Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, - StartKey, - EndKey, - AccFun, - InitAcc), - ok = leveled_penciller:pcl_close(LedgerSnapshot), - Acc + Folder = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, + StartKey, + EndKey, + AccFun, + InitAcc, + by_runner), + AfterFun = + fun() -> + ok = leveled_penciller:pcl_close(LedgerSnapshot) + end, + wrap_runner(Folder, AfterFun) end, {async, Runner}. @@ -145,14 +151,18 @@ bucketkey_query(SnapFun, Tag, Bucket, AccFun = accumulate_keys(FoldKeysFun, TermRegex), Runner = fun() -> - {ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(), - Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, - SK, - EK, - AccFun, - InitAcc), - ok = leveled_penciller:pcl_close(LedgerSnapshot), - Acc + {ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(), + Folder = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, + SK, + EK, + AccFun, + InitAcc, + by_runner), + AfterFun = + fun() -> + ok = leveled_penciller:pcl_close(LedgerSnapshot) + end, + wrap_runner(Folder, AfterFun) end, {async, Runner}. @@ -726,6 +736,18 @@ accumulate_index(TermRe, AddFun, FoldKeysFun) -> end end end. +-spec wrap_runner(fun(), fun()) -> any(). +%% @doc +%% Allow things to be thrown in folds, and ensure clean-up action is still +%% undertaken if they are +wrap_runner(FoldAction, AfterAction) -> + try FoldAction() + catch throw:Throw -> + throw(Throw) + after AfterAction() + end. + + %%%============================================================================ %%% Test @@ -733,6 +755,23 @@ accumulate_index(TermRe, AddFun, FoldKeysFun) -> -ifdef(TEST). +throw_test() -> + StoppedFolder = + fun() -> + throw(stop_fold) + end, + CompletedFolder = + fun() -> + {ok, ['1']} + end, + AfterAction = + fun() -> + error + end, + ?assertMatch({ok, ['1']}, + wrap_runner(CompletedFolder, AfterAction)), + ?assertException(throw, stop_fold, + wrap_runner(StoppedFolder, AfterAction)). -endif. From bed155761b7dbe3012dd8b7ac05c1f6462f775bc Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 24 Sep 2018 20:05:48 +0100 Subject: [PATCH 2/3] Added comments This is still a clumsy feature, in terms of implementation. Is the fact that some folds handle a throw, and some don't an issue? --- src/leveled_runner.erl | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index 3db2267..7ce25ac 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -109,6 +109,13 @@ bucket_list(SnapFun, Tag, FoldBucketsFun, InitAcc, MaxBuckets) -> fun_and_acc()) -> {async, fun()}. %% @doc %% Secondary index query +%% This has the special capability that it will expect a message to be thrown +%% during the query - and handle this without crashing the penciller snapshot +%% This allows for this query to be used with a max_results check in the +%% applictaion - and to throw a stop message to be caught by the worker +%% handling the runner. This behaviour will not prevent the snapshot from +%% closing neatly, allowing delete_pending files to be cleared without waiting +%% for a timeout index_query(SnapFun, {StartKey, EndKey, TermHandling}, FoldAccT) -> {FoldKeysFun, InitAcc} = FoldAccT, {ReturnTerms, TermRegex} = TermHandling, @@ -739,7 +746,10 @@ accumulate_index(TermRe, AddFun, FoldKeysFun) -> -spec wrap_runner(fun(), fun()) -> any(). %% @doc %% Allow things to be thrown in folds, and ensure clean-up action is still -%% undertaken if they are +%% undertaken if they are. +%% +%% It is assumed this is only used at present by index queries and key folds, +%% but the wrap could be applied more generally with further work wrap_runner(FoldAction, AfterAction) -> try FoldAction() catch throw:Throw -> From f4b365438c01ec60e749814df6dbf38bc959c8f5 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 24 Sep 2018 20:43:21 +0100 Subject: [PATCH 3/3] Further comments in API docs --- src/leveled_bookie.erl | 15 ++++++++++++++- src/leveled_runner.erl | 2 +- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index b0b8a96..8292a04 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -591,6 +591,12 @@ book_returnfolder(Pid, RunnerType) -> %% values. In the Riak sense of secondary indexes, there are two types %% of indexes `_bin' indexes and `_int' indexes. Term regex may only %% be run against the `_bin' type. +%% +%% Any book_indexfold query will fold over the snapshot under the control +%% of the worker process controlling the function - and that process can +%% be interrupted by a throw, which will be forwarded to the worker (whilst +%% still closing down the snapshot). This may be used, for example, to +%% curtail a fold in the application at max_results -spec book_indexfold(pid(), Constraint:: {Bucket, Key} | Bucket, FoldAccT :: {FoldFun, Acc}, @@ -609,7 +615,8 @@ book_returnfolder(Pid, RunnerType) -> TermRegex :: re:mp() | undefined. book_indexfold(Pid, Constraint, FoldAccT, Range, TermHandling) -> - RunnerType = {index_query, Constraint, FoldAccT, Range, TermHandling}, + RunnerType = + {index_query, Constraint, FoldAccT, Range, TermHandling}, book_returnfolder(Pid, RunnerType). @@ -647,6 +654,12 @@ book_bucketlist(Pid, Tag, FoldAccT, Constraint) -> %% initial value of `Acc' is the second element of `FoldAccT'. Returns %% `{async, Runner}' where `Runner' is a function that will run the %% fold and return the final value of `Acc' +%% +%% Any book_keylist query will fold over the snapshot under the control +%% of the worker process controlling the function - and that process can +%% be interrupted by a throw, which will be forwarded to the worker (whilst +%% still closing down the snapshot). This may be used, for example, to +%% curtail a fold in the application at max_results -spec book_keylist(pid(), Tag, FoldAccT) -> {async, Runner} when Tag :: leveled_codec:tag(), FoldAccT :: {FoldFun, Acc}, diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index 7ce25ac..7b88c21 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -182,7 +182,7 @@ bucketkey_query(SnapFun, Tag, Bucket, FunAcc) -> -spec hashlist_query(fun(), leveled_codec:tag(), boolean()) -> {async, fun()}. %% @doc -%% Fold pver the key accumulating the hashes +%% Fold over the keys under a given Tag accumulating the hashes hashlist_query(SnapFun, Tag, JournalCheck) -> StartKey = leveled_codec:to_ledgerkey(null, null, Tag), EndKey = leveled_codec:to_ledgerkey(null, null, Tag),