diff --git a/include/leveled.hrl b/include/leveled.hrl index a945f0c..9357bd5 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -50,7 +50,8 @@ {press_method = native :: leveled_sst:press_method(), log_options = leveled_log:get_opts() - :: leveled_log:log_options()}). + :: leveled_log:log_options(), + max_sstslots = 256 :: pos_integer()}). -record(inker_options, {cdb_max_size :: integer() | undefined, diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 02b66e7..11a5312 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -62,7 +62,6 @@ book_headonly/4, book_snapshot/4, book_compactjournal/2, - book_eqccompactjournal/2, book_islastcompactionpending/1, book_trimjournal/1, book_hotbackup/1, @@ -101,8 +100,8 @@ -include_lib("eunit/include/eunit.hrl"). -define(CACHE_SIZE, 2500). --define(MIN_CACHE_SIZE, 1). --define(MIN_PCL_CACHE_SIZE, 4). +-define(MIN_CACHE_SIZE, 100). +-define(MIN_PCL_CACHE_SIZE, 400). -define(MAX_PCL_CACHE_SIZE, 28000). % This is less than actual max - but COIN_SIDECOUNT -define(CACHE_SIZE_JITTER, 25). @@ -125,6 +124,7 @@ {snapshot_bookie, undefined}, {cache_size, ?CACHE_SIZE}, {max_journalsize, 1000000000}, + {max_sstslots, 256}, {sync_strategy, none}, {head_only, false}, {waste_retention_period, undefined}, @@ -221,6 +221,10 @@ {max_journalsize, pos_integer()} | % The maximum size of a journal file in bytes. The abolute % maximum must be 4GB due to 4 byte file pointers being used + {max_sstslots, pos_integer()} | + % The maximum number of slots in a SST file. All testing is done + % at a size of 256 (except for Quickcheck tests}, altering this + % value is not recommended {sync_strategy, sync_mode()} | % Should be sync if it is necessary to flush to disk after every % write, or none if not (allow the OS to schecdule). This has a @@ -1006,7 +1010,6 @@ book_snapshot(Pid, SnapType, Query, LongRunning) -> -spec book_compactjournal(pid(), integer()) -> ok|busy. --spec book_eqccompactjournal(pid(), integer()) -> {ok|busy, pid()|undefined}. -spec book_islastcompactionpending(pid()) -> boolean(). -spec book_trimjournal(pid()) -> ok. @@ -1015,9 +1018,6 @@ book_snapshot(Pid, SnapType, Query, LongRunning) -> %% the scheduling of Journla compaction is called externally, so it is assumed %% in Riak it will be triggered by a vnode callback. -book_eqccompactjournal(Pid, Timeout) -> - gen_server:call(Pid, {compact_journal, Timeout}, infinity). - book_compactjournal(Pid, Timeout) -> {R, _P} = gen_server:call(Pid, {compact_journal, Timeout}, infinity), R. @@ -1639,6 +1639,8 @@ set_options(Opts) -> % If using lz4 this is not recommended false end, + + MaxSSTSlots = proplists:get_value(max_sstslots, Opts), {#inker_options{root_path = JournalFP, reload_strategy = ReloadStrategy, @@ -1660,8 +1662,9 @@ set_options(Opts) -> snaptimeout_short = SnapTimeoutShort, snaptimeout_long = SnapTimeoutLong, sst_options = - #sst_options{press_method = CompressionMethod, - log_options=leveled_log:get_opts()}} + #sst_options{press_method=CompressionMethod, + log_options=leveled_log:get_opts(), + max_sstslots=MaxSSTSlots}} }. diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 1693ed3..8d8dcb0 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -820,7 +820,7 @@ finished_rolling(CDB) -> -spec close_pendingdelete(file:io_device(), list(), list()|undefined) -> ok. %% @doc -%% If delete is pending - thent he close behaviour needs to actuallly delete +%% If delete is pending - then the close behaviour needs to actuallly delete %% the file close_pendingdelete(Handle, Filename, WasteFP) -> ok = file:close(Handle), @@ -2606,6 +2606,24 @@ badly_written_test() -> ok = cdb_close(P2), file:delete(F1). +pendingdelete_test() -> + F1 = "test/test_area/deletfile_test.pnd", + file:delete(F1), + {ok, P1} = cdb_open_writer(F1, #cdb_options{binary_mode=false}), + KVList = generate_sequentialkeys(1000, []), + ok = cdb_mput(P1, KVList), + ?assertMatch(probably, cdb_keycheck(P1, "Key1")), + ?assertMatch({"Key1", "Value1"}, cdb_get(P1, "Key1")), + ?assertMatch({"Key100", "Value100"}, cdb_get(P1, "Key100")), + {ok, F2} = cdb_complete(P1), + {ok, P2} = cdb_open_reader(F2, #cdb_options{binary_mode=false}), + ?assertMatch({"Key1", "Value1"}, cdb_get(P2, "Key1")), + ?assertMatch({"Key100", "Value100"}, cdb_get(P2, "Key100")), + file:delete(F2), + ok = cdb_deletepending(P2), + % No issues destroying even though the file has already been removed + ok = cdb_destroy(P2). + nonsense_coverage_test() -> {ok, Pid} = gen_fsm:start_link(?MODULE, [#cdb_options{}], []), diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 379f417..4044ff7 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -1431,22 +1431,26 @@ compact_journal_testto(WRP, ExpectedFiles) -> ActualManifest = ink_getmanifest(Ink1), ok = ink_printmanifest(Ink1), ?assertMatch(3, length(ActualManifest)), - ok = ink_compactjournal(Ink1, - Checker, - fun(X) -> {X, 55} end, - fun(_F) -> ok end, - fun(L, K, SQN) -> lists:member({SQN, K}, L) end, - 5000), + {ok, _ICL1} = ink_compactjournal(Ink1, + Checker, + fun(X) -> {X, 55} end, + fun(_F) -> ok end, + fun(L, K, SQN) -> + lists:member({SQN, K}, L) + end, + 5000), timer:sleep(1000), CompactedManifest1 = ink_getmanifest(Ink1), ?assertMatch(2, length(CompactedManifest1)), Checker2 = lists:sublist(Checker, 16), - ok = ink_compactjournal(Ink1, - Checker2, - fun(X) -> {X, 55} end, - fun(_F) -> ok end, - fun(L, K, SQN) -> lists:member({SQN, K}, L) end, - 5000), + {ok, _ICL2} = ink_compactjournal(Ink1, + Checker2, + fun(X) -> {X, 55} end, + fun(_F) -> ok end, + fun(L, K, SQN) -> + lists:member({SQN, K}, L) + end, + 5000), timer:sleep(1000), CompactedManifest2 = ink_getmanifest(Ink1), {ok, PrefixTest} = re:compile(?COMPACT_FP), @@ -1475,12 +1479,12 @@ empty_manifest_test() -> CheckFun = fun(L, K, SQN) -> lists:member({SQN, key_converter(K)}, L) end, ?assertMatch(false, CheckFun([], "key", 1)), - ok = ink_compactjournal(Ink1, - [], - fun(X) -> {X, 55} end, - fun(_F) -> ok end, - CheckFun, - 5000), + {ok, _ICL1} = ink_compactjournal(Ink1, + [], + fun(X) -> {X, 55} end, + fun(_F) -> ok end, + CheckFun, + 5000), timer:sleep(1000), ?assertMatch(1, length(ink_getmanifest(Ink1))), ok = ink_close(Ink1), diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index efe4441..3e7b156 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -72,7 +72,6 @@ -include("include/leveled.hrl"). --define(MAX_SLOTS, 2). -define(LOOK_SLOTSIZE, 128). % Maximum of 128 -define(LOOK_BLOCKSIZE, {24, 32}). % 4x + y = ?LOOK_SLOTSIZE -define(NOLOOK_SLOTSIZE, 256). @@ -258,7 +257,7 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) -> PressMethod0 = compress_level(Level, OptsSST#sst_options.press_method), OptsSST0 = OptsSST#sst_options{press_method = PressMethod0}, {[], [], SlotList, FK} = - merge_lists(KVList, PressMethod0, IndexModDate), + merge_lists(KVList, OptsSST0, IndexModDate), case gen_fsm:sync_send_event(Pid, {sst_new, RootPath, @@ -309,7 +308,7 @@ sst_new(RootPath, Filename, OptsSST0 = OptsSST#sst_options{press_method = PressMethod0}, {Rem1, Rem2, SlotList, FK} = merge_lists(KVL1, KVL2, {IsBasement, Level}, - PressMethod0, IndexModDate), + OptsSST0, IndexModDate), case SlotList of [] -> empty; @@ -499,7 +498,7 @@ starting({sst_newlevelzero, RootPath, Filename, SW1 = os:timestamp(), {[], [], SlotList, FirstKey} = - merge_lists(KVList, PressMethod, IdxModDate), + merge_lists(KVList, OptsSST, IdxModDate), Time1 = timer:now_diff(os:timestamp(), SW1), SW2 = os:timestamp(), @@ -2131,16 +2130,17 @@ revert_position(Pos) -> %% there are matching keys then the highest sequence number must be chosen and %% any lower sequence numbers should be compacted out of existence --spec merge_lists(list(), press_method(), boolean()) +-spec merge_lists(list(), sst_options(), boolean()) -> {list(), list(), list(tuple()), tuple()|null}. %% @doc %% %% Merge from asingle list (i.e. at Level 0) -merge_lists(KVList1, PressMethod, IdxModDate) -> +merge_lists(KVList1, SSTOpts, IdxModDate) -> SlotCount = length(KVList1) div ?LOOK_SLOTSIZE, {[], [], - split_lists(KVList1, [], SlotCount, PressMethod, IdxModDate), + split_lists(KVList1, [], + SlotCount, SSTOpts#sst_options.press_method, IdxModDate), element(1, lists:nth(1, KVList1))}. @@ -2157,33 +2157,34 @@ split_lists(KVList1, SlotLists, N, PressMethod, IdxModDate) -> split_lists(KVListRem, [SlotD|SlotLists], N - 1, PressMethod, IdxModDate). --spec merge_lists(list(), list(), tuple(), press_method(), boolean()) -> +-spec merge_lists(list(), list(), tuple(), sst_options(), boolean()) -> {list(), list(), list(tuple()), tuple()|null}. %% @doc %% Merge lists when merging across more thna one file. KVLists that are %% provided may include pointers to fetch more Keys/Values from the source %% file -merge_lists(KVList1, KVList2, LevelInfo, PressMethod, IndexModDate) -> +merge_lists(KVList1, KVList2, LevelInfo, SSTOpts, IndexModDate) -> merge_lists(KVList1, KVList2, LevelInfo, [], null, 0, - PressMethod, + SSTOpts#sst_options.max_sstslots, + SSTOpts#sst_options.press_method, IndexModDate, #build_timings{}). -merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, ?MAX_SLOTS, +merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, MaxSlots, MaxSlots, _PressMethod, _IdxModDate, T0) -> % This SST file is full, move to complete file, and return the % remainder log_buildtimings(T0, LI), {KVL1, KVL2, lists:reverse(SlotList), FirstKey}; -merge_lists([], [], LI, SlotList, FirstKey, _SlotCount, +merge_lists([], [], LI, SlotList, FirstKey, _SlotCount, _MaxSlots, _PressMethod, _IdxModDate, T0) -> % the source files are empty, complete the file log_buildtimings(T0, LI), {[], [], lists:reverse(SlotList), FirstKey}; -merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, SlotCount, +merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, SlotCount, MaxSlots, PressMethod, IdxModDate, T0) -> % Form a slot by merging the two lists until the next 128 K/V pairs have % been determined @@ -2200,6 +2201,7 @@ merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, SlotCount, SlotList, FK0, SlotCount, + MaxSlots, PressMethod, IdxModDate, T1); @@ -2214,6 +2216,7 @@ merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, SlotCount, [SlotD|SlotList], FK0, SlotCount + 1, + MaxSlots, PressMethod, IdxModDate, T2) @@ -2560,7 +2563,8 @@ merge_tombstonelist_test() -> R = merge_lists([SkippingKV1, SkippingKV3, SkippingKV5], [SkippingKV2, SkippingKV4], {true, 9999999}, - native, + #sst_options{press_method = native, + max_sstslots = 256}, ?INDEX_MODDATE), ?assertMatch({[], [], [], null}, R). diff --git a/test/leveledjc_eqc.erl b/test/leveledjc_eqc.erl index 21639eb..c6bcb8c 100644 --- a/test/leveledjc_eqc.erl +++ b/test/leveledjc_eqc.erl @@ -76,7 +76,8 @@ init_backend_args(#{dir := Dir, sut := Name} = S) -> case maps:get(start_opts, S, undefined) of undefined -> [ default(?RIAK_TAG, ?STD_TAG), %% Just test one tag at a time - [{root_path, Dir}, {log_level, error}, {cache_size, 10}, {max_pencillercachesize, 40}, {max_journalsize, 20000} | gen_opts()], Name ]; + [{root_path, Dir}, {log_level, error}, + {max_sstslots, 2}, {cache_size, 10}, {max_pencillercachesize, 40}, {max_journalsize, 20000} | gen_opts()], Name ]; Opts -> %% root_path is part of existing options [ maps:get(tag, S), Opts, Name ] @@ -619,8 +620,7 @@ compact_adapt(#{leveled := Leveled}, [_Pid, TS]) -> [ Leveled, TS ]. compact(Pid, TS) -> - {R, _IClerk} = leveled_bookie:book_eqccompactjournal(Pid, TS), - R. + leveled_bookie:book_compactjournal(Pid, TS). compact_next(S, R, [_Pid, _TS]) -> case {R, maps:get(previous_compact, S, undefined)} of