commit
8388a147f2
4 changed files with 74 additions and 39 deletions
|
@ -129,6 +129,7 @@
|
|||
-define(SNAPTIMEOUT_LONG, 43200). % 12 hours
|
||||
-define(SST_PAGECACHELEVEL_NOLOOKUP, 1).
|
||||
-define(SST_PAGECACHELEVEL_LOOKUP, 4).
|
||||
-define(CACHE_LOGPOINT, 50000).
|
||||
-define(OPTION_DEFAULTS,
|
||||
[{root_path, undefined},
|
||||
{snapshot_bookie, undefined},
|
||||
|
@ -178,6 +179,7 @@
|
|||
get_countdown = 0 :: integer(),
|
||||
fold_countdown = 0 :: integer(),
|
||||
head_countdown = 0 :: integer(),
|
||||
cache_ratio = {0, 0, 0} :: cache_ratio(),
|
||||
get_timings = no_timing :: get_timings(),
|
||||
put_timings = no_timing :: put_timings(),
|
||||
fold_timings = no_timing :: fold_timings(),
|
||||
|
@ -210,6 +212,8 @@
|
|||
-type fold_timings() :: no_timing|#fold_timings{}.
|
||||
-type head_timings() :: no_timing|#head_timings{}.
|
||||
-type timing_types() :: head|get|put|fold.
|
||||
-type cache_ratio() ::
|
||||
{non_neg_integer(), non_neg_integer(), non_neg_integer()}.
|
||||
|
||||
|
||||
-type open_options() ::
|
||||
|
@ -1307,10 +1311,13 @@ handle_call({get, Bucket, Key, Tag}, _From, State)
|
|||
when State#state.head_only == false ->
|
||||
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
|
||||
SWh = os:timestamp(),
|
||||
{H0, UpdCR} =
|
||||
fetch_head(LedgerKey,
|
||||
State#state.penciller,
|
||||
State#state.ledger_cache,
|
||||
State#state.cache_ratio),
|
||||
HeadResult =
|
||||
case fetch_head(LedgerKey,
|
||||
State#state.penciller,
|
||||
State#state.ledger_cache) of
|
||||
case H0 of
|
||||
not_present ->
|
||||
not_found;
|
||||
Head ->
|
||||
|
@ -1347,16 +1354,22 @@ handle_call({get, Bucket, Key, Tag}, _From, State)
|
|||
end,
|
||||
{Timings, CountDown} =
|
||||
update_statetimings(get, Timings2, State#state.get_countdown),
|
||||
{reply, Reply, State#state{get_timings = Timings,
|
||||
get_countdown = CountDown}};
|
||||
{reply,
|
||||
Reply,
|
||||
State#state{get_timings = Timings,
|
||||
get_countdown = CountDown,
|
||||
cache_ratio =
|
||||
maybelog_cacheratio(UpdCR, State#state.is_snapshot)}};
|
||||
handle_call({head, Bucket, Key, Tag, SQNOnly}, _From, State)
|
||||
when State#state.head_lookup == true ->
|
||||
SWp = os:timestamp(),
|
||||
LK = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
|
||||
Head = fetch_head(LK,
|
||||
State#state.penciller,
|
||||
State#state.ledger_cache,
|
||||
State#state.head_only),
|
||||
{Head, UpdCR} =
|
||||
fetch_head(LK,
|
||||
State#state.penciller,
|
||||
State#state.ledger_cache,
|
||||
State#state.cache_ratio,
|
||||
State#state.head_only),
|
||||
{SWr, UpdTimingsP} =
|
||||
update_timings(SWp, {head, pcl}, State#state.head_timings),
|
||||
{LedgerMD, SQN, JournalCheckFrequency} =
|
||||
|
@ -1411,7 +1424,9 @@ handle_call({head, Bucket, Key, Tag, SQNOnly}, _From, State)
|
|||
Reply,
|
||||
State#state{head_timings = UpdTimings,
|
||||
head_countdown = CountDown,
|
||||
ink_checking = JournalCheckFrequency}};
|
||||
ink_checking = JournalCheckFrequency,
|
||||
cache_ratio =
|
||||
maybelog_cacheratio(UpdCR, State#state.is_snapshot)}};
|
||||
handle_call({snapshot, SnapType, Query, LongRunning}, _From, State) ->
|
||||
% Snapshot the store, specifying if the snapshot should be long running
|
||||
% (i.e. will the snapshot be queued or be required for an extended period
|
||||
|
@ -2069,20 +2084,24 @@ scan_table(Table, StartKey, EndKey, Acc, MinSQN, MaxSQN) ->
|
|||
end.
|
||||
|
||||
|
||||
-spec fetch_head(leveled_codec:ledger_key(), pid(), ledger_cache())
|
||||
-> not_present|leveled_codec:ledger_value().
|
||||
-spec fetch_head(leveled_codec:ledger_key(), pid(), ledger_cache(),
|
||||
cache_ratio()) ->
|
||||
{not_present|leveled_codec:ledger_value(),
|
||||
cache_ratio()}.
|
||||
%% @doc
|
||||
%% Fetch only the head of the object from the Ledger (or the bookie's recent
|
||||
%% ledger cache if it has just been updated). not_present is returned if the
|
||||
%% Key is not found
|
||||
fetch_head(Key, Penciller, LedgerCache) ->
|
||||
fetch_head(Key, Penciller, LedgerCache, false).
|
||||
fetch_head(Key, Penciller, LedgerCache, CacheRatio) ->
|
||||
fetch_head(Key, Penciller, LedgerCache, CacheRatio, false).
|
||||
|
||||
-spec fetch_head(leveled_codec:ledger_key(), pid(), ledger_cache(), boolean())
|
||||
-> not_present|leveled_codec:ledger_value().
|
||||
-spec fetch_head(leveled_codec:ledger_key(), pid(), ledger_cache(),
|
||||
cache_ratio(), boolean())
|
||||
-> {not_present|leveled_codec:ledger_value(),
|
||||
cache_ratio()}.
|
||||
%% doc
|
||||
%% The L0Index needs to be bypassed when running head_only
|
||||
fetch_head(Key, Penciller, LedgerCache, HeadOnly) ->
|
||||
fetch_head(Key, Penciller, LedgerCache, {RC, CC, HC}, HeadOnly) ->
|
||||
SW = os:timestamp(),
|
||||
CacheResult =
|
||||
case LedgerCache#ledger_cache.mem of
|
||||
|
@ -2093,7 +2112,7 @@ fetch_head(Key, Penciller, LedgerCache, HeadOnly) ->
|
|||
end,
|
||||
case CacheResult of
|
||||
[{Key, Head}] ->
|
||||
Head;
|
||||
{Head, {RC + 1, CC + 1, HC + 1}};
|
||||
[] ->
|
||||
Hash = leveled_codec:segment_hash(Key),
|
||||
UseL0Idx = not HeadOnly,
|
||||
|
@ -2102,10 +2121,10 @@ fetch_head(Key, Penciller, LedgerCache, HeadOnly) ->
|
|||
case leveled_penciller:pcl_fetch(Penciller, Key, Hash, UseL0Idx) of
|
||||
{Key, Head} ->
|
||||
maybe_longrunning(SW, pcl_head),
|
||||
Head;
|
||||
{Head, {RC + 1, CC, HC + 1}};
|
||||
not_present ->
|
||||
maybe_longrunning(SW, pcl_head),
|
||||
not_present
|
||||
{not_present, {RC + 1, CC, HC}}
|
||||
end
|
||||
end.
|
||||
|
||||
|
@ -2436,6 +2455,13 @@ update_timings(SW, {fold, setup}, Timings) ->
|
|||
Timings0 = Timings#fold_timings{setup_time = FST, sample_count = CNT},
|
||||
{no_timing, Timings0}.
|
||||
|
||||
|
||||
-spec maybelog_cacheratio(cache_ratio(), boolean()) -> cache_ratio().
|
||||
maybelog_cacheratio({?CACHE_LOGPOINT, CC, HC}, false) ->
|
||||
leveled_log:log("B0021", [?CACHE_LOGPOINT, CC, HC]),
|
||||
{0, 0, 0};
|
||||
maybelog_cacheratio(CR, _IsSnap) ->
|
||||
CR.
|
||||
%%%============================================================================
|
||||
%%% Test
|
||||
%%%============================================================================
|
||||
|
|
|
@ -63,7 +63,6 @@
|
|||
-endif.
|
||||
|
||||
-ifdef(slow_test).
|
||||
-define(KEYCOUNT, 2048).
|
||||
-define(SPECIAL_DELFUN, fun(_F) -> ok end).
|
||||
% There are problems with the pendingdelete_test/0 in riak make test
|
||||
% The deletion of the file causes the process to crash and the test to
|
||||
|
@ -71,7 +70,6 @@
|
|||
% Workaround this problem by not performing the delete when running unit
|
||||
% tests in R16
|
||||
-else.
|
||||
-define(KEYCOUNT, 16384).
|
||||
-define(SPECIAL_DELFUN, fun(F) -> file:delete(F) end).
|
||||
-endif.
|
||||
|
||||
|
@ -2341,18 +2339,19 @@ get_keys_byposition_simple_test() ->
|
|||
ok = file:delete(F2).
|
||||
|
||||
generate_sequentialkeys(0, KVList) ->
|
||||
lists:reverse(KVList);
|
||||
KVList;
|
||||
generate_sequentialkeys(Count, KVList) ->
|
||||
KV = {"Key" ++ integer_to_list(Count), "Value" ++ integer_to_list(Count)},
|
||||
generate_sequentialkeys(Count - 1, KVList ++ [KV]).
|
||||
generate_sequentialkeys(Count - 1, [KV|KVList]).
|
||||
|
||||
get_keys_byposition_manykeys_test_() ->
|
||||
{timeout, 600, fun get_keys_byposition_manykeys_test_to/0}.
|
||||
|
||||
get_keys_byposition_manykeys_test_to() ->
|
||||
KeyCount = ?KEYCOUNT,
|
||||
KeyCount = 16384,
|
||||
{ok, P1} = cdb_open_writer("test/test_area/poskeymany.pnd",
|
||||
#cdb_options{binary_mode=false}),
|
||||
#cdb_options{binary_mode=false,
|
||||
sync_strategy=none}),
|
||||
KVList = generate_sequentialkeys(KeyCount, []),
|
||||
lists:foreach(fun({K, V}) -> cdb_put(P1, K, V) end, KVList),
|
||||
ok = cdb_roll(P1),
|
||||
|
|
|
@ -80,6 +80,9 @@
|
|||
{"B0020",
|
||||
{warn, "Ratio of penciller cache size ~w to bookie's memory "
|
||||
++ "cache size ~w is larger than expected"}},
|
||||
{"B0021",
|
||||
{info, "Bookie fetch RequestCount=~w and CacheCount=~w and "
|
||||
++ "ObjectFoundCount=~w"}},
|
||||
|
||||
{"R0001",
|
||||
{debug, "Object fold to process batch of ~w objects"}},
|
||||
|
|
|
@ -603,6 +603,7 @@ busted_journal_test(MaxJournalSize, PressMethod, PressPoint, Bust) ->
|
|||
|
||||
allkeydelta_journal_multicompact(_Config) ->
|
||||
RootPath = testutil:reset_filestructure(),
|
||||
CompPath = filename:join(RootPath, "journal/journal_files/post_compact"),
|
||||
B = <<"test_bucket">>,
|
||||
StartOptsFun =
|
||||
fun(JOC) ->
|
||||
|
@ -621,14 +622,10 @@ allkeydelta_journal_multicompact(_Config) ->
|
|||
false),
|
||||
compact_and_wait(Bookie1, 0),
|
||||
compact_and_wait(Bookie1, 0),
|
||||
{ok, FileList1} =
|
||||
file:list_dir(
|
||||
filename:join(RootPath, "journal/journal_files/post_compact")),
|
||||
{ok, FileList1} = file:list_dir(CompPath),
|
||||
io:format("Number of files after compaction ~w~n", [length(FileList1)]),
|
||||
compact_and_wait(Bookie1, 0),
|
||||
{ok, FileList2} =
|
||||
file:list_dir(
|
||||
filename:join(RootPath, "journal/journal_files/post_compact")),
|
||||
{ok, FileList2} = file:list_dir(CompPath),
|
||||
io:format("Number of files after compaction ~w~n", [length(FileList2)]),
|
||||
true = FileList1 == FileList2,
|
||||
|
||||
|
@ -652,9 +649,7 @@ allkeydelta_journal_multicompact(_Config) ->
|
|||
KSpcL2,
|
||||
false),
|
||||
compact_and_wait(Bookie2, 0),
|
||||
{ok, FileList3} =
|
||||
file:list_dir(
|
||||
filename:join(RootPath, "journal/journal_files/post_compact")),
|
||||
{ok, FileList3} = file:list_dir(CompPath),
|
||||
io:format("Number of files after compaction ~w~n", [length(FileList3)]),
|
||||
|
||||
ok = leveled_bookie:book_close(Bookie2),
|
||||
|
@ -673,13 +668,25 @@ allkeydelta_journal_multicompact(_Config) ->
|
|||
B,
|
||||
KSpcL1 ++ KSpcL2 ++ KSpcL3 ++ KSpcL4,
|
||||
V4),
|
||||
{ok, FileList4} =
|
||||
file:list_dir(
|
||||
filename:join(RootPath, "journal/journal_files/post_compact")),
|
||||
{ok, FileList4} = file:list_dir(CompPath),
|
||||
io:format("Number of files after compaction ~w~n", [length(FileList4)]),
|
||||
true = length(FileList4) >= length(FileList3) + 3,
|
||||
|
||||
ok = leveled_bookie:book_close(Bookie3),
|
||||
|
||||
NewlyCompactedFiles = lists:subtract(FileList4, FileList3),
|
||||
true = length(NewlyCompactedFiles) >= 3,
|
||||
CDBFilterFun = fun(_K, _V, _P, Acc, _EF) -> {loop, Acc + 1} end,
|
||||
CheckLengthFun =
|
||||
fun(FN) ->
|
||||
{ok, CF} =
|
||||
leveled_cdb:cdb_open_reader(filename:join(CompPath, FN)),
|
||||
{_LP, TK} =
|
||||
leveled_cdb:cdb_scan(CF, CDBFilterFun, 0, undefined),
|
||||
io:format("File ~s has ~w keys~n", [FN, TK]),
|
||||
true = TK =< 7000
|
||||
end,
|
||||
lists:foreach(CheckLengthFun, NewlyCompactedFiles),
|
||||
|
||||
testutil:reset_filestructure(10000).
|
||||
|
||||
recompact_keydeltas(_Config) ->
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue