diff --git a/include/leveled.hrl b/include/leveled.hrl index dd0b90b..43b1064 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -56,6 +56,9 @@ root_path :: string() | undefined, cdb_options :: #cdb_options{} | undefined, start_snapshot = false :: boolean(), + %% so a snapshot can monitor the bookie and + %% terminate when it does + bookies_pid :: pid() | undefined, source_inker :: pid() | undefined, reload_strategy = [] :: list(), waste_retention_period :: integer() | undefined, @@ -70,6 +73,9 @@ max_inmemory_tablesize :: integer() | undefined, start_snapshot = false :: boolean(), snapshot_query, + %% so a snapshot can monitor the bookie and + %% terminate when it does + bookies_pid :: pid() | undefined, bookies_mem :: tuple() | undefined, source_penciller :: pid() | undefined, snapshot_longrunning = true :: boolean(), diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index c612485..c25d4f1 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -58,7 +58,6 @@ book_get/4, book_head/3, book_head/4, - book_returnfolder/2, book_snapshot/4, book_compactjournal/2, book_islastcompactionpending/1, @@ -68,6 +67,21 @@ book_destroy/1, book_isempty/2]). +%% folding API +-export([ + book_returnfolder/2, + book_indexfold/5, + book_bucketlist/4, + book_keylist/3, + book_keylist/4, + book_keylist/5, + book_objectfold/4, + book_objectfold/5, + book_objectfold/6, + book_headfold/6, + book_headfold/7 + ]). + -export([empty_ledgercache/0, loadqueue_ledgercache/1, push_ledgercache/2, @@ -529,6 +543,298 @@ book_head(Pid, Bucket, Key) -> book_returnfolder(Pid, RunnerType) -> gen_server:call(Pid, {return_runner, RunnerType}, infinity). +%% @doc Builds and returns an `{async, Runner}' pair for secondary +%% index queries. Calling `Runner' will fold over keys (ledger) tagged +%% with the index `?IDX_TAG' and Constrain the fold to a specific +%% `Bucket''s index fields, as specified by the `Constraint' +%% argument. If `Constraint' is a tuple of `{Bucket, Key}' the fold +%% starts at `Key' (this is useful for implementing pagination, for +%% example.) Provide a `FoldAccT' tuple of fold fun ( which is 3 +%% arity fun that will be called once per-matching index entry, with +%% the Bucket, Primary Key (or {IndexVal and Primary key} if +%% `ReturnTerms' is true)) and an initial Accumulator, which will be +%% passed as the 3rd argument in the initial call to +%% FoldFun. Subsequent calls to FoldFun will use the previous return +%% of FoldFun as the 3rd argument, and the final return of `Runner' is +%% the final return of `FoldFun', the final Accumulator value. The +%% query can filter inputs based on `Range' and `TermHandling'. +%% `Range' specifies the name of `IndexField' to query, and `Start' +%% and `End' optionally provide the range to query over. +%% `TermHandling' is a 2-tuple, the first element is a `boolean()', +%% `true' meaning return terms, (see fold fun above), `false' meaning +%% just return primary keys. `TermRegex' is either a regular +%% expression of type `re:mp()' (that will be run against each index +%% term value, and only those that match will be accumulated) or +%% `undefined', which means no regular expression filtering of index +%% values. +-spec book_indexfold(pid(), + Constraint:: {Bucket, Key} | Bucket, + FoldAccT :: {FoldFun, Acc}, + Range :: {IndexField, Start, End}, + TermHandling :: {ReturnTerms, TermRegex}) -> + {async, Runner::fun()} + when Bucket::term(), + Key::term(), + FoldFun::fun((Bucket, Key | {IndexVal, Key}, Acc) -> Acc), + Acc::term(), + IndexField::term(), + IndexVal::term(), + Start::IndexVal, + End::IndexVal, + ReturnTerms::boolean(), + TermRegex :: re:mp() | undefined. + +book_indexfold(Pid, Constraint, FoldAccT, Range, TermHandling) -> + RunnerType = {index_query, Constraint, FoldAccT, Range, TermHandling}, + book_returnfolder(Pid, RunnerType). + + +%% @doc list buckets. Folds over the ledger only. Given a `Tag' folds +%% over the keyspace calling `FoldFun' from `FoldAccT' for each +%% `Bucket'. `FoldFun' is a 2-arity function that is passed `Bucket' +%% and `Acc'. On first call `Acc' is the initial `Acc' from +%% `FoldAccT', thereafter the result of the previous call to +%% `FoldFun'. `Constraint' can be either atom `all' or `first' meaning +%% return all buckets, or just the first one found. Returns `{async, +%% Runner}' where `Runner' is a fun that returns the final value of +%% `FoldFun', the final `Acc' accumulator. +-spec book_bucketlist(pid(), Tag, FoldAccT, Constraint) -> + {async, Runner} when + Tag :: leveled_codec:tag(), + FoldAccT :: {FoldFun, Acc}, + FoldFun :: fun((Bucket, Acc) -> Acc), + Acc :: term(), + Constraint :: first | all, + Bucket :: term(), + Acc :: term(), + Runner :: fun(() -> Acc). +book_bucketlist(Pid, Tag, FoldAccT, Constraint) -> + RunnerType= + case Constraint of + first-> {first_bucket, Tag, FoldAccT}; + all -> {bucket_list, Tag, FoldAccT} + end, + book_returnfolder(Pid, RunnerType). + + +%% @doc fold over the keys (ledger only) for a given `Tag'. Each key +%% will result in a call to `FoldFun' from `FoldAccT'. `FoldFun' is a +%% 3-arity function, called with `Bucket', `Key' and `Acc'. The +%% initial value of `Acc' is the second element of `FoldAccT'. Returns +%% `{async, Runner}' where `Runner' is a function that will run the +%% fold and return the final value of `Acc' +-spec book_keylist(pid(), Tag, FoldAccT) -> {async, Runner} when + Tag :: leveled_codec:tag(), + FoldAccT :: {FoldFun, Acc}, + FoldFun :: fun((Bucket, Key, Acc) -> Acc), + Acc :: term(), + Bucket :: term(), + Key :: term(), + Runner :: fun(() -> Acc). +book_keylist(Pid, Tag, FoldAccT) -> + RunnerType = {keylist, Tag, FoldAccT}, + book_returnfolder(Pid, RunnerType). + +%% @doc as for book_keylist/3 but constrained to only those keys in +%% `Bucket' +-spec book_keylist(pid(), Tag, Bucket, FoldAccT) -> {async, Runner} when + Tag :: leveled_codec:tag(), + FoldAccT :: {FoldFun, Acc}, + FoldFun :: fun((Bucket, Key, Acc) -> Acc), + Acc :: term(), + Bucket :: term(), + Key :: term(), + Runner :: fun(() -> Acc). +book_keylist(Pid, Tag, Bucket, FoldAccT) -> + RunnerType = {keylist, Tag, Bucket, FoldAccT}, + book_returnfolder(Pid, RunnerType). + +%% @doc as for book_keylist/4 with additional constraint that only +%% keys in the `KeyRange' tuple will be folder over, where `KeyRange' +%% is `StartKey', the first key in the range and `EndKey' the last, +%% (inclusive.) Or the atom `all', which will return all keys in the +%% `Bucket'. +-spec book_keylist(pid(), Tag, Bucket, KeyRange, FoldAccT) -> {async, Runner} when + Tag :: leveled_codec:tag(), + FoldAccT :: {FoldFun, Acc}, + FoldFun :: fun((Bucket, Key, Acc) -> Acc), + Acc :: term(), + Bucket :: term(), + KeyRange :: {StartKey, EndKey} | all, + StartKey :: Key, + EndKey :: Key, + Key :: term(), + Runner :: fun(() -> Acc). +book_keylist(Pid, Tag, Bucket, KeyRange, FoldAccT) -> + RunnerType = {keylist, Tag, Bucket, KeyRange, FoldAccT}, + book_returnfolder(Pid, RunnerType). + +%% @doc fold over all the objects/values in the store in key +%% order. `Tag' is the tagged type of object. `FoldAccT' is a 2-tuple, +%% the first element being a 4-arity fun, that is called once for each +%% key with the arguments `Bucket', `Key', `Value', `Acc'. The 2nd +%% element is the initial accumulator `Acc' which is passed to +%% `FoldFun' on it's first call. Thereafter the return value from +%% `FoldFun' is the 4th argument to the next call of +%% `FoldFun'. `SnapPreFold' is a boolean where `true' means take the +%% snapshot at once, and `false' means take the snapshot when the +%% returned `Runner' is executed. Return `{async, Runner}' where +%% `Runner' is a 0-arity function that returns the final accumulator +%% from `FoldFun' +-spec book_objectfold(pid(), Tag, FoldAccT, SnapPreFold) -> {async, Runner} when + Tag :: leveled_codec:tag(), + FoldAccT :: {FoldFun, Acc}, + FoldFun :: fun((Bucket, Key, Value, Acc) -> Acc), + Acc :: term(), + Bucket :: term(), + Key :: term(), + Value :: term(), + SnapPreFold :: boolean(), + Runner :: fun(() -> Acc). +book_objectfold(Pid, Tag, FoldAccT, SnapPreFold) -> + RunnerType = {foldobjects_allkeys, Tag, FoldAccT, SnapPreFold}, + book_returnfolder(Pid, RunnerType). + +%% @doc exactly as book_objectfold/4 with the additional parameter +%% `Order'. `Order' can be `sqn_order' or `key_order'. In +%% book_objectfold/4 and book_objectfold/6 `key_order' is +%% implied. This function called with `Option == key_order' is +%% identical to book_objectfold/4. NOTE: if you most fold over ALL +%% objects, this is quicker than `key_order' due to accessing the +%% journal objects in thei ron disk order, not via a fold over the +%% ledger. +-spec book_objectfold(pid(), Tag, FoldAccT, SnapPreFold, Order) -> {async, Runner} when + Tag :: leveled_codec:tag(), + FoldAccT :: {FoldFun, Acc}, + FoldFun :: fun((Bucket, Key, Value, Acc) -> Acc), + Acc :: term(), + Bucket :: term(), + Key :: term(), + Value :: term(), + SnapPreFold :: boolean(), + Runner :: fun(() -> Acc), + Order :: key_order | sqn_order. +book_objectfold(Pid, Tag, FoldAccT, SnapPreFold, Order) -> + RunnerType = {foldobjects_allkeys, Tag, FoldAccT, SnapPreFold, Order}, + book_returnfolder(Pid, RunnerType). + +%% @doc as book_objectfold/4, with the addition of some constraints on +%% the range of objects folded over. The 3rd argument `Bucket' limits +%% ths fold to that specific bucket only. The 4th argument `Limiter' +%% further constrains the fold. `Limiter' can be either a `Range' or +%% `Index' query. `Range' is either that atom `all', meaning {min, +%% max}, or, a two tuple of start key and end key, inclusive. Index +%% Query is a 3-tuple of `{IndexField, StartTerm, EndTerm}`, just as +%% in book_indexfold/5 +-spec book_objectfold(pid(), Tag, Bucket, Limiter, FoldAccT, SnapPreFold) -> + {async, Runner} when + Tag :: leveled_codec:tag(), + FoldAccT :: {FoldFun, Acc}, + FoldFun :: fun((Bucket, Key, Value, Acc) -> Acc), + Acc :: term(), + Bucket :: term(), + Key :: term(), + Value :: term(), + Limiter :: Range | Index, + Range :: {StartKey, EndKey} | all, + Index :: {IndexField, Start, End}, + IndexField::term(), + IndexVal::term(), + Start::IndexVal, + End::IndexVal, + StartKey :: Key, + EndKey :: Key, + SnapPreFold :: boolean(), + Runner :: fun(() -> Acc). +book_objectfold(Pid, Tag, Bucket, Limiter, FoldAccT, SnapPreFold) -> + RunnerType = + case Limiter of + all -> + {foldobjects_bybucket, Tag, Bucket, all, FoldAccT, SnapPreFold}; + Range when is_tuple(Range) andalso size(Range) == 2 -> + {foldobjects_bybucket, Tag, Bucket, Range, FoldAccT, SnapPreFold}; + IndexQuery when is_tuple(IndexQuery) andalso size(IndexQuery) == 3 -> + IndexQuery = Limiter, + {foldobjects_byindex, Tag, Bucket, IndexQuery, FoldAccT, SnapPreFold} + end, + book_returnfolder(Pid, RunnerType). + + +%% @doc LevelEd stores not just Keys in the ledger, but also may store +%% object metadata, referred to as heads (after Riak head request for +%% object metadata) Often when folding over objects all that is really +%% required is the object metadata. These "headfolds" are an efficient +%% way to fold over the ledger (possibly wholly in memory) and get +%% object metadata. +%% +%% Fold over the object's head. `Tag' is the tagged type of the +%% objects to fold over. `FoldAccT' is a 2-tuple. The 1st element is a +%% 4-arity fold fun, that takes a Bucket, Key, ProxyObject, and the +%% `Acc'. The ProxyObject is an object that only contains the +%% head/metadata, and no object data from the journal. The `Acc' in +%% the first call is that provided as the second element of `FoldAccT' +%% and thereafter the return of the previous all to the fold fun. If +%% `JournalCheck' is `true' then the journal is checked to see if the +%% object in the ledger is present, which means a snapshot of the +%% whole store is required, if `false', then no such check is +%% performed, and onlt ledger need be snapshotted. `SnapPreFold' is a +%% boolean that determines if the snapshot is taken when the folder is +%% requested `true', or when when run `false'. `SegmentList' can be +%% `false' meaning, all heads, or a list of integers that designate +%% segments in a TicTac Tree. +-spec book_headfold(pid(), Tag, FoldAccT, JournalCheck, SnapPreFold, SegmentList) -> + {async, Runner} when + Tag :: leveled_codec:tag(), + FoldAccT :: {FoldFun, Acc}, + FoldFun :: fun((Bucket, Key, Value, Acc) -> Acc), + Acc :: term(), + Bucket :: term(), + Key :: term(), + Value :: term(), + JournalCheck :: boolean(), + SnapPreFold :: boolean(), + SegmentList :: false | list(integer()), + Runner :: fun(() -> Acc). +book_headfold(Pid, Tag, FoldAccT, JournalCheck, SnapPreFold, SegmentList) -> + RunnerType = {foldheads_allkeys, Tag, FoldAccT, JournalCheck, SnapPreFold, SegmentList}, + book_returnfolder(Pid, RunnerType). + +%% @doc as book_headfold/6, but with the addition of a `Limiter' that +%% restricts the set of objects folded over. `Limiter' can either be a +%% bucket list, or a key range of a single bucket. For bucket list, +%% the `Limiter' should be a 2-tuple, the first element the tag +%% `bucket_list' and the second a `list()' of `Bucket'. Only heads +%% from the listed buckets will be folded over. A single bucket key +%% range may also be used as a `Limiter', in which case the argument +%% is a 3-tuple of `{range ,Bucket, Range}' where `Bucket' is a +%% bucket, and `Range' is a 2-tuple of start key and end key, +%% inclusive, or the atom `all'. The rest of the arguments are as +%% `book_headfold/6' +-spec book_headfold(pid(), Tag, Limiter, FoldAccT, JournalCheck, SnapPreFold, SegmentList) -> + {async, Runner} when + Tag :: leveled_codec:tag(), + Limiter :: BucketList | BucketKeyRange, + BucketList :: {bucket_list, list(Bucket)}, + BucketKeyRange :: {range, Bucket, KeyRange}, + KeyRange :: {StartKey, EndKey} | all, + StartKey :: Key, + EndKey :: Key, + FoldAccT :: {FoldFun, Acc}, + FoldFun :: fun((Bucket, Key, Value, Acc) -> Acc), + Acc :: term(), + Bucket :: term(), + Key :: term(), + Value :: term(), + JournalCheck :: boolean(), + SnapPreFold :: boolean(), + SegmentList :: false | list(integer()), + Runner :: fun(() -> Acc). +book_headfold(Pid, Tag, {bucket_list, BucketList}, FoldAccT, JournalCheck, SnapPreFold, SegmentList) -> + RunnerType = {foldheads_bybucket, Tag, BucketList, bucket_list, FoldAccT, JournalCheck, SnapPreFold, SegmentList}, + book_returnfolder(Pid, RunnerType); +book_headfold(Pid, Tag, {range, Bucket, KeyRange}, FoldAccT, JournalCheck, SnapPreFold, SegmentList) -> + RunnerType = {foldheads_bybucket, Tag, Bucket, KeyRange, FoldAccT, JournalCheck, SnapPreFold, SegmentList}, + book_returnfolder(Pid, RunnerType). -spec book_snapshot(pid(), store|ledger, @@ -605,8 +911,7 @@ book_hotbackup(Pid) -> %% given tag book_isempty(Pid, Tag) -> FoldAccT = {fun(_B, _Acc) -> false end, true}, - {async, Runner} = - gen_server:call(Pid, {return_runner, {first_bucket, Tag, FoldAccT}}), + {async, Runner} = book_bucketlist(Pid, Tag, FoldAccT, first), Runner(). %%%============================================================================ @@ -956,12 +1261,14 @@ snapshot_store(LedgerCache, Penciller, Inker, SnapType, Query, LongRunning) -> source_penciller = Penciller, snapshot_query = Query, snapshot_longrunning = LongRunning, + bookies_pid = self(), bookies_mem = BookiesMem}, {ok, LedgerSnapshot} = leveled_penciller:pcl_snapstart(PCLopts), case SnapType of store -> InkerOpts = #inker_options{start_snapshot=true, - source_inker=Inker}, + bookies_pid = self(), + source_inker=Inker}, {ok, JournalSnapshot} = leveled_inker:ink_snapstart(InkerOpts), {ok, LedgerSnapshot, JournalSnapshot}; ledger -> diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 04418a6..959c80b 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -142,6 +142,7 @@ cdb_options :: #cdb_options{} | undefined, clerk :: pid() | undefined, compaction_pending = false :: boolean(), + bookie_monref :: reference() | undefined, is_snapshot = false :: boolean(), compression_method = native :: lz4|native, compress_on_receipt = false :: boolean(), @@ -440,6 +441,9 @@ init([InkerOpts]) -> case {InkerOpts#inker_options.root_path, InkerOpts#inker_options.start_snapshot} of {undefined, true} -> + %% monitor the bookie, and close the snapshot when bookie + %% exits + BookieMonitor = erlang:monitor(process, InkerOpts#inker_options.bookies_pid), SrcInker = InkerOpts#inker_options.source_inker, {Manifest, ActiveJournalDB, @@ -448,6 +452,7 @@ init([InkerOpts]) -> active_journaldb = ActiveJournalDB, source_inker = SrcInker, journal_sqn = JournalSQN, + bookie_monref = BookieMonitor, is_snapshot = true}}; %% Need to do something about timeout {_RootPath, false} -> @@ -642,6 +647,12 @@ handle_cast({release_snapshot, Snapshot}, State) -> leveled_log:log("I0004", [length(Rs)]), {noreply, State#state{registered_snapshots=Rs}}. +%% handle the bookie stopping and stop this snapshot +handle_info({'DOWN', BookieMonRef, process, _BookiePid, _Info}, + State=#state{bookie_monref = BookieMonRef}) -> + %% Monitor only registered on snapshots + ok = ink_releasesnapshot(State#state.source_inker, self()), + {stop, normal, State}; handle_info(_Info, State) -> {noreply, State}. @@ -1379,4 +1390,43 @@ coverage_cheat_test() -> {noreply, _State0} = handle_info(timeout, #state{}), {ok, _State1} = code_change(null, #state{}, null). +handle_down_test() -> + RootPath = "../test/journal", + build_dummy_journal(), + CDBopts = #cdb_options{max_size=300000, binary_mode=true}, + {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, + cdb_options=CDBopts, + compression_method=native, + compress_on_receipt=true}), + + FakeBookie = spawn(fun loop/0), + + Mon = erlang:monitor(process, FakeBookie), + + SnapOpts = #inker_options{start_snapshot=true, + bookies_pid = FakeBookie, + source_inker=Ink1}, + + {ok, Snap1} = ink_snapstart(SnapOpts), + + FakeBookie ! stop, + + receive + {'DOWN', Mon, process, FakeBookie, normal} -> + %% Now we know that inker should have received this too! + %% (better than timer:sleep/1) + ok + end, + + ?assertEqual(undefined, erlang:process_info(Snap1)), + + ink_close(Ink1), + clean_testdir(RootPath). + +loop() -> + receive + stop -> + ok + end. + -endif. diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index ca0a5b6..c7e8b10 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -257,6 +257,7 @@ is_snapshot = false :: boolean(), snapshot_fully_loaded = false :: boolean(), source_penciller :: pid() | undefined, + bookie_monref :: reference() | undefined, levelzero_astree :: list() | undefined, work_ongoing = false :: boolean(), % i.e. compaction work @@ -583,6 +584,10 @@ init([PCLopts]) -> {undefined, _Snapshot=true, Query, BookiesMem} -> SrcPenciller = PCLopts#penciller_options.source_penciller, LongRunning = PCLopts#penciller_options.snapshot_longrunning, + %% monitor the bookie, and close the snapshot when bookie + %% exits + BookieMonitor = erlang:monitor(process, PCLopts#penciller_options.bookies_pid), + {ok, State} = pcl_registersnapshot(SrcPenciller, self(), Query, @@ -590,7 +595,8 @@ init([PCLopts]) -> LongRunning), leveled_log:log("P0001", [self()]), {ok, State#state{is_snapshot=true, - source_penciller=SrcPenciller}}; + bookie_monref = BookieMonitor, + source_penciller=SrcPenciller}}; {_RootPath, _Snapshot=false, _Q, _BM} -> start_from_file(PCLopts) end. @@ -943,6 +949,11 @@ handle_cast(work_for_clerk, State) -> end. +%% handle the bookie stopping and stop this snapshot +handle_info({'DOWN', BookieMonRef, process, _BookiePid, _Info}, + State=#state{bookie_monref = BookieMonRef}) -> + ok = pcl_releasesnapshot(State#state.source_penciller, self()), + {stop, normal, State}; handle_info(_Info, State) -> {noreply, State}. @@ -2145,4 +2156,57 @@ coverage_cheat_test() -> {noreply, _State0} = handle_info(timeout, #state{}), {ok, _State1} = code_change(null, #state{}, null). +handle_down_test() -> + RootPath = "../test/ledger", + clean_testdir(RootPath), + {ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath, + max_inmemory_tablesize=1000, + compression_method=native}), + FakeBookie = spawn(fun loop/0), + + Mon = erlang:monitor(process, FakeBookie), + + FakeBookie ! {snap, PCLr, self()}, + + {ok, PclSnap, null} = + receive + {FakeBookie, {ok, Snap, null}} -> + {ok, Snap, null} + end, + + FakeBookie ! stop, + + receive + {'DOWN', Mon, process, FakeBookie, normal} -> + %% Now we know that pclr should have received this too! + %% (better than timer:sleep/1) + ok + end, + + ?assertEqual(undefined, erlang:process_info(PclSnap)), + + pcl_close(PCLr), + clean_testdir(RootPath). + +%% the fake bookie. Some calls to leveled_bookie (like the two below) +%% do not go via the gen_server (but it looks like they expect to be +%% called by the gen_server, internally!) they use "self()" to +%% populate the bookie's pid in the pclr. This process wrapping the +%% calls ensures that the TEST controls the bookie's Pid. The +%% FakeBookie. +loop() -> + receive + {snap, PCLr, TestPid} -> + Res = leveled_bookie:snapshot_store(leveled_bookie:empty_ledgercache(), + PCLr, + null, + ledger, + undefined, + false), + TestPid ! {self(), Res}, + loop(); + stop -> + ok + end. + -endif. diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 72028be..32991d4 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -579,8 +579,9 @@ space_clear_ondelete(_Config) -> G2), FoldKeysFun = fun(B, K, Acc) -> [{B, K}|Acc] end, - AllKeyQuery = {keylist, o_rkv, {FoldKeysFun, []}}, - {async, F1} = leveled_bookie:book_returnfolder(Book1, AllKeyQuery), + + {async, F1} = leveled_bookie:book_keylist(Book1, o_rkv, {FoldKeysFun, []}), + SW1 = os:timestamp(), KL1 = F1(), ok = case length(KL1) of @@ -594,19 +595,20 @@ space_clear_ondelete(_Config) -> {ok, FNsA_J} = file:list_dir(RootPath ++ "/journal/journal_files"), io:format("FNsA - Bookie created ~w journal files and ~w ledger files~n", [length(FNsA_J), length(FNsA_L)]), - + % Get an iterator to lock the inker during compaction FoldObjectsFun = fun(B, K, ObjBin, Acc) -> [{B, K, erlang:phash2(ObjBin)}|Acc] end, - {async, HTreeF1} = leveled_bookie:book_returnfolder(Book1, - {foldobjects_allkeys, - ?RIAK_TAG, - FoldObjectsFun, - false}), + + {async, HTreeF1} = leveled_bookie:book_objectfold(Book1, + ?RIAK_TAG, + {FoldObjectsFun, []}, + false), + % This query does not Snap PreFold - and so will not prevent % pending deletes from prompting actual deletes - {async, KF1} = leveled_bookie:book_returnfolder(Book1, AllKeyQuery), + {async, KF1} = leveled_bookie:book_keylist(Book1, o_rkv, {FoldKeysFun, []}), % This query does Snap PreFold, and so will prevent deletes from % the ledger @@ -662,7 +664,7 @@ space_clear_ondelete(_Config) -> "after deletes~n", [PointB_Journals, length(FNsB_L)]), - {async, F2} = leveled_bookie:book_returnfolder(Book1, AllKeyQuery), + {async, F2} = leveled_bookie:book_keylist(Book1, o_rkv, {FoldKeysFun, []}), SW3 = os:timestamp(), KL2 = F2(), ok = case length(KL2) of @@ -674,7 +676,7 @@ space_clear_ondelete(_Config) -> ok = leveled_bookie:book_close(Book1), {ok, Book2} = leveled_bookie:book_start(StartOpts1), - {async, F3} = leveled_bookie:book_returnfolder(Book2, AllKeyQuery), + {async, F3} = leveled_bookie:book_keylist(Book2, o_rkv, {FoldKeysFun, []}), SW4 = os:timestamp(), KL3 = F3(), ok = case length(KL3) of @@ -842,4 +844,4 @@ many_put_fetch_switchcompression(_Config) -> testutil:check_forlist(Bookie3, ChkListFixed), testutil:check_forobject(Bookie3, TestObject), testutil:check_formissingobject(Bookie3, "Bookie1", "MissingKey0123"), - ok = leveled_bookie:book_destroy(Bookie3). \ No newline at end of file + ok = leveled_bookie:book_destroy(Bookie3). diff --git a/test/end_to_end/iterator_SUITE.erl b/test/end_to_end/iterator_SUITE.erl index 4a78ed6..e7940d8 100644 --- a/test/end_to_end/iterator_SUITE.erl +++ b/test/end_to_end/iterator_SUITE.erl @@ -10,6 +10,7 @@ small_load_with2i/1, query_count/1, multibucket_fold/1, + foldobjects_bybucket_range/1, rotating_objects/1]). all() -> [ @@ -17,7 +18,8 @@ all() -> [ small_load_with2i, query_count, multibucket_fold, - rotating_objects + rotating_objects, + foldobjects_bybucket_range ]. @@ -34,14 +36,15 @@ single_object_with2i(_Config) -> TestSpec = [{add, list_to_binary("integer_int"), 100}, {add, list_to_binary("binary_bin"), <<100:32/integer>>}], ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), - - IdxQ1 = {index_query, - "Bucket1", - {fun testutil:foldkeysfun/3, []}, - {list_to_binary("binary_bin"), - <<99:32/integer>>, <<101:32/integer>>}, - {true, undefined}}, - {async, IdxFolder1} = leveled_bookie:book_returnfolder(Bookie1, IdxQ1), + + %% @TODO replace all index queries with new Top-Level API if tests + %% pass + {async, IdxFolder1} = leveled_bookie:book_indexfold(Bookie1, + "Bucket1", + {fun testutil:foldkeysfun/3, []}, + {list_to_binary("binary_bin"), + <<99:32/integer>>, <<101:32/integer>>}, + {true, undefined}), R1 = IdxFolder1(), io:format("R1 of ~w~n", [R1]), true = [{<<100:32/integer>>,"Key1"}] == R1, @@ -127,28 +130,26 @@ small_load_with2i(_Config) -> %% Get the Buckets Keys and Hashes for the whole bucket FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, erlang:phash2(V)}|Acc] end, - {async, HTreeF1} = leveled_bookie:book_returnfolder(Bookie1, - {foldobjects_allkeys, - ?RIAK_TAG, - FoldObjectsFun, - false}), + + {async, HTreeF1} = leveled_bookie:book_objectfold(Bookie1, + ?RIAK_TAG, + {FoldObjectsFun, []}, + false), + KeyHashList1 = HTreeF1(), - {async, HTreeF2} = leveled_bookie:book_returnfolder(Bookie1, - {foldobjects_bybucket, - ?RIAK_TAG, - "Bucket", - all, - FoldObjectsFun, - false}), + {async, HTreeF2} = leveled_bookie:book_objectfold(Bookie1, + ?RIAK_TAG, + "Bucket", + all, + {FoldObjectsFun, []}, + false), KeyHashList2 = HTreeF2(), - {async, HTreeF3} = leveled_bookie:book_returnfolder(Bookie1, - {foldobjects_byindex, - ?RIAK_TAG, - "Bucket", - {"idx1_bin", - "#", "|"}, - FoldObjectsFun, - false}), + {async, HTreeF3} = leveled_bookie:book_objectfold(Bookie1, + ?RIAK_TAG, + "Bucket", + {"idx1_bin", "#", "|"}, + {FoldObjectsFun, []}, + false), KeyHashList3 = HTreeF3(), true = 9901 == length(KeyHashList1), % also includes the test object true = 9900 == length(KeyHashList2), @@ -173,14 +174,12 @@ small_load_with2i(_Config) -> true = Total2 == Total1, FoldBucketsFun = fun(B, Acc) -> sets:add_element(B, Acc) end, + % this should find Bucket and Bucket1 - as we can now find string-based % buckets using bucket_list - i.e. it isn't just binary buckets now - BucketListQuery = {bucket_list, - ?RIAK_TAG, - {FoldBucketsFun, sets:new()}}, - {async, BL} = leveled_bookie:book_returnfolder(Bookie2, BucketListQuery), + {async, BL} = leveled_bookie:book_bucketlist(Bookie2, ?RIAK_TAG, {FoldBucketsFun, sets:new()}, all), true = sets:size(BL()) == 2, - + ok = leveled_bookie:book_close(Bookie2), testutil:reset_filestructure(). @@ -511,16 +510,20 @@ multibucket_fold(_Config) -> IndexGen, <<"Bucket4">>), testutil:riakload(Bookie1, ObjL4), - Q1 = {foldheads_bybucket, - ?RIAK_TAG, - [<<"Bucket1">>, <<"Bucket4">>], bucket_list, - fun(B, K, _PO, Acc) -> + + FF = fun(B, K, _PO, Acc) -> [{B, K}|Acc] - end, - false, - true, - false}, - {async, R1} = leveled_bookie:book_returnfolder(Bookie1, Q1), + end, + FoldAccT = {FF, []}, + + {async, R1} = leveled_bookie:book_headfold(Bookie1, + ?RIAK_TAG, + {bucket_list, [<<"Bucket1">>, <<"Bucket4">>]}, + FoldAccT, + false, + true, + false), + O1 = length(R1()), io:format("Result R1 of length ~w~n", [O1]), @@ -544,10 +547,6 @@ multibucket_fold(_Config) -> ok = leveled_bookie:book_close(Bookie1), testutil:reset_filestructure(). - - - - rotating_objects(_Config) -> RootPath = testutil:reset_filestructure(), ok = testutil:rotating_object_check(RootPath, "Bucket1", 10), @@ -558,8 +557,50 @@ rotating_objects(_Config) -> ok = testutil:rotating_object_check(RootPath, "Bucket6", 9600), testutil:reset_filestructure(). +foldobjects_bybucket_range(_Config) -> + RootPath = testutil:reset_filestructure(), + {ok, Bookie1} = leveled_bookie:book_start(RootPath, + 2000, + 50000000, + testutil:sync_strategy()), + ObjectGen = testutil:get_compressiblevalue_andinteger(), + IndexGen = fun() -> [] end, + ObjL1 = testutil:generate_objects(1300, + {fixed_binary, 1}, + [], + ObjectGen, + IndexGen, + <<"Bucket1">>), + testutil:riakload(Bookie1, ObjL1), + FoldKeysFun = fun(_B, K,_V, Acc) -> + [ K |Acc] + end, + StartKey = testutil:fixed_bin_key(123), + EndKey = testutil:fixed_bin_key(779), - + {async, Folder} = leveled_bookie:book_objectfold(Bookie1, + ?RIAK_TAG, + <<"Bucket1">>, + {StartKey, EndKey}, {FoldKeysFun, []}, + true + ), + ResLen = length(Folder()), + io:format("Length of Result of folder ~w~n", [ResLen]), + true = 657 == ResLen, + {async, AllFolder} = leveled_bookie:book_objectfold(Bookie1, + ?RIAK_TAG, + <<"Bucket1">>, + all, + {FoldKeysFun, []}, + true + ), + + AllResLen = length(AllFolder()), + io:format("Length of Result of all keys folder ~w~n", [AllResLen]), + true = 1300 == AllResLen, + + ok = leveled_bookie:book_close(Bookie1), + testutil:reset_filestructure(). diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index 19109aa..50ab518 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -174,11 +174,12 @@ aae_missingjournal(_Config) -> fun(B, K, _V, Acc) -> [{B, K}|Acc] end, {async, AllHeadF1} = - leveled_bookie:book_returnfolder(Bookie1, - {foldheads_allkeys, - ?RIAK_TAG, - FoldHeadsFun, - true, true, false}), + leveled_bookie:book_headfold(Bookie1, + ?RIAK_TAG, + {FoldHeadsFun, []}, + true, + true, + false), HeadL1 = length(AllHeadF1()), io:format("Fold head returned ~w objects~n", [HeadL1]), diff --git a/test/end_to_end/riak_SUITE.erl b/test/end_to_end/riak_SUITE.erl index 6deb6b4..8e688cd 100644 --- a/test/end_to_end/riak_SUITE.erl +++ b/test/end_to_end/riak_SUITE.erl @@ -425,26 +425,12 @@ handoff(_Config) -> end, % Handoff the data from the first store to the other three stores - HandoffFolder2 = - {foldobjects_allkeys, - ?RIAK_TAG, - {FoldObjectsFun(Bookie2), ok}, - false, - key_order}, - HandoffFolder3 = - {foldobjects_allkeys, - ?RIAK_TAG, - {FoldObjectsFun(Bookie3), ok}, - true, - sqn_order}, - HandoffFolder4 = - {foldobjects_allkeys, - ?RIAK_TAG, - {FoldObjectsFun(Bookie4), ok}, - true, - sqn_order}, {async, Handoff2} = - leveled_bookie:book_returnfolder(Bookie1, HandoffFolder2), + leveled_bookie:book_objectfold(Bookie1, + ?RIAK_TAG, + {FoldObjectsFun(Bookie2), ok}, + false, + key_order), SW2 = os:timestamp(), ok = Handoff2(), Time_HO2 = timer:now_diff(os:timestamp(), SW2)/1000, @@ -452,14 +438,23 @@ handoff(_Config) -> [Time_HO2]), SW3 = os:timestamp(), {async, Handoff3} = - leveled_bookie:book_returnfolder(Bookie1, HandoffFolder3), + leveled_bookie:book_objectfold(Bookie1, + ?RIAK_TAG, + {FoldObjectsFun(Bookie3), ok}, + true, + sqn_order), ok = Handoff3(), Time_HO3 = timer:now_diff(os:timestamp(), SW3)/1000, io:format("Handoff to Book3 in sqn_order took ~w milliseconds ~n", [Time_HO3]), SW4 = os:timestamp(), {async, Handoff4} = - leveled_bookie:book_returnfolder(Bookie1, HandoffFolder4), + leveled_bookie:book_objectfold(Bookie1, + ?RIAK_TAG, + {FoldObjectsFun(Bookie4), ok}, + true, + sqn_order), + ok = Handoff4(), Time_HO4 = timer:now_diff(os:timestamp(), SW4)/1000, io:format("Handoff to Book4 in sqn_order took ~w milliseconds ~n", @@ -529,9 +524,12 @@ dollar_key_index(_Config) -> StartKey = testutil:fixed_bin_key(123), EndKey = testutil:fixed_bin_key(779), - Query = {keylist, ?RIAK_TAG, <<"Bucket1">>, {StartKey, EndKey}, {FoldKeysFun, []}}, - {async, Folder} = leveled_bookie:book_returnfolder(Bookie1, Query), + {async, Folder} = leveled_bookie:book_keylist(Bookie1, + ?RIAK_TAG, + <<"Bucket1">>, + {StartKey, EndKey}, {FoldKeysFun, []} + ), ResLen = length(Folder()), io:format("Length of Result of folder ~w~n", [ResLen]), true = 657 == ResLen, @@ -575,10 +573,9 @@ dollar_bucket_index(_Config) -> FoldKeysFun = fun(B, K, Acc) -> [{B, K}|Acc] end, + FoldAccT = {FoldKeysFun, []}, - Query = {keylist, ?RIAK_TAG, <<"Bucket2">>, {FoldKeysFun, []}}, - - {async, Folder} = leveled_bookie:book_returnfolder(Bookie1, Query), + {async, Folder} = leveled_bookie:book_keylist(Bookie1, ?RIAK_TAG, <<"Bucket2">>, FoldAccT), ResLen = length(Folder()), io:format("Length of Result of folder ~w~n", [ResLen]), diff --git a/test/end_to_end/tictac_SUITE.erl b/test/end_to_end/tictac_SUITE.erl index 1c5f3f3..63cb632 100644 --- a/test/end_to_end/tictac_SUITE.erl +++ b/test/end_to_end/tictac_SUITE.erl @@ -147,14 +147,16 @@ many_put_compare(_Config) -> leveled_tictac:add_kv(Acc, Key, Value, ExtractClockFun) end, - FoldQ0 = {foldheads_bybucket, - o_rkv, - "Bucket", - all, - {FoldObjectsFun, leveled_tictac:new_tree(0, TreeSize)}, - false, true, false}, + FoldAccT = {FoldObjectsFun, leveled_tictac:new_tree(0, TreeSize)}, {async, TreeAObjFolder0} = - leveled_bookie:book_returnfolder(Bookie2, FoldQ0), + leveled_bookie:book_headfold(Bookie2, + o_rkv, + {range, "Bucket", all}, + FoldAccT, + false, + true, + false), + SWB0Obj = os:timestamp(), TreeAObj0 = TreeAObjFolder0(), io:format("Build tictac tree via object fold with no "++ @@ -1338,4 +1340,4 @@ get_segment(K, SegmentCount) -> end, {SegmentID, ExtraHash} = leveled_codec:segment_hash(BinKey), SegHash = (ExtraHash band 65535) bsl 16 + SegmentID, - leveled_tictac:get_segment(SegHash, SegmentCount). \ No newline at end of file + leveled_tictac:get_segment(SegHash, SegmentCount).