From 92582de4dd1990858fb2c3044350445f429ce697 Mon Sep 17 00:00:00 2001 From: Russell Brown Date: Mon, 19 Mar 2018 19:47:19 +0000 Subject: [PATCH 1/2] WIP: changes for backend_eqc test Enable support for $key and $bucket index --- src/leveled_bookie.erl | 57 ++++++++++++++++++++++-------------------- src/leveled_runner.erl | 39 ++++++++++++++++++----------- 2 files changed, 54 insertions(+), 42 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 95656a2..0f69635 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -567,9 +567,9 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), SW0 = os:timestamp(), {ok, SQN, ObjSize} = leveled_inker:ink_put(State#state.inker, - LedgerKey, - Object, - {IndexSpecs, TTL}), + LedgerKey, + Object, + {IndexSpecs, TTL}), {SW1, Timings1} = update_timings(SW0, {put, {inker, ObjSize}}, State#state.put_timings), Changes = preparefor_ledgercache(no_type_assigned, @@ -581,12 +581,12 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) State), Cache0 = addto_ledgercache(Changes, State#state.ledger_cache), {_SW2, Timings2} = update_timings(SW1, {put, mem}, Timings1), - + {Timings, CountDown} = update_statetimings(put, Timings2, State#state.put_countdown), - % If the previous push to memory was returned then punish this PUT with a - % delay. If the back-pressure in the Penciller continues, these delays - % will beocme more frequent + % If the previous push to memory was returned then punish this PUT with a + % delay. If the back-pressure in the Penciller continues, these delays + % will beocme more frequent case State#state.slow_offer of true -> gen_server:reply(From, pause); @@ -595,18 +595,18 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) end, maybe_longrunning(SW0, overall_put), case maybepush_ledgercache(State#state.cache_size, - Cache0, - State#state.penciller) of + Cache0, + State#state.penciller) of {ok, NewCache} -> {noreply, State#state{ledger_cache = NewCache, - put_timings = Timings, - put_countdown = CountDown, - slow_offer = false}}; + put_timings = Timings, + put_countdown = CountDown, + slow_offer = false}}; {returned, NewCache} -> {noreply, State#state{ledger_cache = NewCache, - put_timings = Timings, - put_countdown = CountDown, - slow_offer = true}} + put_timings = Timings, + put_countdown = CountDown, + slow_offer = true}} end; handle_call({mput, ObjectSpecs, TTL}, From, State) when State#state.head_only == true -> @@ -696,28 +696,28 @@ handle_call({head, Bucket, Key, Tag}, _From, State) true -> {SWr, UpdTimingsP} = update_timings(SWp, - {head, pcl}, - State#state.head_timings), + {head, pcl}, + State#state.head_timings), OMD = leveled_codec:build_metadata_object(LK, MD), {_SW, UpdTimingsR} = update_timings(SWr, {head, rsp}, UpdTimingsP), {UpdTimings, CountDown} = update_statetimings(head, - UpdTimingsR, - State#state.head_countdown), + UpdTimingsR, + State#state.head_countdown), {reply, - {ok, OMD}, - State#state{head_timings = UpdTimings, - head_countdown = CountDown}}; + {ok, OMD}, + State#state{head_timings = UpdTimings, + head_countdown = CountDown}}; false -> {reply, not_found, State} end end end; handle_call({snapshot, SnapType, Query, LongRunning}, _From, State) -> - % Snapshot the store, specifying if the snapshot should be long running - % (i.e. will the snapshot be queued or be required for an extended period - % e.g. many minutes) + % Snapshot the store, specifying if the snapshot should be long running + % (i.e. will the snapshot be queued or be required for an extended period + % e.g. many minutes) Reply = snapshot_store(State, SnapType, Query, LongRunning), {reply, Reply, State}; handle_call({return_runner, QueryType}, _From, State) -> @@ -732,8 +732,8 @@ handle_call({return_runner, QueryType}, _From, State) -> handle_call({compact_journal, Timeout}, _From, State) when State#state.head_only == false -> ok = leveled_inker:ink_compactjournal(State#state.inker, - self(), - Timeout), + self(), + Timeout), {reply, ok, State}; handle_call(confirm_compact, _From, State) when State#state.head_only == false -> @@ -915,6 +915,9 @@ get_runner(State, {keylist, Tag, FoldAccT}) -> get_runner(State, {keylist, Tag, Bucket, FoldAccT}) -> SnapFun = return_snapfun(State, ledger, no_lookup, true, true), leveled_runner:bucketkey_query(SnapFun, Tag, Bucket, FoldAccT); +get_runner(State, {keylist, Tag, Bucket, KeyRange, FoldAccT}) -> + SnapFun = return_snapfun(State, ledger, no_lookup, true, true), + leveled_runner:bucketkey_query(SnapFun, Tag, Bucket, KeyRange, FoldAccT); %% Set of runners for object or metadata folds get_runner(State, diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index fd015e3..74ef45a 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -27,6 +27,7 @@ binary_bucketlist/5, index_query/3, bucketkey_query/4, + bucketkey_query/5, hashlist_query/3, tictactree/5, foldheads_allkeys/5, @@ -41,6 +42,8 @@ -define(CHECKJOURNAL_PROB, 0.2). +-type key_range() :: {StartKey:: any(), EndKey :: any()}. + %%%============================================================================ %%% External functions %%%============================================================================ @@ -118,26 +121,32 @@ index_query(SnapFun, {StartKey, EndKey, TermHandling}, FoldAccT) -> end, {async, Runner}. --spec bucketkey_query(fun(), atom(), any(), tuple()) -> {async, fun()}. +-spec bucketkey_query(fun(), atom(), any(), key_range(), tuple()) -> {async, fun()}. %% @doc -%% Fold over all keys under tak (potentially restricted to a given bucket) -bucketkey_query(SnapFun, Tag, Bucket, {FoldKeysFun, InitAcc}) -> - SK = leveled_codec:to_ledgerkey(Bucket, null, Tag), - EK = leveled_codec:to_ledgerkey(Bucket, null, Tag), +%% Fold over all keys in `KeyRange' under tag (restricted to a given bucket) +bucketkey_query(SnapFun, Tag, Bucket, {StartKey, EndKey}, {FoldKeysFun, InitAcc}) -> + SK = leveled_codec:to_ledgerkey(Bucket, StartKey, Tag), + EK = leveled_codec:to_ledgerkey(Bucket, EndKey, Tag), AccFun = accumulate_keys(FoldKeysFun), - Runner = + Runner = fun() -> - {ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(), - Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, - SK, - EK, - AccFun, - InitAcc), - ok = leveled_penciller:pcl_close(LedgerSnapshot), - Acc + {ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(), + Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, + SK, + EK, + AccFun, + InitAcc), + ok = leveled_penciller:pcl_close(LedgerSnapshot), + Acc end, {async, Runner}. +-spec bucketkey_query(fun(), atom(), any(), tuple()) -> {async, fun()}. +%% @doc +%% Fold over all keys under tag (potentially restricted to a given bucket) +bucketkey_query(SnapFun, Tag, Bucket, FunAcc) -> + bucketkey_query(SnapFun, Tag, Bucket, {null, null}, FunAcc). + -spec hashlist_query(fun(), atom(), boolean()) -> {async, fun()}. %% @doc %% Fold pver the key accumulating the hashes @@ -697,4 +706,4 @@ accumulate_index(TermRe, AddFun, FoldKeysFun) -> -endif. - \ No newline at end of file + From 10659bfbd5fe804080f79425d7a5916ed975e783 Mon Sep 17 00:00:00 2001 From: Russell Brown Date: Mon, 16 Apr 2018 17:19:20 +0100 Subject: [PATCH 2/2] Add CT tests for riak $ indexes The project has been at 100% coverage, don't ruin it now! --- test/end_to_end/riak_SUITE.erl | 92 +++++++++++++++++++++++++++++++++- test/end_to_end/testutil.erl | 33 ++++++++++-- 2 files changed, 120 insertions(+), 5 deletions(-) diff --git a/test/end_to_end/riak_SUITE.erl b/test/end_to_end/riak_SUITE.erl index 42a233e..1891412 100644 --- a/test/end_to_end/riak_SUITE.erl +++ b/test/end_to_end/riak_SUITE.erl @@ -4,12 +4,16 @@ -export([all/0]). -export([ crossbucket_aae/1, - handoff/1 + handoff/1, + dollar_bucket_index/1, + dollar_key_index/1 ]). all() -> [ crossbucket_aae, - handoff + handoff, + dollar_bucket_index, + dollar_key_index ]. -define(MAGIC, 53). % riak_kv -> riak_object @@ -419,3 +423,87 @@ handoff(_Config) -> ok = leveled_bookie:book_close(Bookie2), ok = leveled_bookie:book_close(Bookie3), ok = leveled_bookie:book_close(Bookie4). + +%% @doc test that the riak specific $key index can be iterated using +%% leveled's existing folders +dollar_key_index(_Config) -> + RootPath = testutil:reset_filestructure(), + {ok, Bookie1} = leveled_bookie:book_start(RootPath, + 2000, + 50000000, + testutil:sync_strategy()), + ObjectGen = testutil:get_compressiblevalue_andinteger(), + IndexGen = fun() -> [] end, + ObjL1 = testutil:generate_objects(1300, + {fixed_binary, 1}, + [], + ObjectGen, + IndexGen, + <<"Bucket1">>), + testutil:riakload(Bookie1, ObjL1), + + FoldKeysFun = fun(_B, K, Acc) -> + [ K |Acc] + end, + + StartKey = testutil:fixed_bin_key(123), + EndKey = testutil:fixed_bin_key(779), + + Query = {keylist, ?RIAK_TAG, <<"Bucket1">>, {StartKey, EndKey}, {FoldKeysFun, []}}, + + {async, Folder} = leveled_bookie:book_returnfolder(Bookie1, Query), + ResLen = length(Folder()), + io:format("Length of Result of folder ~w~n", [ResLen]), + true = 657 == ResLen, + + ok = leveled_bookie:book_close(Bookie1), + testutil:reset_filestructure(). + +%% @doc test that the riak specific $bucket indexes can be iterated +%% using leveled's existing folders +dollar_bucket_index(_Config) -> + RootPath = testutil:reset_filestructure(), + {ok, Bookie1} = leveled_bookie:book_start(RootPath, + 2000, + 50000000, + testutil:sync_strategy()), + ObjectGen = testutil:get_compressiblevalue_andinteger(), + IndexGen = fun() -> [] end, + ObjL1 = testutil:generate_objects(1300, + uuid, + [], + ObjectGen, + IndexGen, + <<"Bucket1">>), + testutil:riakload(Bookie1, ObjL1), + ObjL2 = testutil:generate_objects(1700, + uuid, + [], + ObjectGen, + IndexGen, + <<"Bucket2">>), + testutil:riakload(Bookie1, ObjL2), + ObjL3 = testutil:generate_objects(7000, + uuid, + [], + ObjectGen, + IndexGen, + <<"Bucket3">>), + + testutil:riakload(Bookie1, ObjL3), + + FoldKeysFun = fun(B, K, Acc) -> + [{B, K}|Acc] + end, + + Query = {keylist, ?RIAK_TAG, <<"Bucket2">>, {FoldKeysFun, []}}, + + {async, Folder} = leveled_bookie:book_returnfolder(Bookie1, Query), + ResLen = length(Folder()), + + io:format("Length of Result of folder ~w~n", [ResLen]), + + true = 1700 == ResLen, + + ok = leveled_bookie:book_close(Bookie1), + testutil:reset_filestructure(). diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index 93b5166..492b161 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -48,7 +48,9 @@ wait_for_compaction/1, foldkeysfun/3, foldkeysfun_returnbucket/3, - sync_strategy/0]). + sync_strategy/0, + numbered_key/1, + fixed_bin_key/1]). -define(RETURN_TERMS, {true, undefined}). -define(SLOWOFFER_DELAY, 5). @@ -398,7 +400,7 @@ generate_objects(Count, uuid, ObjL, Value, IndexGen, Bucket) -> generate_objects(Count, {binary, KeyNumber}, ObjL, Value, IndexGen, Bucket) -> {Obj1, Spec1} = set_object(list_to_binary(Bucket), - list_to_binary("Key" ++ integer_to_list(KeyNumber)), + list_to_binary(numbered_key(KeyNumber)), Value, IndexGen), generate_objects(Count - 1, @@ -407,9 +409,21 @@ generate_objects(Count, {binary, KeyNumber}, ObjL, Value, IndexGen, Bucket) -> Value, IndexGen, Bucket); +generate_objects(Count, {fixed_binary, KeyNumber}, ObjL, Value, IndexGen, Bucket) -> + {Obj1, Spec1} = + set_object(Bucket, + fixed_bin_key(KeyNumber), + Value, + IndexGen), + generate_objects(Count - 1, + {fixed_binary, KeyNumber + 1}, + ObjL ++ [{leveled_rand:uniform(), Obj1, Spec1}], + Value, + IndexGen, + Bucket); generate_objects(Count, KeyNumber, ObjL, Value, IndexGen, Bucket) -> {Obj1, Spec1} = set_object(Bucket, - "Key" ++ integer_to_list(KeyNumber), + numbered_key(KeyNumber), Value, IndexGen), generate_objects(Count - 1, @@ -419,6 +433,19 @@ generate_objects(Count, KeyNumber, ObjL, Value, IndexGen, Bucket) -> IndexGen, Bucket). +%% @doc generates a key, exported so tests can use it without copying +%% code +-spec numbered_key(integer()) -> list(). +numbered_key(KeyNumber) when is_integer(KeyNumber) -> + "Key" ++ integer_to_list(KeyNumber). + +%% @doc generates a key for `KeyNumber' of a fixed size (64bits), +%% again, exported for tests to generate the same keys as +%% generate_objects/N without peeking. +-spec fixed_bin_key(integer()) -> binary(). +fixed_bin_key(KeyNumber) -> + <<$K, $e, $y, KeyNumber:64/integer>>. + set_object(Bucket, Key, Value, IndexGen) -> set_object(Bucket, Key, Value, IndexGen, []).