diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 7804800..bc30bd9 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -83,9 +83,10 @@ -define(LONG_RUNNING, 80000). -record(ledger_cache, {mem :: ets:tab(), - loader = leveled_tree:empty(?CACHE_TYPE) :: tuple(), + loader = leveled_tree:empty(?CACHE_TYPE) + :: tuple()|empty_cache, load_queue = [] :: list(), - index = leveled_pmem:new_index(), % array + index = leveled_pmem:new_index(), % array or empty_index min_sqn = infinity :: integer()|infinity, max_sqn = 0 :: integer()}). @@ -502,9 +503,17 @@ handle_call({return_folder, FolderType}, _From, State) -> {FoldKeysFun, Acc}, {IdxField, StartValue, EndValue}, {ReturnTerms, TermRegex}} -> + {Bucket, StartObjKey} = + case Constraint of + {B, SK} -> + {B, SK}; + B -> + {B, null} + end, {reply, index_query(State, - Constraint, + Bucket, + StartObjKey, {FoldKeysFun, Acc}, {IdxField, StartValue, EndValue}, {ReturnTerms, TermRegex}), @@ -579,6 +588,10 @@ code_change(_OldVsn, State, _Extra) -> %%% External functions %%%============================================================================ +%% @doc Load a snapshot of the penciller with the contents of the ledger cache +%% If the snapshot is to be loaded for a query then #ledger_cache.index may +%% be empty_index (As no need to update lookups), also #ledger_cache.loader +%% may also be empty_cache if there are no relevant results in the LedgerCache load_snapshot(LedgerSnapshot, LedgerCache) -> CacheToLoad = {LedgerCache#ledger_cache.loader, LedgerCache#ledger_cache.index, @@ -586,9 +599,14 @@ load_snapshot(LedgerSnapshot, LedgerCache) -> LedgerCache#ledger_cache.max_sqn}, ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, CacheToLoad). +%% @doc Empty the ledger cache table following a push empty_ledgercache() -> #ledger_cache{mem = ets:new(empty, [ordered_set])}. +%% @doc push the ledgercache to the Penciller - which should respond ok or +%% returned. If the response is ok the cache can be flushed, but if the +%% response is returned the cache should continue to build and it should try +%% to flush at a later date push_ledgercache(Penciller, Cache) -> CacheToLoad = {Cache#ledger_cache.loader, Cache#ledger_cache.index, @@ -596,6 +614,11 @@ push_ledgercache(Penciller, Cache) -> Cache#ledger_cache.max_sqn}, leveled_penciller:pcl_pushmem(Penciller, CacheToLoad). +%% @doc the ledger cache can be built from a queue, for example when +%% loading the ledger from the head of the journal on startup +%% +%% The queue should be build using [NewKey|Acc] so that the most recent +%% key is kept in the sort loadqueue_ledgercache(Cache) -> SL = lists:ukeysort(1, Cache#ledger_cache.load_queue), T = leveled_tree:from_orderedlist(SL, ?CACHE_TYPE), @@ -605,8 +628,6 @@ loadqueue_ledgercache(Cache) -> %%% Internal functions %%%============================================================================ - - maybe_longrunning(SW, Aspect) -> case timer:now_diff(os:timestamp(), SW) of N when N > ?LONG_RUNNING -> @@ -621,7 +642,7 @@ cache_size(LedgerCache) -> bucket_stats(State, Bucket, Tag) -> {ok, {LedgerSnapshot, LedgerCache}, - _JournalSnapshot} = snapshot_store(State, ledger), + _JournalSnapshot} = snapshot_store(State, ledger, no_lookup), Folder = fun() -> leveled_log:log("B0004", [cache_size(LedgerCache)]), load_snapshot(LedgerSnapshot, LedgerCache), @@ -643,7 +664,7 @@ binary_bucketlist(State, Tag, {FoldBucketsFun, InitAcc}) -> % List buckets for tag, assuming bucket names are all binary type {ok, {LedgerSnapshot, LedgerCache}, - _JournalSnapshot} = snapshot_store(State, ledger), + _JournalSnapshot} = snapshot_store(State, ledger, no_lookup), Folder = fun() -> leveled_log:log("B0004", [cache_size(LedgerCache)]), load_snapshot(LedgerSnapshot, LedgerCache), @@ -684,33 +705,27 @@ get_nextbucket(NextBucket, Tag, LedgerSnapshot, BKList) -> index_query(State, - Constraint, + Bucket, + StartObjKey, {FoldKeysFun, InitAcc}, {IdxField, StartValue, EndValue}, {ReturnTerms, TermRegex}) -> + StartKey = leveled_codec:to_ledgerkey(Bucket, + StartObjKey, + ?IDX_TAG, + IdxField, + StartValue), + EndKey = leveled_codec:to_ledgerkey(Bucket, + null, + ?IDX_TAG, + IdxField, + EndValue), {ok, {LedgerSnapshot, LedgerCache}, - _JournalSnapshot} = snapshot_store(State, ledger), - {Bucket, StartObjKey} = - case Constraint of - {B, SK} -> - {B, SK}; - B -> - {B, null} - end, + _JournalSnapshot} = snapshot_store(State, ledger, {StartKey, EndKey}), Folder = fun() -> leveled_log:log("B0004", [cache_size(LedgerCache)]), load_snapshot(LedgerSnapshot, LedgerCache), - StartKey = leveled_codec:to_ledgerkey(Bucket, - StartObjKey, - ?IDX_TAG, - IdxField, - StartValue), - EndKey = leveled_codec:to_ledgerkey(Bucket, - null, - ?IDX_TAG, - IdxField, - EndValue), AddFun = case ReturnTerms of true -> fun add_terms/2; @@ -738,7 +753,7 @@ hashtree_query(State, Tag, JournalCheck) -> end, {ok, {LedgerSnapshot, LedgerCache}, - JournalSnapshot} = snapshot_store(State, SnapType), + JournalSnapshot} = snapshot_store(State, SnapType, no_lookup), Folder = fun() -> leveled_log:log("B0004", [cache_size(LedgerCache)]), load_snapshot(LedgerSnapshot, LedgerCache), @@ -808,7 +823,7 @@ foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun) -> bucketkey_query(State, Tag, Bucket, {FoldKeysFun, InitAcc}) -> {ok, {LedgerSnapshot, LedgerCache}, - _JournalSnapshot} = snapshot_store(State, ledger), + _JournalSnapshot} = snapshot_store(State, ledger, no_lookup), Folder = fun() -> leveled_log:log("B0004", [cache_size(LedgerCache)]), load_snapshot(LedgerSnapshot, LedgerCache), @@ -830,10 +845,13 @@ allkey_query(State, Tag, {FoldKeysFun, InitAcc}) -> snapshot_store(State, SnapType) -> + snapshot_store(State, SnapType, undefined). + +snapshot_store(State, SnapType, Query) -> PCLopts = #penciller_options{start_snapshot=true, source_penciller=State#state.penciller}, {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), - LedgerCache = readycache_forsnapshot(State#state.ledger_cache), + LedgerCache = readycache_forsnapshot(State#state.ledger_cache, Query), case SnapType of store -> InkerOpts = #inker_options{start_snapshot=true, @@ -844,14 +862,70 @@ snapshot_store(State, SnapType) -> {ok, {LedgerSnapshot, LedgerCache}, null} end. -readycache_forsnapshot(LedgerCache) -> +readycache_forsnapshot(LedgerCache, {StartKey, EndKey}) -> + {KL, MinSQN, MaxSQN} = scan_table(LedgerCache#ledger_cache.mem, + StartKey, + EndKey), + case KL of + [] -> + #ledger_cache{loader=empty_cache, + index=empty_index, + min_sqn=MinSQN, + max_sqn=MaxSQN}; + _ -> + #ledger_cache{loader=leveled_tree:from_orderedlist(KL, + ?CACHE_TYPE), + index=empty_index, + min_sqn=MinSQN, + max_sqn=MaxSQN} + end; +readycache_forsnapshot(LedgerCache, Query) -> % Need to convert the Ledger Cache away from using the ETS table Tree = leveled_tree:from_orderedset(LedgerCache#ledger_cache.mem, ?CACHE_TYPE), - Idx = LedgerCache#ledger_cache.index, - MinSQN = LedgerCache#ledger_cache.min_sqn, - MaxSQN = LedgerCache#ledger_cache.max_sqn, - #ledger_cache{loader=Tree, index=Idx, min_sqn=MinSQN, max_sqn=MaxSQN}. + case leveled_tree:tsize(Tree) of + 0 -> + #ledger_cache{loader=empty_cache, + index=empty_index, + min_sqn=LedgerCache#ledger_cache.min_sqn, + max_sqn=LedgerCache#ledger_cache.max_sqn}; + _ -> + Idx = + case Query of + no_lookup -> + empty_index; + _ -> + LedgerCache#ledger_cache.index + end, + #ledger_cache{loader=Tree, + index=Idx, + min_sqn=LedgerCache#ledger_cache.min_sqn, + max_sqn=LedgerCache#ledger_cache.max_sqn} + end. + +scan_table(Table, StartKey, EndKey) -> + scan_table(Table, StartKey, EndKey, [], infinity, 0). + +scan_table(Table, StartKey, EndKey, Acc, MinSQN, MaxSQN) -> + case ets:next(Table, StartKey) of + '$end_of_table' -> + {lists:reverse(Acc), MinSQN, MaxSQN}; + NextKey -> + case leveled_codec:endkey_passed(EndKey, NextKey) of + true -> + {lists:reverse(Acc), MinSQN, MaxSQN}; + false -> + [{NextKey, NextVal}] = ets:lookup(Table, NextKey), + SQN = leveled_codec:strip_to_seqonly({NextKey, NextVal}), + scan_table(Table, + NextKey, + EndKey, + [{NextKey, NextVal}|Acc], + min(MinSQN, SQN), + max(MaxSQN, SQN)) + end + end. + set_options(Opts) -> MaxJournalSize0 = get_opt(max_journalsize, Opts, 10000000000), @@ -1402,6 +1476,63 @@ foldobjects_vs_hashtree_test() -> ok = book_close(Bookie1), reset_filestructure(). +scan_table_test() -> + K1 = leveled_codec:to_ledgerkey(<<"B1">>, + <<"K1">>, + ?IDX_TAG, + <<"F1-bin">>, + <<"AA1">>), + K2 = leveled_codec:to_ledgerkey(<<"B1">>, + <<"K2">>, + ?IDX_TAG, + <<"F1-bin">>, + <<"AA1">>), + K3 = leveled_codec:to_ledgerkey(<<"B1">>, + <<"K3">>, + ?IDX_TAG, + <<"F1-bin">>, + <<"AB1">>), + K4 = leveled_codec:to_ledgerkey(<<"B1">>, + <<"K4">>, + ?IDX_TAG, + <<"F1-bin">>, + <<"AA2">>), + K5 = leveled_codec:to_ledgerkey(<<"B2">>, + <<"K5">>, + ?IDX_TAG, + <<"F1-bin">>, + <<"AA2">>), + Tab0 = ets:new(mem, [ordered_set]), + + SK_A0 = leveled_codec:to_ledgerkey(<<"B1">>, + null, + ?IDX_TAG, + <<"F1-bin">>, + <<"AA0">>), + EK_A9 = leveled_codec:to_ledgerkey(<<"B1">>, + null, + ?IDX_TAG, + <<"F1-bin">>, + <<"AA9">>), + Empty = {[], infinity, 0}, + ?assertMatch(Empty, + scan_table(Tab0, SK_A0, EK_A9)), + ets:insert(Tab0, [{K1, {1, active, no_lookup, null}}]), + ?assertMatch({[{K1, _}], 1, 1}, + scan_table(Tab0, SK_A0, EK_A9)), + ets:insert(Tab0, [{K2, {2, active, no_lookup, null}}]), + ?assertMatch({[{K1, _}, {K2, _}], 1, 2}, + scan_table(Tab0, SK_A0, EK_A9)), + ets:insert(Tab0, [{K3, {3, active, no_lookup, null}}]), + ?assertMatch({[{K1, _}, {K2, _}], 1, 2}, + scan_table(Tab0, SK_A0, EK_A9)), + ets:insert(Tab0, [{K4, {4, active, no_lookup, null}}]), + ?assertMatch({[{K1, _}, {K2, _}, {K4, _}], 1, 4}, + scan_table(Tab0, SK_A0, EK_A9)), + ets:insert(Tab0, [{K5, {5, active, no_lookup, null}}]), + ?assertMatch({[{K1, _}, {K2, _}, {K4, _}], 1, 4}, + scan_table(Tab0, SK_A0, EK_A9)). + longrunning_test() -> SW = os:timestamp(), timer:sleep(100), diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 77c97e9..853a01c 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -312,7 +312,6 @@ pcl_releasesnapshot(Pid, Snapshot) -> pcl_loadsnapshot(Pid, Increment) -> gen_server:call(Pid, {load_snapshot, Increment}, infinity). - pcl_close(Pid) -> gen_server:call(Pid, close, 60000). @@ -437,14 +436,27 @@ handle_call({register_snapshot, Snapshot}, _From, State) -> {reply, {ok, State}, State#state{manifest = Manifest0}}; handle_call({load_snapshot, {BookieIncrTree, BookieIdx, MinSQN, MaxSQN}}, _From, State) -> - L0D = leveled_pmem:add_to_cache(State#state.levelzero_size, - {BookieIncrTree, MinSQN, MaxSQN}, - State#state.ledger_sqn, - State#state.levelzero_cache), - {LedgerSQN, L0Size, L0Cache} = L0D, - L0Index = leveled_pmem:add_to_index(BookieIdx, - State#state.levelzero_index, - length(L0Cache)), + {LedgerSQN, L0Size, L0Cache} = + case BookieIncrTree of + empty_cache -> + {State#state.ledger_sqn, + State#state.levelzero_size, + State#state.levelzero_cache}; + _ -> + leveled_pmem:add_to_cache(State#state.levelzero_size, + {BookieIncrTree, MinSQN, MaxSQN}, + State#state.ledger_sqn, + State#state.levelzero_cache) + end, + L0Index = + case BookieIdx of + empty_index -> + State#state.levelzero_index; + _ -> + leveled_pmem:add_to_index(BookieIdx, + State#state.levelzero_index, + length(L0Cache)) + end, {reply, ok, State#state{levelzero_cache=L0Cache, levelzero_size=L0Size, levelzero_index=L0Index,