Fix unit tests - and make slot size configurable
This commit is contained in:
parent
f907fb5c97
commit
ae9b03ab3c
6 changed files with 76 additions and 46 deletions
|
@ -50,7 +50,8 @@
|
|||
{press_method = native
|
||||
:: leveled_sst:press_method(),
|
||||
log_options = leveled_log:get_opts()
|
||||
:: leveled_log:log_options()}).
|
||||
:: leveled_log:log_options(),
|
||||
max_sstslots = 256 :: pos_integer()}).
|
||||
|
||||
-record(inker_options,
|
||||
{cdb_max_size :: integer() | undefined,
|
||||
|
|
|
@ -62,7 +62,6 @@
|
|||
book_headonly/4,
|
||||
book_snapshot/4,
|
||||
book_compactjournal/2,
|
||||
book_eqccompactjournal/2,
|
||||
book_islastcompactionpending/1,
|
||||
book_trimjournal/1,
|
||||
book_hotbackup/1,
|
||||
|
@ -101,8 +100,8 @@
|
|||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-define(CACHE_SIZE, 2500).
|
||||
-define(MIN_CACHE_SIZE, 1).
|
||||
-define(MIN_PCL_CACHE_SIZE, 4).
|
||||
-define(MIN_CACHE_SIZE, 100).
|
||||
-define(MIN_PCL_CACHE_SIZE, 400).
|
||||
-define(MAX_PCL_CACHE_SIZE, 28000).
|
||||
% This is less than actual max - but COIN_SIDECOUNT
|
||||
-define(CACHE_SIZE_JITTER, 25).
|
||||
|
@ -125,6 +124,7 @@
|
|||
{snapshot_bookie, undefined},
|
||||
{cache_size, ?CACHE_SIZE},
|
||||
{max_journalsize, 1000000000},
|
||||
{max_sstslots, 256},
|
||||
{sync_strategy, none},
|
||||
{head_only, false},
|
||||
{waste_retention_period, undefined},
|
||||
|
@ -221,6 +221,10 @@
|
|||
{max_journalsize, pos_integer()} |
|
||||
% The maximum size of a journal file in bytes. The abolute
|
||||
% maximum must be 4GB due to 4 byte file pointers being used
|
||||
{max_sstslots, pos_integer()} |
|
||||
% The maximum number of slots in a SST file. All testing is done
|
||||
% at a size of 256 (except for Quickcheck tests}, altering this
|
||||
% value is not recommended
|
||||
{sync_strategy, sync_mode()} |
|
||||
% Should be sync if it is necessary to flush to disk after every
|
||||
% write, or none if not (allow the OS to schecdule). This has a
|
||||
|
@ -1006,7 +1010,6 @@ book_snapshot(Pid, SnapType, Query, LongRunning) ->
|
|||
|
||||
|
||||
-spec book_compactjournal(pid(), integer()) -> ok|busy.
|
||||
-spec book_eqccompactjournal(pid(), integer()) -> {ok|busy, pid()|undefined}.
|
||||
-spec book_islastcompactionpending(pid()) -> boolean().
|
||||
-spec book_trimjournal(pid()) -> ok.
|
||||
|
||||
|
@ -1015,9 +1018,6 @@ book_snapshot(Pid, SnapType, Query, LongRunning) ->
|
|||
%% the scheduling of Journla compaction is called externally, so it is assumed
|
||||
%% in Riak it will be triggered by a vnode callback.
|
||||
|
||||
book_eqccompactjournal(Pid, Timeout) ->
|
||||
gen_server:call(Pid, {compact_journal, Timeout}, infinity).
|
||||
|
||||
book_compactjournal(Pid, Timeout) ->
|
||||
{R, _P} = gen_server:call(Pid, {compact_journal, Timeout}, infinity),
|
||||
R.
|
||||
|
@ -1640,6 +1640,8 @@ set_options(Opts) ->
|
|||
false
|
||||
end,
|
||||
|
||||
MaxSSTSlots = proplists:get_value(max_sstslots, Opts),
|
||||
|
||||
{#inker_options{root_path = JournalFP,
|
||||
reload_strategy = ReloadStrategy,
|
||||
max_run_length = proplists:get_value(max_run_length, Opts),
|
||||
|
@ -1660,8 +1662,9 @@ set_options(Opts) ->
|
|||
snaptimeout_short = SnapTimeoutShort,
|
||||
snaptimeout_long = SnapTimeoutLong,
|
||||
sst_options =
|
||||
#sst_options{press_method = CompressionMethod,
|
||||
log_options=leveled_log:get_opts()}}
|
||||
#sst_options{press_method=CompressionMethod,
|
||||
log_options=leveled_log:get_opts(),
|
||||
max_sstslots=MaxSSTSlots}}
|
||||
}.
|
||||
|
||||
|
||||
|
|
|
@ -820,7 +820,7 @@ finished_rolling(CDB) ->
|
|||
|
||||
-spec close_pendingdelete(file:io_device(), list(), list()|undefined) -> ok.
|
||||
%% @doc
|
||||
%% If delete is pending - thent he close behaviour needs to actuallly delete
|
||||
%% If delete is pending - then the close behaviour needs to actuallly delete
|
||||
%% the file
|
||||
close_pendingdelete(Handle, Filename, WasteFP) ->
|
||||
ok = file:close(Handle),
|
||||
|
@ -2606,6 +2606,24 @@ badly_written_test() ->
|
|||
ok = cdb_close(P2),
|
||||
file:delete(F1).
|
||||
|
||||
pendingdelete_test() ->
|
||||
F1 = "test/test_area/deletfile_test.pnd",
|
||||
file:delete(F1),
|
||||
{ok, P1} = cdb_open_writer(F1, #cdb_options{binary_mode=false}),
|
||||
KVList = generate_sequentialkeys(1000, []),
|
||||
ok = cdb_mput(P1, KVList),
|
||||
?assertMatch(probably, cdb_keycheck(P1, "Key1")),
|
||||
?assertMatch({"Key1", "Value1"}, cdb_get(P1, "Key1")),
|
||||
?assertMatch({"Key100", "Value100"}, cdb_get(P1, "Key100")),
|
||||
{ok, F2} = cdb_complete(P1),
|
||||
{ok, P2} = cdb_open_reader(F2, #cdb_options{binary_mode=false}),
|
||||
?assertMatch({"Key1", "Value1"}, cdb_get(P2, "Key1")),
|
||||
?assertMatch({"Key100", "Value100"}, cdb_get(P2, "Key100")),
|
||||
file:delete(F2),
|
||||
ok = cdb_deletepending(P2),
|
||||
% No issues destroying even though the file has already been removed
|
||||
ok = cdb_destroy(P2).
|
||||
|
||||
|
||||
nonsense_coverage_test() ->
|
||||
{ok, Pid} = gen_fsm:start_link(?MODULE, [#cdb_options{}], []),
|
||||
|
|
|
@ -1431,22 +1431,26 @@ compact_journal_testto(WRP, ExpectedFiles) ->
|
|||
ActualManifest = ink_getmanifest(Ink1),
|
||||
ok = ink_printmanifest(Ink1),
|
||||
?assertMatch(3, length(ActualManifest)),
|
||||
ok = ink_compactjournal(Ink1,
|
||||
Checker,
|
||||
fun(X) -> {X, 55} end,
|
||||
fun(_F) -> ok end,
|
||||
fun(L, K, SQN) -> lists:member({SQN, K}, L) end,
|
||||
5000),
|
||||
{ok, _ICL1} = ink_compactjournal(Ink1,
|
||||
Checker,
|
||||
fun(X) -> {X, 55} end,
|
||||
fun(_F) -> ok end,
|
||||
fun(L, K, SQN) ->
|
||||
lists:member({SQN, K}, L)
|
||||
end,
|
||||
5000),
|
||||
timer:sleep(1000),
|
||||
CompactedManifest1 = ink_getmanifest(Ink1),
|
||||
?assertMatch(2, length(CompactedManifest1)),
|
||||
Checker2 = lists:sublist(Checker, 16),
|
||||
ok = ink_compactjournal(Ink1,
|
||||
Checker2,
|
||||
fun(X) -> {X, 55} end,
|
||||
fun(_F) -> ok end,
|
||||
fun(L, K, SQN) -> lists:member({SQN, K}, L) end,
|
||||
5000),
|
||||
{ok, _ICL2} = ink_compactjournal(Ink1,
|
||||
Checker2,
|
||||
fun(X) -> {X, 55} end,
|
||||
fun(_F) -> ok end,
|
||||
fun(L, K, SQN) ->
|
||||
lists:member({SQN, K}, L)
|
||||
end,
|
||||
5000),
|
||||
timer:sleep(1000),
|
||||
CompactedManifest2 = ink_getmanifest(Ink1),
|
||||
{ok, PrefixTest} = re:compile(?COMPACT_FP),
|
||||
|
@ -1475,12 +1479,12 @@ empty_manifest_test() ->
|
|||
|
||||
CheckFun = fun(L, K, SQN) -> lists:member({SQN, key_converter(K)}, L) end,
|
||||
?assertMatch(false, CheckFun([], "key", 1)),
|
||||
ok = ink_compactjournal(Ink1,
|
||||
[],
|
||||
fun(X) -> {X, 55} end,
|
||||
fun(_F) -> ok end,
|
||||
CheckFun,
|
||||
5000),
|
||||
{ok, _ICL1} = ink_compactjournal(Ink1,
|
||||
[],
|
||||
fun(X) -> {X, 55} end,
|
||||
fun(_F) -> ok end,
|
||||
CheckFun,
|
||||
5000),
|
||||
timer:sleep(1000),
|
||||
?assertMatch(1, length(ink_getmanifest(Ink1))),
|
||||
ok = ink_close(Ink1),
|
||||
|
|
|
@ -72,7 +72,6 @@
|
|||
|
||||
-include("include/leveled.hrl").
|
||||
|
||||
-define(MAX_SLOTS, 2).
|
||||
-define(LOOK_SLOTSIZE, 128). % Maximum of 128
|
||||
-define(LOOK_BLOCKSIZE, {24, 32}). % 4x + y = ?LOOK_SLOTSIZE
|
||||
-define(NOLOOK_SLOTSIZE, 256).
|
||||
|
@ -258,7 +257,7 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) ->
|
|||
PressMethod0 = compress_level(Level, OptsSST#sst_options.press_method),
|
||||
OptsSST0 = OptsSST#sst_options{press_method = PressMethod0},
|
||||
{[], [], SlotList, FK} =
|
||||
merge_lists(KVList, PressMethod0, IndexModDate),
|
||||
merge_lists(KVList, OptsSST0, IndexModDate),
|
||||
case gen_fsm:sync_send_event(Pid,
|
||||
{sst_new,
|
||||
RootPath,
|
||||
|
@ -309,7 +308,7 @@ sst_new(RootPath, Filename,
|
|||
OptsSST0 = OptsSST#sst_options{press_method = PressMethod0},
|
||||
{Rem1, Rem2, SlotList, FK} =
|
||||
merge_lists(KVL1, KVL2, {IsBasement, Level},
|
||||
PressMethod0, IndexModDate),
|
||||
OptsSST0, IndexModDate),
|
||||
case SlotList of
|
||||
[] ->
|
||||
empty;
|
||||
|
@ -499,7 +498,7 @@ starting({sst_newlevelzero, RootPath, Filename,
|
|||
|
||||
SW1 = os:timestamp(),
|
||||
{[], [], SlotList, FirstKey} =
|
||||
merge_lists(KVList, PressMethod, IdxModDate),
|
||||
merge_lists(KVList, OptsSST, IdxModDate),
|
||||
Time1 = timer:now_diff(os:timestamp(), SW1),
|
||||
|
||||
SW2 = os:timestamp(),
|
||||
|
@ -2131,16 +2130,17 @@ revert_position(Pos) ->
|
|||
%% there are matching keys then the highest sequence number must be chosen and
|
||||
%% any lower sequence numbers should be compacted out of existence
|
||||
|
||||
-spec merge_lists(list(), press_method(), boolean())
|
||||
-spec merge_lists(list(), sst_options(), boolean())
|
||||
-> {list(), list(), list(tuple()), tuple()|null}.
|
||||
%% @doc
|
||||
%%
|
||||
%% Merge from asingle list (i.e. at Level 0)
|
||||
merge_lists(KVList1, PressMethod, IdxModDate) ->
|
||||
merge_lists(KVList1, SSTOpts, IdxModDate) ->
|
||||
SlotCount = length(KVList1) div ?LOOK_SLOTSIZE,
|
||||
{[],
|
||||
[],
|
||||
split_lists(KVList1, [], SlotCount, PressMethod, IdxModDate),
|
||||
split_lists(KVList1, [],
|
||||
SlotCount, SSTOpts#sst_options.press_method, IdxModDate),
|
||||
element(1, lists:nth(1, KVList1))}.
|
||||
|
||||
|
||||
|
@ -2157,33 +2157,34 @@ split_lists(KVList1, SlotLists, N, PressMethod, IdxModDate) ->
|
|||
split_lists(KVListRem, [SlotD|SlotLists], N - 1, PressMethod, IdxModDate).
|
||||
|
||||
|
||||
-spec merge_lists(list(), list(), tuple(), press_method(), boolean()) ->
|
||||
-spec merge_lists(list(), list(), tuple(), sst_options(), boolean()) ->
|
||||
{list(), list(), list(tuple()), tuple()|null}.
|
||||
%% @doc
|
||||
%% Merge lists when merging across more thna one file. KVLists that are
|
||||
%% provided may include pointers to fetch more Keys/Values from the source
|
||||
%% file
|
||||
merge_lists(KVList1, KVList2, LevelInfo, PressMethod, IndexModDate) ->
|
||||
merge_lists(KVList1, KVList2, LevelInfo, SSTOpts, IndexModDate) ->
|
||||
merge_lists(KVList1, KVList2,
|
||||
LevelInfo,
|
||||
[], null, 0,
|
||||
PressMethod,
|
||||
SSTOpts#sst_options.max_sstslots,
|
||||
SSTOpts#sst_options.press_method,
|
||||
IndexModDate,
|
||||
#build_timings{}).
|
||||
|
||||
|
||||
merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, ?MAX_SLOTS,
|
||||
merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, MaxSlots, MaxSlots,
|
||||
_PressMethod, _IdxModDate, T0) ->
|
||||
% This SST file is full, move to complete file, and return the
|
||||
% remainder
|
||||
log_buildtimings(T0, LI),
|
||||
{KVL1, KVL2, lists:reverse(SlotList), FirstKey};
|
||||
merge_lists([], [], LI, SlotList, FirstKey, _SlotCount,
|
||||
merge_lists([], [], LI, SlotList, FirstKey, _SlotCount, _MaxSlots,
|
||||
_PressMethod, _IdxModDate, T0) ->
|
||||
% the source files are empty, complete the file
|
||||
log_buildtimings(T0, LI),
|
||||
{[], [], lists:reverse(SlotList), FirstKey};
|
||||
merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, SlotCount,
|
||||
merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, SlotCount, MaxSlots,
|
||||
PressMethod, IdxModDate, T0) ->
|
||||
% Form a slot by merging the two lists until the next 128 K/V pairs have
|
||||
% been determined
|
||||
|
@ -2200,6 +2201,7 @@ merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, SlotCount,
|
|||
SlotList,
|
||||
FK0,
|
||||
SlotCount,
|
||||
MaxSlots,
|
||||
PressMethod,
|
||||
IdxModDate,
|
||||
T1);
|
||||
|
@ -2214,6 +2216,7 @@ merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, SlotCount,
|
|||
[SlotD|SlotList],
|
||||
FK0,
|
||||
SlotCount + 1,
|
||||
MaxSlots,
|
||||
PressMethod,
|
||||
IdxModDate,
|
||||
T2)
|
||||
|
@ -2560,7 +2563,8 @@ merge_tombstonelist_test() ->
|
|||
R = merge_lists([SkippingKV1, SkippingKV3, SkippingKV5],
|
||||
[SkippingKV2, SkippingKV4],
|
||||
{true, 9999999},
|
||||
native,
|
||||
#sst_options{press_method = native,
|
||||
max_sstslots = 256},
|
||||
?INDEX_MODDATE),
|
||||
?assertMatch({[], [], [], null}, R).
|
||||
|
||||
|
|
|
@ -76,7 +76,8 @@ init_backend_args(#{dir := Dir, sut := Name} = S) ->
|
|||
case maps:get(start_opts, S, undefined) of
|
||||
undefined ->
|
||||
[ default(?RIAK_TAG, ?STD_TAG), %% Just test one tag at a time
|
||||
[{root_path, Dir}, {log_level, error}, {cache_size, 10}, {max_pencillercachesize, 40}, {max_journalsize, 20000} | gen_opts()], Name ];
|
||||
[{root_path, Dir}, {log_level, error},
|
||||
{max_sstslots, 2}, {cache_size, 10}, {max_pencillercachesize, 40}, {max_journalsize, 20000} | gen_opts()], Name ];
|
||||
Opts ->
|
||||
%% root_path is part of existing options
|
||||
[ maps:get(tag, S), Opts, Name ]
|
||||
|
@ -619,8 +620,7 @@ compact_adapt(#{leveled := Leveled}, [_Pid, TS]) ->
|
|||
[ Leveled, TS ].
|
||||
|
||||
compact(Pid, TS) ->
|
||||
{R, _IClerk} = leveled_bookie:book_eqccompactjournal(Pid, TS),
|
||||
R.
|
||||
leveled_bookie:book_compactjournal(Pid, TS).
|
||||
|
||||
compact_next(S, R, [_Pid, _TS]) ->
|
||||
case {R, maps:get(previous_compact, S, undefined)} of
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue