Merge pull request #130 from russelldb/rdb/backend_eqc_test

WIP: changes for backend_eqc test
This commit is contained in:
Martin Sumner 2018-04-16 22:24:27 +01:00 committed by GitHub
commit e182f8c95c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 174 additions and 47 deletions

View file

@ -567,9 +567,9 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State)
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
SW0 = os:timestamp(), SW0 = os:timestamp(),
{ok, SQN, ObjSize} = leveled_inker:ink_put(State#state.inker, {ok, SQN, ObjSize} = leveled_inker:ink_put(State#state.inker,
LedgerKey, LedgerKey,
Object, Object,
{IndexSpecs, TTL}), {IndexSpecs, TTL}),
{SW1, Timings1} = {SW1, Timings1} =
update_timings(SW0, {put, {inker, ObjSize}}, State#state.put_timings), update_timings(SW0, {put, {inker, ObjSize}}, State#state.put_timings),
Changes = preparefor_ledgercache(no_type_assigned, Changes = preparefor_ledgercache(no_type_assigned,
@ -581,12 +581,12 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State)
State), State),
Cache0 = addto_ledgercache(Changes, State#state.ledger_cache), Cache0 = addto_ledgercache(Changes, State#state.ledger_cache),
{_SW2, Timings2} = update_timings(SW1, {put, mem}, Timings1), {_SW2, Timings2} = update_timings(SW1, {put, mem}, Timings1),
{Timings, CountDown} = {Timings, CountDown} =
update_statetimings(put, Timings2, State#state.put_countdown), update_statetimings(put, Timings2, State#state.put_countdown),
% If the previous push to memory was returned then punish this PUT with a % If the previous push to memory was returned then punish this PUT with a
% delay. If the back-pressure in the Penciller continues, these delays % delay. If the back-pressure in the Penciller continues, these delays
% will beocme more frequent % will beocme more frequent
case State#state.slow_offer of case State#state.slow_offer of
true -> true ->
gen_server:reply(From, pause); gen_server:reply(From, pause);
@ -595,18 +595,18 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State)
end, end,
maybe_longrunning(SW0, overall_put), maybe_longrunning(SW0, overall_put),
case maybepush_ledgercache(State#state.cache_size, case maybepush_ledgercache(State#state.cache_size,
Cache0, Cache0,
State#state.penciller) of State#state.penciller) of
{ok, NewCache} -> {ok, NewCache} ->
{noreply, State#state{ledger_cache = NewCache, {noreply, State#state{ledger_cache = NewCache,
put_timings = Timings, put_timings = Timings,
put_countdown = CountDown, put_countdown = CountDown,
slow_offer = false}}; slow_offer = false}};
{returned, NewCache} -> {returned, NewCache} ->
{noreply, State#state{ledger_cache = NewCache, {noreply, State#state{ledger_cache = NewCache,
put_timings = Timings, put_timings = Timings,
put_countdown = CountDown, put_countdown = CountDown,
slow_offer = true}} slow_offer = true}}
end; end;
handle_call({mput, ObjectSpecs, TTL}, From, State) handle_call({mput, ObjectSpecs, TTL}, From, State)
when State#state.head_only == true -> when State#state.head_only == true ->
@ -696,28 +696,28 @@ handle_call({head, Bucket, Key, Tag}, _From, State)
true -> true ->
{SWr, UpdTimingsP} = {SWr, UpdTimingsP} =
update_timings(SWp, update_timings(SWp,
{head, pcl}, {head, pcl},
State#state.head_timings), State#state.head_timings),
OMD = leveled_codec:build_metadata_object(LK, MD), OMD = leveled_codec:build_metadata_object(LK, MD),
{_SW, UpdTimingsR} = {_SW, UpdTimingsR} =
update_timings(SWr, {head, rsp}, UpdTimingsP), update_timings(SWr, {head, rsp}, UpdTimingsP),
{UpdTimings, CountDown} = {UpdTimings, CountDown} =
update_statetimings(head, update_statetimings(head,
UpdTimingsR, UpdTimingsR,
State#state.head_countdown), State#state.head_countdown),
{reply, {reply,
{ok, OMD}, {ok, OMD},
State#state{head_timings = UpdTimings, State#state{head_timings = UpdTimings,
head_countdown = CountDown}}; head_countdown = CountDown}};
false -> false ->
{reply, not_found, State} {reply, not_found, State}
end end
end end
end; end;
handle_call({snapshot, SnapType, Query, LongRunning}, _From, State) -> handle_call({snapshot, SnapType, Query, LongRunning}, _From, State) ->
% Snapshot the store, specifying if the snapshot should be long running % Snapshot the store, specifying if the snapshot should be long running
% (i.e. will the snapshot be queued or be required for an extended period % (i.e. will the snapshot be queued or be required for an extended period
% e.g. many minutes) % e.g. many minutes)
Reply = snapshot_store(State, SnapType, Query, LongRunning), Reply = snapshot_store(State, SnapType, Query, LongRunning),
{reply, Reply, State}; {reply, Reply, State};
handle_call({return_runner, QueryType}, _From, State) -> handle_call({return_runner, QueryType}, _From, State) ->
@ -732,8 +732,8 @@ handle_call({return_runner, QueryType}, _From, State) ->
handle_call({compact_journal, Timeout}, _From, State) handle_call({compact_journal, Timeout}, _From, State)
when State#state.head_only == false -> when State#state.head_only == false ->
ok = leveled_inker:ink_compactjournal(State#state.inker, ok = leveled_inker:ink_compactjournal(State#state.inker,
self(), self(),
Timeout), Timeout),
{reply, ok, State}; {reply, ok, State};
handle_call(confirm_compact, _From, State) handle_call(confirm_compact, _From, State)
when State#state.head_only == false -> when State#state.head_only == false ->
@ -915,6 +915,9 @@ get_runner(State, {keylist, Tag, FoldAccT}) ->
get_runner(State, {keylist, Tag, Bucket, FoldAccT}) -> get_runner(State, {keylist, Tag, Bucket, FoldAccT}) ->
SnapFun = return_snapfun(State, ledger, no_lookup, true, true), SnapFun = return_snapfun(State, ledger, no_lookup, true, true),
leveled_runner:bucketkey_query(SnapFun, Tag, Bucket, FoldAccT); leveled_runner:bucketkey_query(SnapFun, Tag, Bucket, FoldAccT);
get_runner(State, {keylist, Tag, Bucket, KeyRange, FoldAccT}) ->
SnapFun = return_snapfun(State, ledger, no_lookup, true, true),
leveled_runner:bucketkey_query(SnapFun, Tag, Bucket, KeyRange, FoldAccT);
%% Set of runners for object or metadata folds %% Set of runners for object or metadata folds
get_runner(State, get_runner(State,

View file

@ -27,6 +27,7 @@
binary_bucketlist/5, binary_bucketlist/5,
index_query/3, index_query/3,
bucketkey_query/4, bucketkey_query/4,
bucketkey_query/5,
hashlist_query/3, hashlist_query/3,
tictactree/5, tictactree/5,
foldheads_allkeys/5, foldheads_allkeys/5,
@ -41,6 +42,8 @@
-define(CHECKJOURNAL_PROB, 0.2). -define(CHECKJOURNAL_PROB, 0.2).
-type key_range() :: {StartKey:: any(), EndKey :: any()}.
%%%============================================================================ %%%============================================================================
%%% External functions %%% External functions
%%%============================================================================ %%%============================================================================
@ -118,26 +121,32 @@ index_query(SnapFun, {StartKey, EndKey, TermHandling}, FoldAccT) ->
end, end,
{async, Runner}. {async, Runner}.
-spec bucketkey_query(fun(), atom(), any(), tuple()) -> {async, fun()}. -spec bucketkey_query(fun(), atom(), any(), key_range(), tuple()) -> {async, fun()}.
%% @doc %% @doc
%% Fold over all keys under tak (potentially restricted to a given bucket) %% Fold over all keys in `KeyRange' under tag (restricted to a given bucket)
bucketkey_query(SnapFun, Tag, Bucket, {FoldKeysFun, InitAcc}) -> bucketkey_query(SnapFun, Tag, Bucket, {StartKey, EndKey}, {FoldKeysFun, InitAcc}) ->
SK = leveled_codec:to_ledgerkey(Bucket, null, Tag), SK = leveled_codec:to_ledgerkey(Bucket, StartKey, Tag),
EK = leveled_codec:to_ledgerkey(Bucket, null, Tag), EK = leveled_codec:to_ledgerkey(Bucket, EndKey, Tag),
AccFun = accumulate_keys(FoldKeysFun), AccFun = accumulate_keys(FoldKeysFun),
Runner = Runner =
fun() -> fun() ->
{ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(), {ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(),
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
SK, SK,
EK, EK,
AccFun, AccFun,
InitAcc), InitAcc),
ok = leveled_penciller:pcl_close(LedgerSnapshot), ok = leveled_penciller:pcl_close(LedgerSnapshot),
Acc Acc
end, end,
{async, Runner}. {async, Runner}.
-spec bucketkey_query(fun(), atom(), any(), tuple()) -> {async, fun()}.
%% @doc
%% Fold over all keys under tag (potentially restricted to a given bucket)
bucketkey_query(SnapFun, Tag, Bucket, FunAcc) ->
bucketkey_query(SnapFun, Tag, Bucket, {null, null}, FunAcc).
-spec hashlist_query(fun(), atom(), boolean()) -> {async, fun()}. -spec hashlist_query(fun(), atom(), boolean()) -> {async, fun()}.
%% @doc %% @doc
%% Fold pver the key accumulating the hashes %% Fold pver the key accumulating the hashes
@ -697,4 +706,4 @@ accumulate_index(TermRe, AddFun, FoldKeysFun) ->
-endif. -endif.

View file

@ -4,12 +4,16 @@
-export([all/0]). -export([all/0]).
-export([ -export([
crossbucket_aae/1, crossbucket_aae/1,
handoff/1 handoff/1,
dollar_bucket_index/1,
dollar_key_index/1
]). ]).
all() -> [ all() -> [
crossbucket_aae, crossbucket_aae,
handoff handoff,
dollar_bucket_index,
dollar_key_index
]. ].
-define(MAGIC, 53). % riak_kv -> riak_object -define(MAGIC, 53). % riak_kv -> riak_object
@ -419,3 +423,87 @@ handoff(_Config) ->
ok = leveled_bookie:book_close(Bookie2), ok = leveled_bookie:book_close(Bookie2),
ok = leveled_bookie:book_close(Bookie3), ok = leveled_bookie:book_close(Bookie3),
ok = leveled_bookie:book_close(Bookie4). ok = leveled_bookie:book_close(Bookie4).
%% @doc test that the riak specific $key index can be iterated using
%% leveled's existing folders
dollar_key_index(_Config) ->
RootPath = testutil:reset_filestructure(),
{ok, Bookie1} = leveled_bookie:book_start(RootPath,
2000,
50000000,
testutil:sync_strategy()),
ObjectGen = testutil:get_compressiblevalue_andinteger(),
IndexGen = fun() -> [] end,
ObjL1 = testutil:generate_objects(1300,
{fixed_binary, 1},
[],
ObjectGen,
IndexGen,
<<"Bucket1">>),
testutil:riakload(Bookie1, ObjL1),
FoldKeysFun = fun(_B, K, Acc) ->
[ K |Acc]
end,
StartKey = testutil:fixed_bin_key(123),
EndKey = testutil:fixed_bin_key(779),
Query = {keylist, ?RIAK_TAG, <<"Bucket1">>, {StartKey, EndKey}, {FoldKeysFun, []}},
{async, Folder} = leveled_bookie:book_returnfolder(Bookie1, Query),
ResLen = length(Folder()),
io:format("Length of Result of folder ~w~n", [ResLen]),
true = 657 == ResLen,
ok = leveled_bookie:book_close(Bookie1),
testutil:reset_filestructure().
%% @doc test that the riak specific $bucket indexes can be iterated
%% using leveled's existing folders
dollar_bucket_index(_Config) ->
RootPath = testutil:reset_filestructure(),
{ok, Bookie1} = leveled_bookie:book_start(RootPath,
2000,
50000000,
testutil:sync_strategy()),
ObjectGen = testutil:get_compressiblevalue_andinteger(),
IndexGen = fun() -> [] end,
ObjL1 = testutil:generate_objects(1300,
uuid,
[],
ObjectGen,
IndexGen,
<<"Bucket1">>),
testutil:riakload(Bookie1, ObjL1),
ObjL2 = testutil:generate_objects(1700,
uuid,
[],
ObjectGen,
IndexGen,
<<"Bucket2">>),
testutil:riakload(Bookie1, ObjL2),
ObjL3 = testutil:generate_objects(7000,
uuid,
[],
ObjectGen,
IndexGen,
<<"Bucket3">>),
testutil:riakload(Bookie1, ObjL3),
FoldKeysFun = fun(B, K, Acc) ->
[{B, K}|Acc]
end,
Query = {keylist, ?RIAK_TAG, <<"Bucket2">>, {FoldKeysFun, []}},
{async, Folder} = leveled_bookie:book_returnfolder(Bookie1, Query),
ResLen = length(Folder()),
io:format("Length of Result of folder ~w~n", [ResLen]),
true = 1700 == ResLen,
ok = leveled_bookie:book_close(Bookie1),
testutil:reset_filestructure().

View file

@ -48,7 +48,9 @@
wait_for_compaction/1, wait_for_compaction/1,
foldkeysfun/3, foldkeysfun/3,
foldkeysfun_returnbucket/3, foldkeysfun_returnbucket/3,
sync_strategy/0]). sync_strategy/0,
numbered_key/1,
fixed_bin_key/1]).
-define(RETURN_TERMS, {true, undefined}). -define(RETURN_TERMS, {true, undefined}).
-define(SLOWOFFER_DELAY, 5). -define(SLOWOFFER_DELAY, 5).
@ -398,7 +400,7 @@ generate_objects(Count, uuid, ObjL, Value, IndexGen, Bucket) ->
generate_objects(Count, {binary, KeyNumber}, ObjL, Value, IndexGen, Bucket) -> generate_objects(Count, {binary, KeyNumber}, ObjL, Value, IndexGen, Bucket) ->
{Obj1, Spec1} = {Obj1, Spec1} =
set_object(list_to_binary(Bucket), set_object(list_to_binary(Bucket),
list_to_binary("Key" ++ integer_to_list(KeyNumber)), list_to_binary(numbered_key(KeyNumber)),
Value, Value,
IndexGen), IndexGen),
generate_objects(Count - 1, generate_objects(Count - 1,
@ -407,9 +409,21 @@ generate_objects(Count, {binary, KeyNumber}, ObjL, Value, IndexGen, Bucket) ->
Value, Value,
IndexGen, IndexGen,
Bucket); Bucket);
generate_objects(Count, {fixed_binary, KeyNumber}, ObjL, Value, IndexGen, Bucket) ->
{Obj1, Spec1} =
set_object(Bucket,
fixed_bin_key(KeyNumber),
Value,
IndexGen),
generate_objects(Count - 1,
{fixed_binary, KeyNumber + 1},
ObjL ++ [{leveled_rand:uniform(), Obj1, Spec1}],
Value,
IndexGen,
Bucket);
generate_objects(Count, KeyNumber, ObjL, Value, IndexGen, Bucket) -> generate_objects(Count, KeyNumber, ObjL, Value, IndexGen, Bucket) ->
{Obj1, Spec1} = set_object(Bucket, {Obj1, Spec1} = set_object(Bucket,
"Key" ++ integer_to_list(KeyNumber), numbered_key(KeyNumber),
Value, Value,
IndexGen), IndexGen),
generate_objects(Count - 1, generate_objects(Count - 1,
@ -419,6 +433,19 @@ generate_objects(Count, KeyNumber, ObjL, Value, IndexGen, Bucket) ->
IndexGen, IndexGen,
Bucket). Bucket).
%% @doc generates a key, exported so tests can use it without copying
%% code
-spec numbered_key(integer()) -> list().
numbered_key(KeyNumber) when is_integer(KeyNumber) ->
"Key" ++ integer_to_list(KeyNumber).
%% @doc generates a key for `KeyNumber' of a fixed size (64bits),
%% again, exported for tests to generate the same keys as
%% generate_objects/N without peeking.
-spec fixed_bin_key(integer()) -> binary().
fixed_bin_key(KeyNumber) ->
<<$K, $e, $y, KeyNumber:64/integer>>.
set_object(Bucket, Key, Value, IndexGen) -> set_object(Bucket, Key, Value, IndexGen) ->
set_object(Bucket, Key, Value, IndexGen, []). set_object(Bucket, Key, Value, IndexGen, []).