Mas d31 i413 (#415)
* Allow snapshots to be reused in queries Allow for a full bookie snapshot to be re-used for multiple queries, not just KV fetches. * Reduce log noise The internal dummy tag is expected so should not prompt a log on reload * Snapshot should have same status of active db wrt head_only and head_lookup * Allow logging to specified on snapshots * Shutdown snapshot bookie is primary goes down Inker and Penciller already will shut down based on `erlang:monitor/2` * Review feedback Formatting and code readability fixes
This commit is contained in:
parent
9e804924a8
commit
d544db5461
9 changed files with 289 additions and 183 deletions
|
@ -74,7 +74,9 @@
|
||||||
book_logsettings/1,
|
book_logsettings/1,
|
||||||
book_loglevel/2,
|
book_loglevel/2,
|
||||||
book_addlogs/2,
|
book_addlogs/2,
|
||||||
book_removelogs/2]).
|
book_removelogs/2,
|
||||||
|
book_headstatus/1
|
||||||
|
]).
|
||||||
|
|
||||||
%% folding API
|
%% folding API
|
||||||
-export([
|
-export([
|
||||||
|
@ -155,6 +157,7 @@
|
||||||
head_only = false :: boolean(),
|
head_only = false :: boolean(),
|
||||||
head_lookup = true :: boolean(),
|
head_lookup = true :: boolean(),
|
||||||
ink_checking = ?MAX_KEYCHECK_FREQUENCY :: integer(),
|
ink_checking = ?MAX_KEYCHECK_FREQUENCY :: integer(),
|
||||||
|
bookie_monref :: reference() | undefined,
|
||||||
monitor = {no_monitor, 0} :: leveled_monitor:monitor()}).
|
monitor = {no_monitor, 0} :: leveled_monitor:monitor()}).
|
||||||
|
|
||||||
|
|
||||||
|
@ -1134,6 +1137,12 @@ book_addlogs(Pid, ForcedLogs) ->
|
||||||
book_removelogs(Pid, ForcedLogs) ->
|
book_removelogs(Pid, ForcedLogs) ->
|
||||||
gen_server:cast(Pid, {remove_logs, ForcedLogs}).
|
gen_server:cast(Pid, {remove_logs, ForcedLogs}).
|
||||||
|
|
||||||
|
-spec book_headstatus(pid()) -> {boolean(), boolean()}.
|
||||||
|
%% @doc
|
||||||
|
%% Return booleans to state the bookie is in head_only mode, and supporting
|
||||||
|
%% lookups
|
||||||
|
book_headstatus(Pid) ->
|
||||||
|
gen_server:call(Pid, head_status, infinity).
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% gen_server callbacks
|
%%% gen_server callbacks
|
||||||
|
@ -1235,10 +1244,17 @@ init([Opts]) ->
|
||||||
{Bookie, undefined} ->
|
{Bookie, undefined} ->
|
||||||
{ok, Penciller, Inker} =
|
{ok, Penciller, Inker} =
|
||||||
book_snapshot(Bookie, store, undefined, true),
|
book_snapshot(Bookie, store, undefined, true),
|
||||||
|
BookieMonitor = erlang:monitor(process, Bookie),
|
||||||
|
NewETS = ets:new(mem, [ordered_set]),
|
||||||
|
{HeadOnly, Lookup} = leveled_bookie:book_headstatus(Bookie),
|
||||||
leveled_log:log(b0002, [Inker, Penciller]),
|
leveled_log:log(b0002, [Inker, Penciller]),
|
||||||
{ok,
|
{ok,
|
||||||
#state{penciller = Penciller,
|
#state{penciller = Penciller,
|
||||||
inker = Inker,
|
inker = Inker,
|
||||||
|
ledger_cache = #ledger_cache{mem = NewETS},
|
||||||
|
head_only = HeadOnly,
|
||||||
|
head_lookup = Lookup,
|
||||||
|
bookie_monref = BookieMonitor,
|
||||||
is_snapshot = true}}
|
is_snapshot = true}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -1490,6 +1506,8 @@ handle_call(destroy, _From, State=#state{is_snapshot=Snp}) when Snp == false ->
|
||||||
{stop, normal, ok, State};
|
{stop, normal, ok, State};
|
||||||
handle_call(return_actors, _From, State) ->
|
handle_call(return_actors, _From, State) ->
|
||||||
{reply, {ok, State#state.inker, State#state.penciller}, State};
|
{reply, {ok, State#state.inker, State#state.penciller}, State};
|
||||||
|
handle_call(head_status, _From, State) ->
|
||||||
|
{reply, {State#state.head_only, State#state.head_lookup}, State};
|
||||||
handle_call(Msg, _From, State) ->
|
handle_call(Msg, _From, State) ->
|
||||||
{reply, {unsupported_message, element(1, Msg)}, State}.
|
{reply, {unsupported_message, element(1, Msg)}, State}.
|
||||||
|
|
||||||
|
@ -1497,35 +1515,53 @@ handle_call(Msg, _From, State) ->
|
||||||
handle_cast({log_level, LogLevel}, State) ->
|
handle_cast({log_level, LogLevel}, State) ->
|
||||||
PCL = State#state.penciller,
|
PCL = State#state.penciller,
|
||||||
INK = State#state.inker,
|
INK = State#state.inker,
|
||||||
Monitor = element(1, State#state.monitor),
|
|
||||||
ok = leveled_penciller:pcl_loglevel(PCL, LogLevel),
|
ok = leveled_penciller:pcl_loglevel(PCL, LogLevel),
|
||||||
ok = leveled_inker:ink_loglevel(INK, LogLevel),
|
ok = leveled_inker:ink_loglevel(INK, LogLevel),
|
||||||
ok = leveled_monitor:log_level(Monitor, LogLevel),
|
case element(1, State#state.monitor) of
|
||||||
|
no_monitor ->
|
||||||
|
ok;
|
||||||
|
Monitor ->
|
||||||
|
leveled_monitor:log_level(Monitor, LogLevel)
|
||||||
|
end,
|
||||||
ok = leveled_log:set_loglevel(LogLevel),
|
ok = leveled_log:set_loglevel(LogLevel),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
handle_cast({add_logs, ForcedLogs}, State) ->
|
handle_cast({add_logs, ForcedLogs}, State) ->
|
||||||
PCL = State#state.penciller,
|
PCL = State#state.penciller,
|
||||||
INK = State#state.inker,
|
INK = State#state.inker,
|
||||||
Monitor = element(1, State#state.monitor),
|
|
||||||
ok = leveled_penciller:pcl_addlogs(PCL, ForcedLogs),
|
ok = leveled_penciller:pcl_addlogs(PCL, ForcedLogs),
|
||||||
ok = leveled_inker:ink_addlogs(INK, ForcedLogs),
|
ok = leveled_inker:ink_addlogs(INK, ForcedLogs),
|
||||||
ok = leveled_monitor:log_add(Monitor, ForcedLogs),
|
case element(1, State#state.monitor) of
|
||||||
|
no_monitor ->
|
||||||
|
ok;
|
||||||
|
Monitor ->
|
||||||
|
leveled_monitor:log_add(Monitor, ForcedLogs)
|
||||||
|
end,
|
||||||
ok = leveled_log:add_forcedlogs(ForcedLogs),
|
ok = leveled_log:add_forcedlogs(ForcedLogs),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
handle_cast({remove_logs, ForcedLogs}, State) ->
|
handle_cast({remove_logs, ForcedLogs}, State) ->
|
||||||
PCL = State#state.penciller,
|
PCL = State#state.penciller,
|
||||||
INK = State#state.inker,
|
INK = State#state.inker,
|
||||||
Monitor = element(1, State#state.monitor),
|
|
||||||
ok = leveled_penciller:pcl_removelogs(PCL, ForcedLogs),
|
ok = leveled_penciller:pcl_removelogs(PCL, ForcedLogs),
|
||||||
ok = leveled_inker:ink_removelogs(INK, ForcedLogs),
|
ok = leveled_inker:ink_removelogs(INK, ForcedLogs),
|
||||||
ok = leveled_monitor:log_remove(Monitor, ForcedLogs),
|
case element(1, State#state.monitor) of
|
||||||
|
no_monitor ->
|
||||||
|
ok;
|
||||||
|
Monitor ->
|
||||||
|
leveled_monitor:log_remove(Monitor, ForcedLogs)
|
||||||
|
end,
|
||||||
ok = leveled_log:remove_forcedlogs(ForcedLogs),
|
ok = leveled_log:remove_forcedlogs(ForcedLogs),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
|
|
||||||
|
%% handle the bookie stopping and stop this snapshot
|
||||||
|
handle_info({'DOWN', BookieMonRef, process, BookiePid, Info},
|
||||||
|
State=#state{bookie_monref = BookieMonRef, is_snapshot = true}) ->
|
||||||
|
leveled_log:log(b0004, [BookiePid, Info]),
|
||||||
|
{stop, normal, State};
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
|
|
||||||
terminate(Reason, _State) ->
|
terminate(Reason, _State) ->
|
||||||
leveled_log:log(b0003, [Reason]).
|
leveled_log:log(b0003, [Reason]).
|
||||||
|
|
||||||
|
@ -1827,10 +1863,11 @@ set_options(Opts, Monitor) ->
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
|
||||||
-spec return_snapfun(book_state(), store|ledger,
|
-spec return_snapfun(
|
||||||
tuple()|no_lookup|undefined,
|
book_state(), store|ledger,
|
||||||
boolean(), boolean())
|
tuple()|no_lookup|undefined,
|
||||||
-> fun(() -> {ok, pid(), pid()|null}).
|
boolean(), boolean())
|
||||||
|
-> fun(() -> {ok, pid(), pid()|null, fun(() -> ok)}).
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Generates a function from which a snapshot can be created. The primary
|
%% Generates a function from which a snapshot can be created. The primary
|
||||||
%% factor here is the SnapPreFold boolean. If this is true then the snapshot
|
%% factor here is the SnapPreFold boolean. If this is true then the snapshot
|
||||||
|
@ -1838,11 +1875,31 @@ set_options(Opts, Monitor) ->
|
||||||
%% false then the snapshot will be taken when the Fold function is called.
|
%% false then the snapshot will be taken when the Fold function is called.
|
||||||
%%
|
%%
|
||||||
%% SnapPrefold is to be used when the intention is to queue the fold, and so
|
%% SnapPrefold is to be used when the intention is to queue the fold, and so
|
||||||
%% claling of the fold may be delayed, but it is still desired that the fold
|
%% calling of the fold may be delayed, but it is still desired that the fold
|
||||||
%% represent the point in time that the query was requested.
|
%% represent the point in time that the query was requested.
|
||||||
|
%%
|
||||||
|
%% Also returns a function which will close any snapshots to be used in the
|
||||||
|
%% runners post-query cleanup action
|
||||||
|
%%
|
||||||
|
%% When the bookie is a snapshot, a fresh snapshot should not be taken, the
|
||||||
|
%% previous snapshot should be used instead. Also the snapshot should not be
|
||||||
|
%% closed as part of the post-query activity as the snapshot may be reused, and
|
||||||
|
%% should be manually closed.
|
||||||
return_snapfun(State, SnapType, Query, LongRunning, SnapPreFold) ->
|
return_snapfun(State, SnapType, Query, LongRunning, SnapPreFold) ->
|
||||||
case SnapPreFold of
|
CloseFun =
|
||||||
true ->
|
fun(LS0, JS0) ->
|
||||||
|
fun() ->
|
||||||
|
ok = leveled_penciller:pcl_close(LS0),
|
||||||
|
case JS0 of
|
||||||
|
JS0 when is_pid(JS0) ->
|
||||||
|
leveled_inker:ink_close(JS0);
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
case {SnapPreFold, State#state.is_snapshot} of
|
||||||
|
{true, false} ->
|
||||||
{ok, LS, JS} =
|
{ok, LS, JS} =
|
||||||
snapshot_store(
|
snapshot_store(
|
||||||
State#state.ledger_cache,
|
State#state.ledger_cache,
|
||||||
|
@ -1852,15 +1909,23 @@ return_snapfun(State, SnapType, Query, LongRunning, SnapPreFold) ->
|
||||||
SnapType,
|
SnapType,
|
||||||
Query,
|
Query,
|
||||||
LongRunning),
|
LongRunning),
|
||||||
fun() -> {ok, LS, JS} end;
|
fun() -> {ok, LS, JS, CloseFun(LS, JS)} end;
|
||||||
false ->
|
{false, false} ->
|
||||||
Self = self(),
|
Self = self(),
|
||||||
% Timeout will be ignored, as will Requestor
|
% Timeout will be ignored, as will Requestor
|
||||||
%
|
%
|
||||||
% This uses the external snapshot - as the snapshot will need
|
% This uses the external snapshot - as the snapshot will need
|
||||||
% to have consistent state between Bookie and Penciller when
|
% to have consistent state between Bookie and Penciller when
|
||||||
% it is made.
|
% it is made.
|
||||||
fun() -> book_snapshot(Self, SnapType, Query, LongRunning) end
|
fun() ->
|
||||||
|
{ok, LS, JS} =
|
||||||
|
book_snapshot(Self, SnapType, Query, LongRunning),
|
||||||
|
{ok, LS, JS, CloseFun(LS, JS)}
|
||||||
|
end;
|
||||||
|
{_ , true} ->
|
||||||
|
LS = State#state.penciller,
|
||||||
|
JS = State#state.inker,
|
||||||
|
fun() -> {ok, LS, JS, fun() -> ok end} end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec snaptype_by_presence(boolean()) -> store|ledger.
|
-spec snaptype_by_presence(boolean()) -> store|ledger.
|
||||||
|
@ -2191,14 +2256,7 @@ fetch_head(Key, Penciller, LedgerCache) ->
|
||||||
%% The L0Index needs to be bypassed when running head_only
|
%% The L0Index needs to be bypassed when running head_only
|
||||||
fetch_head(Key, Penciller, LedgerCache, HeadOnly) ->
|
fetch_head(Key, Penciller, LedgerCache, HeadOnly) ->
|
||||||
SW = os:timestamp(),
|
SW = os:timestamp(),
|
||||||
CacheResult =
|
case ets:lookup(LedgerCache#ledger_cache.mem, Key) of
|
||||||
case LedgerCache#ledger_cache.mem of
|
|
||||||
undefined ->
|
|
||||||
[];
|
|
||||||
Tab ->
|
|
||||||
ets:lookup(Tab, Key)
|
|
||||||
end,
|
|
||||||
case CacheResult of
|
|
||||||
[{Key, Head}] ->
|
[{Key, Head}] ->
|
||||||
{Head, true};
|
{Head, true};
|
||||||
[] ->
|
[] ->
|
||||||
|
|
|
@ -403,8 +403,8 @@ inker_reload_strategy(AltList) ->
|
||||||
lists:ukeysort(1, DefaultList)).
|
lists:ukeysort(1, DefaultList)).
|
||||||
|
|
||||||
|
|
||||||
-spec get_tagstrategy(ledger_key()|tag()|dummy, compaction_strategy())
|
-spec get_tagstrategy(
|
||||||
-> compaction_method().
|
ledger_key()|tag()|dummy, compaction_strategy()) -> compaction_method().
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Work out the compaction strategy for the key
|
%% Work out the compaction strategy for the key
|
||||||
get_tagstrategy({Tag, _, _, _}, Strategy) ->
|
get_tagstrategy({Tag, _, _, _}, Strategy) ->
|
||||||
|
@ -413,6 +413,10 @@ get_tagstrategy(Tag, Strategy) ->
|
||||||
case lists:keyfind(Tag, 1, Strategy) of
|
case lists:keyfind(Tag, 1, Strategy) of
|
||||||
{Tag, TagStrat} ->
|
{Tag, TagStrat} ->
|
||||||
TagStrat;
|
TagStrat;
|
||||||
|
false when Tag == dummy ->
|
||||||
|
%% dummy is not a strategy, but this is expected to see this when
|
||||||
|
%% running in head_only mode - so don't warn
|
||||||
|
retain;
|
||||||
false ->
|
false ->
|
||||||
leveled_log:log(ic012, [Tag, Strategy]),
|
leveled_log:log(ic012, [Tag, Strategy]),
|
||||||
retain
|
retain
|
||||||
|
|
|
@ -754,25 +754,34 @@ handle_cast({release_snapshot, Snapshot}, State) ->
|
||||||
{noreply, State#state{registered_snapshots=Rs}}
|
{noreply, State#state{registered_snapshots=Rs}}
|
||||||
end;
|
end;
|
||||||
handle_cast({log_level, LogLevel}, State) ->
|
handle_cast({log_level, LogLevel}, State) ->
|
||||||
INC = State#state.clerk,
|
case State#state.clerk of
|
||||||
ok = leveled_iclerk:clerk_loglevel(INC, LogLevel),
|
undefined ->
|
||||||
|
ok;
|
||||||
|
INC ->
|
||||||
|
leveled_iclerk:clerk_loglevel(INC, LogLevel)
|
||||||
|
end,
|
||||||
ok = leveled_log:set_loglevel(LogLevel),
|
ok = leveled_log:set_loglevel(LogLevel),
|
||||||
CDBopts = State#state.cdb_options,
|
CDBopts0 = update_cdb_logoptions(State#state.cdb_options),
|
||||||
CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()},
|
|
||||||
{noreply, State#state{cdb_options = CDBopts0}};
|
{noreply, State#state{cdb_options = CDBopts0}};
|
||||||
handle_cast({add_logs, ForcedLogs}, State) ->
|
handle_cast({add_logs, ForcedLogs}, State) ->
|
||||||
INC = State#state.clerk,
|
case State#state.clerk of
|
||||||
ok = leveled_iclerk:clerk_addlogs(INC, ForcedLogs),
|
undefined ->
|
||||||
|
ok;
|
||||||
|
INC ->
|
||||||
|
leveled_iclerk:clerk_addlogs(INC, ForcedLogs)
|
||||||
|
end,
|
||||||
ok = leveled_log:add_forcedlogs(ForcedLogs),
|
ok = leveled_log:add_forcedlogs(ForcedLogs),
|
||||||
CDBopts = State#state.cdb_options,
|
CDBopts0 = update_cdb_logoptions(State#state.cdb_options),
|
||||||
CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()},
|
|
||||||
{noreply, State#state{cdb_options = CDBopts0}};
|
{noreply, State#state{cdb_options = CDBopts0}};
|
||||||
handle_cast({remove_logs, ForcedLogs}, State) ->
|
handle_cast({remove_logs, ForcedLogs}, State) ->
|
||||||
INC = State#state.clerk,
|
case State#state.clerk of
|
||||||
ok = leveled_iclerk:clerk_removelogs(INC, ForcedLogs),
|
undefined ->
|
||||||
|
ok;
|
||||||
|
INC ->
|
||||||
|
leveled_iclerk:clerk_removelogs(INC, ForcedLogs)
|
||||||
|
end,
|
||||||
ok = leveled_log:remove_forcedlogs(ForcedLogs),
|
ok = leveled_log:remove_forcedlogs(ForcedLogs),
|
||||||
CDBopts = State#state.cdb_options,
|
CDBopts0 = update_cdb_logoptions(State#state.cdb_options),
|
||||||
CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()},
|
|
||||||
{noreply, State#state{cdb_options = CDBopts0}};
|
{noreply, State#state{cdb_options = CDBopts0}};
|
||||||
handle_cast({maybe_defer_shutdown, ShutdownType, From}, State) ->
|
handle_cast({maybe_defer_shutdown, ShutdownType, From}, State) ->
|
||||||
case length(State#state.registered_snapshots) of
|
case length(State#state.registered_snapshots) of
|
||||||
|
@ -816,8 +825,10 @@ handle_info({'DOWN', BookieMonRef, process, _BookiePid, _Info},
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(_Reason, _State) ->
|
terminate(Reason, _State=#state{is_snapshot=Snap}) when Snap == true ->
|
||||||
ok.
|
leveled_log:log(i0027, [Reason]);
|
||||||
|
terminate(Reason, _State) ->
|
||||||
|
leveled_log:log(i0028, [Reason]).
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
@ -1291,6 +1302,14 @@ wrap_checkfilterfun(CheckFilterFun) ->
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
-spec update_cdb_logoptions(
|
||||||
|
#cdb_options{}|undefined) -> #cdb_options{}|undefined.
|
||||||
|
update_cdb_logoptions(undefined) ->
|
||||||
|
undefined;
|
||||||
|
update_cdb_logoptions(CDBopts) ->
|
||||||
|
CDBopts#cdb_options{log_options = leveled_log:get_opts()}.
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% Test
|
%%% Test
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
|
@ -48,6 +48,8 @@
|
||||||
{info, <<"Snapshot starting with Ink ~w Pcl ~w">>},
|
{info, <<"Snapshot starting with Ink ~w Pcl ~w">>},
|
||||||
b0003 =>
|
b0003 =>
|
||||||
{info, <<"Bookie closing for reason ~w">>},
|
{info, <<"Bookie closing for reason ~w">>},
|
||||||
|
b0004 =>
|
||||||
|
{warn, <<"Bookie snapshot exiting as master store ~w is down for reason ~p">>},
|
||||||
b0005 =>
|
b0005 =>
|
||||||
{info, <<"LedgerSQN=~w at startup">>},
|
{info, <<"LedgerSQN=~w at startup">>},
|
||||||
b0006 =>
|
b0006 =>
|
||||||
|
@ -83,17 +85,17 @@
|
||||||
p0005 =>
|
p0005 =>
|
||||||
{debug, <<"Delete confirmed as file ~s is removed from Manifest">>},
|
{debug, <<"Delete confirmed as file ~s is removed from Manifest">>},
|
||||||
p0007 =>
|
p0007 =>
|
||||||
{debug, <<"Sent release message for cloned Penciller following close for reason ~w">>},
|
{debug, <<"Shutdown complete for cloned Penciller for reason ~w">>},
|
||||||
p0008 =>
|
p0008 =>
|
||||||
{info, <<"Penciller closing for reason ~w">>},
|
{info, <<"Penciller closing for reason ~w">>},
|
||||||
p0010 =>
|
p0010 =>
|
||||||
{info, <<"level zero discarded_count=~w on close of Penciller">>},
|
{info, <<"level zero discarded_count=~w on close of Penciller">>},
|
||||||
p0011 =>
|
p0011 =>
|
||||||
{info, <<"Shutdown complete for Penciller for reason ~w">>},
|
{debug, <<"Shutdown complete for Penciller for reason ~w">>},
|
||||||
p0012 =>
|
p0012 =>
|
||||||
{info, <<"Store to be started based on manifest sequence number of ~w">>},
|
{info, <<"Store to be started based on manifest sequence number of ~w">>},
|
||||||
p0013 =>
|
p0013 =>
|
||||||
{warn, <<"Seqence number of 0 indicates no valid manifest">>},
|
{info, <<"Seqence number of 0 indicates no valid manifest">>},
|
||||||
p0014 =>
|
p0014 =>
|
||||||
{info, <<"Maximum sequence number of ~w found in nonzero levels">>},
|
{info, <<"Maximum sequence number of ~w found in nonzero levels">>},
|
||||||
p0015 =>
|
p0015 =>
|
||||||
|
@ -246,6 +248,10 @@
|
||||||
{warn, <<"Journal SQN of ~w is below Ledger SQN of ~w anti-entropy will be required">>},
|
{warn, <<"Journal SQN of ~w is below Ledger SQN of ~w anti-entropy will be required">>},
|
||||||
i0026 =>
|
i0026 =>
|
||||||
{info, <<"Deferring shutdown due to snapshot_count=~w">>},
|
{info, <<"Deferring shutdown due to snapshot_count=~w">>},
|
||||||
|
i0027 =>
|
||||||
|
{debug, <<"Shutdown complete for cloned Inker for reason ~w">>},
|
||||||
|
i0028 =>
|
||||||
|
{debug, <<"Shutdown complete for Inker for reason ~w">>},
|
||||||
ic001 =>
|
ic001 =>
|
||||||
{info, <<"Closed for reason ~w so maybe leaving garbage">>},
|
{info, <<"Closed for reason ~w so maybe leaving garbage">>},
|
||||||
ic002 =>
|
ic002 =>
|
||||||
|
|
|
@ -628,16 +628,16 @@ init([LogOpts, PCLopts]) ->
|
||||||
%% exits
|
%% exits
|
||||||
BookieMonitor =
|
BookieMonitor =
|
||||||
erlang:monitor(process, PCLopts#penciller_options.bookies_pid),
|
erlang:monitor(process, PCLopts#penciller_options.bookies_pid),
|
||||||
|
{ok, State} =
|
||||||
{ok, State} = pcl_registersnapshot(SrcPenciller,
|
pcl_registersnapshot(
|
||||||
self(),
|
SrcPenciller, self(), Query, BookiesMem, LongRunning),
|
||||||
Query,
|
|
||||||
BookiesMem,
|
|
||||||
LongRunning),
|
|
||||||
leveled_log:log(p0001, [self()]),
|
leveled_log:log(p0001, [self()]),
|
||||||
{ok, State#state{is_snapshot = true,
|
{ok,
|
||||||
bookie_monref = BookieMonitor,
|
State#state{
|
||||||
source_penciller = SrcPenciller}};
|
is_snapshot = true,
|
||||||
|
clerk = undefined,
|
||||||
|
bookie_monref = BookieMonitor,
|
||||||
|
source_penciller = SrcPenciller}};
|
||||||
{_RootPath, _Snapshot=false, _Q, _BM} ->
|
{_RootPath, _Snapshot=false, _Q, _BM} ->
|
||||||
start_from_file(PCLopts)
|
start_from_file(PCLopts)
|
||||||
end.
|
end.
|
||||||
|
@ -1131,22 +1131,21 @@ handle_cast({fetch_levelzero, Slot, ReturnFun}, State) ->
|
||||||
ReturnFun(lists:nth(Slot, State#state.levelzero_cache)),
|
ReturnFun(lists:nth(Slot, State#state.levelzero_cache)),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
handle_cast({log_level, LogLevel}, State) ->
|
handle_cast({log_level, LogLevel}, State) ->
|
||||||
PC = State#state.clerk,
|
update_clerk(
|
||||||
ok = leveled_pclerk:clerk_loglevel(PC, LogLevel),
|
State#state.clerk, fun leveled_pclerk:clerk_loglevel/2, LogLevel),
|
||||||
ok = leveled_log:set_loglevel(LogLevel),
|
|
||||||
SSTopts = State#state.sst_options,
|
SSTopts = State#state.sst_options,
|
||||||
SSTopts0 = SSTopts#sst_options{log_options = leveled_log:get_opts()},
|
SSTopts0 = SSTopts#sst_options{log_options = leveled_log:get_opts()},
|
||||||
{noreply, State#state{sst_options = SSTopts0}};
|
{noreply, State#state{sst_options = SSTopts0}};
|
||||||
handle_cast({add_logs, ForcedLogs}, State) ->
|
handle_cast({add_logs, ForcedLogs}, State) ->
|
||||||
PC = State#state.clerk,
|
update_clerk(
|
||||||
ok = leveled_pclerk:clerk_addlogs(PC, ForcedLogs),
|
State#state.clerk, fun leveled_pclerk:clerk_addlogs/2, ForcedLogs),
|
||||||
ok = leveled_log:add_forcedlogs(ForcedLogs),
|
ok = leveled_log:add_forcedlogs(ForcedLogs),
|
||||||
SSTopts = State#state.sst_options,
|
SSTopts = State#state.sst_options,
|
||||||
SSTopts0 = SSTopts#sst_options{log_options = leveled_log:get_opts()},
|
SSTopts0 = SSTopts#sst_options{log_options = leveled_log:get_opts()},
|
||||||
{noreply, State#state{sst_options = SSTopts0}};
|
{noreply, State#state{sst_options = SSTopts0}};
|
||||||
handle_cast({remove_logs, ForcedLogs}, State) ->
|
handle_cast({remove_logs, ForcedLogs}, State) ->
|
||||||
PC = State#state.clerk,
|
update_clerk(
|
||||||
ok = leveled_pclerk:clerk_removelogs(PC, ForcedLogs),
|
State#state.clerk, fun leveled_pclerk:clerk_removelogs/2, ForcedLogs),
|
||||||
ok = leveled_log:remove_forcedlogs(ForcedLogs),
|
ok = leveled_log:remove_forcedlogs(ForcedLogs),
|
||||||
SSTopts = State#state.sst_options,
|
SSTopts = State#state.sst_options,
|
||||||
SSTopts0 = SSTopts#sst_options{log_options = leveled_log:get_opts()},
|
SSTopts0 = SSTopts#sst_options{log_options = leveled_log:get_opts()},
|
||||||
|
@ -1224,6 +1223,12 @@ sst_filename(ManSQN, Level, Count) ->
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
|
-spec update_clerk(pid()|undefined, fun((pid(), term()) -> ok), term()) -> ok.
|
||||||
|
update_clerk(undefined, _F, _T) ->
|
||||||
|
ok;
|
||||||
|
update_clerk(Clerk, F, T) when is_pid(Clerk) ->
|
||||||
|
F(Clerk, T).
|
||||||
|
|
||||||
-spec start_from_file(penciller_options()) -> {ok, pcl_state()}.
|
-spec start_from_file(penciller_options()) -> {ok, pcl_state()}.
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Normal start of a penciller (i.e. not a snapshot), needs to read the
|
%% Normal start of a penciller (i.e. not a snapshot), needs to read the
|
||||||
|
|
|
@ -81,14 +81,11 @@ bucket_sizestats(SnapFun, Bucket, Tag) ->
|
||||||
AccFun = accumulate_size(),
|
AccFun = accumulate_size(),
|
||||||
Runner =
|
Runner =
|
||||||
fun() ->
|
fun() ->
|
||||||
{ok, LedgerSnap, _JournalSnap} = SnapFun(),
|
{ok, LedgerSnap, _JournalSnap, AfterFun} = SnapFun(),
|
||||||
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnap,
|
Acc =
|
||||||
StartKey,
|
leveled_penciller:pcl_fetchkeys(
|
||||||
EndKey,
|
LedgerSnap, StartKey, EndKey, AccFun, {0, 0}, as_pcl),
|
||||||
AccFun,
|
AfterFun(),
|
||||||
{0, 0},
|
|
||||||
as_pcl),
|
|
||||||
ok = leveled_penciller:pcl_close(LedgerSnap),
|
|
||||||
Acc
|
Acc
|
||||||
end,
|
end,
|
||||||
{async, Runner}.
|
{async, Runner}.
|
||||||
|
@ -112,19 +109,16 @@ bucket_list(SnapFun, Tag, FoldBucketsFun, InitAcc) ->
|
||||||
bucket_list(SnapFun, Tag, FoldBucketsFun, InitAcc, MaxBuckets) ->
|
bucket_list(SnapFun, Tag, FoldBucketsFun, InitAcc, MaxBuckets) ->
|
||||||
Runner =
|
Runner =
|
||||||
fun() ->
|
fun() ->
|
||||||
{ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(),
|
{ok, LedgerSnapshot, _JournalSnapshot, AfterFun} = SnapFun(),
|
||||||
BucketAcc =
|
BucketAcc =
|
||||||
get_nextbucket(null, null,
|
get_nextbucket(
|
||||||
Tag, LedgerSnapshot, [], {0, MaxBuckets}),
|
null, null, Tag, LedgerSnapshot, [], {0, MaxBuckets}),
|
||||||
AfterFun =
|
|
||||||
fun() ->
|
|
||||||
ok = leveled_penciller:pcl_close(LedgerSnapshot)
|
|
||||||
end,
|
|
||||||
FoldRunner =
|
FoldRunner =
|
||||||
fun() ->
|
fun() ->
|
||||||
lists:foldr(fun({B, _K}, Acc) -> FoldBucketsFun(B, Acc) end,
|
lists:foldr(
|
||||||
InitAcc,
|
fun({B, _K}, Acc) -> FoldBucketsFun(B, Acc) end,
|
||||||
BucketAcc)
|
InitAcc,
|
||||||
|
BucketAcc)
|
||||||
% Buckets in reverse alphabetical order so foldr
|
% Buckets in reverse alphabetical order so foldr
|
||||||
end,
|
end,
|
||||||
% For this fold, the fold over the store is actually completed
|
% For this fold, the fold over the store is actually completed
|
||||||
|
@ -160,21 +154,18 @@ index_query(SnapFun, {StartKey, EndKey, TermHandling}, FoldAccT) ->
|
||||||
_ ->
|
_ ->
|
||||||
fun add_keys/2
|
fun add_keys/2
|
||||||
end,
|
end,
|
||||||
AccFun = accumulate_index(TermRegex, AddFun, FoldKeysFun),
|
|
||||||
|
|
||||||
Runner =
|
Runner =
|
||||||
fun() ->
|
fun() ->
|
||||||
{ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(),
|
{ok, LedgerSnapshot, _JournalSnapshot, AfterFun} = SnapFun(),
|
||||||
Folder = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
Folder =
|
||||||
StartKey,
|
leveled_penciller:pcl_fetchkeys(
|
||||||
EndKey,
|
LedgerSnapshot,
|
||||||
AccFun,
|
StartKey,
|
||||||
InitAcc,
|
EndKey,
|
||||||
by_runner),
|
accumulate_index(TermRegex, AddFun, FoldKeysFun),
|
||||||
AfterFun =
|
InitAcc,
|
||||||
fun() ->
|
by_runner),
|
||||||
ok = leveled_penciller:pcl_close(LedgerSnapshot)
|
|
||||||
end,
|
|
||||||
wrap_runner(Folder, AfterFun)
|
wrap_runner(Folder, AfterFun)
|
||||||
end,
|
end,
|
||||||
{async, Runner}.
|
{async, Runner}.
|
||||||
|
@ -197,17 +188,10 @@ bucketkey_query(SnapFun, Tag, Bucket,
|
||||||
AccFun = accumulate_keys(FoldKeysFun, TermRegex),
|
AccFun = accumulate_keys(FoldKeysFun, TermRegex),
|
||||||
Runner =
|
Runner =
|
||||||
fun() ->
|
fun() ->
|
||||||
{ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(),
|
{ok, LedgerSnapshot, _JournalSnapshot, AfterFun} = SnapFun(),
|
||||||
Folder = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
Folder =
|
||||||
SK,
|
leveled_penciller:pcl_fetchkeys(
|
||||||
EK,
|
LedgerSnapshot, SK, EK, AccFun, InitAcc, by_runner),
|
||||||
AccFun,
|
|
||||||
InitAcc,
|
|
||||||
by_runner),
|
|
||||||
AfterFun =
|
|
||||||
fun() ->
|
|
||||||
ok = leveled_penciller:pcl_close(LedgerSnapshot)
|
|
||||||
end,
|
|
||||||
wrap_runner(Folder, AfterFun)
|
wrap_runner(Folder, AfterFun)
|
||||||
end,
|
end,
|
||||||
{async, Runner}.
|
{async, Runner}.
|
||||||
|
@ -231,20 +215,12 @@ hashlist_query(SnapFun, Tag, JournalCheck) ->
|
||||||
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
||||||
Runner =
|
Runner =
|
||||||
fun() ->
|
fun() ->
|
||||||
{ok, LedgerSnapshot, JournalSnapshot} = SnapFun(),
|
{ok, LedgerSnapshot, JournalSnapshot, AfterFun} = SnapFun(),
|
||||||
AccFun = accumulate_hashes(JournalCheck, JournalSnapshot),
|
AccFun = accumulate_hashes(JournalCheck, JournalSnapshot),
|
||||||
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
Acc =
|
||||||
StartKey,
|
leveled_penciller:pcl_fetchkeys(
|
||||||
EndKey,
|
LedgerSnapshot, StartKey, EndKey, AccFun, []),
|
||||||
AccFun,
|
AfterFun(),
|
||||||
[]),
|
|
||||||
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
|
||||||
case JournalCheck of
|
|
||||||
false ->
|
|
||||||
ok;
|
|
||||||
true ->
|
|
||||||
leveled_inker:ink_close(JournalSnapshot)
|
|
||||||
end,
|
|
||||||
Acc
|
Acc
|
||||||
end,
|
end,
|
||||||
{async, Runner}.
|
{async, Runner}.
|
||||||
|
@ -262,7 +238,7 @@ tictactree(SnapFun, {Tag, Bucket, Query}, JournalCheck, TreeSize, Filter) ->
|
||||||
Tree = leveled_tictac:new_tree(temp, TreeSize),
|
Tree = leveled_tictac:new_tree(temp, TreeSize),
|
||||||
Runner =
|
Runner =
|
||||||
fun() ->
|
fun() ->
|
||||||
{ok, LedgerSnap, JournalSnap} = SnapFun(),
|
{ok, LedgerSnap, JournalSnap, AfterFun} = SnapFun(),
|
||||||
% The start key and end key will vary depending on whether the
|
% The start key and end key will vary depending on whether the
|
||||||
% fold is to fold over an index or a key range
|
% fold is to fold over an index or a key range
|
||||||
EnsureKeyBinaryFun =
|
EnsureKeyBinaryFun =
|
||||||
|
@ -294,19 +270,9 @@ tictactree(SnapFun, {Tag, Bucket, Query}, JournalCheck, TreeSize, Filter) ->
|
||||||
AccFun =
|
AccFun =
|
||||||
accumulate_tree(Filter, JournalCheck, JournalSnap, ExtractFun),
|
accumulate_tree(Filter, JournalCheck, JournalSnap, ExtractFun),
|
||||||
Acc =
|
Acc =
|
||||||
leveled_penciller:pcl_fetchkeys(LedgerSnap,
|
leveled_penciller:pcl_fetchkeys(
|
||||||
StartKey, EndKey,
|
LedgerSnap, StartKey, EndKey, AccFun, Tree),
|
||||||
AccFun, Tree),
|
AfterFun(),
|
||||||
|
|
||||||
% Close down snapshot when complete so as not to hold removed
|
|
||||||
% files open
|
|
||||||
ok = leveled_penciller:pcl_close(LedgerSnap),
|
|
||||||
case JournalCheck of
|
|
||||||
false ->
|
|
||||||
ok;
|
|
||||||
true ->
|
|
||||||
leveled_inker:ink_close(JournalSnap)
|
|
||||||
end,
|
|
||||||
Acc
|
Acc
|
||||||
end,
|
end,
|
||||||
{async, Runner}.
|
{async, Runner}.
|
||||||
|
@ -357,17 +323,16 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) ->
|
||||||
% initial accumulator
|
% initial accumulator
|
||||||
FoldObjectsFun;
|
FoldObjectsFun;
|
||||||
false ->
|
false ->
|
||||||
% no initial accumulatr passed, and so should be just a list
|
% no initial accumulator passed, and so should be just a list
|
||||||
{FoldObjectsFun, []}
|
{FoldObjectsFun, []}
|
||||||
end,
|
end,
|
||||||
|
|
||||||
FilterFun =
|
FilterFun =
|
||||||
fun(JKey, JVal, _Pos, Acc, ExtractFun) ->
|
fun(JKey, JVal, _Pos, Acc, ExtractFun) ->
|
||||||
|
|
||||||
{SQN, InkTag, LedgerKey} = JKey,
|
{SQN, InkTag, LedgerKey} = JKey,
|
||||||
case {InkTag, leveled_codec:from_ledgerkey(Tag, LedgerKey)} of
|
case {InkTag, leveled_codec:from_ledgerkey(Tag, LedgerKey)} of
|
||||||
{?INKT_STND, {B, K}} ->
|
{?INKT_STND, {B, K}} ->
|
||||||
% Ignore tombstones and non-matching Tags and Key changes
|
% Ignore tombstones and non-matching Tags and Key changes
|
||||||
% objects.
|
% objects.
|
||||||
{MinSQN, MaxSQN, BatchAcc} = Acc,
|
{MinSQN, MaxSQN, BatchAcc} = Acc,
|
||||||
case SQN of
|
case SQN of
|
||||||
|
@ -377,7 +342,8 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) ->
|
||||||
{stop, Acc};
|
{stop, Acc};
|
||||||
_ ->
|
_ ->
|
||||||
{VBin, _VSize} = ExtractFun(JVal),
|
{VBin, _VSize} = ExtractFun(JVal),
|
||||||
{Obj, _IdxSpecs} = leveled_codec:split_inkvalue(VBin),
|
{Obj, _IdxSpecs} =
|
||||||
|
leveled_codec:split_inkvalue(VBin),
|
||||||
ToLoop =
|
ToLoop =
|
||||||
case SQN of
|
case SQN of
|
||||||
MaxSQN -> stop;
|
MaxSQN -> stop;
|
||||||
|
@ -395,20 +361,17 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) ->
|
||||||
|
|
||||||
Folder =
|
Folder =
|
||||||
fun() ->
|
fun() ->
|
||||||
|
{ok, LedgerSnapshot, JournalSnapshot, AfterFun} = SnapFun(),
|
||||||
{ok, LedgerSnapshot, JournalSnapshot} = SnapFun(),
|
|
||||||
{ok, JournalSQN} = leveled_inker:ink_getjournalsqn(JournalSnapshot),
|
{ok, JournalSQN} = leveled_inker:ink_getjournalsqn(JournalSnapshot),
|
||||||
IsValidFun =
|
IsValidFun =
|
||||||
fun(Bucket, Key, SQN) ->
|
fun(Bucket, Key, SQN) ->
|
||||||
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
|
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
|
||||||
CheckSQN =
|
CheckSQN =
|
||||||
leveled_penciller:pcl_checksequencenumber(LedgerSnapshot,
|
leveled_penciller:pcl_checksequencenumber(
|
||||||
LedgerKey,
|
LedgerSnapshot, LedgerKey, SQN),
|
||||||
SQN),
|
|
||||||
% Need to check that we have not folded past the point
|
% Need to check that we have not folded past the point
|
||||||
% at which the snapshot was taken
|
% at which the snapshot was taken
|
||||||
(JournalSQN >= SQN) and (CheckSQN == current)
|
(JournalSQN >= SQN) and (CheckSQN == current)
|
||||||
|
|
||||||
end,
|
end,
|
||||||
|
|
||||||
BatchFoldFun =
|
BatchFoldFun =
|
||||||
|
@ -427,17 +390,11 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) ->
|
||||||
end,
|
end,
|
||||||
|
|
||||||
InkFolder =
|
InkFolder =
|
||||||
leveled_inker:ink_fold(JournalSnapshot,
|
leveled_inker:ink_fold(
|
||||||
0,
|
JournalSnapshot,
|
||||||
{FilterFun,
|
0,
|
||||||
InitAccFun,
|
{FilterFun, InitAccFun, BatchFoldFun},
|
||||||
BatchFoldFun},
|
InitAcc),
|
||||||
InitAcc),
|
|
||||||
AfterFun =
|
|
||||||
fun() ->
|
|
||||||
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
|
||||||
ok = leveled_inker:ink_close(JournalSnapshot)
|
|
||||||
end,
|
|
||||||
wrap_runner(InkFolder, AfterFun)
|
wrap_runner(InkFolder, AfterFun)
|
||||||
end,
|
end,
|
||||||
{async, Folder}.
|
{async, Folder}.
|
||||||
|
@ -451,12 +408,8 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) ->
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Fold over all objects within a given key range in a bucket
|
%% Fold over all objects within a given key range in a bucket
|
||||||
foldobjects_bybucket(SnapFun, Tag, KeyRanges, FoldFun) ->
|
foldobjects_bybucket(SnapFun, Tag, KeyRanges, FoldFun) ->
|
||||||
foldobjects(SnapFun,
|
foldobjects(
|
||||||
Tag,
|
SnapFun, Tag, KeyRanges, FoldFun, false, false).
|
||||||
KeyRanges,
|
|
||||||
FoldFun,
|
|
||||||
false,
|
|
||||||
false).
|
|
||||||
|
|
||||||
-spec foldheads_bybucket(snap_fun(),
|
-spec foldheads_bybucket(snap_fun(),
|
||||||
leveled_codec:tag(),
|
leveled_codec:tag(),
|
||||||
|
@ -505,7 +458,6 @@ foldobjects_byindex(SnapFun, {Tag, Bucket, Field, FromTerm, ToTerm}, FoldFun) ->
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
@ -584,12 +536,10 @@ foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch,
|
||||||
|
|
||||||
Folder =
|
Folder =
|
||||||
fun() ->
|
fun() ->
|
||||||
{ok, LedgerSnapshot, JournalSnapshot} = SnapFun(),
|
{ok, LedgerSnapshot, JournalSnapshot, AfterFun} = SnapFun(),
|
||||||
AccFun =
|
AccFun =
|
||||||
accumulate_objects(FoldFun,
|
accumulate_objects(
|
||||||
JournalSnapshot,
|
FoldFun, JournalSnapshot, Tag, DeferredFetch),
|
||||||
Tag,
|
|
||||||
DeferredFetch),
|
|
||||||
FoldFunGen =
|
FoldFunGen =
|
||||||
fun({StartKey, EndKey}, FoldAcc) ->
|
fun({StartKey, EndKey}, FoldAcc) ->
|
||||||
leveled_penciller:pcl_fetchkeysbysegment(LedgerSnapshot,
|
leveled_penciller:pcl_fetchkeysbysegment(LedgerSnapshot,
|
||||||
|
@ -601,16 +551,6 @@ foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch,
|
||||||
LastModRange,
|
LastModRange,
|
||||||
LimitByCount)
|
LimitByCount)
|
||||||
end,
|
end,
|
||||||
AfterFun =
|
|
||||||
fun() ->
|
|
||||||
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
|
||||||
case DeferredFetch of
|
|
||||||
{true, false} ->
|
|
||||||
ok;
|
|
||||||
_ ->
|
|
||||||
ok = leveled_inker:ink_close(JournalSnapshot)
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
ListFoldFun =
|
ListFoldFun =
|
||||||
fun(KeyRange, Acc) ->
|
fun(KeyRange, Acc) ->
|
||||||
Folder = FoldFunGen(KeyRange, Acc),
|
Folder = FoldFunGen(KeyRange, Acc),
|
||||||
|
|
|
@ -404,6 +404,8 @@ fetchput_snapshot(_Config) ->
|
||||||
testutil:check_forlist(Bookie1, ChkList1),
|
testutil:check_forlist(Bookie1, ChkList1),
|
||||||
testutil:check_forlist(SnapBookie1, ChkList1),
|
testutil:check_forlist(SnapBookie1, ChkList1),
|
||||||
|
|
||||||
|
compare_foldwithsnap(Bookie1, SnapBookie1, ChkList1),
|
||||||
|
|
||||||
% Close the snapshot, check the original store still has the objects
|
% Close the snapshot, check the original store still has the objects
|
||||||
|
|
||||||
ok = leveled_bookie:book_close(SnapBookie1),
|
ok = leveled_bookie:book_close(SnapBookie1),
|
||||||
|
@ -480,6 +482,8 @@ fetchput_snapshot(_Config) ->
|
||||||
testutil:check_forlist(SnapBookie3, ChkList2),
|
testutil:check_forlist(SnapBookie3, ChkList2),
|
||||||
testutil:check_forlist(SnapBookie2, ChkList1),
|
testutil:check_forlist(SnapBookie2, ChkList1),
|
||||||
io:format("Started new snapshot and check for new objects~n"),
|
io:format("Started new snapshot and check for new objects~n"),
|
||||||
|
|
||||||
|
compare_foldwithsnap(Bookie2, SnapBookie3, ChkList3),
|
||||||
|
|
||||||
% Load yet more objects, these are replacement objects for the last load
|
% Load yet more objects, these are replacement objects for the last load
|
||||||
|
|
||||||
|
@ -563,6 +567,28 @@ fetchput_snapshot(_Config) ->
|
||||||
testutil:reset_filestructure().
|
testutil:reset_filestructure().
|
||||||
|
|
||||||
|
|
||||||
|
compare_foldwithsnap(Bookie, SnapBookie, ChkList) ->
|
||||||
|
HeadFoldFun = fun(B, K, _Hd, Acc) -> [{B, K}|Acc] end,
|
||||||
|
KeyFoldFun = fun(B, K, Acc) -> [{B, K}|Acc] end,
|
||||||
|
{async, HeadFoldDB} =
|
||||||
|
leveled_bookie:book_headfold(
|
||||||
|
Bookie, ?RIAK_TAG, {HeadFoldFun, []}, true, false, false
|
||||||
|
),
|
||||||
|
{async, HeadFoldSnap} =
|
||||||
|
leveled_bookie:book_headfold(
|
||||||
|
SnapBookie, ?RIAK_TAG, {HeadFoldFun, []}, true, false, false
|
||||||
|
),
|
||||||
|
true = HeadFoldDB() == HeadFoldSnap(),
|
||||||
|
|
||||||
|
testutil:check_forlist(SnapBookie, ChkList),
|
||||||
|
|
||||||
|
{async, KeyFoldSnap} =
|
||||||
|
leveled_bookie:book_keylist(
|
||||||
|
SnapBookie, ?RIAK_TAG, {KeyFoldFun, []}
|
||||||
|
),
|
||||||
|
true = HeadFoldSnap() == KeyFoldSnap().
|
||||||
|
|
||||||
|
|
||||||
load_and_count(_Config) ->
|
load_and_count(_Config) ->
|
||||||
% Use artificially small files, and the load keys, counting they're all
|
% Use artificially small files, and the load keys, counting they're all
|
||||||
% present
|
% present
|
||||||
|
|
|
@ -74,7 +74,21 @@ replace_everything(_Config) ->
|
||||||
compact_and_wait(Book1, 1000),
|
compact_and_wait(Book1, 1000),
|
||||||
{ok, FileList2} = file:list_dir(CompPath),
|
{ok, FileList2} = file:list_dir(CompPath),
|
||||||
io:format("Number of files after compaction ~w~n", [length(FileList2)]),
|
io:format("Number of files after compaction ~w~n", [length(FileList2)]),
|
||||||
true = FileList1 == FileList2,
|
true = FileList1 =< FileList2,
|
||||||
|
%% There will normally be 5 journal files after 50K write then alter
|
||||||
|
%% That may be two files with entirely altered objects - which will be
|
||||||
|
%% compacted, and will be compacted to nothing.
|
||||||
|
%% The "middle" file - which will be 50% compactable may be scored to
|
||||||
|
%% be part of the first run, or may end up in the second run. If in
|
||||||
|
%% the first run, the second run will not compact and FL1 == FL2.
|
||||||
|
%% Otherwise FL1 could be 0 and FL2 1. Hard to control this as there
|
||||||
|
%% is randomisation in both the scoring and the journal size (due to
|
||||||
|
%% jittering of parameters).
|
||||||
|
compact_and_wait(Book1, 1000),
|
||||||
|
{ok, FileList3} = file:list_dir(CompPath),
|
||||||
|
io:format("Number of files after compaction ~w~n", [length(FileList3)]),
|
||||||
|
%% By the third compaction there should be no further changes
|
||||||
|
true = FileList2 == FileList3,
|
||||||
{async, BackupFun} = leveled_bookie:book_hotbackup(Book1),
|
{async, BackupFun} = leveled_bookie:book_hotbackup(Book1),
|
||||||
ok = BackupFun(BackupPath),
|
ok = BackupFun(BackupPath),
|
||||||
|
|
||||||
|
@ -131,9 +145,9 @@ replace_everything(_Config) ->
|
||||||
{OSpcL6, RSpcL6} = lists:split(200, lists:ukeysort(1, KSpcL6)),
|
{OSpcL6, RSpcL6} = lists:split(200, lists:ukeysort(1, KSpcL6)),
|
||||||
{KSpcL7, V7} =
|
{KSpcL7, V7} =
|
||||||
testutil:put_altered_indexed_objects(Book6, BKT3, RSpcL6),
|
testutil:put_altered_indexed_objects(Book6, BKT3, RSpcL6),
|
||||||
{ok, FileList3} = file:list_dir(CompPath),
|
|
||||||
compact_and_wait(Book6),
|
|
||||||
{ok, FileList4} = file:list_dir(CompPath),
|
{ok, FileList4} = file:list_dir(CompPath),
|
||||||
|
compact_and_wait(Book6),
|
||||||
|
{ok, FileList5} = file:list_dir(CompPath),
|
||||||
{OSpcL6A, V7} =
|
{OSpcL6A, V7} =
|
||||||
testutil:put_altered_indexed_objects(Book6, BKT3, OSpcL6, true, V7),
|
testutil:put_altered_indexed_objects(Book6, BKT3, OSpcL6, true, V7),
|
||||||
{async, BackupFun6} = leveled_bookie:book_hotbackup(Book6),
|
{async, BackupFun6} = leveled_bookie:book_hotbackup(Book6),
|
||||||
|
@ -141,7 +155,7 @@ replace_everything(_Config) ->
|
||||||
ok = leveled_bookie:book_close(Book6),
|
ok = leveled_bookie:book_close(Book6),
|
||||||
|
|
||||||
io:format("Checking object count in newly compacted journal files~n"),
|
io:format("Checking object count in newly compacted journal files~n"),
|
||||||
NewlyCompactedFiles = lists:subtract(FileList4, FileList3),
|
NewlyCompactedFiles = lists:subtract(FileList5, FileList4),
|
||||||
true = length(NewlyCompactedFiles) >= 1,
|
true = length(NewlyCompactedFiles) >= 1,
|
||||||
CDBFilterFun = fun(_K, _V, _P, Acc, _EF) -> {loop, Acc + 1} end,
|
CDBFilterFun = fun(_K, _V, _P, Acc, _EF) -> {loop, Acc + 1} end,
|
||||||
CheckLengthFun =
|
CheckLengthFun =
|
||||||
|
|
|
@ -738,11 +738,45 @@ basic_headonly_test(ObjectCount, RemoveCount, HeadOnly) ->
|
||||||
Bucket0,
|
Bucket0,
|
||||||
Key0),
|
Key0),
|
||||||
CheckHeadFun =
|
CheckHeadFun =
|
||||||
fun({add, SegID, B, K, H}) ->
|
fun(DB) ->
|
||||||
{ok, H} =
|
fun({add, SegID, B, K, H}) ->
|
||||||
leveled_bookie:book_headonly(Bookie1, SegID, B, K)
|
{ok, H} =
|
||||||
|
leveled_bookie:book_headonly(DB, SegID, B, K)
|
||||||
|
end
|
||||||
end,
|
end,
|
||||||
lists:foreach(CheckHeadFun, ObjectSpecL);
|
lists:foreach(CheckHeadFun(Bookie1), ObjectSpecL),
|
||||||
|
{ok, Snapshot} =
|
||||||
|
leveled_bookie:book_start([{snapshot_bookie, Bookie1}]),
|
||||||
|
ok = leveled_bookie:book_loglevel(Snapshot, warn),
|
||||||
|
ok =
|
||||||
|
leveled_bookie:book_addlogs(
|
||||||
|
Snapshot, [b0001, b0002, b0003, i0027, p0007]
|
||||||
|
),
|
||||||
|
ok =
|
||||||
|
leveled_bookie:book_removelogs(
|
||||||
|
Snapshot, [b0019]
|
||||||
|
),
|
||||||
|
io:format(
|
||||||
|
"Checking for ~w objects against Snapshot ~w~n",
|
||||||
|
[length(ObjectSpecL), Snapshot]),
|
||||||
|
lists:foreach(CheckHeadFun(Snapshot), ObjectSpecL),
|
||||||
|
io:format("Closing snapshot ~w~n", [Snapshot]),
|
||||||
|
ok = leveled_bookie:book_close(Snapshot),
|
||||||
|
{ok, AltSnapshot} =
|
||||||
|
leveled_bookie:book_start([{snapshot_bookie, Bookie1}]),
|
||||||
|
ok =
|
||||||
|
leveled_bookie:book_addlogs(
|
||||||
|
AltSnapshot, [b0001, b0002, b0003, b0004, i0027, p0007]
|
||||||
|
),
|
||||||
|
true = is_process_alive(AltSnapshot),
|
||||||
|
io:format(
|
||||||
|
"Closing actual store ~w with snapshot ~w open~n",
|
||||||
|
[Bookie1, AltSnapshot]
|
||||||
|
),
|
||||||
|
ok = leveled_bookie:book_close(Bookie1),
|
||||||
|
% Sleep a beat so as not to race with the 'DOWN' message
|
||||||
|
timer:sleep(10),
|
||||||
|
false = is_process_alive(AltSnapshot);
|
||||||
no_lookup ->
|
no_lookup ->
|
||||||
{unsupported_message, head} =
|
{unsupported_message, head} =
|
||||||
leveled_bookie:book_head(Bookie1,
|
leveled_bookie:book_head(Bookie1,
|
||||||
|
@ -753,11 +787,11 @@ basic_headonly_test(ObjectCount, RemoveCount, HeadOnly) ->
|
||||||
leveled_bookie:book_headonly(Bookie1,
|
leveled_bookie:book_headonly(Bookie1,
|
||||||
SegmentID0,
|
SegmentID0,
|
||||||
Bucket0,
|
Bucket0,
|
||||||
Key0)
|
Key0),
|
||||||
|
io:format("Closing actual store ~w~n", [Bookie1]),
|
||||||
|
ok = leveled_bookie:book_close(Bookie1)
|
||||||
end,
|
end,
|
||||||
|
|
||||||
|
|
||||||
ok = leveled_bookie:book_close(Bookie1),
|
|
||||||
{ok, FinalJournals} = file:list_dir(JFP),
|
{ok, FinalJournals} = file:list_dir(JFP),
|
||||||
io:format("Trim has reduced journal count from " ++
|
io:format("Trim has reduced journal count from " ++
|
||||||
"~w to ~w and ~w after restart~n",
|
"~w to ~w and ~w after restart~n",
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue