Handle timeout/message race
When there is hevay PUT load, leveled_sst files could go into the delete-pending state befre the GC message is receieved - and the GC message would then interrupt the timeout cycle and lead ot the file not being GC'd until close.
This commit is contained in:
parent
984a2aba5a
commit
744a521289
1 changed files with 46 additions and 16 deletions
|
@ -180,7 +180,7 @@
|
||||||
-record(state,
|
-record(state,
|
||||||
{summary,
|
{summary,
|
||||||
handle :: file:fd() | undefined,
|
handle :: file:fd() | undefined,
|
||||||
penciller :: pid() | undefined,
|
penciller :: pid() | undefined | false,
|
||||||
root_path,
|
root_path,
|
||||||
filename,
|
filename,
|
||||||
yield_blockquery = false :: boolean(),
|
yield_blockquery = false :: boolean(),
|
||||||
|
@ -762,9 +762,13 @@ delete_pending(close, _From, State) ->
|
||||||
{stop, normal, ok, State}.
|
{stop, normal, ok, State}.
|
||||||
|
|
||||||
delete_pending(timeout, State) ->
|
delete_pending(timeout, State) ->
|
||||||
ok = leveled_penciller:pcl_confirmdelete(State#state.penciller,
|
case State#state.penciller of
|
||||||
State#state.filename,
|
false ->
|
||||||
self()),
|
ok = leveled_sst:sst_deleteconfirmed(self());
|
||||||
|
PCL ->
|
||||||
|
FN = State#state.filename,
|
||||||
|
ok = leveled_penciller:pcl_confirmdelete(PCL, FN, self())
|
||||||
|
end,
|
||||||
% If the next thing is another timeout - may be long-running snapshot, so
|
% If the next thing is another timeout - may be long-running snapshot, so
|
||||||
% back-off
|
% back-off
|
||||||
{next_state, delete_pending, State, leveled_rand:uniform(10) * ?DELETE_TIMEOUT};
|
{next_state, delete_pending, State, leveled_rand:uniform(10) * ?DELETE_TIMEOUT};
|
||||||
|
@ -781,6 +785,10 @@ handle_sync_event(_Msg, _From, StateName, State) ->
|
||||||
handle_event(_Msg, StateName, State) ->
|
handle_event(_Msg, StateName, State) ->
|
||||||
{next_state, StateName, State}.
|
{next_state, StateName, State}.
|
||||||
|
|
||||||
|
handle_info(tidyup_after_startup, delete_pending, State) ->
|
||||||
|
% No need to GC, this file is to be shutdown. This message may have
|
||||||
|
% interrupted the delete timeout, so timeout straight away
|
||||||
|
{next_state, delete_pending, State, 0};
|
||||||
handle_info(tidyup_after_startup, StateName, State) ->
|
handle_info(tidyup_after_startup, StateName, State) ->
|
||||||
case is_process_alive(State#state.starting_pid) of
|
case is_process_alive(State#state.starting_pid) of
|
||||||
true ->
|
true ->
|
||||||
|
@ -2583,6 +2591,8 @@ update_timings(SW, Timings, Stage, Continue) ->
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
|
|
||||||
|
-define(TEST_AREA, "test/test_area/").
|
||||||
|
|
||||||
testsst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod) ->
|
testsst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod) ->
|
||||||
OptsSST =
|
OptsSST =
|
||||||
#sst_options{press_method=PressMethod,
|
#sst_options{press_method=PressMethod,
|
||||||
|
@ -2935,6 +2945,7 @@ test_binary_slot(FullBin, Key, Hash, ExpectedValue) ->
|
||||||
|
|
||||||
|
|
||||||
merge_test() ->
|
merge_test() ->
|
||||||
|
filelib:ensure_dir(?TEST_AREA),
|
||||||
merge_tester(fun testsst_new/6, fun testsst_new/8).
|
merge_tester(fun testsst_new/6, fun testsst_new/8).
|
||||||
|
|
||||||
|
|
||||||
|
@ -2945,9 +2956,9 @@ merge_tester(NewFunS, NewFunM) ->
|
||||||
KVL3 = lists:ukeymerge(1, KVL1, KVL2),
|
KVL3 = lists:ukeymerge(1, KVL1, KVL2),
|
||||||
SW0 = os:timestamp(),
|
SW0 = os:timestamp(),
|
||||||
{ok, P1, {FK1, LK1}, _Bloom1} =
|
{ok, P1, {FK1, LK1}, _Bloom1} =
|
||||||
NewFunS("test/test_area/", "level1_src", 1, KVL1, 6000, native),
|
NewFunS(?TEST_AREA, "level1_src", 1, KVL1, 6000, native),
|
||||||
{ok, P2, {FK2, LK2}, _Bloom2} =
|
{ok, P2, {FK2, LK2}, _Bloom2} =
|
||||||
NewFunS("test/test_area/", "level2_src", 2, KVL2, 3000, native),
|
NewFunS(?TEST_AREA, "level2_src", 2, KVL2, 3000, native),
|
||||||
ExpFK1 = element(1, lists:nth(1, KVL1)),
|
ExpFK1 = element(1, lists:nth(1, KVL1)),
|
||||||
ExpLK1 = element(1, lists:last(KVL1)),
|
ExpLK1 = element(1, lists:last(KVL1)),
|
||||||
ExpFK2 = element(1, lists:nth(1, KVL2)),
|
ExpFK2 = element(1, lists:nth(1, KVL2)),
|
||||||
|
@ -2959,7 +2970,7 @@ merge_tester(NewFunS, NewFunM) ->
|
||||||
ML1 = [{next, #manifest_entry{owner = P1}, FK1}],
|
ML1 = [{next, #manifest_entry{owner = P1}, FK1}],
|
||||||
ML2 = [{next, #manifest_entry{owner = P2}, FK2}],
|
ML2 = [{next, #manifest_entry{owner = P2}, FK2}],
|
||||||
NewR =
|
NewR =
|
||||||
NewFunM("test/test_area/", "level2_merge", ML1, ML2, false, 2, N * 2, native),
|
NewFunM(?TEST_AREA, "level2_merge", ML1, ML2, false, 2, N * 2, native),
|
||||||
{ok, P3, {{Rem1, Rem2}, FK3, LK3}, _Bloom3} = NewR,
|
{ok, P3, {{Rem1, Rem2}, FK3, LK3}, _Bloom3} = NewR,
|
||||||
?assertMatch([], Rem1),
|
?assertMatch([], Rem1),
|
||||||
?assertMatch([], Rem2),
|
?assertMatch([], Rem2),
|
||||||
|
@ -2982,16 +2993,16 @@ merge_tester(NewFunS, NewFunM) ->
|
||||||
ok = sst_close(P1),
|
ok = sst_close(P1),
|
||||||
ok = sst_close(P2),
|
ok = sst_close(P2),
|
||||||
ok = sst_close(P3),
|
ok = sst_close(P3),
|
||||||
ok = file:delete("test/test_area/level1_src.sst"),
|
ok = file:delete(?TEST_AREA ++ "/level1_src.sst"),
|
||||||
ok = file:delete("test/test_area/level2_src.sst"),
|
ok = file:delete(?TEST_AREA ++ "/level2_src.sst"),
|
||||||
ok = file:delete("test/test_area/level2_merge.sst").
|
ok = file:delete(?TEST_AREA ++ "/level2_merge.sst").
|
||||||
|
|
||||||
|
|
||||||
simple_persisted_range_test() ->
|
simple_persisted_range_test() ->
|
||||||
simple_persisted_range_tester(fun testsst_new/6).
|
simple_persisted_range_tester(fun testsst_new/6).
|
||||||
|
|
||||||
simple_persisted_range_tester(SSTNewFun) ->
|
simple_persisted_range_tester(SSTNewFun) ->
|
||||||
{RP, Filename} = {"test/test_area/", "simple_test"},
|
{RP, Filename} = {?TEST_AREA, "simple_test"},
|
||||||
KVList0 = generate_randomkeys(1, ?LOOK_SLOTSIZE * 16, 1, 20),
|
KVList0 = generate_randomkeys(1, ?LOOK_SLOTSIZE * 16, 1, 20),
|
||||||
KVList1 = lists:ukeysort(1, KVList0),
|
KVList1 = lists:ukeysort(1, KVList0),
|
||||||
[{FirstKey, _FV}|_Rest] = KVList1,
|
[{FirstKey, _FV}|_Rest] = KVList1,
|
||||||
|
@ -3033,7 +3044,7 @@ simple_persisted_rangesegfilter_test() ->
|
||||||
simple_persisted_rangesegfilter_tester(fun testsst_new/6).
|
simple_persisted_rangesegfilter_tester(fun testsst_new/6).
|
||||||
|
|
||||||
simple_persisted_rangesegfilter_tester(SSTNewFun) ->
|
simple_persisted_rangesegfilter_tester(SSTNewFun) ->
|
||||||
{RP, Filename} = {"test/test_area/", "range_segfilter_test"},
|
{RP, Filename} = {?TEST_AREA, "range_segfilter_test"},
|
||||||
KVList0 = generate_randomkeys(1, ?LOOK_SLOTSIZE * 16, 1, 20),
|
KVList0 = generate_randomkeys(1, ?LOOK_SLOTSIZE * 16, 1, 20),
|
||||||
KVList1 = lists:ukeysort(1, KVList0),
|
KVList1 = lists:ukeysort(1, KVList0),
|
||||||
[{FirstKey, _FV}|_Rest] = KVList1,
|
[{FirstKey, _FV}|_Rest] = KVList1,
|
||||||
|
@ -3125,7 +3136,7 @@ additional_range_test() ->
|
||||||
lists:seq(?NOLOOK_SLOTSIZE + Gap + 1,
|
lists:seq(?NOLOOK_SLOTSIZE + Gap + 1,
|
||||||
2 * ?NOLOOK_SLOTSIZE + Gap)),
|
2 * ?NOLOOK_SLOTSIZE + Gap)),
|
||||||
{ok, P1, {{Rem1, Rem2}, SK, EK}, _Bloom1} =
|
{ok, P1, {{Rem1, Rem2}, SK, EK}, _Bloom1} =
|
||||||
testsst_new("test/test_area/", "range1_src", IK1, IK2, false, 1, 9999, native),
|
testsst_new(?TEST_AREA, "range1_src", IK1, IK2, false, 1, 9999, native),
|
||||||
?assertMatch([], Rem1),
|
?assertMatch([], Rem1),
|
||||||
?assertMatch([], Rem2),
|
?assertMatch([], Rem2),
|
||||||
?assertMatch(SK, element(1, lists:nth(1, IK1))),
|
?assertMatch(SK, element(1, lists:nth(1, IK1))),
|
||||||
|
@ -3180,7 +3191,7 @@ simple_persisted_slotsize_test() ->
|
||||||
|
|
||||||
|
|
||||||
simple_persisted_slotsize_tester(SSTNewFun) ->
|
simple_persisted_slotsize_tester(SSTNewFun) ->
|
||||||
{RP, Filename} = {"test/test_area/", "simple_slotsize_test"},
|
{RP, Filename} = {?TEST_AREA, "simple_slotsize_test"},
|
||||||
KVList0 = generate_randomkeys(1, ?LOOK_SLOTSIZE * 2, 1, 20),
|
KVList0 = generate_randomkeys(1, ?LOOK_SLOTSIZE * 2, 1, 20),
|
||||||
KVList1 = lists:sublist(lists:ukeysort(1, KVList0),
|
KVList1 = lists:sublist(lists:ukeysort(1, KVList0),
|
||||||
?LOOK_SLOTSIZE),
|
?LOOK_SLOTSIZE),
|
||||||
|
@ -3195,6 +3206,25 @@ simple_persisted_slotsize_tester(SSTNewFun) ->
|
||||||
ok = sst_close(Pid),
|
ok = sst_close(Pid),
|
||||||
ok = file:delete(filename:join(RP, Filename ++ ".sst")).
|
ok = file:delete(filename:join(RP, Filename ++ ".sst")).
|
||||||
|
|
||||||
|
|
||||||
|
delete_pending_test_() ->
|
||||||
|
{timeout, 30, fun delete_pending_tester/0}.
|
||||||
|
|
||||||
|
delete_pending_tester() ->
|
||||||
|
% Confirm no race condition between the GC call and the delete timeout
|
||||||
|
{RP, Filename} = {?TEST_AREA, "deletepending_test"},
|
||||||
|
KVList0 = generate_randomkeys(1, ?LOOK_SLOTSIZE * 32, 1, 20),
|
||||||
|
KVList1 = lists:ukeysort(1, KVList0),
|
||||||
|
[{FirstKey, _FV}|_Rest] = KVList1,
|
||||||
|
{LastKey, _LV} = lists:last(KVList1),
|
||||||
|
{ok, Pid, {FirstKey, LastKey}, _Bloom} =
|
||||||
|
testsst_new(RP, Filename, 1, KVList1, length(KVList1), native),
|
||||||
|
timer:sleep(2000),
|
||||||
|
leveled_sst:sst_setfordelete(Pid, false),
|
||||||
|
timer:sleep(?DELETE_TIMEOUT + 1000),
|
||||||
|
?assertMatch(false, is_process_alive(Pid)).
|
||||||
|
|
||||||
|
|
||||||
simple_persisted_test_() ->
|
simple_persisted_test_() ->
|
||||||
{timeout, 60, fun simple_persisted_test_bothformats/0}.
|
{timeout, 60, fun simple_persisted_test_bothformats/0}.
|
||||||
|
|
||||||
|
@ -3202,7 +3232,7 @@ simple_persisted_test_bothformats() ->
|
||||||
simple_persisted_tester(fun testsst_new/6).
|
simple_persisted_tester(fun testsst_new/6).
|
||||||
|
|
||||||
simple_persisted_tester(SSTNewFun) ->
|
simple_persisted_tester(SSTNewFun) ->
|
||||||
{RP, Filename} = {"test/test_area/", "simple_test"},
|
{RP, Filename} = {?TEST_AREA, "simple_test"},
|
||||||
KVList0 = generate_randomkeys(1, ?LOOK_SLOTSIZE * 32, 1, 20),
|
KVList0 = generate_randomkeys(1, ?LOOK_SLOTSIZE * 32, 1, 20),
|
||||||
KVList1 = lists:ukeysort(1, KVList0),
|
KVList1 = lists:ukeysort(1, KVList0),
|
||||||
[{FirstKey, _FV}|_Rest] = KVList1,
|
[{FirstKey, _FV}|_Rest] = KVList1,
|
||||||
|
@ -3471,7 +3501,7 @@ start_sst_fun(ProcessToInform) ->
|
||||||
#sst_options{press_method=native,
|
#sst_options{press_method=native,
|
||||||
log_options=leveled_log:get_opts()},
|
log_options=leveled_log:get_opts()},
|
||||||
{ok, P1, {_FK1, _LK1}, _Bloom1} =
|
{ok, P1, {_FK1, _LK1}, _Bloom1} =
|
||||||
sst_new("test/test_area/", "level1_src", 1, KVL1, 6000, OptsSST),
|
sst_new(?TEST_AREA, "level1_src", 1, KVL1, 6000, OptsSST),
|
||||||
ProcessToInform ! {sst_pid, P1}.
|
ProcessToInform ! {sst_pid, P1}.
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue