Check SQN order fold does not fold beyond end of snapshot
the Journla snapshot is not a true snapshot, in that the active file in the snapshot can still be taking appends. So when getting a snapshot it is necessary to check if folding over the snapshot that the SQN is <= JournalSQN when the snapshot is taken. Normally consistency of the snapshot is managed as the operation depends on the penciller, and the penciller *is* a snapshot. Not in this case, as the penciller will return true on a sqn check if the pcl SQN is behind the Journal. So the Journal folder, has been given an additionla check to stop at the JournalSQN. This is perhaps a fault in the pcl check sqn, which should only return true on an exact match? I'm nervous about changing this though, so we have a less pure fix for now.
This commit is contained in:
parent
5799f06452
commit
db1486fa36
4 changed files with 102 additions and 14 deletions
|
@ -3021,6 +3021,71 @@ erase_journal_test() ->
|
||||||
?assertMatch(500, HeadsNotFound2),
|
?assertMatch(500, HeadsNotFound2),
|
||||||
ok = book_destroy(Bookie2).
|
ok = book_destroy(Bookie2).
|
||||||
|
|
||||||
|
sqnorder_fold_test() ->
|
||||||
|
RootPath = reset_filestructure(),
|
||||||
|
{ok, Bookie1} = book_start([{root_path, RootPath},
|
||||||
|
{max_journalsize, 1000000},
|
||||||
|
{cache_size, 500}]),
|
||||||
|
ok = book_put(Bookie1,
|
||||||
|
<<"B">>, <<"K1">>, {value, <<"V1">>}, [],
|
||||||
|
?STD_TAG),
|
||||||
|
ok = book_put(Bookie1,
|
||||||
|
<<"B">>, <<"K2">>, {value, <<"V2">>}, [],
|
||||||
|
?STD_TAG),
|
||||||
|
|
||||||
|
FoldObjectsFun = fun(B, K, V, Acc) -> Acc ++ [{B, K, V}] end,
|
||||||
|
{async, ObjFPre} =
|
||||||
|
book_objectfold(Bookie1,
|
||||||
|
?STD_TAG, {FoldObjectsFun, []}, true, sqn_order),
|
||||||
|
{async, ObjFPost} =
|
||||||
|
book_objectfold(Bookie1,
|
||||||
|
?STD_TAG, {FoldObjectsFun, []}, false, sqn_order),
|
||||||
|
|
||||||
|
ok = book_put(Bookie1,
|
||||||
|
<<"B">>, <<"K3">>, {value, <<"V3">>}, [],
|
||||||
|
?STD_TAG),
|
||||||
|
|
||||||
|
ObjLPre = ObjFPre(),
|
||||||
|
?assertMatch([{<<"B">>, <<"K1">>, {value, <<"V1">>}},
|
||||||
|
{<<"B">>, <<"K2">>, {value, <<"V2">>}}], ObjLPre),
|
||||||
|
ObjLPost = ObjFPost(),
|
||||||
|
?assertMatch([{<<"B">>, <<"K1">>, {value, <<"V1">>}},
|
||||||
|
{<<"B">>, <<"K2">>, {value, <<"V2">>}},
|
||||||
|
{<<"B">>, <<"K3">>, {value, <<"V3">>}}], ObjLPost),
|
||||||
|
|
||||||
|
ok = book_destroy(Bookie1).
|
||||||
|
|
||||||
|
sqnorder_mutatefold_test() ->
|
||||||
|
RootPath = reset_filestructure(),
|
||||||
|
{ok, Bookie1} = book_start([{root_path, RootPath},
|
||||||
|
{max_journalsize, 1000000},
|
||||||
|
{cache_size, 500}]),
|
||||||
|
ok = book_put(Bookie1,
|
||||||
|
<<"B">>, <<"K1">>, {value, <<"V1">>}, [],
|
||||||
|
?STD_TAG),
|
||||||
|
ok = book_put(Bookie1,
|
||||||
|
<<"B">>, <<"K1">>, {value, <<"V2">>}, [],
|
||||||
|
?STD_TAG),
|
||||||
|
|
||||||
|
FoldObjectsFun = fun(B, K, V, Acc) -> Acc ++ [{B, K, V}] end,
|
||||||
|
{async, ObjFPre} =
|
||||||
|
book_objectfold(Bookie1,
|
||||||
|
?STD_TAG, {FoldObjectsFun, []}, true, sqn_order),
|
||||||
|
{async, ObjFPost} =
|
||||||
|
book_objectfold(Bookie1,
|
||||||
|
?STD_TAG, {FoldObjectsFun, []}, false, sqn_order),
|
||||||
|
|
||||||
|
ok = book_put(Bookie1,
|
||||||
|
<<"B">>, <<"K1">>, {value, <<"V3">>}, [],
|
||||||
|
?STD_TAG),
|
||||||
|
|
||||||
|
ObjLPre = ObjFPre(),
|
||||||
|
?assertMatch([{<<"B">>, <<"K1">>, {value, <<"V2">>}}], ObjLPre),
|
||||||
|
ObjLPost = ObjFPost(),
|
||||||
|
?assertMatch([{<<"B">>, <<"K1">>, {value, <<"V3">>}}], ObjLPost),
|
||||||
|
|
||||||
|
ok = book_destroy(Bookie1).
|
||||||
|
|
||||||
check_notfound_test() ->
|
check_notfound_test() ->
|
||||||
ProbablyFun = fun() -> probably end,
|
ProbablyFun = fun() -> probably end,
|
||||||
MissingFun = fun() -> missing end,
|
MissingFun = fun() -> missing end,
|
||||||
|
|
|
@ -117,7 +117,8 @@
|
||||||
ink_checksqn/2,
|
ink_checksqn/2,
|
||||||
ink_loglevel/2,
|
ink_loglevel/2,
|
||||||
ink_addlogs/2,
|
ink_addlogs/2,
|
||||||
ink_removelogs/2]).
|
ink_removelogs/2,
|
||||||
|
ink_getjournalsqn/1]).
|
||||||
|
|
||||||
-export([build_dummy_journal/0,
|
-export([build_dummy_journal/0,
|
||||||
clean_testdir/1,
|
clean_testdir/1,
|
||||||
|
@ -452,6 +453,13 @@ ink_addlogs(Pid, ForcedLogs) ->
|
||||||
ink_removelogs(Pid, ForcedLogs) ->
|
ink_removelogs(Pid, ForcedLogs) ->
|
||||||
gen_server:cast(Pid, {remove_logs, ForcedLogs}).
|
gen_server:cast(Pid, {remove_logs, ForcedLogs}).
|
||||||
|
|
||||||
|
-spec ink_getjournalsqn(pid()) -> {ok, pos_integer()}.
|
||||||
|
%% @doc
|
||||||
|
%% Return the current Journal SQN, which may be in the actual past if the Inker
|
||||||
|
%% is in fact a snapshot
|
||||||
|
ink_getjournalsqn(Pid) ->
|
||||||
|
gen_server:call(Pid, get_journalsqn).
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% gen_server callbacks
|
%%% gen_server callbacks
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
@ -481,12 +489,14 @@ init([LogOpts, InkerOpts]) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
handle_call({put, Key, Object, KeyChanges}, _From, State) ->
|
handle_call({put, Key, Object, KeyChanges}, _From,
|
||||||
|
State=#state{is_snapshot=Snap}) when Snap == false ->
|
||||||
case put_object(Key, Object, KeyChanges, State) of
|
case put_object(Key, Object, KeyChanges, State) of
|
||||||
{_, UpdState, ObjSize} ->
|
{_, UpdState, ObjSize} ->
|
||||||
{reply, {ok, UpdState#state.journal_sqn, ObjSize}, UpdState}
|
{reply, {ok, UpdState#state.journal_sqn, ObjSize}, UpdState}
|
||||||
end;
|
end;
|
||||||
handle_call({mput, Key, ObjChanges}, _From, State) ->
|
handle_call({mput, Key, ObjChanges}, _From,
|
||||||
|
State=#state{is_snapshot=Snap}) when Snap == false ->
|
||||||
case put_object(Key, head_only, ObjChanges, State) of
|
case put_object(Key, head_only, ObjChanges, State) of
|
||||||
{_, UpdState, _ObjSize} ->
|
{_, UpdState, _ObjSize} ->
|
||||||
{reply, {ok, UpdState#state.journal_sqn}, UpdState}
|
{reply, {ok, UpdState#state.journal_sqn}, UpdState}
|
||||||
|
@ -504,10 +514,8 @@ handle_call({get, Key, SQN}, _From, State) ->
|
||||||
handle_call({key_check, Key, SQN}, _From, State) ->
|
handle_call({key_check, Key, SQN}, _From, State) ->
|
||||||
{reply, key_check(Key, SQN, State#state.manifest), State};
|
{reply, key_check(Key, SQN, State#state.manifest), State};
|
||||||
handle_call({fold,
|
handle_call({fold,
|
||||||
StartSQN,
|
StartSQN, {FilterFun, InitAccFun, FoldFun}, Acc, By},
|
||||||
{FilterFun, InitAccFun, FoldFun},
|
_From, State) ->
|
||||||
Acc,
|
|
||||||
By}, _From, State) ->
|
|
||||||
Manifest = lists:reverse(leveled_imanifest:to_list(State#state.manifest)),
|
Manifest = lists:reverse(leveled_imanifest:to_list(State#state.manifest)),
|
||||||
Folder =
|
Folder =
|
||||||
fun() ->
|
fun() ->
|
||||||
|
@ -522,7 +530,8 @@ handle_call({fold,
|
||||||
by_runner ->
|
by_runner ->
|
||||||
{reply, Folder, State}
|
{reply, Folder, State}
|
||||||
end;
|
end;
|
||||||
handle_call({register_snapshot, Requestor}, _From , State) ->
|
handle_call({register_snapshot, Requestor},
|
||||||
|
_From , State=#state{is_snapshot=Snap}) when Snap == false ->
|
||||||
Rs = [{Requestor,
|
Rs = [{Requestor,
|
||||||
os:timestamp(),
|
os:timestamp(),
|
||||||
State#state.manifest_sqn}|State#state.registered_snapshots],
|
State#state.manifest_sqn}|State#state.registered_snapshots],
|
||||||
|
@ -564,7 +573,7 @@ handle_call({compact,
|
||||||
InitiateFun,
|
InitiateFun,
|
||||||
CloseFun,
|
CloseFun,
|
||||||
FilterFun},
|
FilterFun},
|
||||||
_From, State) ->
|
_From, State=#state{is_snapshot=Snap}) when Snap == false ->
|
||||||
Clerk = State#state.clerk,
|
Clerk = State#state.clerk,
|
||||||
Manifest = leveled_imanifest:to_list(State#state.manifest),
|
Manifest = leveled_imanifest:to_list(State#state.manifest),
|
||||||
leveled_iclerk:clerk_compact(State#state.clerk,
|
leveled_iclerk:clerk_compact(State#state.clerk,
|
||||||
|
@ -576,11 +585,12 @@ handle_call({compact,
|
||||||
{reply, {ok, Clerk}, State#state{compaction_pending=true}};
|
{reply, {ok, Clerk}, State#state{compaction_pending=true}};
|
||||||
handle_call(compaction_pending, _From, State) ->
|
handle_call(compaction_pending, _From, State) ->
|
||||||
{reply, State#state.compaction_pending, State};
|
{reply, State#state.compaction_pending, State};
|
||||||
handle_call({trim, PersistedSQN}, _From, State) ->
|
handle_call({trim, PersistedSQN}, _From, State=#state{is_snapshot=Snap})
|
||||||
|
when Snap == false ->
|
||||||
Manifest = leveled_imanifest:to_list(State#state.manifest),
|
Manifest = leveled_imanifest:to_list(State#state.manifest),
|
||||||
ok = leveled_iclerk:clerk_trim(State#state.clerk, PersistedSQN, Manifest),
|
ok = leveled_iclerk:clerk_trim(State#state.clerk, PersistedSQN, Manifest),
|
||||||
{reply, ok, State};
|
{reply, ok, State};
|
||||||
handle_call(roll, _From, State) ->
|
handle_call(roll, _From, State=#state{is_snapshot=Snap}) when Snap == false ->
|
||||||
case leveled_cdb:cdb_lastkey(State#state.active_journaldb) of
|
case leveled_cdb:cdb_lastkey(State#state.active_journaldb) of
|
||||||
empty ->
|
empty ->
|
||||||
{reply, ok, State};
|
{reply, ok, State};
|
||||||
|
@ -664,6 +674,8 @@ handle_call({check_sqn, LedgerSQN}, _From, State) ->
|
||||||
_JSQN ->
|
_JSQN ->
|
||||||
{reply, ok, State}
|
{reply, ok, State}
|
||||||
end;
|
end;
|
||||||
|
handle_call(get_journalsqn, _From, State) ->
|
||||||
|
{reply, {ok, State#state.journal_sqn}, State};
|
||||||
handle_call(close, _From, State) ->
|
handle_call(close, _From, State) ->
|
||||||
case State#state.is_snapshot of
|
case State#state.is_snapshot of
|
||||||
true ->
|
true ->
|
||||||
|
|
|
@ -1482,6 +1482,11 @@ compare_to_sqn(Obj, SQN) ->
|
||||||
SQNToCompare > SQN ->
|
SQNToCompare > SQN ->
|
||||||
false;
|
false;
|
||||||
true ->
|
true ->
|
||||||
|
% Normally we would expect the SQN to be equal here, but
|
||||||
|
% this also allows for the Journal to have a more advanced
|
||||||
|
% value. We return true here as we wouldn't want to
|
||||||
|
% compact thta more advanced value, but this may cause
|
||||||
|
% confusion in snapshots.
|
||||||
true
|
true
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -363,12 +363,18 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) ->
|
||||||
fun() ->
|
fun() ->
|
||||||
|
|
||||||
{ok, LedgerSnapshot, JournalSnapshot} = SnapFun(),
|
{ok, LedgerSnapshot, JournalSnapshot} = SnapFun(),
|
||||||
|
{ok, JournalSQN} = leveled_inker:ink_getjournalsqn(JournalSnapshot),
|
||||||
IsValidFun =
|
IsValidFun =
|
||||||
fun(Bucket, Key, SQN) ->
|
fun(Bucket, Key, SQN) ->
|
||||||
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
|
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
|
||||||
|
CheckSQN =
|
||||||
leveled_penciller:pcl_checksequencenumber(LedgerSnapshot,
|
leveled_penciller:pcl_checksequencenumber(LedgerSnapshot,
|
||||||
LedgerKey,
|
LedgerKey,
|
||||||
SQN)
|
SQN),
|
||||||
|
% Need to check that we have not folded past the point
|
||||||
|
% at which the snapshot was taken
|
||||||
|
(JournalSQN >= SQN) and CheckSQN
|
||||||
|
|
||||||
end,
|
end,
|
||||||
|
|
||||||
BatchFoldFun =
|
BatchFoldFun =
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue