Merge pull request #184 from martinsumner/mas-i183-thowinasyncfold
Mas i183 thowinasyncfold
This commit is contained in:
commit
ab74d5c0dd
3 changed files with 107 additions and 31 deletions
|
@ -591,6 +591,12 @@ book_returnfolder(Pid, RunnerType) ->
|
||||||
%% values. In the Riak sense of secondary indexes, there are two types
|
%% values. In the Riak sense of secondary indexes, there are two types
|
||||||
%% of indexes `_bin' indexes and `_int' indexes. Term regex may only
|
%% of indexes `_bin' indexes and `_int' indexes. Term regex may only
|
||||||
%% be run against the `_bin' type.
|
%% be run against the `_bin' type.
|
||||||
|
%%
|
||||||
|
%% Any book_indexfold query will fold over the snapshot under the control
|
||||||
|
%% of the worker process controlling the function - and that process can
|
||||||
|
%% be interrupted by a throw, which will be forwarded to the worker (whilst
|
||||||
|
%% still closing down the snapshot). This may be used, for example, to
|
||||||
|
%% curtail a fold in the application at max_results
|
||||||
-spec book_indexfold(pid(),
|
-spec book_indexfold(pid(),
|
||||||
Constraint:: {Bucket, Key} | Bucket,
|
Constraint:: {Bucket, Key} | Bucket,
|
||||||
FoldAccT :: {FoldFun, Acc},
|
FoldAccT :: {FoldFun, Acc},
|
||||||
|
@ -609,7 +615,8 @@ book_returnfolder(Pid, RunnerType) ->
|
||||||
TermRegex :: re:mp() | undefined.
|
TermRegex :: re:mp() | undefined.
|
||||||
|
|
||||||
book_indexfold(Pid, Constraint, FoldAccT, Range, TermHandling) ->
|
book_indexfold(Pid, Constraint, FoldAccT, Range, TermHandling) ->
|
||||||
RunnerType = {index_query, Constraint, FoldAccT, Range, TermHandling},
|
RunnerType =
|
||||||
|
{index_query, Constraint, FoldAccT, Range, TermHandling},
|
||||||
book_returnfolder(Pid, RunnerType).
|
book_returnfolder(Pid, RunnerType).
|
||||||
|
|
||||||
|
|
||||||
|
@ -647,6 +654,12 @@ book_bucketlist(Pid, Tag, FoldAccT, Constraint) ->
|
||||||
%% initial value of `Acc' is the second element of `FoldAccT'. Returns
|
%% initial value of `Acc' is the second element of `FoldAccT'. Returns
|
||||||
%% `{async, Runner}' where `Runner' is a function that will run the
|
%% `{async, Runner}' where `Runner' is a function that will run the
|
||||||
%% fold and return the final value of `Acc'
|
%% fold and return the final value of `Acc'
|
||||||
|
%%
|
||||||
|
%% Any book_keylist query will fold over the snapshot under the control
|
||||||
|
%% of the worker process controlling the function - and that process can
|
||||||
|
%% be interrupted by a throw, which will be forwarded to the worker (whilst
|
||||||
|
%% still closing down the snapshot). This may be used, for example, to
|
||||||
|
%% curtail a fold in the application at max_results
|
||||||
-spec book_keylist(pid(), Tag, FoldAccT) -> {async, Runner} when
|
-spec book_keylist(pid(), Tag, FoldAccT) -> {async, Runner} when
|
||||||
Tag :: leveled_codec:tag(),
|
Tag :: leveled_codec:tag(),
|
||||||
FoldAccT :: {FoldFun, Acc},
|
FoldAccT :: {FoldFun, Acc},
|
||||||
|
|
|
@ -176,6 +176,7 @@
|
||||||
pcl_fetchlevelzero/2,
|
pcl_fetchlevelzero/2,
|
||||||
pcl_fetch/4,
|
pcl_fetch/4,
|
||||||
pcl_fetchkeys/5,
|
pcl_fetchkeys/5,
|
||||||
|
pcl_fetchkeys/6,
|
||||||
pcl_fetchkeysbysegment/6,
|
pcl_fetchkeysbysegment/6,
|
||||||
pcl_fetchnextkey/5,
|
pcl_fetchnextkey/5,
|
||||||
pcl_checksequencenumber/3,
|
pcl_checksequencenumber/3,
|
||||||
|
@ -386,7 +387,7 @@ pcl_fetch(Pid, Key, Hash, UseL0Index) ->
|
||||||
-spec pcl_fetchkeys(pid(),
|
-spec pcl_fetchkeys(pid(),
|
||||||
leveled_codec:ledger_key(),
|
leveled_codec:ledger_key(),
|
||||||
leveled_codec:ledger_key(),
|
leveled_codec:ledger_key(),
|
||||||
fun(), any()) -> any().
|
fun(), any(), as_pcl|by_runner) -> any().
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Run a range query between StartKey and EndKey (inclusive). This will cover
|
%% Run a range query between StartKey and EndKey (inclusive). This will cover
|
||||||
%% all keys in the range - so must only be run against snapshots of the
|
%% all keys in the range - so must only be run against snapshots of the
|
||||||
|
@ -397,13 +398,18 @@ pcl_fetch(Pid, Key, Hash, UseL0Index) ->
|
||||||
%% the top of the range. Comparison with the start of the range is based on
|
%% the top of the range. Comparison with the start of the range is based on
|
||||||
%% Erlang term order.
|
%% Erlang term order.
|
||||||
pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc) ->
|
pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc) ->
|
||||||
|
pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc, as_pcl).
|
||||||
|
|
||||||
|
pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc, By) ->
|
||||||
gen_server:call(Pid,
|
gen_server:call(Pid,
|
||||||
{fetch_keys,
|
{fetch_keys,
|
||||||
StartKey, EndKey,
|
StartKey, EndKey,
|
||||||
AccFun, InitAcc,
|
AccFun, InitAcc,
|
||||||
false, -1},
|
false, -1,
|
||||||
|
By},
|
||||||
infinity).
|
infinity).
|
||||||
|
|
||||||
|
|
||||||
-spec pcl_fetchkeysbysegment(pid(),
|
-spec pcl_fetchkeysbysegment(pid(),
|
||||||
leveled_codec:ledger_key(),
|
leveled_codec:ledger_key(),
|
||||||
leveled_codec:ledger_key(),
|
leveled_codec:ledger_key(),
|
||||||
|
@ -426,7 +432,8 @@ pcl_fetchkeysbysegment(Pid, StartKey, EndKey, AccFun, InitAcc, SegmentList) ->
|
||||||
{fetch_keys,
|
{fetch_keys,
|
||||||
StartKey, EndKey,
|
StartKey, EndKey,
|
||||||
AccFun, InitAcc,
|
AccFun, InitAcc,
|
||||||
SegmentList, -1},
|
SegmentList, -1,
|
||||||
|
as_pcl},
|
||||||
infinity).
|
infinity).
|
||||||
|
|
||||||
-spec pcl_fetchnextkey(pid(),
|
-spec pcl_fetchnextkey(pid(),
|
||||||
|
@ -442,7 +449,8 @@ pcl_fetchnextkey(Pid, StartKey, EndKey, AccFun, InitAcc) ->
|
||||||
{fetch_keys,
|
{fetch_keys,
|
||||||
StartKey, EndKey,
|
StartKey, EndKey,
|
||||||
AccFun, InitAcc,
|
AccFun, InitAcc,
|
||||||
false, 1},
|
false, 1,
|
||||||
|
as_pcl},
|
||||||
infinity).
|
infinity).
|
||||||
|
|
||||||
-spec pcl_checksequencenumber(pid(),
|
-spec pcl_checksequencenumber(pid(),
|
||||||
|
@ -676,7 +684,7 @@ handle_call({check_sqn, Key, Hash, SQN}, _From, State) ->
|
||||||
handle_call({fetch_keys,
|
handle_call({fetch_keys,
|
||||||
StartKey, EndKey,
|
StartKey, EndKey,
|
||||||
AccFun, InitAcc,
|
AccFun, InitAcc,
|
||||||
SegmentList, MaxKeys},
|
SegmentList, MaxKeys, By},
|
||||||
_From,
|
_From,
|
||||||
State=#state{snapshot_fully_loaded=Ready})
|
State=#state{snapshot_fully_loaded=Ready})
|
||||||
when Ready == true ->
|
when Ready == true ->
|
||||||
|
@ -707,13 +715,19 @@ handle_call({fetch_keys,
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
SSTiter = lists:foldl(SetupFoldFun, [], lists:seq(0, ?MAX_LEVELS - 1)),
|
SSTiter = lists:foldl(SetupFoldFun, [], lists:seq(0, ?MAX_LEVELS - 1)),
|
||||||
|
Folder =
|
||||||
Acc = keyfolder({L0AsList, SSTiter},
|
fun() ->
|
||||||
|
keyfolder({L0AsList, SSTiter},
|
||||||
{StartKey, EndKey},
|
{StartKey, EndKey},
|
||||||
{AccFun, InitAcc},
|
{AccFun, InitAcc},
|
||||||
{SegmentList, MaxKeys}),
|
{SegmentList, MaxKeys})
|
||||||
|
end,
|
||||||
{reply, Acc, State};
|
case By of
|
||||||
|
as_pcl ->
|
||||||
|
{reply, Folder(), State};
|
||||||
|
by_runner ->
|
||||||
|
{reply, Folder, State}
|
||||||
|
end;
|
||||||
handle_call(get_startup_sqn, _From, State) ->
|
handle_call(get_startup_sqn, _From, State) ->
|
||||||
{reply, State#state.persisted_sqn, State};
|
{reply, State#state.persisted_sqn, State};
|
||||||
handle_call({register_snapshot, Snapshot, Query, BookiesMem, LR}, _From, State) ->
|
handle_call({register_snapshot, Snapshot, Query, BookiesMem, LR}, _From, State) ->
|
||||||
|
|
|
@ -65,10 +65,11 @@ bucket_sizestats(SnapFun, Bucket, Tag) ->
|
||||||
fun() ->
|
fun() ->
|
||||||
{ok, LedgerSnap, _JournalSnap} = SnapFun(),
|
{ok, LedgerSnap, _JournalSnap} = SnapFun(),
|
||||||
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnap,
|
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnap,
|
||||||
StartKey,
|
StartKey,
|
||||||
EndKey,
|
EndKey,
|
||||||
AccFun,
|
AccFun,
|
||||||
{0, 0}),
|
{0, 0},
|
||||||
|
as_pcl),
|
||||||
ok = leveled_penciller:pcl_close(LedgerSnap),
|
ok = leveled_penciller:pcl_close(LedgerSnap),
|
||||||
Acc
|
Acc
|
||||||
end,
|
end,
|
||||||
|
@ -108,6 +109,13 @@ bucket_list(SnapFun, Tag, FoldBucketsFun, InitAcc, MaxBuckets) ->
|
||||||
fun_and_acc()) -> {async, fun()}.
|
fun_and_acc()) -> {async, fun()}.
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Secondary index query
|
%% Secondary index query
|
||||||
|
%% This has the special capability that it will expect a message to be thrown
|
||||||
|
%% during the query - and handle this without crashing the penciller snapshot
|
||||||
|
%% This allows for this query to be used with a max_results check in the
|
||||||
|
%% applictaion - and to throw a stop message to be caught by the worker
|
||||||
|
%% handling the runner. This behaviour will not prevent the snapshot from
|
||||||
|
%% closing neatly, allowing delete_pending files to be cleared without waiting
|
||||||
|
%% for a timeout
|
||||||
index_query(SnapFun, {StartKey, EndKey, TermHandling}, FoldAccT) ->
|
index_query(SnapFun, {StartKey, EndKey, TermHandling}, FoldAccT) ->
|
||||||
{FoldKeysFun, InitAcc} = FoldAccT,
|
{FoldKeysFun, InitAcc} = FoldAccT,
|
||||||
{ReturnTerms, TermRegex} = TermHandling,
|
{ReturnTerms, TermRegex} = TermHandling,
|
||||||
|
@ -119,16 +127,21 @@ index_query(SnapFun, {StartKey, EndKey, TermHandling}, FoldAccT) ->
|
||||||
fun add_keys/2
|
fun add_keys/2
|
||||||
end,
|
end,
|
||||||
AccFun = accumulate_index(TermRegex, AddFun, FoldKeysFun),
|
AccFun = accumulate_index(TermRegex, AddFun, FoldKeysFun),
|
||||||
|
|
||||||
Runner =
|
Runner =
|
||||||
fun() ->
|
fun() ->
|
||||||
{ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(),
|
{ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(),
|
||||||
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
Folder = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
||||||
StartKey,
|
StartKey,
|
||||||
EndKey,
|
EndKey,
|
||||||
AccFun,
|
AccFun,
|
||||||
InitAcc),
|
InitAcc,
|
||||||
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
by_runner),
|
||||||
Acc
|
AfterFun =
|
||||||
|
fun() ->
|
||||||
|
ok = leveled_penciller:pcl_close(LedgerSnapshot)
|
||||||
|
end,
|
||||||
|
wrap_runner(Folder, AfterFun)
|
||||||
end,
|
end,
|
||||||
{async, Runner}.
|
{async, Runner}.
|
||||||
|
|
||||||
|
@ -145,14 +158,18 @@ bucketkey_query(SnapFun, Tag, Bucket,
|
||||||
AccFun = accumulate_keys(FoldKeysFun, TermRegex),
|
AccFun = accumulate_keys(FoldKeysFun, TermRegex),
|
||||||
Runner =
|
Runner =
|
||||||
fun() ->
|
fun() ->
|
||||||
{ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(),
|
{ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(),
|
||||||
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
Folder = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
||||||
SK,
|
SK,
|
||||||
EK,
|
EK,
|
||||||
AccFun,
|
AccFun,
|
||||||
InitAcc),
|
InitAcc,
|
||||||
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
by_runner),
|
||||||
Acc
|
AfterFun =
|
||||||
|
fun() ->
|
||||||
|
ok = leveled_penciller:pcl_close(LedgerSnapshot)
|
||||||
|
end,
|
||||||
|
wrap_runner(Folder, AfterFun)
|
||||||
end,
|
end,
|
||||||
{async, Runner}.
|
{async, Runner}.
|
||||||
|
|
||||||
|
@ -165,7 +182,7 @@ bucketkey_query(SnapFun, Tag, Bucket, FunAcc) ->
|
||||||
|
|
||||||
-spec hashlist_query(fun(), leveled_codec:tag(), boolean()) -> {async, fun()}.
|
-spec hashlist_query(fun(), leveled_codec:tag(), boolean()) -> {async, fun()}.
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Fold pver the key accumulating the hashes
|
%% Fold over the keys under a given Tag accumulating the hashes
|
||||||
hashlist_query(SnapFun, Tag, JournalCheck) ->
|
hashlist_query(SnapFun, Tag, JournalCheck) ->
|
||||||
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
||||||
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
||||||
|
@ -726,6 +743,21 @@ accumulate_index(TermRe, AddFun, FoldKeysFun) ->
|
||||||
end end
|
end end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec wrap_runner(fun(), fun()) -> any().
|
||||||
|
%% @doc
|
||||||
|
%% Allow things to be thrown in folds, and ensure clean-up action is still
|
||||||
|
%% undertaken if they are.
|
||||||
|
%%
|
||||||
|
%% It is assumed this is only used at present by index queries and key folds,
|
||||||
|
%% but the wrap could be applied more generally with further work
|
||||||
|
wrap_runner(FoldAction, AfterAction) ->
|
||||||
|
try FoldAction()
|
||||||
|
catch throw:Throw ->
|
||||||
|
throw(Throw)
|
||||||
|
after AfterAction()
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% Test
|
%%% Test
|
||||||
|
@ -733,6 +765,23 @@ accumulate_index(TermRe, AddFun, FoldKeysFun) ->
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
|
|
||||||
|
throw_test() ->
|
||||||
|
StoppedFolder =
|
||||||
|
fun() ->
|
||||||
|
throw(stop_fold)
|
||||||
|
end,
|
||||||
|
CompletedFolder =
|
||||||
|
fun() ->
|
||||||
|
{ok, ['1']}
|
||||||
|
end,
|
||||||
|
AfterAction =
|
||||||
|
fun() ->
|
||||||
|
error
|
||||||
|
end,
|
||||||
|
?assertMatch({ok, ['1']},
|
||||||
|
wrap_runner(CompletedFolder, AfterAction)),
|
||||||
|
?assertException(throw, stop_fold,
|
||||||
|
wrap_runner(StoppedFolder, AfterAction)).
|
||||||
|
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue