diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 53f931e..7eb4e7a 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -57,8 +57,7 @@ book_head/3, book_head/4, book_returnfolder/2, - book_snapshotstore/3, - book_snapshotledger/3, + book_snapshot/4, book_compactjournal/2, book_islastcompactionpending/1, book_close/1, @@ -69,7 +68,7 @@ empty_ledgercache/0, loadqueue_ledgercache/1, push_ledgercache/2, - snapshot_store/5, + snapshot_store/6, fetch_value/2]). -include_lib("eunit/include/eunit.hrl"). @@ -78,7 +77,6 @@ -define(JOURNAL_FP, "journal"). -define(LEDGER_FP, "ledger"). -define(SNAPSHOT_TIMEOUT, 300000). --define(CHECKJOURNAL_PROB, 0.2). -define(CACHE_SIZE_JITTER, 25). -define(JOURNAL_SIZE_JITTER, 20). -define(LONG_RUNNING, 80000). @@ -102,6 +100,7 @@ put_timing :: tuple() | undefined, get_timing :: tuple() | undefined}). + -type book_state() :: #state{}. %%%============================================================================ @@ -280,7 +279,10 @@ book_head(Pid, Bucket, Key, Tag) -> %% %% If there is a snapshot request (e.g. to iterate over the keys) the Bookie %% may request a clone of the Penciller, or clones of both the Penciller and -%% the Inker should values also need to be accessed. +%% the Inker should values also need to be accessed. The snapshot clone is +%% made available through a "runner" - a new trasnportable PID through which +%% the previous state of the store can be queried. So, for example, a +%% riak_kv_vnode_worker in the pool could host the runner. %% %% The clone is seeded with the manifest SQN. The clone should be registered %% with the real Inker/Penciller, so that the real Inker/Penciller may prevent @@ -296,7 +298,8 @@ book_head(Pid, Bucket, Key, Tag) -> %% StateData. %% %% There are a series of specific folders implemented that provide pre-canned -%% snapshot functionality: +%% snapshot functionality, more folders can be seen in the get_runner/2 +%% function: %% %% {bucket_stats, Bucket} -> return a key count and total object size within %% a bucket @@ -311,26 +314,6 @@ book_head(Pid, Bucket, Key, Tag) -> %% {keylist, Tag, {FoldKeysFun, Acc}} -> list all keys with tag %% {keylist, Tag, Bucket, {FoldKeysFun, Acc}} -> list all keys within given %% bucket -%% {hashlist_query, Tag, JournalCheck} -> return keys and hashes for all -%% objects with a given tag -%% {tictactree_idx, -%% {Bucket, IdxField, StartValue, EndValue}, -%% TreeSize, -%% PartitionFilter} -%% -> compile a hashtree for the items on the index. A partition filter is -%% required to avoid adding an index entry in this vnode as a fallback. -%% There is no de-duplicate of results, duplicate reuslts corrupt the tree. -%% {tictactree_obj, -%% {Bucket, StartKey, EndKey, CheckPresence}, -%% TreeSize, -%% PartitionFilter} -%% -> compile a hashtree for all the objects in the range. A partition filter -%% may be passed to restrict the query to a given partition on this vnode. The -%% filter should bea function that takes (Bucket, Key) as inputs and outputs -%% one of the atoms accumulate or pass. There is no de-duplicate of results, -%% duplicate reuslts corrupt the tree. -%% CheckPresence can be used if there is a need to do a deeper check to ensure -%% that the object is in the Journal (or at least indexed within the Journal). %% {foldobjects_bybucket, Tag, Bucket, FoldObjectsFun} -> fold over all objects %% in a given bucket %% {foldobjects_byindex, @@ -340,14 +323,18 @@ book_head(Pid, Bucket, Key, Tag) -> %% FoldObjectsFun} -> fold over all objects with an entry in a given %% range on a given index -book_returnfolder(Pid, FolderType) -> - gen_server:call(Pid, {return_folder, FolderType}, infinity). +book_returnfolder(Pid, RunnerType) -> + gen_server:call(Pid, {return_runner, RunnerType}, infinity). -book_snapshotstore(Pid, Requestor, Timeout) -> - gen_server:call(Pid, {snapshot, Requestor, store, Timeout}, infinity). +%% @doc create a snapshot of the store +%% +%% Snapshot can be based on a pre-defined query (which will be used to filter +%% caches prior to copying for the snapshot), and can be defined as long +%% running to avoid timeouts (snapshots are generally expected to be required +%% for < 60s) -book_snapshotledger(Pid, Requestor, Timeout) -> - gen_server:call(Pid, {snapshot, Requestor, ledger, Timeout}, infinity). +book_snapshot(Pid, SnapType, Query, LongRunning) -> + gen_server:call(Pid, {snapshot, SnapType, Query, LongRunning}, infinity). %% @doc Call for compaction of the Journal %% @@ -411,9 +398,8 @@ init([Opts]) -> ledger_cache=#ledger_cache{mem = NewETS}, is_snapshot=false}}; Bookie -> - {ok, Penciller, Inker} = book_snapshotstore(Bookie, - self(), - ?SNAPSHOT_TIMEOUT), + {ok, Penciller, Inker} = + book_snapshot(Bookie, store, undefined, true), leveled_log:log("B0002", [Inker, Penciller]), {ok, #state{penciller=Penciller, inker=Inker, @@ -518,122 +504,14 @@ handle_call({head, Bucket, Key, Tag}, _From, State) -> end end end; -handle_call({snapshot, _Requestor, SnapType, _Timeout}, _From, State) -> - % TODO: clean-up passing of Requestor (which was previously just used in - % logs) and so can now be ignored, and timeout which is ignored - but - % probably shouldn't be. - Reply = snapshot_store(State, SnapType), +handle_call({snapshot, SnapType, Query, LongRunning}, _From, State) -> + % Snapshot the store, specifying if the snapshot should be long running + % (i.e. will the snapshot be queued or be required for an extended period + % e.g. many minutes) + Reply = snapshot_store(State, SnapType, Query, LongRunning), {reply, Reply, State}; -handle_call({return_folder, FolderType}, _From, State) -> - case FolderType of - {bucket_stats, Bucket} -> - {reply, - bucket_stats(State, Bucket, ?STD_TAG), - State}; - {riakbucket_stats, Bucket} -> - {reply, - bucket_stats(State, Bucket, ?RIAK_TAG), - State}; - {binary_bucketlist, Tag, {FoldKeysFun, Acc}} -> - {reply, - binary_bucketlist(State, Tag, {FoldKeysFun, Acc}), - State}; - {index_query, - Constraint, - {FoldKeysFun, Acc}, - {IdxField, StartValue, EndValue}, - {ReturnTerms, TermRegex}} -> - {Bucket, StartObjKey} = - case Constraint of - {B, SK} -> - {B, SK}; - B -> - {B, null} - end, - {reply, - index_query(State, - Bucket, - StartObjKey, - {FoldKeysFun, Acc}, - {IdxField, StartValue, EndValue}, - {ReturnTerms, TermRegex}), - State}; - {keylist, Tag, {FoldKeysFun, Acc}} -> - {reply, - allkey_query(State, Tag, {FoldKeysFun, Acc}), - State}; - {keylist, Tag, Bucket, {FoldKeysFun, Acc}} -> - {reply, - bucketkey_query(State, Tag, Bucket, {FoldKeysFun, Acc}), - State}; - {hashlist_query, Tag, JournalCheck} -> - {reply, - hashlist_query(State, Tag, JournalCheck), - State}; - {tictactree_obj, - {Tag, Bucket, StartKey, EndKey, CheckPresence}, - TreeSize, - PartitionFilter} -> - {reply, - tictactree(State, - Tag, - Bucket, - {StartKey, EndKey}, - CheckPresence, - TreeSize, - PartitionFilter), - State}; - {tictactree_idx, - {Bucket, IdxField, StartValue, EndValue}, - TreeSize, - PartitionFilter} -> - {reply, - tictactree(State, - ?IDX_TAG, - Bucket, - {IdxField, StartValue, EndValue}, - false, - TreeSize, - PartitionFilter), - State}; - {foldheads_allkeys, Tag, FoldHeadsFun, - CheckPresence, SnapPreFold} -> - {reply, - foldheads_allkeys(State, Tag, - FoldHeadsFun, - CheckPresence, - SnapPreFold), - State}; - {foldheads_bybucket, Tag, Bucket, KeyRange, FoldHeadsFun, - CheckPresence, SnapPreFold} -> - {reply, - foldheads_bybucket(State, Tag, Bucket, - KeyRange, - FoldHeadsFun, - CheckPresence, - SnapPreFold), - State}; - {foldobjects_allkeys, Tag, FoldObjectsFun} -> - {reply, - foldobjects_allkeys(State, Tag, FoldObjectsFun), - State}; - {foldobjects_bybucket, Tag, Bucket, FoldObjectsFun} -> - {reply, - foldobjects_bybucket(State, Tag, Bucket, FoldObjectsFun), - State}; - {foldobjects_byindex, - Tag, - Bucket, - {Field, FromTerm, ToTerm}, - FoldObjectsFun} -> - {reply, - foldobjects_byindex(State, - Tag, Bucket, - Field, FromTerm, ToTerm, - FoldObjectsFun), - State} - - end; +handle_call({return_runner, QueryType}, _From, State) -> + {reply, get_runner(State, QueryType), State}; handle_call({compact_journal, Timeout}, _From, State) -> ok = leveled_inker:ink_compactjournal(State#state.inker, self(), @@ -707,27 +585,32 @@ loadqueue_ledgercache(Cache) -> %% range queries and not direct fetch requests. {StartKey, EndKey} if the the %% snapshot is to be used for one specific query only (this is much quicker to %% setup, assuming the range is a small subset of the overall key space). -snapshot_store(LedgerCache0, Penciller, Inker, SnapType, Query) -> - LedgerCache = readycache_forsnapshot(LedgerCache0, Query), - BookiesMem = {LedgerCache#ledger_cache.loader, - LedgerCache#ledger_cache.index, - LedgerCache#ledger_cache.min_sqn, - LedgerCache#ledger_cache.max_sqn}, - LongRunning = - case Query of - undefined -> - true; - no_lookup -> - true; - _ -> - % If a specific query has been defined, then not expected - % to be long running - false +snapshot_store(LedgerCache, Penciller, Inker, SnapType, Query, LongRunning) -> + LedgerCacheReady = readycache_forsnapshot(LedgerCache, Query), + BookiesMem = {LedgerCacheReady#ledger_cache.loader, + LedgerCacheReady#ledger_cache.index, + LedgerCacheReady#ledger_cache.min_sqn, + LedgerCacheReady#ledger_cache.max_sqn}, + LongRunning0 = + case LongRunning of + undefined -> + case Query of + undefined -> + true; + no_lookup -> + true; + _ -> + % If a specific query has been defined, then not expected + % to be long running + false + end; + TrueOrFalse -> + TrueOrFalse end, PCLopts = #penciller_options{start_snapshot = true, source_penciller = Penciller, snapshot_query = Query, - snapshot_longrunning = LongRunning, + snapshot_longrunning = LongRunning0, bookies_mem = BookiesMem}, {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), case SnapType of @@ -740,15 +623,13 @@ snapshot_store(LedgerCache0, Penciller, Inker, SnapType, Query) -> {ok, LedgerSnapshot, null} end. -snapshot_store(State, SnapType) -> - snapshot_store(State, SnapType, undefined). - -snapshot_store(State, SnapType, Query) -> +snapshot_store(State, SnapType, Query, LongRunning) -> snapshot_store(State#state.ledger_cache, State#state.penciller, State#state.inker, SnapType, - Query). + Query, + LongRunning). fetch_value(Inker, {Key, SQN}) -> SW = os:timestamp(), @@ -765,6 +646,178 @@ fetch_value(Inker, {Key, SQN}) -> %%% Internal functions %%%============================================================================ +return_snapfun(State, SnapType, Query, LongRunning, SnapPreFold) -> + case SnapPreFold of + true -> + {ok, LS, JS} = snapshot_store(State, SnapType, Query, LongRunning), + fun() -> {ok, LS, JS} end; + false -> + Self = self(), + % Timeout will be ignored, as will Requestor + % + % This uses the external snapshot - as the snapshot will need + % to have consistent state between Bookie and Penciller when + % it is made. + fun() -> book_snapshot(Self, SnapType, Query, LongRunning) end + end. + +snaptype_by_presence(true) -> + store; +snaptype_by_presence(false) -> + ledger. + +-spec get_runner(book_state(), tuple()) -> {async, fun()}. +%% @doc +%% Getan {async, Runner} for a given fold type. Fold types have different +%% tuple inputs +get_runner(State, {index_query, Constraint, FoldAccT, Range, TermHandling}) -> + {IdxFld, StartT, EndT} = Range, + {Bucket, ObjKey0} = + case Constraint of + {B, SK} -> + {B, SK}; + B -> + {B, null} + end, + StartKey = + leveled_codec:to_ledgerkey(Bucket, ObjKey0, ?IDX_TAG, IdxFld, StartT), + EndKey = + leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, IdxFld, EndT), + SnapFun = return_snapfun(State, ledger, {StartKey, EndKey}, false, false), + leveled_runner:index_query(SnapFun, + {StartKey, EndKey, TermHandling}, + FoldAccT); +get_runner(State, {keylist, Tag, FoldAccT}) -> + SnapFun = return_snapfun(State, ledger, no_lookup, true, true), + leveled_runner:bucketkey_query(SnapFun, Tag, null, FoldAccT); +get_runner(State, {keylist, Tag, Bucket, FoldAccT}) -> + SnapFun = return_snapfun(State, ledger, no_lookup, true, true), + leveled_runner:bucketkey_query(SnapFun, Tag, Bucket, FoldAccT); + +%% Set of runners for object or metadata folds +get_runner(State, + {foldheads_allkeys, Tag, FoldFun, JournalCheck, SnapPreFold}) -> + SnapType = snaptype_by_presence(JournalCheck), + SnapFun = return_snapfun(State, SnapType, no_lookup, true, SnapPreFold), + leveled_runner:foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck); +get_runner(State, + {foldobjects_allkeys, Tag, FoldFun, SnapPreFold}) -> + SnapFun = return_snapfun(State, store, no_lookup, true, SnapPreFold), + leveled_runner:foldobjects_allkeys(SnapFun, Tag, FoldFun); +get_runner(State, + {foldheads_bybucket, + Tag, Bucket, KeyRange, + FoldFun, + JournalCheck, SnapPreFold}) -> + {StartKey, EndKey, SnapQ} = return_ledger_keyrange(Tag, Bucket, KeyRange), + SnapType = snaptype_by_presence(JournalCheck), + SnapFun = return_snapfun(State, SnapType, SnapQ, true, SnapPreFold), + leveled_runner:foldheads_bybucket(SnapFun, + {Tag, StartKey, EndKey}, + FoldFun, + JournalCheck); +get_runner(State, + {foldobjects_bybucket, + Tag, Bucket, KeyRange, + FoldFun, + SnapPreFold}) -> + {StartKey, EndKey, SnapQ} = return_ledger_keyrange(Tag, Bucket, KeyRange), + SnapFun = return_snapfun(State, store, SnapQ, true, SnapPreFold), + leveled_runner:foldobjects_bybucket(SnapFun, + {Tag, StartKey, EndKey}, + FoldFun); +get_runner(State, + {foldobjects_byindex, + Tag, Bucket, {Field, FromTerm, ToTerm}, + FoldObjectsFun, + SnapPreFold}) -> + SnapFun = return_snapfun(State, store, no_lookup, true, SnapPreFold), + leveled_runner:foldobjects_byindex(SnapFun, + {Tag, Bucket, Field, FromTerm, ToTerm}, + FoldObjectsFun); + +%% Set of specific runners, primarily used as exmaples for tests +get_runner(State, DeprecatedQuery) -> + get_deprecatedrunner(State, DeprecatedQuery). + + +-spec get_deprecatedrunner(book_state(), tuple()) -> {async, fun()}. +%% @doc +%% Get an {async, Runner} for a given fold type. Fold types have different +%% tuple inputs. These folds are currentyl used in tests, but are deprecated. +%% Most of these folds should be achievable through other available folds. +get_deprecatedrunner(State, {bucket_stats, Bucket}) -> + SnapFun = return_snapfun(State, ledger, no_lookup, true, true), + leveled_runner:bucket_sizestats(SnapFun, Bucket, ?STD_TAG); +get_deprecatedrunner(State, {riakbucket_stats, Bucket}) -> + SnapFun = return_snapfun(State, ledger, no_lookup, true, true), + leveled_runner:bucket_sizestats(SnapFun, Bucket, ?RIAK_TAG); +get_deprecatedrunner(State, {binary_bucketlist, Tag, FoldAccT}) -> + {FoldKeysFun, Acc} = FoldAccT, + SnapFun = return_snapfun(State, ledger, no_lookup, false, false), + leveled_runner:binary_bucketlist(SnapFun, Tag, FoldKeysFun, Acc); +get_deprecatedrunner(State, {hashlist_query, Tag, JournalCheck}) -> + SnapType = snaptype_by_presence(JournalCheck), + SnapFun = return_snapfun(State, SnapType, no_lookup, true, true), + leveled_runner:hashlist_query(SnapFun, Tag, JournalCheck); +get_deprecatedrunner(State, + {tictactree_obj, + {Tag, Bucket, StartK, EndK, JournalCheck}, + TreeSize, + PartitionFilter}) -> + SnapType = snaptype_by_presence(JournalCheck), + SnapFun = return_snapfun(State, SnapType, no_lookup, true, true), + leveled_runner:tictactree(SnapFun, + {Tag, Bucket, {StartK, EndK}}, + JournalCheck, + TreeSize, + PartitionFilter); +get_deprecatedrunner(State, + {tictactree_idx, + {Bucket, IdxField, StartK, EndK}, + TreeSize, + PartitionFilter}) -> + SnapFun = return_snapfun(State, ledger, no_lookup, true, true), + leveled_runner:tictactree(SnapFun, + {?IDX_TAG, Bucket, {IdxField, StartK, EndK}}, + false, + TreeSize, + PartitionFilter). + + +-spec return_ledger_keyrange(atom(), any(), tuple()) -> + {tuple(), tuple(), tuple()|no_lookup}. +%% @doc +%% Convert a range of binary keys into a ledger key range, returning +%% {StartLK, EndLK, Query} where Query is to indicate whether the query +%% range is worth using to minimise the cost of the snapshot +return_ledger_keyrange(Tag, Bucket, KeyRange) -> + {StartKey, EndKey, Snap} = + case KeyRange of + all -> + {leveled_codec:to_ledgerkey(Bucket, null, Tag), + leveled_codec:to_ledgerkey(Bucket, null, Tag), + false}; + {StartTerm, <<"$all">>} -> + {leveled_codec:to_ledgerkey(Bucket, StartTerm, Tag), + leveled_codec:to_ledgerkey(Bucket, null, Tag), + false}; + {StartTerm, EndTerm} -> + {leveled_codec:to_ledgerkey(Bucket, StartTerm, Tag), + leveled_codec:to_ledgerkey(Bucket, EndTerm, Tag), + true} + end, + SnapQuery = + case Snap of + true -> + {StartKey, EndKey}; + false -> + no_lookup + end, + {StartKey, EndKey, SnapQuery}. + + + maybe_longrunning(SW, Aspect) -> case timer:now_diff(os:timestamp(), SW) of N when N > ?LONG_RUNNING -> @@ -773,342 +826,6 @@ maybe_longrunning(SW, Aspect) -> ok end. -bucket_stats(State, Bucket, Tag) -> - {ok, LedgerSnapshot, _JournalSnapshot} = snapshot_store(State, - ledger, - no_lookup), - Folder = fun() -> - StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), - EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), - AccFun = accumulate_size(), - Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, - StartKey, - EndKey, - AccFun, - {0, 0}), - ok = leveled_penciller:pcl_close(LedgerSnapshot), - Acc - end, - {async, Folder}. - - -binary_bucketlist(State, Tag, {FoldBucketsFun, InitAcc}) -> - % List buckets for tag, assuming bucket names are all binary type - {ok, LedgerSnapshot, _JournalSnapshot} = snapshot_store(State, - ledger, - no_lookup), - Folder = fun() -> - BucketAcc = get_nextbucket(null, - null, - Tag, - LedgerSnapshot, - []), - ok = leveled_penciller:pcl_close(LedgerSnapshot), - lists:foldl(fun({B, _K}, Acc) -> FoldBucketsFun(B, Acc) end, - InitAcc, - BucketAcc) - end, - {async, Folder}. - -get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList) -> - Now = leveled_codec:integer_now(), - StartKey = leveled_codec:to_ledgerkey(NextBucket, NextKey, Tag), - EndKey = leveled_codec:to_ledgerkey(null, null, Tag), - ExtractFun = - fun(LK, V, _Acc) -> - {leveled_codec:from_ledgerkey(LK), V} - end, - R = leveled_penciller:pcl_fetchnextkey(LedgerSnapshot, - StartKey, - EndKey, - ExtractFun, - null), - case R of - null -> - leveled_log:log("B0008",[]), - BKList; - {{B, K}, V} when is_binary(B), is_binary(K) -> - case leveled_codec:is_active({B, K}, V, Now) of - true -> - leveled_log:log("B0009",[B]), - get_nextbucket(<>, - null, - Tag, - LedgerSnapshot, - [{B, K}|BKList]); - false -> - get_nextbucket(B, - <>, - Tag, - LedgerSnapshot, - BKList) - end; - {NB, _V} -> - leveled_log:log("B0010",[NB]), - [] - end. - - -index_query(State, - Bucket, - StartObjKey, - {FoldKeysFun, InitAcc}, - {IdxField, StartValue, EndValue}, - {ReturnTerms, TermRegex}) -> - StartKey = leveled_codec:to_ledgerkey(Bucket, - StartObjKey, - ?IDX_TAG, - IdxField, - StartValue), - EndKey = leveled_codec:to_ledgerkey(Bucket, - null, - ?IDX_TAG, - IdxField, - EndValue), - {ok, LedgerSnapshot, _JournalSnapshot} = snapshot_store(State, - ledger, - {StartKey, - EndKey}), - Folder = fun() -> - AddFun = case ReturnTerms of - true -> - fun add_terms/2; - _ -> - fun add_keys/2 - end, - AccFun = accumulate_index(TermRegex, AddFun, FoldKeysFun), - Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, - StartKey, - EndKey, - AccFun, - InitAcc), - ok = leveled_penciller:pcl_close(LedgerSnapshot), - Acc - end, - {async, Folder}. - - -hashlist_query(State, Tag, JournalCheck) -> - SnapType = case JournalCheck of - false -> - ledger; - check_presence -> - store - end, - {ok, LedgerSnapshot, JournalSnapshot} = snapshot_store(State, - SnapType, - no_lookup), - Folder = fun() -> - StartKey = leveled_codec:to_ledgerkey(null, null, Tag), - EndKey = leveled_codec:to_ledgerkey(null, null, Tag), - AccFun = accumulate_hashes(JournalCheck, JournalSnapshot), - Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, - StartKey, - EndKey, - AccFun, - []), - ok = leveled_penciller:pcl_close(LedgerSnapshot), - case JournalCheck of - false -> - ok; - check_presence -> - leveled_inker:ink_close(JournalSnapshot) - end, - Acc - end, - {async, Folder}. - -tictactree(State, Tag, Bucket, Query, JournalCheck, TreeSize, Filter) -> - % Journal check can be used for object key folds to confirm that the - % object is still indexed within the journal - SnapType = case JournalCheck of - false -> - ledger; - check_presence -> - store - end, - {ok, LedgerSnapshot, JournalSnapshot} = snapshot_store(State, - SnapType, - no_lookup), - Tree = leveled_tictac:new_tree(temp, TreeSize), - Folder = - fun() -> - % The start key and end key will vary depending on whether the - % fold is to fold over an index or a key range - {StartKey, EndKey, ExtractFun} = - case Tag of - ?IDX_TAG -> - {IdxField, StartIdx, EndIdx} = Query, - {leveled_codec:to_ledgerkey(Bucket, - null, - ?IDX_TAG, - IdxField, - StartIdx), - leveled_codec:to_ledgerkey(Bucket, - null, - ?IDX_TAG, - IdxField, - EndIdx), - fun(K, T) -> {K, T} end}; - _ -> - {StartObjKey, EndObjKey} = Query, - {leveled_codec:to_ledgerkey(Bucket, - StartObjKey, - Tag), - leveled_codec:to_ledgerkey(Bucket, - EndObjKey, - Tag), - fun(K, H) -> {K, {is_hash, H}} end} - end, - AccFun = accumulate_tree(Filter, - JournalCheck, - JournalSnapshot, - ExtractFun), - Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, - StartKey, - EndKey, - AccFun, - Tree), - - % Close down snapshot when complete so as not to hold removed - % files open - ok = leveled_penciller:pcl_close(LedgerSnapshot), - case JournalCheck of - false -> - ok; - check_presence -> - leveled_inker:ink_close(JournalSnapshot) - end, - Acc - end, - {async, Folder}. - -foldobjects_allkeys(State, Tag, FoldObjectsFun) -> - StartKey = leveled_codec:to_ledgerkey(null, null, Tag), - EndKey = leveled_codec:to_ledgerkey(null, null, Tag), - foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun, - false, true). - -foldheads_allkeys(State, Tag, FoldHeadsFun, CheckPresence, SnapPreFold) -> - StartKey = leveled_codec:to_ledgerkey(null, null, Tag), - EndKey = leveled_codec:to_ledgerkey(null, null, Tag), - foldobjects(State, Tag, StartKey, EndKey, FoldHeadsFun, - {true, CheckPresence}, SnapPreFold). - -foldobjects_bybucket(State, Tag, Bucket, FoldObjectsFun) -> - StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), - EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), - foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun, - false, true). - -foldheads_bybucket(State, Tag, Bucket, KeyRange, FoldHeadsFun, - CheckPresence, SnapPreFold) -> - {StartKey, EndKey} = - case KeyRange of - all -> - {leveled_codec:to_ledgerkey(Bucket, null, Tag), - leveled_codec:to_ledgerkey(Bucket, null, Tag)}; - {StartTerm, <<"$all">>} -> - {leveled_codec:to_ledgerkey(Bucket, StartTerm, Tag), - leveled_codec:to_ledgerkey(Bucket, null, Tag)}; - {StartTerm, EndTerm} -> - {leveled_codec:to_ledgerkey(Bucket, StartTerm, Tag), - leveled_codec:to_ledgerkey(Bucket, EndTerm, Tag)} - end, - foldobjects(State, Tag, StartKey, EndKey, FoldHeadsFun, - {true, CheckPresence}, SnapPreFold). - -foldobjects_byindex(State, Tag, Bucket, - Field, FromTerm, ToTerm, FoldObjectsFun) -> - StartKey = - leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, FromTerm), - EndKey = - leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, ToTerm), - foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun, - false, true). - --spec foldobjects(book_state(), atom(), tuple(), tuple(), fun(), - false|{true, boolean()}, boolean()) -> - {async, fun()}. -%% @doc -%% The object folder should be passed DeferredFetch and SnapPreFold. -%% DeferredFetch can either be false (which will return to the fold function -%% the full object), or {true, CheckPresence} - in which case a proxy object -%% will be created that if understood by the fold function will allow the fold -%% function to work on the head of the object, and defer fetching the body in -%% case such a fetch is unecessary. -%% SnapPreFold determines if the snapshot of the database is done prior to -%% returning the Folder function (SnapPreFold=true) or when the Folder function -%% is called as Folder() {SnapPreFold=false} -foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun, - DeferredFetch, SnapPreFold) -> - {FoldFun, InitAcc} = - case is_tuple(FoldObjectsFun) of - true -> - % FoldObjectsFun is already a tuple with a Fold function and an - % initial accumulator - FoldObjectsFun; - false -> - % no initial accumulatr passed, and so should be just a list - {FoldObjectsFun, []} - end, - SnapFun = - case SnapPreFold of - true -> - {ok, LS, JS} = snapshot_store(State, store, undefined), - fun() -> {ok, LS, JS} end; - false -> - Self = self(), - % Timeout will be ignored, as will Requestor - % - % This uses the external snapshot - as the snapshot will need - % to have consistent state between Bookie and Penciller when - % it is made. - fun() -> book_snapshotstore(Self, Self, 5400) end - end, - - Folder = - fun() -> - {ok, LedgerSnapshot, JournalSnapshot} = SnapFun(), - - AccFun = accumulate_objects(FoldFun, - JournalSnapshot, - Tag, - DeferredFetch), - Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, - StartKey, - EndKey, - AccFun, - InitAcc), - ok = leveled_penciller:pcl_close(LedgerSnapshot), - ok = leveled_inker:ink_close(JournalSnapshot), - Acc - end, - {async, Folder}. - - -bucketkey_query(State, Tag, Bucket, {FoldKeysFun, InitAcc}) -> - {ok, LedgerSnapshot, _JournalSnapshot} = snapshot_store(State, - ledger, - no_lookup), - Folder = fun() -> - SK = leveled_codec:to_ledgerkey(Bucket, null, Tag), - EK = leveled_codec:to_ledgerkey(Bucket, null, Tag), - AccFun = accumulate_keys(FoldKeysFun), - Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, - SK, - EK, - AccFun, - InitAcc), - ok = leveled_penciller:pcl_close(LedgerSnapshot), - Acc - end, - {async, Folder}. - -allkey_query(State, Tag, {FoldKeysFun, InitAcc}) -> - bucketkey_query(State, Tag, null, {FoldKeysFun, InitAcc}). - - readycache_forsnapshot(LedgerCache, {StartKey, EndKey}) -> {KL, MinSQN, MaxSQN} = scan_table(LedgerCache#ledger_cache.mem, StartKey, @@ -1242,200 +959,6 @@ fetch_head(Key, Penciller, LedgerCache) -> end. -accumulate_size() -> - Now = leveled_codec:integer_now(), - AccFun = fun(Key, Value, {Size, Count}) -> - case leveled_codec:is_active(Key, Value, Now) of - true -> - {Size + leveled_codec:get_size(Key, Value), - Count + 1}; - false -> - {Size, Count} - end - end, - AccFun. - -accumulate_hashes(JournalCheck, InkerClone) -> - AddKeyFun = - fun(B, K, H, Acc) -> - [{B, K, H}|Acc] - end, - get_hashaccumulator(JournalCheck, - InkerClone, - AddKeyFun). - -accumulate_tree(FilterFun, JournalCheck, InkerClone, HashFun) -> - AddKeyFun = - fun(B, K, H, Tree) -> - case FilterFun(B, K) of - accumulate -> - leveled_tictac:add_kv(Tree, K, H, HashFun, false); - pass -> - Tree - end - end, - get_hashaccumulator(JournalCheck, - InkerClone, - AddKeyFun). - -get_hashaccumulator(JournalCheck, InkerClone, AddKeyFun) -> - Now = leveled_codec:integer_now(), - AccFun = - fun(LK, V, Acc) -> - case leveled_codec:is_active(LK, V, Now) of - true -> - {B, K, H} = leveled_codec:get_keyandobjhash(LK, V), - Check = leveled_rand:uniform() < ?CHECKJOURNAL_PROB, - case {JournalCheck, Check} of - {check_presence, true} -> - case check_presence(LK, V, InkerClone) of - true -> - AddKeyFun(B, K, H, Acc); - false -> - Acc - end; - _ -> - AddKeyFun(B, K, H, Acc) - end; - false -> - Acc - end - end, - AccFun. - - -accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) -> - Now = leveled_codec:integer_now(), - AccFun = - fun(LK, V, Acc) -> - % The function takes the Ledger Key and the value from the - % ledger (with the value being the object metadata) - % - % Need to check if this is an active object (so TTL has not - % expired). - % If this is a deferred_fetch (i.e. the fold is a fold_heads not - % a fold_objects), then a metadata object needs to be built to be - % returned - but a quick check that Key is present in the Journal - % is made first - case leveled_codec:is_active(LK, V, Now) of - true -> - {SQN, _St, _MH, MD} = - leveled_codec:striphead_to_details(V), - {B, K} = - case leveled_codec:from_ledgerkey(LK) of - {B0, K0} -> - {B0, K0}; - {B0, K0, _T0} -> - {B0, K0} - end, - JK = {leveled_codec:to_ledgerkey(B, K, Tag), SQN}, - case DeferredFetch of - {true, true} -> - InJournal = - leveled_inker:ink_keycheck(InkerClone, - LK, - SQN), - case InJournal of - probably -> - ProxyObj = make_proxy_object(LK, JK, - MD, V, - InkerClone), - FoldObjectsFun(B, K, ProxyObj, Acc); - missing -> - Acc - end; - {true, false} -> - ProxyObj = make_proxy_object(LK, JK, - MD, V, - InkerClone), - FoldObjectsFun(B, K,ProxyObj, Acc); - false -> - R = fetch_value(InkerClone, JK), - case R of - not_present -> - Acc; - Value -> - FoldObjectsFun(B, K, Value, Acc) - - end - end; - false -> - Acc - end - end, - AccFun. - -make_proxy_object(LK, JK, MD, V, InkerClone) -> - Size = leveled_codec:get_size(LK, V), - MDBin = leveled_codec:build_metadata_object(LK, MD), - term_to_binary({proxy_object, - MDBin, - Size, - {fun fetch_value/2, InkerClone, JK}}). - -check_presence(Key, Value, InkerClone) -> - {LedgerKey, SQN} = leveled_codec:strip_to_keyseqonly({Key, Value}), - case leveled_inker:ink_keycheck(InkerClone, LedgerKey, SQN) of - probably -> - true; - missing -> - false - end. - -accumulate_keys(FoldKeysFun) -> - Now = leveled_codec:integer_now(), - AccFun = fun(Key, Value, Acc) -> - case leveled_codec:is_active(Key, Value, Now) of - true -> - {B, K} = leveled_codec:from_ledgerkey(Key), - FoldKeysFun(B, K, Acc); - false -> - Acc - end - end, - AccFun. - -add_keys(ObjKey, _IdxValue) -> - ObjKey. - -add_terms(ObjKey, IdxValue) -> - {IdxValue, ObjKey}. - -accumulate_index(TermRe, AddFun, FoldKeysFun) -> - Now = leveled_codec:integer_now(), - case TermRe of - undefined -> - fun(Key, Value, Acc) -> - case leveled_codec:is_active(Key, Value, Now) of - true -> - {Bucket, - ObjKey, - IdxValue} = leveled_codec:from_ledgerkey(Key), - FoldKeysFun(Bucket, AddFun(ObjKey, IdxValue), Acc); - false -> - Acc - end end; - TermRe -> - fun(Key, Value, Acc) -> - case leveled_codec:is_active(Key, Value, Now) of - true -> - {Bucket, - ObjKey, - IdxValue} = leveled_codec:from_ledgerkey(Key), - case re:run(IdxValue, TermRe) of - nomatch -> - Acc; - _ -> - FoldKeysFun(Bucket, - AddFun(ObjKey, IdxValue), - Acc) - end; - false -> - Acc - end end - end. - - preparefor_ledgercache(?INKT_KEYD, LedgerKey, SQN, _Obj, _Size, {IdxSpecs, TTL}, _AAE) -> @@ -1753,7 +1276,7 @@ hashlist_query_withjournalcheck_testto() -> {async, HTFolder2} = book_returnfolder(Bookie1, {hashlist_query, ?STD_TAG, - check_presence}), + true}), ?assertMatch(KeyHashList, HTFolder2()), ok = book_close(Bookie1), reset_filestructure(). @@ -1782,9 +1305,11 @@ foldobjects_vs_hashtree_testto() -> FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, erlang:phash2(term_to_binary(V))}|Acc] end, - {async, HTFolder2} = - book_returnfolder(Bookie1, - {foldobjects_allkeys, ?STD_TAG, FoldObjectsFun}), + {async, HTFolder2} = book_returnfolder(Bookie1, + {foldobjects_allkeys, + ?STD_TAG, + FoldObjectsFun, + true}), KeyHashList2 = HTFolder2(), ?assertMatch(KeyHashList1, lists:usort(KeyHashList2)), @@ -1861,14 +1386,18 @@ foldobjects_vs_foldheads_bybucket_testto() -> {foldobjects_bybucket, ?STD_TAG, "BucketA", - FoldObjectsFun}), + all, + FoldObjectsFun, + false}), KeyHashList1A = HTFolder1A(), {async, HTFolder1B} = book_returnfolder(Bookie1, {foldobjects_bybucket, ?STD_TAG, "BucketB", - FoldObjectsFun}), + all, + FoldObjectsFun, + true}), KeyHashList1B = HTFolder1B(), ?assertMatch(false, lists:usort(KeyHashList1A) == lists:usort(KeyHashList1B)), @@ -1900,7 +1429,7 @@ foldobjects_vs_foldheads_bybucket_testto() -> "BucketB", all, FoldHeadsFun, - false, + true, false}), KeyHashList2B = HTFolder2B(), @@ -1916,7 +1445,7 @@ foldobjects_vs_foldheads_bybucket_testto() -> "BucketB", {"Key", <<"$all">>}, FoldHeadsFun, - false, + true, false}), KeyHashList2C = HTFolder2C(), {async, HTFolder2D} = @@ -1926,8 +1455,8 @@ foldobjects_vs_foldheads_bybucket_testto() -> "BucketB", {"Key", "Keyzzzzz"}, FoldHeadsFun, - false, - false}), + true, + true}), KeyHashList2D = HTFolder2D(), ?assertMatch(true, lists:usort(KeyHashList2B) == lists:usort(KeyHashList2C)), @@ -1941,7 +1470,7 @@ foldobjects_vs_foldheads_bybucket_testto() -> "BucketB", {"Key", "Key4zzzz"}, FoldHeadsFun, - false, + true, false}), KeyHashList2E = HTFolder2E(), {async, HTFolder2F} = @@ -1951,7 +1480,7 @@ foldobjects_vs_foldheads_bybucket_testto() -> "BucketB", {"Key5", <<"all">>}, FoldHeadsFun, - false, + true, false}), KeyHashList2F = HTFolder2F(), diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index ede3d9c..209abc4 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -839,7 +839,8 @@ filepath(CompactFilePath, NewSQN, compact_journal) -> initiate_penciller_snapshot(Bookie) -> - {ok, LedgerSnap, _} = leveled_bookie:book_snapshotledger(Bookie, self(), undefined), + {ok, LedgerSnap, _} = + leveled_bookie:book_snapshot(Bookie, ledger, undefined, true), MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap), {LedgerSnap, MaxSQN}. diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index a45184e..4321ee6 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -1457,7 +1457,8 @@ simple_server_test() -> PCLr, null, ledger, - undefined), + undefined, + false), ?assertMatch(Key1, pcl_fetch(PclSnap, {o,"Bucket0001", "Key0001", null})), ?assertMatch(Key2, pcl_fetch(PclSnap, {o,"Bucket0002", "Key0002", null})), @@ -1510,7 +1511,8 @@ simple_server_test() -> PCLr, null, ledger, - undefined), + undefined, + false), ?assertMatch(false, pcl_checksequencenumber(PclSnap2, {o, diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl new file mode 100644 index 0000000..ebef076 --- /dev/null +++ b/src/leveled_runner.erl @@ -0,0 +1,543 @@ +%% -------- RUNNER --------- +%% +%% A bookie's runner would traditionally allow remote actors to place bets +%% via the runner. In this case the runner will allow a remote actor to +%% have query access to the ledger or journal. Runners provide a snapshot of +%% the book for querying the backend. +%% +%% Runners implement the {async, Folder} within Riak backends - returning an +%% {async, Runner}, where the Runner() can be called as a function. The +%% Runner may make the snapshot at the point it is called, or the snapshot can +%% be generated and encapsulated within the function (known as snap_prefold). +%% +%% Runners which view only the Ledger (the Penciller view of the state) may +%% have a CheckPresence option - which causes the function to perform a basic +%% check that the item is available in the Journal via the Inker as part of +%% the fold. This may be useful for anti-entropy folds + + +-module(leveled_runner). + +-include("include/leveled.hrl"). + +-export([ + bucket_sizestats/3, + binary_bucketlist/4, + index_query/3, + bucketkey_query/4, + hashlist_query/3, + tictactree/5, + foldheads_allkeys/4, + foldobjects_allkeys/3, + foldheads_bybucket/4, + foldobjects_bybucket/3, + foldobjects_byindex/3 + ]). + + +-include_lib("eunit/include/eunit.hrl"). + +-define(CHECKJOURNAL_PROB, 0.2). + +%%%============================================================================ +%%% External functions +%%%============================================================================ + + +-spec bucket_sizestats(fun(), any(), atom()) -> {async, fun()}. +%% @doc +%% Fold over a bucket accumulating the count of objects and their total sizes +bucket_sizestats(SnapFun, Bucket, Tag) -> + StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), + EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), + AccFun = accumulate_size(), + Runner = + fun() -> + {ok, LedgerSnap, _JournalSnap} = SnapFun(), + Acc = leveled_penciller:pcl_fetchkeys(LedgerSnap, + StartKey, + EndKey, + AccFun, + {0, 0}), + ok = leveled_penciller:pcl_close(LedgerSnap), + Acc + end, + {async, Runner}. + +-spec binary_bucketlist(fun(), atom(), fun(), any()) -> {async, fun()}. +%% @doc +%% List buckets for tag, assuming bucket names are all binary type +binary_bucketlist(SnapFun, Tag, FoldBucketsFun, InitAcc) -> + Runner = + fun() -> + {ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(), + BucketAcc = get_nextbucket(null, null, Tag, LedgerSnapshot, []), + ok = leveled_penciller:pcl_close(LedgerSnapshot), + lists:foldl(fun({B, _K}, Acc) -> FoldBucketsFun(B, Acc) end, + InitAcc, + BucketAcc) + end, + {async, Runner}. + +-spec index_query(fun(), tuple(), tuple()) -> {async, fun()}. +%% @doc +%% Secondary index query +index_query(SnapFun, {StartKey, EndKey, TermHandling}, FoldAccT) -> + {FoldKeysFun, InitAcc} = FoldAccT, + {ReturnTerms, TermRegex} = TermHandling, + AddFun = + case ReturnTerms of + true -> + fun add_terms/2; + _ -> + fun add_keys/2 + end, + AccFun = accumulate_index(TermRegex, AddFun, FoldKeysFun), + Runner = + fun() -> + {ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(), + Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, + StartKey, + EndKey, + AccFun, + InitAcc), + ok = leveled_penciller:pcl_close(LedgerSnapshot), + Acc + end, + {async, Runner}. + +-spec bucketkey_query(fun(), atom(), any(), tuple()) -> {async, fun()}. +%% @doc +%% Fold over all keys under tak (potentially restricted to a given bucket) +bucketkey_query(SnapFun, Tag, Bucket, {FoldKeysFun, InitAcc}) -> + SK = leveled_codec:to_ledgerkey(Bucket, null, Tag), + EK = leveled_codec:to_ledgerkey(Bucket, null, Tag), + AccFun = accumulate_keys(FoldKeysFun), + Runner = + fun() -> + {ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(), + Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, + SK, + EK, + AccFun, + InitAcc), + ok = leveled_penciller:pcl_close(LedgerSnapshot), + Acc + end, + {async, Runner}. + +-spec hashlist_query(fun(), atom(), boolean()) -> {async, fun()}. +%% @doc +%% Fold pver the key accumulating the hashes +hashlist_query(SnapFun, Tag, JournalCheck) -> + StartKey = leveled_codec:to_ledgerkey(null, null, Tag), + EndKey = leveled_codec:to_ledgerkey(null, null, Tag), + Runner = + fun() -> + {ok, LedgerSnapshot, JournalSnapshot} = SnapFun(), + AccFun = accumulate_hashes(JournalCheck, JournalSnapshot), + Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, + StartKey, + EndKey, + AccFun, + []), + ok = leveled_penciller:pcl_close(LedgerSnapshot), + case JournalCheck of + false -> + ok; + true -> + leveled_inker:ink_close(JournalSnapshot) + end, + Acc + end, + {async, Runner}. + +-spec tictactree(fun(), {atom(), any(), tuple()}, boolean(), atom(), fun()) + -> {async, fun()}. +%% @doc +%% Return a merkle tree from the fold, directly accessing hashes cached in the +%% metadata +tictactree(SnapFun, {Tag, Bucket, Query}, JournalCheck, TreeSize, Filter) -> + % Journal check can be used for object key folds to confirm that the + % object is still indexed within the journal + Tree = leveled_tictac:new_tree(temp, TreeSize), + Runner = + fun() -> + {ok, LedgerSnap, JournalSnap} = SnapFun(), + % The start key and end key will vary depending on whether the + % fold is to fold over an index or a key range + {StartKey, EndKey, ExtractFun} = + case Tag of + ?IDX_TAG -> + {IdxFld, StartIdx, EndIdx} = Query, + KeyDefFun = fun leveled_codec:to_ledgerkey/5, + {KeyDefFun(Bucket, null, ?IDX_TAG, IdxFld, StartIdx), + KeyDefFun(Bucket, null, ?IDX_TAG, IdxFld, EndIdx), + fun(K, T) -> {K, T} end}; + _ -> + {StartOKey, EndOKey} = Query, + {leveled_codec:to_ledgerkey(Bucket, StartOKey, Tag), + leveled_codec:to_ledgerkey(Bucket, EndOKey, Tag), + fun(K, H) -> {K, {is_hash, H}} end} + end, + AccFun = + accumulate_tree(Filter, JournalCheck, JournalSnap, ExtractFun), + Acc = + leveled_penciller:pcl_fetchkeys(LedgerSnap, + StartKey, EndKey, + AccFun, Tree), + + % Close down snapshot when complete so as not to hold removed + % files open + ok = leveled_penciller:pcl_close(LedgerSnap), + case JournalCheck of + false -> + ok; + true -> + leveled_inker:ink_close(JournalSnap) + end, + Acc + end, + {async, Runner}. + +-spec foldheads_allkeys(fun(), atom(), fun(), boolean()) -> {async, fun()}. +%% @doc +%% Fold over all heads in the store for a given tag - applying the passed +%% function to each proxy object +foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck) -> + StartKey = leveled_codec:to_ledgerkey(null, null, Tag), + EndKey = leveled_codec:to_ledgerkey(null, null, Tag), + foldobjects(SnapFun, Tag, StartKey, EndKey, FoldFun, {true, JournalCheck}). + +-spec foldobjects_allkeys(fun(), atom(), fun()) -> {async, fun()}. +%% @doc +%% Fold over all objects for a given tag +foldobjects_allkeys(SnapFun, Tag, FoldFun) -> + StartKey = leveled_codec:to_ledgerkey(null, null, Tag), + EndKey = leveled_codec:to_ledgerkey(null, null, Tag), + foldobjects(SnapFun, Tag, StartKey, EndKey, FoldFun, false). + +-spec foldobjects_bybucket(fun(), {atom(), any(), any()}, fun()) -> + {async, fun()}. +%% @doc +%% Fold over all objects within a given key range in a bucket +foldobjects_bybucket(SnapFun, {Tag, StartKey, EndKey}, FoldFun) -> + foldobjects(SnapFun, Tag, StartKey, EndKey, FoldFun, false). + +-spec foldheads_bybucket(fun(), {atom(), any(), any()}, fun(), boolean()) -> + {async, fun()}. +%% @doc +%% Fold over all object metadata within a given key range in a bucket +foldheads_bybucket(SnapFun, {Tag, StartKey, EndKey}, FoldFun, JournalCheck) -> + foldobjects(SnapFun, Tag, StartKey, EndKey, FoldFun, {true, JournalCheck}). + +-spec foldobjects_byindex(fun(), tuple(), fun()) -> {async, fun()}. +%% @doc +%% Folds over an index, fetching the objects associated with the keys returned +%% and passing those objects into the fold function +foldobjects_byindex(SnapFun, {Tag, Bucket, Field, FromTerm, ToTerm}, FoldFun) -> + StartKey = + leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, FromTerm), + EndKey = + leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, ToTerm), + foldobjects(SnapFun, Tag, StartKey, EndKey, FoldFun, false). + + + + +%%%============================================================================ +%%% Internal functions +%%%============================================================================ + +get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList) -> + Now = leveled_codec:integer_now(), + StartKey = leveled_codec:to_ledgerkey(NextBucket, NextKey, Tag), + EndKey = leveled_codec:to_ledgerkey(null, null, Tag), + ExtractFun = + fun(LK, V, _Acc) -> + {leveled_codec:from_ledgerkey(LK), V} + end, + R = leveled_penciller:pcl_fetchnextkey(LedgerSnapshot, + StartKey, + EndKey, + ExtractFun, + null), + case R of + null -> + leveled_log:log("B0008",[]), + BKList; + {{B, K}, V} when is_binary(B), is_binary(K) -> + case leveled_codec:is_active({B, K}, V, Now) of + true -> + leveled_log:log("B0009",[B]), + get_nextbucket(<>, + null, + Tag, + LedgerSnapshot, + [{B, K}|BKList]); + false -> + get_nextbucket(B, + <>, + Tag, + LedgerSnapshot, + BKList) + end; + {NB, _V} -> + leveled_log:log("B0010",[NB]), + [] + end. + + +-spec foldobjects(fun(), atom(), tuple(), tuple(), fun(), + false|{true, boolean()}) -> + {async, fun()}. +%% @doc +%% The object folder should be passed DeferredFetch. +%% DeferredFetch can either be false (which will return to the fold function +%% the full object), or {true, CheckPresence} - in which case a proxy object +%% will be created that if understood by the fold function will allow the fold +%% function to work on the head of the object, and defer fetching the body in +%% case such a fetch is unecessary. +foldobjects(SnapFun, Tag, StartKey, EndKey, FoldObjectsFun, DeferredFetch) -> + {FoldFun, InitAcc} = + case is_tuple(FoldObjectsFun) of + true -> + % FoldObjectsFun is already a tuple with a Fold function and an + % initial accumulator + FoldObjectsFun; + false -> + % no initial accumulatr passed, and so should be just a list + {FoldObjectsFun, []} + end, + + Folder = + fun() -> + {ok, LedgerSnapshot, JournalSnapshot} = SnapFun(), + + AccFun = accumulate_objects(FoldFun, + JournalSnapshot, + Tag, + DeferredFetch), + Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, + StartKey, + EndKey, + AccFun, + InitAcc), + ok = leveled_penciller:pcl_close(LedgerSnapshot), + case DeferredFetch of + {true, false} -> + ok; + _ -> + ok = leveled_inker:ink_close(JournalSnapshot) + end, + Acc + end, + {async, Folder}. + + +accumulate_size() -> + Now = leveled_codec:integer_now(), + AccFun = fun(Key, Value, {Size, Count}) -> + case leveled_codec:is_active(Key, Value, Now) of + true -> + {Size + leveled_codec:get_size(Key, Value), + Count + 1}; + false -> + {Size, Count} + end + end, + AccFun. + +accumulate_hashes(JournalCheck, InkerClone) -> + AddKeyFun = + fun(B, K, H, Acc) -> + [{B, K, H}|Acc] + end, + get_hashaccumulator(JournalCheck, + InkerClone, + AddKeyFun). + +accumulate_tree(FilterFun, JournalCheck, InkerClone, HashFun) -> + AddKeyFun = + fun(B, K, H, Tree) -> + case FilterFun(B, K) of + accumulate -> + leveled_tictac:add_kv(Tree, K, H, HashFun, false); + pass -> + Tree + end + end, + get_hashaccumulator(JournalCheck, + InkerClone, + AddKeyFun). + +get_hashaccumulator(JournalCheck, InkerClone, AddKeyFun) -> + Now = leveled_codec:integer_now(), + AccFun = + fun(LK, V, Acc) -> + case leveled_codec:is_active(LK, V, Now) of + true -> + {B, K, H} = leveled_codec:get_keyandobjhash(LK, V), + Check = leveled_rand:uniform() < ?CHECKJOURNAL_PROB, + case {JournalCheck, Check} of + {true, true} -> + case check_presence(LK, V, InkerClone) of + true -> + AddKeyFun(B, K, H, Acc); + false -> + Acc + end; + _ -> + AddKeyFun(B, K, H, Acc) + end; + false -> + Acc + end + end, + AccFun. + + +accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) -> + Now = leveled_codec:integer_now(), + AccFun = + fun(LK, V, Acc) -> + % The function takes the Ledger Key and the value from the + % ledger (with the value being the object metadata) + % + % Need to check if this is an active object (so TTL has not + % expired). + % If this is a deferred_fetch (i.e. the fold is a fold_heads not + % a fold_objects), then a metadata object needs to be built to be + % returned - but a quick check that Key is present in the Journal + % is made first + case leveled_codec:is_active(LK, V, Now) of + true -> + {SQN, _St, _MH, MD} = + leveled_codec:striphead_to_details(V), + {B, K} = + case leveled_codec:from_ledgerkey(LK) of + {B0, K0} -> + {B0, K0}; + {B0, K0, _T0} -> + {B0, K0} + end, + JK = {leveled_codec:to_ledgerkey(B, K, Tag), SQN}, + case DeferredFetch of + {true, true} -> + InJournal = + leveled_inker:ink_keycheck(InkerClone, + LK, + SQN), + case InJournal of + probably -> + ProxyObj = make_proxy_object(LK, JK, + MD, V, + InkerClone), + FoldObjectsFun(B, K, ProxyObj, Acc); + missing -> + Acc + end; + {true, false} -> + ProxyObj = make_proxy_object(LK, JK, + MD, V, + InkerClone), + FoldObjectsFun(B, K,ProxyObj, Acc); + false -> + R = leveled_bookie:fetch_value(InkerClone, JK), + case R of + not_present -> + Acc; + Value -> + FoldObjectsFun(B, K, Value, Acc) + + end + end; + false -> + Acc + end + end, + AccFun. + +make_proxy_object(LK, JK, MD, V, InkerClone) -> + Size = leveled_codec:get_size(LK, V), + MDBin = leveled_codec:build_metadata_object(LK, MD), + term_to_binary({proxy_object, + MDBin, + Size, + {fun leveled_bookie:fetch_value/2, InkerClone, JK}}). + +check_presence(Key, Value, InkerClone) -> + {LedgerKey, SQN} = leveled_codec:strip_to_keyseqonly({Key, Value}), + case leveled_inker:ink_keycheck(InkerClone, LedgerKey, SQN) of + probably -> + true; + missing -> + false + end. + +accumulate_keys(FoldKeysFun) -> + Now = leveled_codec:integer_now(), + AccFun = fun(Key, Value, Acc) -> + case leveled_codec:is_active(Key, Value, Now) of + true -> + {B, K} = leveled_codec:from_ledgerkey(Key), + FoldKeysFun(B, K, Acc); + false -> + Acc + end + end, + AccFun. + +add_keys(ObjKey, _IdxValue) -> + ObjKey. + +add_terms(ObjKey, IdxValue) -> + {IdxValue, ObjKey}. + +accumulate_index(TermRe, AddFun, FoldKeysFun) -> + Now = leveled_codec:integer_now(), + case TermRe of + undefined -> + fun(Key, Value, Acc) -> + case leveled_codec:is_active(Key, Value, Now) of + true -> + {Bucket, + ObjKey, + IdxValue} = leveled_codec:from_ledgerkey(Key), + FoldKeysFun(Bucket, AddFun(ObjKey, IdxValue), Acc); + false -> + Acc + end end; + TermRe -> + fun(Key, Value, Acc) -> + case leveled_codec:is_active(Key, Value, Now) of + true -> + {Bucket, + ObjKey, + IdxValue} = leveled_codec:from_ledgerkey(Key), + case re:run(IdxValue, TermRe) of + nomatch -> + Acc; + _ -> + FoldKeysFun(Bucket, + AddFun(ObjKey, IdxValue), + Acc) + end; + false -> + Acc + end end + end. + + +%%%============================================================================ +%%% Test +%%%============================================================================ + +-ifdef(TEST). + + + +-endif. + + + \ No newline at end of file diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index cbb6b5d..92ca46e 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -156,11 +156,8 @@ journal_compaction(_Config) -> testutil:wait_for_compaction(Bookie1), % Start snapshot - should not stop deletions - {ok, - PclClone, - InkClone} = leveled_bookie:book_snapshotstore(Bookie1, - self(), - 300000), + {ok, PclClone, InkClone} = + leveled_bookie:book_snapshot(Bookie1, store, undefined, false), % Wait 2 seconds for files to be deleted WasteFP = RootPath ++ "/journal/journal_files/waste", lists:foldl(fun(X, Found) -> @@ -500,7 +497,8 @@ space_clear_ondelete(_Config) -> {async, HTreeF1} = leveled_bookie:book_returnfolder(Book1, {foldobjects_allkeys, ?RIAK_TAG, - FoldObjectsFun}), + FoldObjectsFun, + false}), {async, KF1} = leveled_bookie:book_returnfolder(Book1, AllKeyQuery), % Delete the keys @@ -534,8 +532,8 @@ space_clear_ondelete(_Config) -> io:format("Waiting for journal deletes - blocked~n"), timer:sleep(20000), - % for this query snapshot is made at fold time - EDIT now uses a snapshot! - true = length(HTreeF1()) == 80000, + % for this query snapshot is made at fold time + true = length(HTreeF1()) == 0, % This query uses a genuine async fold on a snasphot made at request time true = length(KF1()) == 80000, diff --git a/test/end_to_end/iterator_SUITE.erl b/test/end_to_end/iterator_SUITE.erl index dd03edb..562a402 100644 --- a/test/end_to_end/iterator_SUITE.erl +++ b/test/end_to_end/iterator_SUITE.erl @@ -128,13 +128,16 @@ small_load_with2i(_Config) -> {async, HTreeF1} = leveled_bookie:book_returnfolder(Bookie1, {foldobjects_allkeys, ?RIAK_TAG, - FoldObjectsFun}), + FoldObjectsFun, + false}), KeyHashList1 = HTreeF1(), {async, HTreeF2} = leveled_bookie:book_returnfolder(Bookie1, {foldobjects_bybucket, ?RIAK_TAG, "Bucket", - FoldObjectsFun}), + all, + FoldObjectsFun, + false}), KeyHashList2 = HTreeF2(), {async, HTreeF3} = leveled_bookie:book_returnfolder(Bookie1, {foldobjects_byindex, @@ -142,7 +145,8 @@ small_load_with2i(_Config) -> "Bucket", {"idx1_bin", "#", "|"}, - FoldObjectsFun}), + FoldObjectsFun, + false}), KeyHashList3 = HTreeF3(), true = 9901 == length(KeyHashList1), % also includes the test object true = 9900 == length(KeyHashList2), @@ -152,7 +156,8 @@ small_load_with2i(_Config) -> {I, _Bin} = testutil:get_value(Obj), Acc + I end, - BucketObjQ = {foldobjects_bybucket, ?RIAK_TAG, "Bucket", {SumIntFun, 0}}, + BucketObjQ = + {foldobjects_bybucket, ?RIAK_TAG, "Bucket", all, {SumIntFun, 0}, true}, {async, Sum1} = leveled_bookie:book_returnfolder(Bookie1, BucketObjQ), Total1 = Sum1(), true = Total1 > 100000, diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index de32e5a..8df2e21 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -202,7 +202,7 @@ aae_bustedjournal(_Config) -> {async, HashTreeF2} = leveled_bookie:book_returnfolder(Bookie2, {hashlist_query, ?RIAK_TAG, - check_presence}), + true}), KeyHashList2 = HashTreeF2(), % The file is still there, and the hashtree is not corrupted KeyHashList2 = KeyHashList1, @@ -219,7 +219,8 @@ aae_bustedjournal(_Config) -> {async, HashTreeF3} = leveled_bookie:book_returnfolder(Bookie2, {foldobjects_allkeys, ?RIAK_TAG, - FoldObjectsFun}), + FoldObjectsFun, + false}), KeyHashList3 = HashTreeF3(), true = length(KeyHashList3) > 19000, @@ -241,7 +242,8 @@ aae_bustedjournal(_Config) -> {async, HashTreeF4} = leveled_bookie:book_returnfolder(Bookie3, {foldobjects_allkeys, ?RIAK_TAG, - FoldObjectsFun}), + FoldObjectsFun, + false}), KeyHashList4 = HashTreeF4(), true = length(KeyHashList4) == 20001, @@ -258,7 +260,8 @@ aae_bustedjournal(_Config) -> {async, HashTreeF5} = leveled_bookie:book_returnfolder(Bookie4, {foldobjects_allkeys, ?RIAK_TAG, - FoldObjectsFun}), + FoldObjectsFun, + false}), KeyHashList5 = HashTreeF5(), true = length(KeyHashList5) > 19000, @@ -274,7 +277,7 @@ aae_bustedjournal(_Config) -> {async, HashTreeF6} = leveled_bookie:book_returnfolder(Bookie4, {hashlist_query, ?RIAK_TAG, - check_presence}), + true}), KeyHashList6 = HashTreeF6(), true = length(KeyHashList6) > 19000, true = length(KeyHashList6) < HeadCount, @@ -289,7 +292,8 @@ aae_bustedjournal(_Config) -> {async, HashTreeF7} = leveled_bookie:book_returnfolder(Bookie5, {foldobjects_allkeys, ?RIAK_TAG, - FoldObjectsFun}), + FoldObjectsFun, + false}), KeyHashList7 = HashTreeF7(), true = length(KeyHashList7) == 20001,