Tidy up protecting against corrupt Keys

this was previously not na issue as leveled_codec:segment_hash/1 would handle anyhting that could be hashed.  This now has to be a tuple, and one with a first element - so corrupted tuples are failing.

Add a guard chekcing for a corrupted tuple, but we only need this when doing journal compaction.

Change user_defined keys to be `retain` as a tag strategy
This commit is contained in:
Martin Sumner 2018-12-07 09:07:22 +00:00
parent cee5a60ceb
commit 714e128df8
4 changed files with 51 additions and 26 deletions

View file

@ -460,7 +460,7 @@ book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag) ->
leveled_codec:index_specs(),
leveled_codec:tag(), infinity|integer()) -> ok|pause.
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) ->
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) when is_atom(Tag) ->
gen_server:call(Pid,
{put, Bucket, Key, Object, IndexSpecs, Tag, TTL},
infinity).

View file

@ -28,6 +28,7 @@
to_ledgerkey/5,
from_ledgerkey/1,
from_ledgerkey/2,
isvalid_ledgerkey/1,
to_inkerkey/2,
to_inkerkv/6,
from_inkerkv/1,
@ -53,7 +54,7 @@
-define(NRT_IDX, "$aae.").
-type tag() ::
leveled_head:object_tag()|?IDX_TAG|?HEAD_TAG.
leveled_head:object_tag()|?IDX_TAG|?HEAD_TAG|atom().
-type key() ::
binary()|string()|{binary(), binary()}.
% Keys SHOULD be binary()
@ -325,6 +326,15 @@ to_ledgerkey(Bucket, {Key, SubKey}, ?HEAD_TAG) ->
to_ledgerkey(Bucket, Key, Tag) ->
{Tag, Bucket, Key, null}.
%% No spec - due to tests
%% @doc
%% Check that the ledgerkey is a valid format, to handle un-checksummed keys
%% that may be returned corrupted (such as from the Journal)
isvalid_ledgerkey({Tag, _B, _K, _SK}) ->
is_atom(Tag);
isvalid_ledgerkey(_LK) ->
false.
-spec endkey_passed(ledger_key(), ledger_key()) -> boolean().
%% @oc
%% Compare a key against a query key, only comparing elements that are non-null
@ -370,7 +380,7 @@ get_tagstrategy({Tag, _, _, _}, Strategy) ->
TagStrat;
false ->
leveled_log:log("IC012", [Tag, Strategy]),
skip
retain
end.
%%%============================================================================
@ -713,6 +723,14 @@ next_key({Type, Bucket}) when is_binary(Type), is_binary(Bucket) ->
-ifdef(TEST).
valid_ledgerkey_test() ->
UserDefTag = {user_defined, <<"B">>, <<"K">>, null},
?assertMatch(true, isvalid_ledgerkey(UserDefTag)),
KeyNotTuple = [?STD_TAG, <<"B">>, <<"K">>, null],
?assertMatch(false, isvalid_ledgerkey(KeyNotTuple)),
TagNotAtom = {"tag", <<"B">>, <<"K">>, null},
?assertMatch(false, isvalid_ledgerkey(TagNotAtom)),
?assertMatch(retain, get_tagstrategy(UserDefTag, inker_reload_strategy([]))).
indexspecs_test() ->
IndexSpecs = [{add, "t1_int", 456},

View file

@ -357,7 +357,8 @@ ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) ->
ink_compactjournal(Pid, Bookie, Timeout) ->
CheckerInitiateFun = fun initiate_penciller_snapshot/1,
CheckerCloseFun = fun leveled_penciller:pcl_close/1,
CheckerFilterFun = fun leveled_penciller:pcl_checksequencenumber/3,
CheckerFilterFun =
wrap_checkfilterfun(fun leveled_penciller:pcl_checksequencenumber/3),
gen_server:call(Pid,
{compact,
Bookie,
@ -1185,6 +1186,20 @@ initiate_penciller_snapshot(Bookie) ->
MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap),
{LedgerSnap, MaxSQN}.
-spec wrap_checkfilterfun(fun()) -> fun().
%% @doc
%% Make a check of the validity of the key being passed into the CheckFilterFun
wrap_checkfilterfun(CheckFilterFun) ->
fun(Pcl, LK, SQN) ->
case leveled_codec:isvalid_ledgerkey(LK) of
true ->
CheckFilterFun(Pcl, LK, SQN);
false ->
false
end
end.
%%%============================================================================
%%% Test
%%%============================================================================
@ -1438,6 +1453,16 @@ empty_manifest_test() ->
ink_close(Ink2),
clean_testdir(RootPath).
wrapper_test() ->
KeyNotTuple = [?STD_TAG, <<"B">>, <<"K">>, null],
TagNotAtom = {"tag", <<"B">>, <<"K">>, null},
CheckFilterFun = fun(_Pcl, _LK, _SQN) -> true end,
WrappedFun = wrap_checkfilterfun(CheckFilterFun),
?assertMatch(false, WrappedFun(null, KeyNotTuple, 1)),
?assertMatch(false, WrappedFun(null, TagNotAtom, 1)).
coverage_cheat_test() ->
{noreply, _State0} = handle_info(timeout, #state{}),
{ok, _State1} = code_change(null, #state{}, null).

View file

@ -483,18 +483,10 @@ pcl_fetchnextkey(Pid, StartKey, EndKey, AccFun, InitAcc) ->
%% If the key is not present, it will be assumed that a higher sequence number
%% tombstone once existed, and false will be returned.
pcl_checksequencenumber(Pid, Key, SQN) ->
try
Hash = leveled_codec:segment_hash(Key),
if
Hash /= no_lookup ->
gen_server:call(Pid, {check_sqn, Key, Hash, SQN}, infinity)
end
catch
% Can't let this crash here, as when journal files are corrupted,
% corrupted input might be received by the penciller for this check.
% Want to be able to compact away this corruption - not end up with
% perpetually failing compaction jobs
_Type:_Error -> false
Hash = leveled_codec:segment_hash(Key),
if
Hash /= no_lookup ->
gen_server:call(Pid, {check_sqn, Key, Hash, SQN}, infinity)
end.
-spec pcl_workforclerk(pid()) -> ok.
@ -2012,16 +2004,6 @@ simple_server_test() ->
"Key0004",
null},
3004)),
% Try a busted key - and get false, as the exception should be handled
% Mimics a bad ledger key being discovered in the Journal, want to get
% false rather than just crashing.
?assertMatch(false, pcl_checksequencenumber(PclSnap,
[o,
"Bucket0004",
"Key0004",
null],
3004)),
% Add some more keys and confirm that check sequence number still
% sees the old version in the previous snapshot, but will see the new