Merge branch 'master' into mas-i189-tuplebuckets

This commit is contained in:
Martin Sumner 2018-09-27 16:53:18 +01:00
commit f95f078929
4 changed files with 102 additions and 12 deletions

View file

@ -48,6 +48,7 @@
code_change/3,
book_start/1,
book_start/4,
book_plainstart/1,
book_put/5,
book_put/6,
book_tempput/7,
@ -357,6 +358,14 @@ book_start(Opts) ->
gen_server:start_link(?MODULE, [set_defaults(Opts)], []).
-spec book_plainstart(list(tuple())) -> {ok, pid()}.
%% @doc
%% Start used in tests to start without linking
book_plainstart(Opts) ->
gen_server:start(?MODULE, [set_defaults(Opts)], []).
-spec book_tempput(pid(), key(), key(), any(),
leveled_codec:index_specs(),
leveled_codec:tag(), integer()) -> ok|pause.

View file

@ -625,11 +625,8 @@ reader({direct_fetch, PositionList, Info}, _From, State) ->
PositionList);
key_value_check ->
BM = State#state.binary_mode,
lists:filtermap(
fun(P) ->
FilterFalseKey(extract_key_value_check(H, P, BM))
end,
PositionList)
lists:map(fun(P) -> extract_key_value_check(H, P, BM) end,
PositionList)
end,
{reply, Reply, reader, State};
reader(cdb_complete, _From, State) ->
@ -1269,6 +1266,8 @@ scan_over_file(Handle, Position, FilterFun, Output, LastKey) ->
case saferead_keyvalue(Handle) of
false ->
leveled_log:log("CDB09", [Position]),
% Bring file back to that position
{ok, Position} = file:position(Handle, {bof, Position}),
{eof, Output};
{Key, ValueAsBin, KeyLength, ValueLength} ->
NewPosition = case Key of
@ -1318,7 +1317,7 @@ saferead_keyvalue(Handle) ->
case read_next_2_integers(Handle) of
eof ->
false;
{KeyL, ValueL} ->
{KeyL, ValueL} when is_integer(KeyL), is_integer(ValueL) ->
case safe_read_next_keybin(Handle, KeyL) of
false ->
false;
@ -1330,7 +1329,9 @@ saferead_keyvalue(Handle) ->
% i.e. value with no CRC
{Key, TrueValue, KeyL, ValueL}
end
end
end;
_ ->
false
end.
@ -1437,7 +1438,7 @@ extract_valueandsize(ValueAsBin) ->
%% Note that the endian_flip is required to make the file format compatible
%% with CDB
read_next_2_integers(Handle) ->
case file:read(Handle,?DWORD_SIZE) of
case file:read(Handle, ?DWORD_SIZE) of
{ok, <<Int1:32,Int2:32>>} ->
{endian_flip(Int1), endian_flip(Int2)};
ReadError ->
@ -2444,7 +2445,7 @@ crc_corrupt_writer_test() ->
ok = file:close(Handle),
{ok, P2} = cdb_open_writer("../test/corruptwrt_test.pnd",
#cdb_options{binary_mode=false}),
?assertMatch(probably, cdb_keycheck(P2, "Key1")),
?assertMatch(probably, cdb_keycheck(P2, "Key1")),
?assertMatch({"Key1", "Value1"}, cdb_get(P2, "Key1")),
?assertMatch(missing, cdb_get(P2, "Key100")),
ok = cdb_put(P2, "Key100", "Value100"),
@ -2534,6 +2535,56 @@ safe_read_test() ->
file:delete(TestFN).
get_positions_corruption_test() ->
F1 = "../test/corruptpos_test.pnd",
file:delete(F1),
{ok, P1} = cdb_open_writer(F1, #cdb_options{binary_mode=false}),
KVList = generate_sequentialkeys(1000, []),
ok = cdb_mput(P1, KVList),
?assertMatch(probably, cdb_keycheck(P1, "Key1")),
?assertMatch({"Key1", "Value1"}, cdb_get(P1, "Key1")),
?assertMatch({"Key100", "Value100"}, cdb_get(P1, "Key100")),
{ok, F2} = cdb_complete(P1),
{ok, P2} = cdb_open_reader(F2, #cdb_options{binary_mode=false}),
PositionList = cdb_getpositions(P2, all),
?assertMatch(1000, length(PositionList)),
ok = cdb_close(P2),
{ok, Handle} = file:open(F2, ?WRITE_OPS),
Positions = lists:sublist(PositionList, 200, 10),
CorruptFun =
fun(Offset) ->
ok = file:pwrite(Handle, Offset, <<0:8/integer>>)
end,
ok = lists:foreach(CorruptFun, Positions),
ok = file:close(Handle),
{ok, P3} = cdb_open_reader(F2, #cdb_options{binary_mode=false}),
PositionList = cdb_getpositions(P3, all),
?assertMatch(1000, length(PositionList)),
KVCL = cdb_directfetch(P3, PositionList, key_size),
?assertMatch(true, length(KVCL) < 1000),
ok = cdb_close(P3),
file:delete(F2).
badly_written_test() ->
F1 = "../test/badfirstwrite_test.pnd",
file:delete(F1),
{ok, Handle} = file:open(F1, ?WRITE_OPS),
ok = file:pwrite(Handle, 256 * ?DWORD_SIZE, <<1:8/integer>>),
ok = file:close(Handle),
{ok, P1} = cdb_open_writer(F1, #cdb_options{binary_mode=false}),
ok = cdb_put(P1, "Key100", "Value100"),
?assertMatch({"Key100", "Value100"}, cdb_get(P1, "Key100")),
ok = cdb_close(P1),
{ok, P2} = cdb_open_writer(F1, #cdb_options{binary_mode=false}),
?assertMatch({"Key100", "Value100"}, cdb_get(P2, "Key100")),
ok = cdb_close(P2),
file:delete(F1).
nonsense_coverage_test() ->
{ok, Pid} = gen_fsm:start_link(?MODULE, [#cdb_options{}], []),
ok = gen_fsm:send_all_state_event(Pid, nonsense),

View file

@ -364,7 +364,7 @@
++ "with totals of cycle_count=~w "
++ "fetch_time=~w index_time=~w"}},
{"CDB20",
{error, "Error ~w caught when safe reading a file to length ~w"}}
{warn, "Error ~w caught when safe reading a file to length ~w"}}
]).

View file

@ -11,7 +11,8 @@
space_clear_ondelete/1,
is_empty_test/1,
many_put_fetch_switchcompression/1,
bigjournal_littlejournal/1
bigjournal_littlejournal/1,
safereaderror_startup/1
]).
all() -> [
@ -24,7 +25,8 @@ all() -> [
space_clear_ondelete,
is_empty_test,
many_put_fetch_switchcompression,
bigjournal_littlejournal
bigjournal_littlejournal,
safereaderror_startup
].
@ -873,3 +875,31 @@ many_put_fetch_switchcompression(_Config) ->
testutil:check_forobject(Bookie3, TestObject),
testutil:check_formissingobject(Bookie3, "Bookie1", "MissingKey0123"),
ok = leveled_bookie:book_destroy(Bookie3).
safereaderror_startup(_Config) ->
RootPath = testutil:reset_filestructure(),
StartOpts1 = [{root_path, RootPath},
{compression_point, on_compact},
{max_journalsize, 1000}, {cache_size, 2060}],
{ok, Bookie1} = leveled_bookie:book_plainstart(StartOpts1),
B1 = <<98, 117, 99, 107, 101, 116, 51>>,
K1 =
<<38, 50, 201, 47, 167, 125, 57, 232, 84, 38, 14, 114, 24, 62,
12, 74>>,
Obj1 =
<<87, 150, 217, 230, 4, 81, 170, 68, 181, 224, 60, 232, 4, 74,
159, 12, 156, 56, 194, 181, 18, 158, 195, 207, 106, 191, 80,
111, 100, 81, 252, 248>>,
Obj2 =
<<86, 201, 253, 149, 213, 10, 32, 166, 33, 136, 42, 79, 103, 250,
139, 95, 42, 143, 161, 3, 185, 74, 149, 226, 232, 214, 183, 64,
69, 56, 167, 78>>,
ok = leveled_bookie:book_put(Bookie1, B1, K1, Obj1, []),
ok = leveled_bookie:book_put(Bookie1, B1, K1, Obj2, []),
exit(Bookie1, kill),
{ok, Bookie2} = leveled_bookie:book_start(StartOpts1),
{ok, ReadBack} = leveled_bookie:book_get(Bookie2, B1, K1),
io:format("Read back ~w", [ReadBack]),
true = ReadBack == Obj2,
ok = leveled_bookie:book_close(Bookie2).