diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 3fbf027..63b22f1 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -129,6 +129,7 @@ -define(SNAPTIMEOUT_LONG, 43200). % 12 hours -define(SST_PAGECACHELEVEL_NOLOOKUP, 1). -define(SST_PAGECACHELEVEL_LOOKUP, 4). +-define(CACHE_LOGPOINT, 50000). -define(OPTION_DEFAULTS, [{root_path, undefined}, {snapshot_bookie, undefined}, @@ -178,6 +179,7 @@ get_countdown = 0 :: integer(), fold_countdown = 0 :: integer(), head_countdown = 0 :: integer(), + cache_ratio = {0, 0, 0} :: cache_ratio(), get_timings = no_timing :: get_timings(), put_timings = no_timing :: put_timings(), fold_timings = no_timing :: fold_timings(), @@ -210,6 +212,8 @@ -type fold_timings() :: no_timing|#fold_timings{}. -type head_timings() :: no_timing|#head_timings{}. -type timing_types() :: head|get|put|fold. +-type cache_ratio() :: + {non_neg_integer(), non_neg_integer(), non_neg_integer()}. -type open_options() :: @@ -1307,10 +1311,13 @@ handle_call({get, Bucket, Key, Tag}, _From, State) when State#state.head_only == false -> LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), SWh = os:timestamp(), + {H0, UpdCR} = + fetch_head(LedgerKey, + State#state.penciller, + State#state.ledger_cache, + State#state.cache_ratio), HeadResult = - case fetch_head(LedgerKey, - State#state.penciller, - State#state.ledger_cache) of + case H0 of not_present -> not_found; Head -> @@ -1347,16 +1354,22 @@ handle_call({get, Bucket, Key, Tag}, _From, State) end, {Timings, CountDown} = update_statetimings(get, Timings2, State#state.get_countdown), - {reply, Reply, State#state{get_timings = Timings, - get_countdown = CountDown}}; + {reply, + Reply, + State#state{get_timings = Timings, + get_countdown = CountDown, + cache_ratio = + maybelog_cacheratio(UpdCR, State#state.is_snapshot)}}; handle_call({head, Bucket, Key, Tag, SQNOnly}, _From, State) when State#state.head_lookup == true -> SWp = os:timestamp(), LK = leveled_codec:to_ledgerkey(Bucket, Key, Tag), - Head = fetch_head(LK, - State#state.penciller, - State#state.ledger_cache, - State#state.head_only), + {Head, UpdCR} = + fetch_head(LK, + State#state.penciller, + State#state.ledger_cache, + State#state.cache_ratio, + State#state.head_only), {SWr, UpdTimingsP} = update_timings(SWp, {head, pcl}, State#state.head_timings), {LedgerMD, SQN, JournalCheckFrequency} = @@ -1411,7 +1424,9 @@ handle_call({head, Bucket, Key, Tag, SQNOnly}, _From, State) Reply, State#state{head_timings = UpdTimings, head_countdown = CountDown, - ink_checking = JournalCheckFrequency}}; + ink_checking = JournalCheckFrequency, + cache_ratio = + maybelog_cacheratio(UpdCR, State#state.is_snapshot)}}; handle_call({snapshot, SnapType, Query, LongRunning}, _From, State) -> % Snapshot the store, specifying if the snapshot should be long running % (i.e. will the snapshot be queued or be required for an extended period @@ -2069,20 +2084,24 @@ scan_table(Table, StartKey, EndKey, Acc, MinSQN, MaxSQN) -> end. --spec fetch_head(leveled_codec:ledger_key(), pid(), ledger_cache()) - -> not_present|leveled_codec:ledger_value(). +-spec fetch_head(leveled_codec:ledger_key(), pid(), ledger_cache(), + cache_ratio()) -> + {not_present|leveled_codec:ledger_value(), + cache_ratio()}. %% @doc %% Fetch only the head of the object from the Ledger (or the bookie's recent %% ledger cache if it has just been updated). not_present is returned if the %% Key is not found -fetch_head(Key, Penciller, LedgerCache) -> - fetch_head(Key, Penciller, LedgerCache, false). +fetch_head(Key, Penciller, LedgerCache, CacheRatio) -> + fetch_head(Key, Penciller, LedgerCache, CacheRatio, false). --spec fetch_head(leveled_codec:ledger_key(), pid(), ledger_cache(), boolean()) - -> not_present|leveled_codec:ledger_value(). +-spec fetch_head(leveled_codec:ledger_key(), pid(), ledger_cache(), + cache_ratio(), boolean()) + -> {not_present|leveled_codec:ledger_value(), + cache_ratio()}. %% doc %% The L0Index needs to be bypassed when running head_only -fetch_head(Key, Penciller, LedgerCache, HeadOnly) -> +fetch_head(Key, Penciller, LedgerCache, {RC, CC, HC}, HeadOnly) -> SW = os:timestamp(), CacheResult = case LedgerCache#ledger_cache.mem of @@ -2093,7 +2112,7 @@ fetch_head(Key, Penciller, LedgerCache, HeadOnly) -> end, case CacheResult of [{Key, Head}] -> - Head; + {Head, {RC + 1, CC + 1, HC + 1}}; [] -> Hash = leveled_codec:segment_hash(Key), UseL0Idx = not HeadOnly, @@ -2102,10 +2121,10 @@ fetch_head(Key, Penciller, LedgerCache, HeadOnly) -> case leveled_penciller:pcl_fetch(Penciller, Key, Hash, UseL0Idx) of {Key, Head} -> maybe_longrunning(SW, pcl_head), - Head; + {Head, {RC + 1, CC, HC + 1}}; not_present -> maybe_longrunning(SW, pcl_head), - not_present + {not_present, {RC + 1, CC, HC}} end end. @@ -2436,6 +2455,13 @@ update_timings(SW, {fold, setup}, Timings) -> Timings0 = Timings#fold_timings{setup_time = FST, sample_count = CNT}, {no_timing, Timings0}. + +-spec maybelog_cacheratio(cache_ratio(), boolean()) -> cache_ratio(). +maybelog_cacheratio({?CACHE_LOGPOINT, CC, HC}, false) -> + leveled_log:log("B0021", [?CACHE_LOGPOINT, CC, HC]), + {0, 0, 0}; +maybelog_cacheratio(CR, _IsSnap) -> + CR. %%%============================================================================ %%% Test %%%============================================================================ diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index a2ff728..49418ff 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -63,7 +63,6 @@ -endif. -ifdef(slow_test). --define(KEYCOUNT, 2048). -define(SPECIAL_DELFUN, fun(_F) -> ok end). % There are problems with the pendingdelete_test/0 in riak make test % The deletion of the file causes the process to crash and the test to @@ -71,7 +70,6 @@ % Workaround this problem by not performing the delete when running unit % tests in R16 -else. --define(KEYCOUNT, 16384). -define(SPECIAL_DELFUN, fun(F) -> file:delete(F) end). -endif. @@ -2341,18 +2339,19 @@ get_keys_byposition_simple_test() -> ok = file:delete(F2). generate_sequentialkeys(0, KVList) -> - lists:reverse(KVList); + KVList; generate_sequentialkeys(Count, KVList) -> KV = {"Key" ++ integer_to_list(Count), "Value" ++ integer_to_list(Count)}, - generate_sequentialkeys(Count - 1, KVList ++ [KV]). + generate_sequentialkeys(Count - 1, [KV|KVList]). get_keys_byposition_manykeys_test_() -> {timeout, 600, fun get_keys_byposition_manykeys_test_to/0}. get_keys_byposition_manykeys_test_to() -> - KeyCount = ?KEYCOUNT, + KeyCount = 16384, {ok, P1} = cdb_open_writer("test/test_area/poskeymany.pnd", - #cdb_options{binary_mode=false}), + #cdb_options{binary_mode=false, + sync_strategy=none}), KVList = generate_sequentialkeys(KeyCount, []), lists:foreach(fun({K, V}) -> cdb_put(P1, K, V) end, KVList), ok = cdb_roll(P1), diff --git a/src/leveled_log.erl b/src/leveled_log.erl index 1a78d36..27344dc 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -80,6 +80,9 @@ {"B0020", {warn, "Ratio of penciller cache size ~w to bookie's memory " ++ "cache size ~w is larger than expected"}}, + {"B0021", + {info, "Bookie fetch RequestCount=~w and CacheCount=~w and " + ++ "ObjectFoundCount=~w"}}, {"R0001", {debug, "Object fold to process batch of ~w objects"}}, diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index d832c7d..a3890f8 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -603,6 +603,7 @@ busted_journal_test(MaxJournalSize, PressMethod, PressPoint, Bust) -> allkeydelta_journal_multicompact(_Config) -> RootPath = testutil:reset_filestructure(), + CompPath = filename:join(RootPath, "journal/journal_files/post_compact"), B = <<"test_bucket">>, StartOptsFun = fun(JOC) -> @@ -621,14 +622,10 @@ allkeydelta_journal_multicompact(_Config) -> false), compact_and_wait(Bookie1, 0), compact_and_wait(Bookie1, 0), - {ok, FileList1} = - file:list_dir( - filename:join(RootPath, "journal/journal_files/post_compact")), + {ok, FileList1} = file:list_dir(CompPath), io:format("Number of files after compaction ~w~n", [length(FileList1)]), compact_and_wait(Bookie1, 0), - {ok, FileList2} = - file:list_dir( - filename:join(RootPath, "journal/journal_files/post_compact")), + {ok, FileList2} = file:list_dir(CompPath), io:format("Number of files after compaction ~w~n", [length(FileList2)]), true = FileList1 == FileList2, @@ -652,9 +649,7 @@ allkeydelta_journal_multicompact(_Config) -> KSpcL2, false), compact_and_wait(Bookie2, 0), - {ok, FileList3} = - file:list_dir( - filename:join(RootPath, "journal/journal_files/post_compact")), + {ok, FileList3} = file:list_dir(CompPath), io:format("Number of files after compaction ~w~n", [length(FileList3)]), ok = leveled_bookie:book_close(Bookie2), @@ -673,13 +668,25 @@ allkeydelta_journal_multicompact(_Config) -> B, KSpcL1 ++ KSpcL2 ++ KSpcL3 ++ KSpcL4, V4), - {ok, FileList4} = - file:list_dir( - filename:join(RootPath, "journal/journal_files/post_compact")), + {ok, FileList4} = file:list_dir(CompPath), io:format("Number of files after compaction ~w~n", [length(FileList4)]), - true = length(FileList4) >= length(FileList3) + 3, ok = leveled_bookie:book_close(Bookie3), + + NewlyCompactedFiles = lists:subtract(FileList4, FileList3), + true = length(NewlyCompactedFiles) >= 3, + CDBFilterFun = fun(_K, _V, _P, Acc, _EF) -> {loop, Acc + 1} end, + CheckLengthFun = + fun(FN) -> + {ok, CF} = + leveled_cdb:cdb_open_reader(filename:join(CompPath, FN)), + {_LP, TK} = + leveled_cdb:cdb_scan(CF, CDBFilterFun, 0, undefined), + io:format("File ~s has ~w keys~n", [FN, TK]), + true = TK =< 7000 + end, + lists:foreach(CheckLengthFun, NewlyCompactedFiles), + testutil:reset_filestructure(10000). recompact_keydeltas(_Config) ->