Shift pause out of leveled
Leveled will now signal the need for a pause due to back-pressure, but not actually pause itself. The hope is that in a riak implementation this pause can be managed by the put_fsm, and so not lock the store.
This commit is contained in:
parent
4583460328
commit
37c23a5b38
4 changed files with 28 additions and 44 deletions
|
@ -157,7 +157,6 @@
|
||||||
-define(LEDGER_FP, "ledger").
|
-define(LEDGER_FP, "ledger").
|
||||||
-define(SNAPSHOT_TIMEOUT, 300000).
|
-define(SNAPSHOT_TIMEOUT, 300000).
|
||||||
-define(CHECKJOURNAL_PROB, 0.2).
|
-define(CHECKJOURNAL_PROB, 0.2).
|
||||||
-define(SLOWOFFER_DELAY, 5).
|
|
||||||
|
|
||||||
-record(state, {inker :: pid(),
|
-record(state, {inker :: pid(),
|
||||||
penciller :: pid(),
|
penciller :: pid(),
|
||||||
|
@ -277,11 +276,10 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) ->
|
||||||
% will beocme more frequent
|
% will beocme more frequent
|
||||||
case State#state.slow_offer of
|
case State#state.slow_offer of
|
||||||
true ->
|
true ->
|
||||||
timer:sleep(?SLOWOFFER_DELAY);
|
gen_server:reply(From, pause);
|
||||||
false ->
|
false ->
|
||||||
ok
|
gen_server:reply(From, ok)
|
||||||
end,
|
end,
|
||||||
gen_server:reply(From, ok),
|
|
||||||
case maybepush_ledgercache(State#state.cache_size,
|
case maybepush_ledgercache(State#state.cache_size,
|
||||||
Cache0,
|
Cache0,
|
||||||
State#state.penciller) of
|
State#state.penciller) of
|
||||||
|
|
|
@ -36,9 +36,7 @@ simple_put_fetch_head_delete(_Config) ->
|
||||||
{ok, Bookie2} = leveled_bookie:book_start(StartOpts2),
|
{ok, Bookie2} = leveled_bookie:book_start(StartOpts2),
|
||||||
testutil:check_forobject(Bookie2, TestObject),
|
testutil:check_forobject(Bookie2, TestObject),
|
||||||
ObjList1 = testutil:generate_objects(5000, 2),
|
ObjList1 = testutil:generate_objects(5000, 2),
|
||||||
lists:foreach(fun({_RN, Obj, Spc}) ->
|
testutil:riakload(Bookie2, ObjList1),
|
||||||
testutil:book_riakput(Bookie2, Obj, Spc) end,
|
|
||||||
ObjList1),
|
|
||||||
ChkList1 = lists:sublist(lists:sort(ObjList1), 100),
|
ChkList1 = lists:sublist(lists:sort(ObjList1), 100),
|
||||||
testutil:check_forlist(Bookie2, ChkList1),
|
testutil:check_forlist(Bookie2, ChkList1),
|
||||||
testutil:check_forobject(Bookie2, TestObject),
|
testutil:check_forobject(Bookie2, TestObject),
|
||||||
|
@ -88,9 +86,7 @@ many_put_fetch_head(_Config) ->
|
||||||
ChkListFixed = lists:nth(length(CLs), CLs),
|
ChkListFixed = lists:nth(length(CLs), CLs),
|
||||||
testutil:check_forlist(Bookie2, CL1A),
|
testutil:check_forlist(Bookie2, CL1A),
|
||||||
ObjList2A = testutil:generate_objects(5000, 2),
|
ObjList2A = testutil:generate_objects(5000, 2),
|
||||||
lists:foreach(fun({_RN, Obj, Spc}) ->
|
testutil:riakload(Bookie2, ObjList2A),
|
||||||
testutil:book_riakput(Bookie2, Obj, Spc) end,
|
|
||||||
ObjList2A),
|
|
||||||
ChkList2A = lists:sublist(lists:sort(ObjList2A), 1000),
|
ChkList2A = lists:sublist(lists:sort(ObjList2A), 1000),
|
||||||
testutil:check_forlist(Bookie2, ChkList2A),
|
testutil:check_forlist(Bookie2, ChkList2A),
|
||||||
testutil:check_forlist(Bookie2, ChkListFixed),
|
testutil:check_forlist(Bookie2, ChkListFixed),
|
||||||
|
@ -116,9 +112,7 @@ journal_compaction(_Config) ->
|
||||||
ok = testutil:book_riakput(Bookie1, TestObject, TestSpec),
|
ok = testutil:book_riakput(Bookie1, TestObject, TestSpec),
|
||||||
testutil:check_forobject(Bookie1, TestObject),
|
testutil:check_forobject(Bookie1, TestObject),
|
||||||
ObjList1 = testutil:generate_objects(20000, 2),
|
ObjList1 = testutil:generate_objects(20000, 2),
|
||||||
lists:foreach(fun({_RN, Obj, Spc}) ->
|
testutil:riakload(Bookie1, ObjList1),
|
||||||
testutil:book_riakput(Bookie1, Obj, Spc) end,
|
|
||||||
ObjList1),
|
|
||||||
ChkList1 = lists:sublist(lists:sort(ObjList1), 10000),
|
ChkList1 = lists:sublist(lists:sort(ObjList1), 10000),
|
||||||
testutil:check_forlist(Bookie1, ChkList1),
|
testutil:check_forlist(Bookie1, ChkList1),
|
||||||
testutil:check_forobject(Bookie1, TestObject),
|
testutil:check_forobject(Bookie1, TestObject),
|
||||||
|
@ -140,18 +134,16 @@ journal_compaction(_Config) ->
|
||||||
%% Delete some of the objects
|
%% Delete some of the objects
|
||||||
ObjListD = testutil:generate_objects(10000, 2),
|
ObjListD = testutil:generate_objects(10000, 2),
|
||||||
lists:foreach(fun({_R, O, _S}) ->
|
lists:foreach(fun({_R, O, _S}) ->
|
||||||
ok = testutil:book_riakdelete(Bookie1,
|
testutil:book_riakdelete(Bookie1,
|
||||||
O#r_object.bucket,
|
O#r_object.bucket,
|
||||||
O#r_object.key,
|
O#r_object.key,
|
||||||
[])
|
[])
|
||||||
end,
|
end,
|
||||||
ObjListD),
|
ObjListD),
|
||||||
|
|
||||||
%% Now replace all the other objects
|
%% Now replace all the other objects
|
||||||
ObjList2 = testutil:generate_objects(40000, 10002),
|
ObjList2 = testutil:generate_objects(40000, 10002),
|
||||||
lists:foreach(fun({_RN, Obj, Spc}) ->
|
testutil:riakload(Bookie1, ObjList2),
|
||||||
testutil:book_riakput(Bookie1, Obj, Spc) end,
|
|
||||||
ObjList2),
|
|
||||||
ok = leveled_bookie:book_compactjournal(Bookie1, 30000),
|
ok = leveled_bookie:book_compactjournal(Bookie1, 30000),
|
||||||
|
|
||||||
F = fun leveled_bookie:book_islastcompactionpending/1,
|
F = fun leveled_bookie:book_islastcompactionpending/1,
|
||||||
|
@ -186,9 +178,7 @@ fetchput_snapshot(_Config) ->
|
||||||
{TestObject, TestSpec} = testutil:generate_testobject(),
|
{TestObject, TestSpec} = testutil:generate_testobject(),
|
||||||
ok = testutil:book_riakput(Bookie1, TestObject, TestSpec),
|
ok = testutil:book_riakput(Bookie1, TestObject, TestSpec),
|
||||||
ObjList1 = testutil:generate_objects(5000, 2),
|
ObjList1 = testutil:generate_objects(5000, 2),
|
||||||
lists:foreach(fun({_RN, Obj, Spc}) ->
|
testutil:riakload(Bookie1, ObjList1),
|
||||||
testutil:book_riakput(Bookie1, Obj, Spc) end,
|
|
||||||
ObjList1),
|
|
||||||
SnapOpts1 = [{snapshot_bookie, Bookie1}],
|
SnapOpts1 = [{snapshot_bookie, Bookie1}],
|
||||||
{ok, SnapBookie1} = leveled_bookie:book_start(SnapOpts1),
|
{ok, SnapBookie1} = leveled_bookie:book_start(SnapOpts1),
|
||||||
ChkList1 = lists:sublist(lists:sort(ObjList1), 100),
|
ChkList1 = lists:sublist(lists:sort(ObjList1), 100),
|
||||||
|
@ -211,9 +201,7 @@ fetchput_snapshot(_Config) ->
|
||||||
|
|
||||||
|
|
||||||
ObjList2 = testutil:generate_objects(5000, 2),
|
ObjList2 = testutil:generate_objects(5000, 2),
|
||||||
lists:foreach(fun({_RN, Obj, Spc}) ->
|
testutil:riakload(Bookie2, ObjList2),
|
||||||
testutil:book_riakput(Bookie2, Obj, Spc) end,
|
|
||||||
ObjList2),
|
|
||||||
io:format("Replacement objects put~n"),
|
io:format("Replacement objects put~n"),
|
||||||
|
|
||||||
ChkList2 = lists:sublist(lists:sort(ObjList2), 100),
|
ChkList2 = lists:sublist(lists:sort(ObjList2), 100),
|
||||||
|
@ -225,9 +213,7 @@ fetchput_snapshot(_Config) ->
|
||||||
ok = filelib:ensure_dir(RootPath ++ "/ledger/ledger_files"),
|
ok = filelib:ensure_dir(RootPath ++ "/ledger/ledger_files"),
|
||||||
{ok, FNsA} = file:list_dir(RootPath ++ "/ledger/ledger_files"),
|
{ok, FNsA} = file:list_dir(RootPath ++ "/ledger/ledger_files"),
|
||||||
ObjList3 = testutil:generate_objects(15000, 5002),
|
ObjList3 = testutil:generate_objects(15000, 5002),
|
||||||
lists:foreach(fun({_RN, Obj, Spc}) ->
|
testutil:riakload(Bookie2, ObjList3),
|
||||||
testutil:book_riakput(Bookie2, Obj, Spc) end,
|
|
||||||
ObjList3),
|
|
||||||
ChkList3 = lists:sublist(lists:sort(ObjList3), 100),
|
ChkList3 = lists:sublist(lists:sort(ObjList3), 100),
|
||||||
testutil:check_forlist(Bookie2, ChkList3),
|
testutil:check_forlist(Bookie2, ChkList3),
|
||||||
testutil:check_formissinglist(SnapBookie2, ChkList3),
|
testutil:check_formissinglist(SnapBookie2, ChkList3),
|
||||||
|
@ -461,7 +447,7 @@ space_clear_ondelete(_Config) ->
|
||||||
% Delete the keys
|
% Delete the keys
|
||||||
SW2 = os:timestamp(),
|
SW2 = os:timestamp(),
|
||||||
lists:foreach(fun({Bucket, Key}) ->
|
lists:foreach(fun({Bucket, Key}) ->
|
||||||
ok = testutil:book_riakdelete(Book1,
|
testutil:book_riakdelete(Book1,
|
||||||
Bucket,
|
Bucket,
|
||||||
Key,
|
Key,
|
||||||
[])
|
[])
|
||||||
|
|
|
@ -35,9 +35,7 @@ small_load_with2i(_Config) ->
|
||||||
[],
|
[],
|
||||||
ObjectGen,
|
ObjectGen,
|
||||||
IndexGen),
|
IndexGen),
|
||||||
lists:foreach(fun({_RN, Obj, Spc}) ->
|
testutil:riakload(Bookie1, ObjL1),
|
||||||
testutil:book_riakput(Bookie1, Obj, Spc) end,
|
|
||||||
ObjL1),
|
|
||||||
ChkList1 = lists:sublist(lists:sort(ObjL1), 100),
|
ChkList1 = lists:sublist(lists:sort(ObjL1), 100),
|
||||||
testutil:check_forlist(Bookie1, ChkList1),
|
testutil:check_forlist(Bookie1, ChkList1),
|
||||||
testutil:check_forobject(Bookie1, TestObject),
|
testutil:check_forobject(Bookie1, TestObject),
|
||||||
|
@ -129,12 +127,7 @@ query_count(_Config) ->
|
||||||
[],
|
[],
|
||||||
V,
|
V,
|
||||||
Indexes),
|
Indexes),
|
||||||
lists:foreach(fun({_RN, Obj, Spc}) ->
|
testutil:riakload(Book1, ObjL1),
|
||||||
testutil:book_riakput(Book1,
|
|
||||||
Obj,
|
|
||||||
Spc)
|
|
||||||
end,
|
|
||||||
ObjL1),
|
|
||||||
io:format("Put of 10000 objects with 8 index entries "
|
io:format("Put of 10000 objects with 8 index entries "
|
||||||
++
|
++
|
||||||
"each completed in ~w microseconds~n",
|
"each completed in ~w microseconds~n",
|
||||||
|
|
|
@ -6,6 +6,7 @@
|
||||||
book_riakdelete/4,
|
book_riakdelete/4,
|
||||||
book_riakget/3,
|
book_riakget/3,
|
||||||
book_riakhead/3,
|
book_riakhead/3,
|
||||||
|
riakload/2,
|
||||||
reset_filestructure/0,
|
reset_filestructure/0,
|
||||||
reset_filestructure/1,
|
reset_filestructure/1,
|
||||||
check_bucket_stats/2,
|
check_bucket_stats/2,
|
||||||
|
@ -41,6 +42,7 @@
|
||||||
riak_hash/1]).
|
riak_hash/1]).
|
||||||
|
|
||||||
-define(RETURN_TERMS, {true, undefined}).
|
-define(RETURN_TERMS, {true, undefined}).
|
||||||
|
-define(SLOWOFFER_DELAY, 5).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -58,7 +60,15 @@ book_riakhead(Pid, Bucket, Key) ->
|
||||||
leveled_book:book_head(Pid, Bucket, Key, ?RIAK_TAG).
|
leveled_book:book_head(Pid, Bucket, Key, ?RIAK_TAG).
|
||||||
|
|
||||||
|
|
||||||
|
riakload(Bookie, ObjectList) ->
|
||||||
|
lists:foreach(fun({_RN, Obj, Spc}) ->
|
||||||
|
R = book_riakput(Bookie,Obj, Spc),
|
||||||
|
case R of
|
||||||
|
ok -> ok;
|
||||||
|
pause -> timer:sleep(?SLOWOFFER_DELAY)
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
ObjectList).
|
||||||
|
|
||||||
|
|
||||||
reset_filestructure() ->
|
reset_filestructure() ->
|
||||||
|
@ -257,10 +267,7 @@ load_objects(ChunkSize, GenList, Bookie, TestObject, Generator) ->
|
||||||
lists:map(fun(KN) ->
|
lists:map(fun(KN) ->
|
||||||
ObjListA = Generator(ChunkSize, KN),
|
ObjListA = Generator(ChunkSize, KN),
|
||||||
StartWatchA = os:timestamp(),
|
StartWatchA = os:timestamp(),
|
||||||
lists:foreach(fun({_RN, Obj, Spc}) ->
|
riakload(Bookie, ObjListA),
|
||||||
book_riakput(Bookie, Obj, Spc)
|
|
||||||
end,
|
|
||||||
ObjListA),
|
|
||||||
Time = timer:now_diff(os:timestamp(), StartWatchA),
|
Time = timer:now_diff(os:timestamp(), StartWatchA),
|
||||||
io:format("~w objects loaded in ~w seconds~n",
|
io:format("~w objects loaded in ~w seconds~n",
|
||||||
[ChunkSize, Time/1000000]),
|
[ChunkSize, Time/1000000]),
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue