Merge pull request #47 from martinsumner/mas-pushmem-i46
Mas pushmem i46
This commit is contained in:
commit
7f50540120
5 changed files with 620 additions and 269 deletions
|
@ -1181,7 +1181,7 @@ maybepush_ledgercache(MaxCacheSize, Cache, Penciller) ->
|
|||
TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize),
|
||||
if
|
||||
TimeToPush ->
|
||||
CacheToLoad = {leveled_tree:from_orderedset(Tab, ?CACHE_TYPE),
|
||||
CacheToLoad = {Tab,
|
||||
Cache#ledger_cache.index,
|
||||
Cache#ledger_cache.min_sqn,
|
||||
Cache#ledger_cache.max_sqn},
|
||||
|
|
|
@ -61,7 +61,8 @@
|
|||
generate_uuid/0,
|
||||
integer_now/0,
|
||||
riak_extract_metadata/2,
|
||||
magic_hash/1]).
|
||||
magic_hash/1,
|
||||
to_lookup/1]).
|
||||
|
||||
-define(V1_VERS, 1).
|
||||
-define(MAGIC, 53). % riak_kv -> riak_object
|
||||
|
@ -73,6 +74,14 @@
|
|||
%% what they are -
|
||||
%% http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function
|
||||
|
||||
to_lookup(Key) ->
|
||||
case element(1, Key) of
|
||||
?IDX_TAG ->
|
||||
no_lookup;
|
||||
_ ->
|
||||
lookup
|
||||
end.
|
||||
|
||||
magic_hash({?RIAK_TAG, Bucket, Key, _SubKey}) ->
|
||||
magic_hash({Bucket, Key});
|
||||
magic_hash({?STD_TAG, Bucket, Key, _SubKey}) ->
|
||||
|
@ -198,32 +207,41 @@ compact_inkerkvc({_InkerKey, crc_wonky, false}, _Strategy) ->
|
|||
compact_inkerkvc({{_SQN, ?INKT_TOMB, _LK}, _V, _CrcCheck}, _Strategy) ->
|
||||
skip;
|
||||
compact_inkerkvc({{SQN, ?INKT_KEYD, LK}, V, CrcCheck}, Strategy) ->
|
||||
{Tag, _, _, _} = LK,
|
||||
{Tag, TagStrat} = lists:keyfind(Tag, 1, Strategy),
|
||||
case TagStrat of
|
||||
case get_tagstrategy(LK, Strategy) of
|
||||
skip ->
|
||||
skip;
|
||||
retain ->
|
||||
{retain, {{SQN, ?INKT_KEYD, LK}, V, CrcCheck}};
|
||||
TagStrat ->
|
||||
{TagStrat, null}
|
||||
end;
|
||||
compact_inkerkvc({{SQN, ?INKT_STND, LK}, V, CrcCheck}, Strategy) ->
|
||||
{Tag, _, _, _} = LK,
|
||||
case lists:keyfind(Tag, 1, Strategy) of
|
||||
{Tag, TagStrat} ->
|
||||
case TagStrat of
|
||||
retain ->
|
||||
{_V, KeyDeltas} = split_inkvalue(V),
|
||||
{TagStrat, {{SQN, ?INKT_KEYD, LK}, {null, KeyDeltas}, CrcCheck}};
|
||||
TagStrat ->
|
||||
{TagStrat, null}
|
||||
end;
|
||||
false ->
|
||||
leveled_log:log("IC012", [Tag, Strategy]),
|
||||
skip
|
||||
case get_tagstrategy(LK, Strategy) of
|
||||
skip ->
|
||||
skip;
|
||||
retain ->
|
||||
{_V, KeyDeltas} = split_inkvalue(V),
|
||||
{retain, {{SQN, ?INKT_KEYD, LK}, {null, KeyDeltas}, CrcCheck}};
|
||||
TagStrat ->
|
||||
{TagStrat, null}
|
||||
end;
|
||||
compact_inkerkvc(_KVC, _Strategy) ->
|
||||
skip.
|
||||
|
||||
get_tagstrategy(LK, Strategy) ->
|
||||
case LK of
|
||||
{Tag, _, _, _} ->
|
||||
case lists:keyfind(Tag, 1, Strategy) of
|
||||
{Tag, TagStrat} ->
|
||||
TagStrat;
|
||||
false ->
|
||||
leveled_log:log("IC012", [Tag, Strategy]),
|
||||
skip
|
||||
end;
|
||||
_ ->
|
||||
skip
|
||||
end.
|
||||
|
||||
split_inkvalue(VBin) ->
|
||||
case is_binary(VBin) of
|
||||
true ->
|
||||
|
@ -429,6 +447,73 @@ endkey_passed_test() ->
|
|||
?assertMatch(true, endkey_passed(TestKey, K2)).
|
||||
|
||||
|
||||
corrupted_ledgerkey_test() ->
|
||||
% When testing for compacted journal which has been corrupted, there may
|
||||
% be a corruptes ledger key. Always skip these keys
|
||||
% Key has become a 3-tuple not a 4-tuple
|
||||
TagStrat1 = compact_inkerkvc({{1,
|
||||
?INKT_STND,
|
||||
{?STD_TAG, "B1", "K1andSK"}},
|
||||
{},
|
||||
true},
|
||||
[{?STD_TAG, retain}]),
|
||||
?assertMatch(skip, TagStrat1),
|
||||
TagStrat2 = compact_inkerkvc({{1,
|
||||
?INKT_KEYD,
|
||||
{?STD_TAG, "B1", "K1andSK"}},
|
||||
{},
|
||||
true},
|
||||
[{?STD_TAG, retain}]),
|
||||
?assertMatch(skip, TagStrat2).
|
||||
|
||||
general_skip_strategy_test() ->
|
||||
% Confirm that we will skip if the strategy says so
|
||||
TagStrat1 = compact_inkerkvc({{1,
|
||||
?INKT_STND,
|
||||
{?STD_TAG, "B1", "K1andSK"}},
|
||||
{},
|
||||
true},
|
||||
[{?STD_TAG, skip}]),
|
||||
?assertMatch(skip, TagStrat1),
|
||||
TagStrat2 = compact_inkerkvc({{1,
|
||||
?INKT_KEYD,
|
||||
{?STD_TAG, "B1", "K1andSK"}},
|
||||
{},
|
||||
true},
|
||||
[{?STD_TAG, skip}]),
|
||||
?assertMatch(skip, TagStrat2),
|
||||
TagStrat3 = compact_inkerkvc({{1,
|
||||
?INKT_KEYD,
|
||||
{?IDX_TAG, "B1", "K1", "SK"}},
|
||||
{},
|
||||
true},
|
||||
[{?STD_TAG, skip}]),
|
||||
?assertMatch(skip, TagStrat3),
|
||||
TagStrat4 = compact_inkerkvc({{1,
|
||||
?INKT_KEYD,
|
||||
{?IDX_TAG, "B1", "K1", "SK"}},
|
||||
{},
|
||||
true},
|
||||
[{?STD_TAG, skip}, {?IDX_TAG, recalc}]),
|
||||
?assertMatch({recalc, null}, TagStrat4),
|
||||
TagStrat5 = compact_inkerkvc({{1,
|
||||
?INKT_TOMB,
|
||||
{?IDX_TAG, "B1", "K1", "SK"}},
|
||||
{},
|
||||
true},
|
||||
[{?STD_TAG, skip}, {?IDX_TAG, recalc}]),
|
||||
?assertMatch(skip, TagStrat5).
|
||||
|
||||
corrupted_inker_tag_test() ->
|
||||
% Confirm that we will skip on unknown inker tag
|
||||
TagStrat1 = compact_inkerkvc({{1,
|
||||
foo,
|
||||
{?STD_TAG, "B1", "K1andSK"}},
|
||||
{},
|
||||
true},
|
||||
[{?STD_TAG, retain}]),
|
||||
?assertMatch(skip, TagStrat1).
|
||||
|
||||
%% Test below proved that the overhead of performing hashes was trivial
|
||||
%% Maybe 5 microseconds per hash
|
||||
|
||||
|
|
|
@ -166,7 +166,8 @@
|
|||
{"PC015",
|
||||
{info, "File created"}},
|
||||
{"PC016",
|
||||
{info, "Slow fetch from SFT ~w of ~w microseconds with result ~w"}},
|
||||
{info, "Slow fetch from SFT ~w of ~w microseconds at level ~w "
|
||||
++ "with result ~w"}},
|
||||
{"PC017",
|
||||
{info, "Notified clerk of manifest change"}},
|
||||
{"PC018",
|
||||
|
@ -259,14 +260,14 @@
|
|||
{error, "False result returned from SST with filename ~s as "
|
||||
++ "slot ~w has failed crc check"}},
|
||||
{"SST03",
|
||||
{info, "Opening SST file with filename ~s keys ~w slots ~w and"
|
||||
{info, "Opening SST file with filename ~s slots ~w and"
|
||||
++ " max sqn ~w"}},
|
||||
{"SST04",
|
||||
{info, "Exit called for reason ~w on filename ~s"}},
|
||||
{"SST05",
|
||||
{warn, "Rename rogue filename ~s to ~s"}},
|
||||
{"SST06",
|
||||
{info, "File ~s has been set for delete"}},
|
||||
{debug, "File ~s has been set for delete"}},
|
||||
{"SST07",
|
||||
{info, "Exit called and now clearing ~s"}},
|
||||
{"SST08",
|
||||
|
|
|
@ -341,7 +341,7 @@ init([PCLopts]) ->
|
|||
end.
|
||||
|
||||
|
||||
handle_call({push_mem, {PushedTree, PushedIdx, MinSQN, MaxSQN}},
|
||||
handle_call({push_mem, {LedgerTable, PushedIdx, MinSQN, MaxSQN}},
|
||||
From,
|
||||
State=#state{is_snapshot=Snap}) when Snap == false ->
|
||||
% The push_mem process is as follows:
|
||||
|
@ -370,6 +370,15 @@ handle_call({push_mem, {PushedTree, PushedIdx, MinSQN, MaxSQN}},
|
|||
{reply, returned, State};
|
||||
false ->
|
||||
leveled_log:log("P0018", [ok, false, false]),
|
||||
PushedTree =
|
||||
case is_tuple(LedgerTable) of
|
||||
true ->
|
||||
LedgerTable;
|
||||
false ->
|
||||
leveled_tree:from_orderedset(LedgerTable,
|
||||
?CACHE_TYPE)
|
||||
end,
|
||||
% Reply must happen after the table has been converted
|
||||
gen_server:reply(From, ok),
|
||||
{noreply,
|
||||
update_levelzero(State#state.levelzero_size,
|
||||
|
@ -853,7 +862,7 @@ fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
|
|||
L0Check = leveled_pmem:check_levelzero(Key, Hash, PosList, L0Cache),
|
||||
case L0Check of
|
||||
{false, not_found} ->
|
||||
fetch(Key, Hash, Manifest, 0, fun timed_sst_get/3);
|
||||
fetch(Key, Hash, Manifest, 0, fun timed_sst_get/4);
|
||||
{true, KV} ->
|
||||
{KV, 0}
|
||||
end.
|
||||
|
@ -865,7 +874,7 @@ fetch(Key, Hash, Manifest, Level, FetchFun) ->
|
|||
false ->
|
||||
fetch(Key, Hash, Manifest, Level + 1, FetchFun);
|
||||
FP ->
|
||||
case FetchFun(FP, Key, Hash) of
|
||||
case FetchFun(FP, Key, Hash, Level) of
|
||||
not_present ->
|
||||
fetch(Key, Hash, Manifest, Level + 1, FetchFun);
|
||||
ObjectFound ->
|
||||
|
@ -873,21 +882,21 @@ fetch(Key, Hash, Manifest, Level, FetchFun) ->
|
|||
end
|
||||
end.
|
||||
|
||||
timed_sst_get(PID, Key, Hash) ->
|
||||
timed_sst_get(PID, Key, Hash, Level) ->
|
||||
SW = os:timestamp(),
|
||||
R = leveled_sst:sst_get(PID, Key, Hash),
|
||||
T0 = timer:now_diff(os:timestamp(), SW),
|
||||
log_slowfetch(T0, R, PID, ?SLOW_FETCH).
|
||||
log_slowfetch(T0, R, PID, Level, ?SLOW_FETCH).
|
||||
|
||||
log_slowfetch(T0, R, PID, FetchTolerance) ->
|
||||
log_slowfetch(T0, R, PID, Level, FetchTolerance) ->
|
||||
case {T0, R} of
|
||||
{T, R} when T < FetchTolerance ->
|
||||
R;
|
||||
{T, not_present} ->
|
||||
leveled_log:log("PC016", [PID, T, not_present]),
|
||||
leveled_log:log("PC016", [PID, T, Level, not_present]),
|
||||
not_present;
|
||||
{T, R} ->
|
||||
leveled_log:log("PC016", [PID, T, found]),
|
||||
leveled_log:log("PC016", [PID, T, Level, found]),
|
||||
R
|
||||
end.
|
||||
|
||||
|
@ -1498,7 +1507,7 @@ create_file_test() ->
|
|||
?assertMatch("hello", binary_to_term(Bin)).
|
||||
|
||||
slow_fetch_test() ->
|
||||
?assertMatch(not_present, log_slowfetch(2, not_present, "fake", 1)).
|
||||
?assertMatch(not_present, log_slowfetch(2, not_present, "fake", 0, 1)).
|
||||
|
||||
checkready(Pid) ->
|
||||
try
|
||||
|
|
File diff suppressed because it is too large
Load diff
Loading…
Add table
Add a link
Reference in a new issue