diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 730085b..28049f7 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -74,7 +74,9 @@ book_logsettings/1, book_loglevel/2, book_addlogs/2, - book_removelogs/2]). + book_removelogs/2, + book_headstatus/1 + ]). %% folding API -export([ @@ -155,6 +157,7 @@ head_only = false :: boolean(), head_lookup = true :: boolean(), ink_checking = ?MAX_KEYCHECK_FREQUENCY :: integer(), + bookie_monref :: reference() | undefined, monitor = {no_monitor, 0} :: leveled_monitor:monitor()}). @@ -1134,6 +1137,12 @@ book_addlogs(Pid, ForcedLogs) -> book_removelogs(Pid, ForcedLogs) -> gen_server:cast(Pid, {remove_logs, ForcedLogs}). +-spec book_headstatus(pid()) -> {boolean(), boolean()}. +%% @doc +%% Return booleans to state the bookie is in head_only mode, and supporting +%% lookups +book_headstatus(Pid) -> + gen_server:call(Pid, head_status, infinity). %%%============================================================================ %%% gen_server callbacks @@ -1235,10 +1244,17 @@ init([Opts]) -> {Bookie, undefined} -> {ok, Penciller, Inker} = book_snapshot(Bookie, store, undefined, true), + BookieMonitor = erlang:monitor(process, Bookie), + NewETS = ets:new(mem, [ordered_set]), + {HeadOnly, Lookup} = leveled_bookie:book_headstatus(Bookie), leveled_log:log(b0002, [Inker, Penciller]), {ok, #state{penciller = Penciller, inker = Inker, + ledger_cache = #ledger_cache{mem = NewETS}, + head_only = HeadOnly, + head_lookup = Lookup, + bookie_monref = BookieMonitor, is_snapshot = true}} end. @@ -1490,6 +1506,8 @@ handle_call(destroy, _From, State=#state{is_snapshot=Snp}) when Snp == false -> {stop, normal, ok, State}; handle_call(return_actors, _From, State) -> {reply, {ok, State#state.inker, State#state.penciller}, State}; +handle_call(head_status, _From, State) -> + {reply, {State#state.head_only, State#state.head_lookup}, State}; handle_call(Msg, _From, State) -> {reply, {unsupported_message, element(1, Msg)}, State}. @@ -1497,35 +1515,53 @@ handle_call(Msg, _From, State) -> handle_cast({log_level, LogLevel}, State) -> PCL = State#state.penciller, INK = State#state.inker, - Monitor = element(1, State#state.monitor), ok = leveled_penciller:pcl_loglevel(PCL, LogLevel), ok = leveled_inker:ink_loglevel(INK, LogLevel), - ok = leveled_monitor:log_level(Monitor, LogLevel), + case element(1, State#state.monitor) of + no_monitor -> + ok; + Monitor -> + leveled_monitor:log_level(Monitor, LogLevel) + end, ok = leveled_log:set_loglevel(LogLevel), {noreply, State}; handle_cast({add_logs, ForcedLogs}, State) -> PCL = State#state.penciller, INK = State#state.inker, - Monitor = element(1, State#state.monitor), ok = leveled_penciller:pcl_addlogs(PCL, ForcedLogs), ok = leveled_inker:ink_addlogs(INK, ForcedLogs), - ok = leveled_monitor:log_add(Monitor, ForcedLogs), + case element(1, State#state.monitor) of + no_monitor -> + ok; + Monitor -> + leveled_monitor:log_add(Monitor, ForcedLogs) + end, ok = leveled_log:add_forcedlogs(ForcedLogs), {noreply, State}; handle_cast({remove_logs, ForcedLogs}, State) -> PCL = State#state.penciller, INK = State#state.inker, - Monitor = element(1, State#state.monitor), ok = leveled_penciller:pcl_removelogs(PCL, ForcedLogs), ok = leveled_inker:ink_removelogs(INK, ForcedLogs), - ok = leveled_monitor:log_remove(Monitor, ForcedLogs), + case element(1, State#state.monitor) of + no_monitor -> + ok; + Monitor -> + leveled_monitor:log_remove(Monitor, ForcedLogs) + end, ok = leveled_log:remove_forcedlogs(ForcedLogs), {noreply, State}. +%% handle the bookie stopping and stop this snapshot +handle_info({'DOWN', BookieMonRef, process, BookiePid, Info}, + State=#state{bookie_monref = BookieMonRef, is_snapshot = true}) -> + leveled_log:log(b0004, [BookiePid, Info]), + {stop, normal, State}; handle_info(_Info, State) -> {noreply, State}. + terminate(Reason, _State) -> leveled_log:log(b0003, [Reason]). @@ -1827,10 +1863,11 @@ set_options(Opts, Monitor) -> }. --spec return_snapfun(book_state(), store|ledger, - tuple()|no_lookup|undefined, - boolean(), boolean()) - -> fun(() -> {ok, pid(), pid()|null}). +-spec return_snapfun( + book_state(), store|ledger, + tuple()|no_lookup|undefined, + boolean(), boolean()) + -> fun(() -> {ok, pid(), pid()|null, fun(() -> ok)}). %% @doc %% Generates a function from which a snapshot can be created. The primary %% factor here is the SnapPreFold boolean. If this is true then the snapshot @@ -1838,11 +1875,31 @@ set_options(Opts, Monitor) -> %% false then the snapshot will be taken when the Fold function is called. %% %% SnapPrefold is to be used when the intention is to queue the fold, and so -%% claling of the fold may be delayed, but it is still desired that the fold +%% calling of the fold may be delayed, but it is still desired that the fold %% represent the point in time that the query was requested. +%% +%% Also returns a function which will close any snapshots to be used in the +%% runners post-query cleanup action +%% +%% When the bookie is a snapshot, a fresh snapshot should not be taken, the +%% previous snapshot should be used instead. Also the snapshot should not be +%% closed as part of the post-query activity as the snapshot may be reused, and +%% should be manually closed. return_snapfun(State, SnapType, Query, LongRunning, SnapPreFold) -> - case SnapPreFold of - true -> + CloseFun = + fun(LS0, JS0) -> + fun() -> + ok = leveled_penciller:pcl_close(LS0), + case JS0 of + JS0 when is_pid(JS0) -> + leveled_inker:ink_close(JS0); + _ -> + ok + end + end + end, + case {SnapPreFold, State#state.is_snapshot} of + {true, false} -> {ok, LS, JS} = snapshot_store( State#state.ledger_cache, @@ -1852,15 +1909,23 @@ return_snapfun(State, SnapType, Query, LongRunning, SnapPreFold) -> SnapType, Query, LongRunning), - fun() -> {ok, LS, JS} end; - false -> + fun() -> {ok, LS, JS, CloseFun(LS, JS)} end; + {false, false} -> Self = self(), % Timeout will be ignored, as will Requestor % % This uses the external snapshot - as the snapshot will need % to have consistent state between Bookie and Penciller when % it is made. - fun() -> book_snapshot(Self, SnapType, Query, LongRunning) end + fun() -> + {ok, LS, JS} = + book_snapshot(Self, SnapType, Query, LongRunning), + {ok, LS, JS, CloseFun(LS, JS)} + end; + {_ , true} -> + LS = State#state.penciller, + JS = State#state.inker, + fun() -> {ok, LS, JS, fun() -> ok end} end end. -spec snaptype_by_presence(boolean()) -> store|ledger. @@ -2191,14 +2256,7 @@ fetch_head(Key, Penciller, LedgerCache) -> %% The L0Index needs to be bypassed when running head_only fetch_head(Key, Penciller, LedgerCache, HeadOnly) -> SW = os:timestamp(), - CacheResult = - case LedgerCache#ledger_cache.mem of - undefined -> - []; - Tab -> - ets:lookup(Tab, Key) - end, - case CacheResult of + case ets:lookup(LedgerCache#ledger_cache.mem, Key) of [{Key, Head}] -> {Head, true}; [] -> diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 5df4f20..462a395 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -403,8 +403,8 @@ inker_reload_strategy(AltList) -> lists:ukeysort(1, DefaultList)). --spec get_tagstrategy(ledger_key()|tag()|dummy, compaction_strategy()) - -> compaction_method(). +-spec get_tagstrategy( + ledger_key()|tag()|dummy, compaction_strategy()) -> compaction_method(). %% @doc %% Work out the compaction strategy for the key get_tagstrategy({Tag, _, _, _}, Strategy) -> @@ -413,6 +413,10 @@ get_tagstrategy(Tag, Strategy) -> case lists:keyfind(Tag, 1, Strategy) of {Tag, TagStrat} -> TagStrat; + false when Tag == dummy -> + %% dummy is not a strategy, but this is expected to see this when + %% running in head_only mode - so don't warn + retain; false -> leveled_log:log(ic012, [Tag, Strategy]), retain diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 0c8cbe8..0e5958f 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -754,25 +754,34 @@ handle_cast({release_snapshot, Snapshot}, State) -> {noreply, State#state{registered_snapshots=Rs}} end; handle_cast({log_level, LogLevel}, State) -> - INC = State#state.clerk, - ok = leveled_iclerk:clerk_loglevel(INC, LogLevel), + case State#state.clerk of + undefined -> + ok; + INC -> + leveled_iclerk:clerk_loglevel(INC, LogLevel) + end, ok = leveled_log:set_loglevel(LogLevel), - CDBopts = State#state.cdb_options, - CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()}, + CDBopts0 = update_cdb_logoptions(State#state.cdb_options), {noreply, State#state{cdb_options = CDBopts0}}; handle_cast({add_logs, ForcedLogs}, State) -> - INC = State#state.clerk, - ok = leveled_iclerk:clerk_addlogs(INC, ForcedLogs), + case State#state.clerk of + undefined -> + ok; + INC -> + leveled_iclerk:clerk_addlogs(INC, ForcedLogs) + end, ok = leveled_log:add_forcedlogs(ForcedLogs), - CDBopts = State#state.cdb_options, - CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()}, + CDBopts0 = update_cdb_logoptions(State#state.cdb_options), {noreply, State#state{cdb_options = CDBopts0}}; handle_cast({remove_logs, ForcedLogs}, State) -> - INC = State#state.clerk, - ok = leveled_iclerk:clerk_removelogs(INC, ForcedLogs), + case State#state.clerk of + undefined -> + ok; + INC -> + leveled_iclerk:clerk_removelogs(INC, ForcedLogs) + end, ok = leveled_log:remove_forcedlogs(ForcedLogs), - CDBopts = State#state.cdb_options, - CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()}, + CDBopts0 = update_cdb_logoptions(State#state.cdb_options), {noreply, State#state{cdb_options = CDBopts0}}; handle_cast({maybe_defer_shutdown, ShutdownType, From}, State) -> case length(State#state.registered_snapshots) of @@ -816,8 +825,10 @@ handle_info({'DOWN', BookieMonRef, process, _BookiePid, _Info}, handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, _State) -> - ok. +terminate(Reason, _State=#state{is_snapshot=Snap}) when Snap == true -> + leveled_log:log(i0027, [Reason]); +terminate(Reason, _State) -> + leveled_log:log(i0028, [Reason]). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -1291,6 +1302,14 @@ wrap_checkfilterfun(CheckFilterFun) -> end end. + +-spec update_cdb_logoptions( + #cdb_options{}|undefined) -> #cdb_options{}|undefined. +update_cdb_logoptions(undefined) -> + undefined; +update_cdb_logoptions(CDBopts) -> + CDBopts#cdb_options{log_options = leveled_log:get_opts()}. + %%%============================================================================ %%% Test %%%============================================================================ diff --git a/src/leveled_log.erl b/src/leveled_log.erl index faec5a4..dfdc43e 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -48,6 +48,8 @@ {info, <<"Snapshot starting with Ink ~w Pcl ~w">>}, b0003 => {info, <<"Bookie closing for reason ~w">>}, + b0004 => + {warn, <<"Bookie snapshot exiting as master store ~w is down for reason ~p">>}, b0005 => {info, <<"LedgerSQN=~w at startup">>}, b0006 => @@ -83,17 +85,17 @@ p0005 => {debug, <<"Delete confirmed as file ~s is removed from Manifest">>}, p0007 => - {debug, <<"Sent release message for cloned Penciller following close for reason ~w">>}, + {debug, <<"Shutdown complete for cloned Penciller for reason ~w">>}, p0008 => {info, <<"Penciller closing for reason ~w">>}, p0010 => {info, <<"level zero discarded_count=~w on close of Penciller">>}, p0011 => - {info, <<"Shutdown complete for Penciller for reason ~w">>}, + {debug, <<"Shutdown complete for Penciller for reason ~w">>}, p0012 => {info, <<"Store to be started based on manifest sequence number of ~w">>}, p0013 => - {warn, <<"Seqence number of 0 indicates no valid manifest">>}, + {info, <<"Seqence number of 0 indicates no valid manifest">>}, p0014 => {info, <<"Maximum sequence number of ~w found in nonzero levels">>}, p0015 => @@ -246,6 +248,10 @@ {warn, <<"Journal SQN of ~w is below Ledger SQN of ~w anti-entropy will be required">>}, i0026 => {info, <<"Deferring shutdown due to snapshot_count=~w">>}, + i0027 => + {debug, <<"Shutdown complete for cloned Inker for reason ~w">>}, + i0028 => + {debug, <<"Shutdown complete for Inker for reason ~w">>}, ic001 => {info, <<"Closed for reason ~w so maybe leaving garbage">>}, ic002 => diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index a44d5ea..d4929f2 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -628,16 +628,16 @@ init([LogOpts, PCLopts]) -> %% exits BookieMonitor = erlang:monitor(process, PCLopts#penciller_options.bookies_pid), - - {ok, State} = pcl_registersnapshot(SrcPenciller, - self(), - Query, - BookiesMem, - LongRunning), + {ok, State} = + pcl_registersnapshot( + SrcPenciller, self(), Query, BookiesMem, LongRunning), leveled_log:log(p0001, [self()]), - {ok, State#state{is_snapshot = true, - bookie_monref = BookieMonitor, - source_penciller = SrcPenciller}}; + {ok, + State#state{ + is_snapshot = true, + clerk = undefined, + bookie_monref = BookieMonitor, + source_penciller = SrcPenciller}}; {_RootPath, _Snapshot=false, _Q, _BM} -> start_from_file(PCLopts) end. @@ -1131,22 +1131,21 @@ handle_cast({fetch_levelzero, Slot, ReturnFun}, State) -> ReturnFun(lists:nth(Slot, State#state.levelzero_cache)), {noreply, State}; handle_cast({log_level, LogLevel}, State) -> - PC = State#state.clerk, - ok = leveled_pclerk:clerk_loglevel(PC, LogLevel), - ok = leveled_log:set_loglevel(LogLevel), + update_clerk( + State#state.clerk, fun leveled_pclerk:clerk_loglevel/2, LogLevel), SSTopts = State#state.sst_options, SSTopts0 = SSTopts#sst_options{log_options = leveled_log:get_opts()}, {noreply, State#state{sst_options = SSTopts0}}; handle_cast({add_logs, ForcedLogs}, State) -> - PC = State#state.clerk, - ok = leveled_pclerk:clerk_addlogs(PC, ForcedLogs), + update_clerk( + State#state.clerk, fun leveled_pclerk:clerk_addlogs/2, ForcedLogs), ok = leveled_log:add_forcedlogs(ForcedLogs), SSTopts = State#state.sst_options, SSTopts0 = SSTopts#sst_options{log_options = leveled_log:get_opts()}, {noreply, State#state{sst_options = SSTopts0}}; handle_cast({remove_logs, ForcedLogs}, State) -> - PC = State#state.clerk, - ok = leveled_pclerk:clerk_removelogs(PC, ForcedLogs), + update_clerk( + State#state.clerk, fun leveled_pclerk:clerk_removelogs/2, ForcedLogs), ok = leveled_log:remove_forcedlogs(ForcedLogs), SSTopts = State#state.sst_options, SSTopts0 = SSTopts#sst_options{log_options = leveled_log:get_opts()}, @@ -1224,6 +1223,12 @@ sst_filename(ManSQN, Level, Count) -> %%% Internal functions %%%============================================================================ +-spec update_clerk(pid()|undefined, fun((pid(), term()) -> ok), term()) -> ok. +update_clerk(undefined, _F, _T) -> + ok; +update_clerk(Clerk, F, T) when is_pid(Clerk) -> + F(Clerk, T). + -spec start_from_file(penciller_options()) -> {ok, pcl_state()}. %% @doc %% Normal start of a penciller (i.e. not a snapshot), needs to read the diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index af2c6df..86f753d 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -81,14 +81,11 @@ bucket_sizestats(SnapFun, Bucket, Tag) -> AccFun = accumulate_size(), Runner = fun() -> - {ok, LedgerSnap, _JournalSnap} = SnapFun(), - Acc = leveled_penciller:pcl_fetchkeys(LedgerSnap, - StartKey, - EndKey, - AccFun, - {0, 0}, - as_pcl), - ok = leveled_penciller:pcl_close(LedgerSnap), + {ok, LedgerSnap, _JournalSnap, AfterFun} = SnapFun(), + Acc = + leveled_penciller:pcl_fetchkeys( + LedgerSnap, StartKey, EndKey, AccFun, {0, 0}, as_pcl), + AfterFun(), Acc end, {async, Runner}. @@ -112,19 +109,16 @@ bucket_list(SnapFun, Tag, FoldBucketsFun, InitAcc) -> bucket_list(SnapFun, Tag, FoldBucketsFun, InitAcc, MaxBuckets) -> Runner = fun() -> - {ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(), + {ok, LedgerSnapshot, _JournalSnapshot, AfterFun} = SnapFun(), BucketAcc = - get_nextbucket(null, null, - Tag, LedgerSnapshot, [], {0, MaxBuckets}), - AfterFun = - fun() -> - ok = leveled_penciller:pcl_close(LedgerSnapshot) - end, + get_nextbucket( + null, null, Tag, LedgerSnapshot, [], {0, MaxBuckets}), FoldRunner = fun() -> - lists:foldr(fun({B, _K}, Acc) -> FoldBucketsFun(B, Acc) end, - InitAcc, - BucketAcc) + lists:foldr( + fun({B, _K}, Acc) -> FoldBucketsFun(B, Acc) end, + InitAcc, + BucketAcc) % Buckets in reverse alphabetical order so foldr end, % For this fold, the fold over the store is actually completed @@ -160,21 +154,18 @@ index_query(SnapFun, {StartKey, EndKey, TermHandling}, FoldAccT) -> _ -> fun add_keys/2 end, - AccFun = accumulate_index(TermRegex, AddFun, FoldKeysFun), Runner = fun() -> - {ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(), - Folder = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, - StartKey, - EndKey, - AccFun, - InitAcc, - by_runner), - AfterFun = - fun() -> - ok = leveled_penciller:pcl_close(LedgerSnapshot) - end, + {ok, LedgerSnapshot, _JournalSnapshot, AfterFun} = SnapFun(), + Folder = + leveled_penciller:pcl_fetchkeys( + LedgerSnapshot, + StartKey, + EndKey, + accumulate_index(TermRegex, AddFun, FoldKeysFun), + InitAcc, + by_runner), wrap_runner(Folder, AfterFun) end, {async, Runner}. @@ -197,17 +188,10 @@ bucketkey_query(SnapFun, Tag, Bucket, AccFun = accumulate_keys(FoldKeysFun, TermRegex), Runner = fun() -> - {ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(), - Folder = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, - SK, - EK, - AccFun, - InitAcc, - by_runner), - AfterFun = - fun() -> - ok = leveled_penciller:pcl_close(LedgerSnapshot) - end, + {ok, LedgerSnapshot, _JournalSnapshot, AfterFun} = SnapFun(), + Folder = + leveled_penciller:pcl_fetchkeys( + LedgerSnapshot, SK, EK, AccFun, InitAcc, by_runner), wrap_runner(Folder, AfterFun) end, {async, Runner}. @@ -231,20 +215,12 @@ hashlist_query(SnapFun, Tag, JournalCheck) -> EndKey = leveled_codec:to_ledgerkey(null, null, Tag), Runner = fun() -> - {ok, LedgerSnapshot, JournalSnapshot} = SnapFun(), + {ok, LedgerSnapshot, JournalSnapshot, AfterFun} = SnapFun(), AccFun = accumulate_hashes(JournalCheck, JournalSnapshot), - Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, - StartKey, - EndKey, - AccFun, - []), - ok = leveled_penciller:pcl_close(LedgerSnapshot), - case JournalCheck of - false -> - ok; - true -> - leveled_inker:ink_close(JournalSnapshot) - end, + Acc = + leveled_penciller:pcl_fetchkeys( + LedgerSnapshot, StartKey, EndKey, AccFun, []), + AfterFun(), Acc end, {async, Runner}. @@ -262,7 +238,7 @@ tictactree(SnapFun, {Tag, Bucket, Query}, JournalCheck, TreeSize, Filter) -> Tree = leveled_tictac:new_tree(temp, TreeSize), Runner = fun() -> - {ok, LedgerSnap, JournalSnap} = SnapFun(), + {ok, LedgerSnap, JournalSnap, AfterFun} = SnapFun(), % The start key and end key will vary depending on whether the % fold is to fold over an index or a key range EnsureKeyBinaryFun = @@ -294,19 +270,9 @@ tictactree(SnapFun, {Tag, Bucket, Query}, JournalCheck, TreeSize, Filter) -> AccFun = accumulate_tree(Filter, JournalCheck, JournalSnap, ExtractFun), Acc = - leveled_penciller:pcl_fetchkeys(LedgerSnap, - StartKey, EndKey, - AccFun, Tree), - - % Close down snapshot when complete so as not to hold removed - % files open - ok = leveled_penciller:pcl_close(LedgerSnap), - case JournalCheck of - false -> - ok; - true -> - leveled_inker:ink_close(JournalSnap) - end, + leveled_penciller:pcl_fetchkeys( + LedgerSnap, StartKey, EndKey, AccFun, Tree), + AfterFun(), Acc end, {async, Runner}. @@ -357,17 +323,16 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) -> % initial accumulator FoldObjectsFun; false -> - % no initial accumulatr passed, and so should be just a list + % no initial accumulator passed, and so should be just a list {FoldObjectsFun, []} end, FilterFun = fun(JKey, JVal, _Pos, Acc, ExtractFun) -> - {SQN, InkTag, LedgerKey} = JKey, case {InkTag, leveled_codec:from_ledgerkey(Tag, LedgerKey)} of {?INKT_STND, {B, K}} -> - % Ignore tombstones and non-matching Tags and Key changes + % Ignore tombstones and non-matching Tags and Key changes % objects. {MinSQN, MaxSQN, BatchAcc} = Acc, case SQN of @@ -377,7 +342,8 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) -> {stop, Acc}; _ -> {VBin, _VSize} = ExtractFun(JVal), - {Obj, _IdxSpecs} = leveled_codec:split_inkvalue(VBin), + {Obj, _IdxSpecs} = + leveled_codec:split_inkvalue(VBin), ToLoop = case SQN of MaxSQN -> stop; @@ -395,20 +361,17 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) -> Folder = fun() -> - - {ok, LedgerSnapshot, JournalSnapshot} = SnapFun(), + {ok, LedgerSnapshot, JournalSnapshot, AfterFun} = SnapFun(), {ok, JournalSQN} = leveled_inker:ink_getjournalsqn(JournalSnapshot), IsValidFun = fun(Bucket, Key, SQN) -> LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), CheckSQN = - leveled_penciller:pcl_checksequencenumber(LedgerSnapshot, - LedgerKey, - SQN), + leveled_penciller:pcl_checksequencenumber( + LedgerSnapshot, LedgerKey, SQN), % Need to check that we have not folded past the point % at which the snapshot was taken (JournalSQN >= SQN) and (CheckSQN == current) - end, BatchFoldFun = @@ -427,17 +390,11 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) -> end, InkFolder = - leveled_inker:ink_fold(JournalSnapshot, - 0, - {FilterFun, - InitAccFun, - BatchFoldFun}, - InitAcc), - AfterFun = - fun() -> - ok = leveled_penciller:pcl_close(LedgerSnapshot), - ok = leveled_inker:ink_close(JournalSnapshot) - end, + leveled_inker:ink_fold( + JournalSnapshot, + 0, + {FilterFun, InitAccFun, BatchFoldFun}, + InitAcc), wrap_runner(InkFolder, AfterFun) end, {async, Folder}. @@ -451,12 +408,8 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) -> %% @doc %% Fold over all objects within a given key range in a bucket foldobjects_bybucket(SnapFun, Tag, KeyRanges, FoldFun) -> - foldobjects(SnapFun, - Tag, - KeyRanges, - FoldFun, - false, - false). + foldobjects( + SnapFun, Tag, KeyRanges, FoldFun, false, false). -spec foldheads_bybucket(snap_fun(), leveled_codec:tag(), @@ -505,7 +458,6 @@ foldobjects_byindex(SnapFun, {Tag, Bucket, Field, FromTerm, ToTerm}, FoldFun) -> - %%%============================================================================ %%% Internal functions %%%============================================================================ @@ -584,12 +536,10 @@ foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, Folder = fun() -> - {ok, LedgerSnapshot, JournalSnapshot} = SnapFun(), + {ok, LedgerSnapshot, JournalSnapshot, AfterFun} = SnapFun(), AccFun = - accumulate_objects(FoldFun, - JournalSnapshot, - Tag, - DeferredFetch), + accumulate_objects( + FoldFun, JournalSnapshot, Tag, DeferredFetch), FoldFunGen = fun({StartKey, EndKey}, FoldAcc) -> leveled_penciller:pcl_fetchkeysbysegment(LedgerSnapshot, @@ -601,16 +551,6 @@ foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, LastModRange, LimitByCount) end, - AfterFun = - fun() -> - ok = leveled_penciller:pcl_close(LedgerSnapshot), - case DeferredFetch of - {true, false} -> - ok; - _ -> - ok = leveled_inker:ink_close(JournalSnapshot) - end - end, ListFoldFun = fun(KeyRange, Acc) -> Folder = FoldFunGen(KeyRange, Acc), diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index b50a60f..e401b8a 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -404,6 +404,8 @@ fetchput_snapshot(_Config) -> testutil:check_forlist(Bookie1, ChkList1), testutil:check_forlist(SnapBookie1, ChkList1), + compare_foldwithsnap(Bookie1, SnapBookie1, ChkList1), + % Close the snapshot, check the original store still has the objects ok = leveled_bookie:book_close(SnapBookie1), @@ -480,6 +482,8 @@ fetchput_snapshot(_Config) -> testutil:check_forlist(SnapBookie3, ChkList2), testutil:check_forlist(SnapBookie2, ChkList1), io:format("Started new snapshot and check for new objects~n"), + + compare_foldwithsnap(Bookie2, SnapBookie3, ChkList3), % Load yet more objects, these are replacement objects for the last load @@ -563,6 +567,28 @@ fetchput_snapshot(_Config) -> testutil:reset_filestructure(). +compare_foldwithsnap(Bookie, SnapBookie, ChkList) -> + HeadFoldFun = fun(B, K, _Hd, Acc) -> [{B, K}|Acc] end, + KeyFoldFun = fun(B, K, Acc) -> [{B, K}|Acc] end, + {async, HeadFoldDB} = + leveled_bookie:book_headfold( + Bookie, ?RIAK_TAG, {HeadFoldFun, []}, true, false, false + ), + {async, HeadFoldSnap} = + leveled_bookie:book_headfold( + SnapBookie, ?RIAK_TAG, {HeadFoldFun, []}, true, false, false + ), + true = HeadFoldDB() == HeadFoldSnap(), + + testutil:check_forlist(SnapBookie, ChkList), + + {async, KeyFoldSnap} = + leveled_bookie:book_keylist( + SnapBookie, ?RIAK_TAG, {KeyFoldFun, []} + ), + true = HeadFoldSnap() == KeyFoldSnap(). + + load_and_count(_Config) -> % Use artificially small files, and the load keys, counting they're all % present diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index b163fa8..222a747 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -74,7 +74,21 @@ replace_everything(_Config) -> compact_and_wait(Book1, 1000), {ok, FileList2} = file:list_dir(CompPath), io:format("Number of files after compaction ~w~n", [length(FileList2)]), - true = FileList1 == FileList2, + true = FileList1 =< FileList2, + %% There will normally be 5 journal files after 50K write then alter + %% That may be two files with entirely altered objects - which will be + %% compacted, and will be compacted to nothing. + %% The "middle" file - which will be 50% compactable may be scored to + %% be part of the first run, or may end up in the second run. If in + %% the first run, the second run will not compact and FL1 == FL2. + %% Otherwise FL1 could be 0 and FL2 1. Hard to control this as there + %% is randomisation in both the scoring and the journal size (due to + %% jittering of parameters). + compact_and_wait(Book1, 1000), + {ok, FileList3} = file:list_dir(CompPath), + io:format("Number of files after compaction ~w~n", [length(FileList3)]), + %% By the third compaction there should be no further changes + true = FileList2 == FileList3, {async, BackupFun} = leveled_bookie:book_hotbackup(Book1), ok = BackupFun(BackupPath), @@ -131,9 +145,9 @@ replace_everything(_Config) -> {OSpcL6, RSpcL6} = lists:split(200, lists:ukeysort(1, KSpcL6)), {KSpcL7, V7} = testutil:put_altered_indexed_objects(Book6, BKT3, RSpcL6), - {ok, FileList3} = file:list_dir(CompPath), - compact_and_wait(Book6), {ok, FileList4} = file:list_dir(CompPath), + compact_and_wait(Book6), + {ok, FileList5} = file:list_dir(CompPath), {OSpcL6A, V7} = testutil:put_altered_indexed_objects(Book6, BKT3, OSpcL6, true, V7), {async, BackupFun6} = leveled_bookie:book_hotbackup(Book6), @@ -141,7 +155,7 @@ replace_everything(_Config) -> ok = leveled_bookie:book_close(Book6), io:format("Checking object count in newly compacted journal files~n"), - NewlyCompactedFiles = lists:subtract(FileList4, FileList3), + NewlyCompactedFiles = lists:subtract(FileList5, FileList4), true = length(NewlyCompactedFiles) >= 1, CDBFilterFun = fun(_K, _V, _P, Acc, _EF) -> {loop, Acc + 1} end, CheckLengthFun = diff --git a/test/end_to_end/tictac_SUITE.erl b/test/end_to_end/tictac_SUITE.erl index ba422b0..16ea215 100644 --- a/test/end_to_end/tictac_SUITE.erl +++ b/test/end_to_end/tictac_SUITE.erl @@ -738,11 +738,45 @@ basic_headonly_test(ObjectCount, RemoveCount, HeadOnly) -> Bucket0, Key0), CheckHeadFun = - fun({add, SegID, B, K, H}) -> - {ok, H} = - leveled_bookie:book_headonly(Bookie1, SegID, B, K) + fun(DB) -> + fun({add, SegID, B, K, H}) -> + {ok, H} = + leveled_bookie:book_headonly(DB, SegID, B, K) + end end, - lists:foreach(CheckHeadFun, ObjectSpecL); + lists:foreach(CheckHeadFun(Bookie1), ObjectSpecL), + {ok, Snapshot} = + leveled_bookie:book_start([{snapshot_bookie, Bookie1}]), + ok = leveled_bookie:book_loglevel(Snapshot, warn), + ok = + leveled_bookie:book_addlogs( + Snapshot, [b0001, b0002, b0003, i0027, p0007] + ), + ok = + leveled_bookie:book_removelogs( + Snapshot, [b0019] + ), + io:format( + "Checking for ~w objects against Snapshot ~w~n", + [length(ObjectSpecL), Snapshot]), + lists:foreach(CheckHeadFun(Snapshot), ObjectSpecL), + io:format("Closing snapshot ~w~n", [Snapshot]), + ok = leveled_bookie:book_close(Snapshot), + {ok, AltSnapshot} = + leveled_bookie:book_start([{snapshot_bookie, Bookie1}]), + ok = + leveled_bookie:book_addlogs( + AltSnapshot, [b0001, b0002, b0003, b0004, i0027, p0007] + ), + true = is_process_alive(AltSnapshot), + io:format( + "Closing actual store ~w with snapshot ~w open~n", + [Bookie1, AltSnapshot] + ), + ok = leveled_bookie:book_close(Bookie1), + % Sleep a beat so as not to race with the 'DOWN' message + timer:sleep(10), + false = is_process_alive(AltSnapshot); no_lookup -> {unsupported_message, head} = leveled_bookie:book_head(Bookie1, @@ -753,11 +787,11 @@ basic_headonly_test(ObjectCount, RemoveCount, HeadOnly) -> leveled_bookie:book_headonly(Bookie1, SegmentID0, Bucket0, - Key0) + Key0), + io:format("Closing actual store ~w~n", [Bookie1]), + ok = leveled_bookie:book_close(Bookie1) end, - - - ok = leveled_bookie:book_close(Bookie1), + {ok, FinalJournals} = file:list_dir(JFP), io:format("Trim has reduced journal count from " ++ "~w to ~w and ~w after restart~n",