diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 0958ee9..1963c94 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -6,7 +6,7 @@ %% only in a sequential Journal. %% - Different file formats are used for Journal (based on constant %% database), and the ledger (sft, based on sst) -%% - It is not intended to be general purpose, but be specifically suited for +%% - It is not intended to be general purpose, but be primarily suited for %% use as a Riak backend in specific circumstances (relatively large values, %% and frequent use of iterators) %% - The Journal is an extended nursery log in leveldb terms. It is keyed @@ -15,7 +15,7 @@ %% the value is the metadata of the object including the sequence number %% %% -%% -------- The actors --------- +%% -------- Actors --------- %% %% The store is fronted by a Bookie, who takes support from different actors: %% - An Inker who persists new data into the journal, and returns items from @@ -39,28 +39,17 @@ %% - IndexSpecs - a set of secondary key changes associated with the %% transaction %% -%% The Bookie takes the place request and passes it first to the Inker to add -%% the request to the ledger. +%% The Bookie takes the request and passes it first to the Inker to add the +%% request to the journal. %% %% The inker will pass the PK/Value/IndexSpecs to the current (append only) %% CDB journal file to persist the change. The call should return either 'ok' %% or 'roll'. -'roll' indicates that the CDB file has insufficient capacity for -%% this write. +%% this write, and a new journal file should be created (with appropriate +%% manifest changes to be made). %% -%% (Note that storing the IndexSpecs will create some duplication with the -%% Metadata wrapped up within the Object value. This Value and the IndexSpecs -%% are compressed before storage, so this should provide some mitigation for -%% the duplication). -%% -%% In resonse to a 'roll', the inker should: -%% - start a new active journal file with an open_write_request, and then; -%% - call to PUT the object in this file; -%% - reply to the bookie, but then in the background -%% - close the previously active journal file (writing the hashtree), and move -%% it to the historic journal -%% -%% The inker will also return the SQN which the change has been made at, as -%% well as the object size on disk within the Journal. +%% The inker will return the SQN which the change has been made at, as well as +%% the object size on disk within the Journal. %% %% Once the object has been persisted to the Journal, the Ledger can be updated. %% The Ledger is updated by the Bookie applying a function (extract_metadata/4) @@ -68,30 +57,32 @@ %% of the Value and also taking the Primary Key, the IndexSpecs, the Sequence %% Number in the Journal and the Object Size (returned from the Inker). %% -%% The Bookie should generate a series of ledger key changes from this -%% information, using a function passed in at startup. For Riak this will be -%% of the form: -%% {{o_rkv, Bucket, Key, SubKey|null}, -%% SQN, -%% {Hash, Size, {Riak_Metadata}}, -%% {active, TS}|{tomb, TS}} or -%% {{i, Bucket, {IndexTerm, IndexField}, Key}, -%% SQN, -%% null, -%% {active, TS}|{tomb, TS}} +%% A set of Ledger Key changes are then generated and placed in the Bookie's +%% Ledger Key cache (a gb_tree). %% -%% Recent Ledger changes are retained initially in the Bookies' memory (in a -%% small generally balanced tree). Periodically, the current table is pushed to -%% the Penciller for eventual persistence, and a new table is started. +%% The PUT can now be acknowledged. In the background the Bookie may then +%% choose to push the cache to the Penciller for eventual persistence within +%% the ledger. This push will either be acccepted or returned (if the +%% Penciller has a backlog of key changes). The back-pressure should lead to +%% the Bookie entering into a slow-offer status whereby the next PUT will be +%% acknowledged by a PAUSE signal - with the expectation that the this will +%% lead to a back-off behaviour. %% -%% This completes the non-deferrable work associated with a PUT +%% -------- GET, HEAD -------- %% -%% -------- Snapshots (Key & Metadata Only) -------- +%% The Bookie supports both GET and HEAD requests, with the HEAD request +%% returning only the metadata and not the actual object value. The HEAD +%% requets cna be serviced by reference to the Ledger Cache and the Penciller. +%% +%% GET requests first follow the path of a HEAD request, and if an object is +%% found, then fetch the value from the Journal via the Inker. +%% +%% -------- Snapshots/Clones -------- %% %% If there is a snapshot request (e.g. to iterate over the keys) the Bookie %% may request a clone of the Penciller, or the Penciller and the Inker. %% -%% The clone is seeded with the manifest. Teh clone should be registered with +%% The clone is seeded with the manifest. The clone should be registered with %% the real Inker/Penciller, so that the real Inker/Penciller may prevent the %% deletion of files still in use by a snapshot clone. %% @@ -101,12 +92,6 @@ %% there are no registered iterators from before the point the file was %% removed from the manifest. %% -%% -------- Special Ops -------- -%% -%% e.g. Get all for SegmentID/Partition -%% -%% -%% %% -------- On Startup -------- %% %% On startup the Bookie must restart both the Inker to load the Journal, and @@ -134,16 +119,14 @@ code_change/3, book_start/1, book_start/3, - book_riakput/3, - book_riakdelete/4, - book_riakget/3, - book_riakhead/3, book_put/5, book_put/6, book_tempput/7, book_delete/4, book_get/3, + book_get/4, book_head/3, + book_head/4, book_returnfolder/2, book_snapshotstore/3, book_snapshotledger/3, @@ -161,7 +144,6 @@ -define(LEDGER_FP, "ledger"). -define(SNAPSHOT_TIMEOUT, 300000). -define(CHECKJOURNAL_PROB, 0.2). --define(SLOWOFFER_DELAY, 5). -record(state, {inker :: pid(), penciller :: pid(), @@ -184,9 +166,6 @@ book_start(RootPath, LedgerCacheSize, JournalSize) -> book_start(Opts) -> gen_server:start(?MODULE, [Opts], []). -book_riakput(Pid, RiakObject, IndexSpecs) -> - {Bucket, Key} = leveled_codec:riakto_keydetails(RiakObject), - book_put(Pid, Bucket, Key, RiakObject, IndexSpecs, ?RIAK_TAG). book_tempput(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) when is_integer(TTL) -> book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL). @@ -197,21 +176,12 @@ book_put(Pid, Bucket, Key, Object, IndexSpecs) -> book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag) -> book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, infinity). -book_riakdelete(Pid, Bucket, Key, IndexSpecs) -> - book_put(Pid, Bucket, Key, delete, IndexSpecs, ?RIAK_TAG). - book_delete(Pid, Bucket, Key, IndexSpecs) -> book_put(Pid, Bucket, Key, delete, IndexSpecs, ?STD_TAG). -book_riakget(Pid, Bucket, Key) -> - book_get(Pid, Bucket, Key, ?RIAK_TAG). - book_get(Pid, Bucket, Key) -> book_get(Pid, Bucket, Key, ?STD_TAG). -book_riakhead(Pid, Bucket, Key) -> - book_head(Pid, Bucket, Key, ?RIAK_TAG). - book_head(Pid, Bucket, Key) -> book_head(Pid, Bucket, Key, ?STD_TAG). @@ -293,11 +263,10 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) -> % will beocme more frequent case State#state.slow_offer of true -> - timer:sleep(?SLOWOFFER_DELAY); + gen_server:reply(From, pause); false -> - ok + gen_server:reply(From, ok) end, - gen_server:reply(From, ok), case maybepush_ledgercache(State#state.cache_size, Cache0, State#state.penciller) of @@ -933,12 +902,12 @@ single_key_test() -> {"MDK1", "MDV1"}}, Content = #r_content{metadata=MD, value=V1}, Object = #r_object{bucket=B1, key=K1, contents=[Content], vclock=[{'a',1}]}, - ok = book_riakput(Bookie1, Object, Spec1), - {ok, F1} = book_riakget(Bookie1, B1, K1), + ok = book_put(Bookie1, B1, K1, Object, Spec1, ?RIAK_TAG), + {ok, F1} = book_get(Bookie1, B1, K1, ?RIAK_TAG), ?assertMatch(F1, Object), ok = book_close(Bookie1), {ok, Bookie2} = book_start([{root_path, RootPath}]), - {ok, F2} = book_riakget(Bookie2, B1, K1), + {ok, F2} = book_get(Bookie2, B1, K1, ?RIAK_TAG), ?assertMatch(F2, Object), ok = book_close(Bookie2), reset_filestructure(). @@ -960,41 +929,53 @@ multi_key_test() -> {"MDK2", "MDV2"}}, C2 = #r_content{metadata=MD2, value=V2}, Obj2 = #r_object{bucket=B2, key=K2, contents=[C2], vclock=[{'a',1}]}, - ok = book_riakput(Bookie1, Obj1, Spec1), + ok = book_put(Bookie1, B1, K1, Obj1, Spec1, ?RIAK_TAG), ObjL1 = generate_multiple_robjects(100, 3), SW1 = os:timestamp(), - lists:foreach(fun({O, S}) -> ok = book_riakput(Bookie1, O, S) end, ObjL1), + lists:foreach(fun({O, S}) -> + {B, K} = leveled_codec:riakto_keydetails(O), + ok = book_put(Bookie1, B, K, O, S, ?RIAK_TAG) + end, + ObjL1), io:format("PUT of 100 objects completed in ~w microseconds~n", [timer:now_diff(os:timestamp(),SW1)]), - ok = book_riakput(Bookie1, Obj2, Spec2), - {ok, F1A} = book_riakget(Bookie1, B1, K1), + ok = book_put(Bookie1, B2, K2, Obj2, Spec2, ?RIAK_TAG), + {ok, F1A} = book_get(Bookie1, B1, K1, ?RIAK_TAG), ?assertMatch(F1A, Obj1), - {ok, F2A} = book_riakget(Bookie1, B2, K2), + {ok, F2A} = book_get(Bookie1, B2, K2, ?RIAK_TAG), ?assertMatch(F2A, Obj2), ObjL2 = generate_multiple_robjects(100, 103), SW2 = os:timestamp(), - lists:foreach(fun({O, S}) -> ok = book_riakput(Bookie1, O, S) end, ObjL2), + lists:foreach(fun({O, S}) -> + {B, K} = leveled_codec:riakto_keydetails(O), + ok = book_put(Bookie1, B, K, O, S, ?RIAK_TAG) + end, + ObjL2), io:format("PUT of 100 objects completed in ~w microseconds~n", [timer:now_diff(os:timestamp(),SW2)]), - {ok, F1B} = book_riakget(Bookie1, B1, K1), + {ok, F1B} = book_get(Bookie1, B1, K1, ?RIAK_TAG), ?assertMatch(F1B, Obj1), - {ok, F2B} = book_riakget(Bookie1, B2, K2), + {ok, F2B} = book_get(Bookie1, B2, K2, ?RIAK_TAG), ?assertMatch(F2B, Obj2), ok = book_close(Bookie1), % Now reopen the file, and confirm that a fetch is still possible {ok, Bookie2} = book_start([{root_path, RootPath}]), - {ok, F1C} = book_riakget(Bookie2, B1, K1), + {ok, F1C} = book_get(Bookie2, B1, K1, ?RIAK_TAG), ?assertMatch(F1C, Obj1), - {ok, F2C} = book_riakget(Bookie2, B2, K2), + {ok, F2C} = book_get(Bookie2, B2, K2, ?RIAK_TAG), ?assertMatch(F2C, Obj2), ObjL3 = generate_multiple_robjects(100, 203), SW3 = os:timestamp(), - lists:foreach(fun({O, S}) -> ok = book_riakput(Bookie2, O, S) end, ObjL3), + lists:foreach(fun({O, S}) -> + {B, K} = leveled_codec:riakto_keydetails(O), + ok = book_put(Bookie2, B, K, O, S, ?RIAK_TAG) + end, + ObjL3), io:format("PUT of 100 objects completed in ~w microseconds~n", [timer:now_diff(os:timestamp(),SW3)]), - {ok, F1D} = book_riakget(Bookie2, B1, K1), + {ok, F1D} = book_get(Bookie2, B1, K1, ?RIAK_TAG), ?assertMatch(F1D, Obj1), - {ok, F2D} = book_riakget(Bookie2, B2, K2), + {ok, F2D} = book_get(Bookie2, B2, K2, ?RIAK_TAG), ?assertMatch(F2D, Obj2), ok = book_close(Bookie2), reset_filestructure(). diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 287f122..eeff696 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -35,10 +35,6 @@ %% - L1 TO L7: May contain multiple processes managing non-overlapping sft %% files. Compaction work should be sheduled if the number of files exceeds %% the target size of the level, where the target size is 8 ^ n. -%% - L Minus 1: Used to cache the last ledger cache push for use in queries -%% whilst the Penciller awaits a callback from the roll_clerk with the new -%% merged L0 file containing the L-1 updates. -%% %% %% The most recent revision of a Key can be found by checking each level until %% the key is found. To check a level the correct file must be sought from the @@ -59,33 +55,28 @@ %% %% When the clerk picks work it will take the current manifest, and the %% Penciller assumes the manifest sequence number is to be incremented. -%% When the clerk has completed the work it cna request that the manifest +%% When the clerk has completed the work it can request that the manifest %% change be committed by the Penciller. The commit is made through changing %% the filename of the new manifest - so the Penciller is not held up by the %% process of wiritng a file, just altering file system metadata. %% -%% The manifest is locked by a clerk taking work, or by there being a need to -%% write a file to Level 0. If the manifest is locked, then new keys can still -%% be added in memory - however, the response to that push will be to "pause", -%% that is to say the Penciller will ask the Bookie to slowdown. -%% %% ---------- PUSH ---------- %% %% The Penciller must support the PUSH of a dump of keys from the Bookie. The %% call to PUSH should be immediately acknowledged, and then work should be -%% completed to merge the tree into the L0 tree (with the tree being cached as -%% a Level -1 tree so as not to block reads whilst it waits. +%% completed to merge the tree into the L0 tree. %% %% The Penciller MUST NOT accept a new PUSH if the Clerk has commenced the -%% conversion of the current ETS table into a SFT file, but not completed this +%% conversion of the current L0 tree into a SFT file, but not completed this %% change. The Penciller in this case returns the push, and the Bookie should -%% continue to gorw the cache before trying again. +%% continue to grow the cache before trying again. %% %% ---------- FETCH ---------- %% -%% On request to fetch a key the Penciller should look first in the L0 ETS -%% table, and then look in the SFT files Level by Level, consulting the -%% Manifest to determine which file should be checked at each level. +%% On request to fetch a key the Penciller should look first in the in-memory +%% L0 tree, then look in the SFT files Level by Level (including level 0), +%% consulting the Manifest to determine which file should be checked at each +%% level. %% %% ---------- SNAPSHOT ---------- %% @@ -153,61 +144,13 @@ %% %% The writing of L0 files do not require the involvement of the clerk. %% The L0 files are prompted directly by the penciller when the in-memory tree -%% has reached capacity. When there is a next push into memory the Penciller -%% calls to check that the file is now active (which may pause if the write is -%% ongoing the acceptence of the push), and if so it can clear the L0 tree -%% and build a new tree from an empty tree and the keys from the latest push. +%% has reached capacity. This places the penciller in a levelzero_pending +%% state, and in this state it must return new pushes. Once the SFT file has +%% been completed it will confirm completion to the penciller which can then +%% revert the levelzero_pending state, add the file to the manifest and clear +%% the current level zero in-memory view. %% -%% Only a single L0 file may exist at any one moment in time. If pushes are -%% received when memory is over the maximum size, the pushes must be kept into -%% memory. -%% -%% 1 - A L0 file is prompted to be created at ManifestSQN n -%% 2 - The next push to memory will be stalled until the L0 write is reported -%% as completed (as the memory needs to be flushed) -%% 3 - The completion of the L0 file will cause a prompt to be cast to the -%% clerk for them to look for work -%% 4 - On completion of the merge (of the L0 file into L1, as this will be the -%% highest priority work), the clerk will create a new manifest file at -%% manifest SQN n+1 -%% 5 - The clerk will prompt the penciller about the change, and the Penciller -%% will then commit the change (by renaming the manifest file to be active, and -%% advancing the in-memory state of the manifest and manifest SQN) -%% 6 - The Penciller having committed the change will cast back to the Clerk -%% to inform the Clerk that the chnage has been committed, and so it can carry -%% on requetsing new work -%% 7 - If the Penciller now receives a Push to over the max size, a new L0 file -%% can now be created with the ManifestSQN of n+1 -%% -%% ---------- NOTES ON THE (NON) USE OF ETS ---------- -%% -%% Insertion into ETS is very fast, and so using ETS does not slow the PUT -%% path. However, an ETS table is mutable, so it does complicate the -%% snapshotting of the Ledger. -%% -%% Originally the solution had used an ETS table for insertion speed as the L0 -%% cache. Insertion speed was an order or magnitude faster than gb_trees. To -%% resolving issues of trying to have fast start-up snapshots though led to -%% keeping a seperate set of trees alongside the ETS table to be used by -%% snapshots. -%% -%% The next strategy was to perform the expensive operation (merging the -%% Ledger cache into the Level0 cache), within a dedicated Penciller's clerk, -%% known as the roll_clerk. This may take 30-40ms, but during this period -%% the Penciller will keep a Level -1 cache of the unmerged elements which -%% it will wipe once the roll_clerk returns with an updated L0 cache. -%% -%% This was still a bit complicated, and did a lot of processing to -%% making updates to the large L0 cache - which will have created a lot of GC -%% effort required. The processing was inefficient -%% -%% The current paproach is to simply append each new tree pushed to a list, and -%% use an array of hashes to index for the presence of objects in the list. -%% When attempting to iterate, the caches are all merged for the range relevant -%% to the given iterator only. The main downside to the approahc is that the -%% Penciller cna no longer accurately measure the size of the L0 cache (as it -%% cannot determine how many replacements there are in the Cache - so it may -%% prematurely write a smaller than necessary L0 file. + -module(leveled_penciller). @@ -1298,13 +1241,18 @@ simple_server_test() -> clean_testdir(RootPath), {ok, PCL} = pcl_start(#penciller_options{root_path=RootPath, max_inmemory_tablesize=1000}), - Key1 = {{o,"Bucket0001", "Key0001", null}, {1, {active, infinity}, null}}, + Key1 = {{o,"Bucket0001", "Key0001", null}, + {1, {active, infinity}, null}}, KL1 = leveled_sft:generate_randomkeys({1000, 2}), - Key2 = {{o,"Bucket0002", "Key0002", null}, {1002, {active, infinity}, null}}, - KL2 = leveled_sft:generate_randomkeys({1000, 1003}), - Key3 = {{o,"Bucket0003", "Key0003", null}, {2003, {active, infinity}, null}}, - KL3 = leveled_sft:generate_randomkeys({1000, 2004}), - Key4 = {{o,"Bucket0004", "Key0004", null}, {3004, {active, infinity}, null}}, + Key2 = {{o,"Bucket0002", "Key0002", null}, + {1002, {active, infinity}, null}}, + KL2 = leveled_sft:generate_randomkeys({900, 1003}), + % Keep below the max table size by having 900 not 1000 + Key3 = {{o,"Bucket0003", "Key0003", null}, + {2003, {active, infinity}, null}}, + KL3 = leveled_sft:generate_randomkeys({1000, 2004}), + Key4 = {{o,"Bucket0004", "Key0004", null}, + {3004, {active, infinity}, null}}, KL4 = leveled_sft:generate_randomkeys({1000, 3005}), ok = maybe_pause_push(PCL, [Key1]), ?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001", null})), @@ -1321,13 +1269,15 @@ simple_server_test() -> ?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001", null})), ?assertMatch(Key2, pcl_fetch(PCL, {o,"Bucket0002", "Key0002", null})), ?assertMatch(Key3, pcl_fetch(PCL, {o,"Bucket0003", "Key0003", null})), + timer:sleep(200), + % This sleep should make sure that the merge to L1 has occurred + % This will free up the L0 slot for the remainder to be written in shutdown ok = pcl_close(PCL), {ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath, max_inmemory_tablesize=1000}), - ?assertMatch(1001, pcl_getstartupsequencenumber(PCLr)), - ok = maybe_pause_push(PCLr, [Key2] ++ KL2 ++ [Key3]), - io:format("Back to starting position with lost data recovered~n"), + ?assertMatch(2003, pcl_getstartupsequencenumber(PCLr)), + % ok = maybe_pause_push(PCLr, [Key2] ++ KL2 ++ [Key3]), ?assertMatch(Key1, pcl_fetch(PCLr, {o,"Bucket0001", "Key0001", null})), ?assertMatch(Key2, pcl_fetch(PCLr, {o,"Bucket0002", "Key0002", null})), diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 23273de..0c1deae 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -27,7 +27,7 @@ simple_put_fetch_head_delete(_Config) -> StartOpts1 = [{root_path, RootPath}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), {TestObject, TestSpec} = testutil:generate_testobject(), - ok = leveled_bookie:book_riakput(Bookie1, TestObject, TestSpec), + ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), testutil:check_forobject(Bookie1, TestObject), testutil:check_formissingobject(Bookie1, "Bucket1", "Key2"), ok = leveled_bookie:book_close(Bookie1), @@ -36,9 +36,7 @@ simple_put_fetch_head_delete(_Config) -> {ok, Bookie2} = leveled_bookie:book_start(StartOpts2), testutil:check_forobject(Bookie2, TestObject), ObjList1 = testutil:generate_objects(5000, 2), - lists:foreach(fun({_RN, Obj, Spc}) -> - leveled_bookie:book_riakput(Bookie2, Obj, Spc) end, - ObjList1), + testutil:riakload(Bookie2, ObjList1), ChkList1 = lists:sublist(lists:sort(ObjList1), 100), testutil:check_forlist(Bookie2, ChkList1), testutil:check_forobject(Bookie2, TestObject), @@ -72,7 +70,7 @@ many_put_fetch_head(_Config) -> StartOpts1 = [{root_path, RootPath}, {max_pencillercachesize, 16000}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), {TestObject, TestSpec} = testutil:generate_testobject(), - ok = leveled_bookie:book_riakput(Bookie1, TestObject, TestSpec), + ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), testutil:check_forobject(Bookie1, TestObject), ok = leveled_bookie:book_close(Bookie1), StartOpts2 = [{root_path, RootPath}, @@ -88,9 +86,7 @@ many_put_fetch_head(_Config) -> ChkListFixed = lists:nth(length(CLs), CLs), testutil:check_forlist(Bookie2, CL1A), ObjList2A = testutil:generate_objects(5000, 2), - lists:foreach(fun({_RN, Obj, Spc}) -> - leveled_bookie:book_riakput(Bookie2, Obj, Spc) end, - ObjList2A), + testutil:riakload(Bookie2, ObjList2A), ChkList2A = lists:sublist(lists:sort(ObjList2A), 1000), testutil:check_forlist(Bookie2, ChkList2A), testutil:check_forlist(Bookie2, ChkListFixed), @@ -113,12 +109,10 @@ journal_compaction(_Config) -> {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), ok = leveled_bookie:book_compactjournal(Bookie1, 30000), {TestObject, TestSpec} = testutil:generate_testobject(), - ok = leveled_bookie:book_riakput(Bookie1, TestObject, TestSpec), + ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), testutil:check_forobject(Bookie1, TestObject), ObjList1 = testutil:generate_objects(20000, 2), - lists:foreach(fun({_RN, Obj, Spc}) -> - leveled_bookie:book_riakput(Bookie1, Obj, Spc) end, - ObjList1), + testutil:riakload(Bookie1, ObjList1), ChkList1 = lists:sublist(lists:sort(ObjList1), 10000), testutil:check_forlist(Bookie1, ChkList1), testutil:check_forobject(Bookie1, TestObject), @@ -129,7 +123,7 @@ journal_compaction(_Config) -> {"MDK1", "MDV1"}}, {TestObject2, TestSpec2} = testutil:generate_testobject(B2, K2, V2, Spec2, MD), - ok = leveled_bookie:book_riakput(Bookie1, TestObject2, TestSpec2), + ok = testutil:book_riakput(Bookie1, TestObject2, TestSpec2), ok = leveled_bookie:book_compactjournal(Bookie1, 30000), testutil:check_forlist(Bookie1, ChkList1), testutil:check_forobject(Bookie1, TestObject), @@ -140,18 +134,16 @@ journal_compaction(_Config) -> %% Delete some of the objects ObjListD = testutil:generate_objects(10000, 2), lists:foreach(fun({_R, O, _S}) -> - ok = leveled_bookie:book_riakdelete(Bookie1, - O#r_object.bucket, - O#r_object.key, - []) + testutil:book_riakdelete(Bookie1, + O#r_object.bucket, + O#r_object.key, + []) end, ObjListD), %% Now replace all the other objects ObjList2 = testutil:generate_objects(40000, 10002), - lists:foreach(fun({_RN, Obj, Spc}) -> - leveled_bookie:book_riakput(Bookie1, Obj, Spc) end, - ObjList2), + testutil:riakload(Bookie1, ObjList2), ok = leveled_bookie:book_compactjournal(Bookie1, 30000), F = fun leveled_bookie:book_islastcompactionpending/1, @@ -184,11 +176,9 @@ fetchput_snapshot(_Config) -> StartOpts1 = [{root_path, RootPath}, {max_journalsize, 30000000}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), {TestObject, TestSpec} = testutil:generate_testobject(), - ok = leveled_bookie:book_riakput(Bookie1, TestObject, TestSpec), + ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), ObjList1 = testutil:generate_objects(5000, 2), - lists:foreach(fun({_RN, Obj, Spc}) -> - leveled_bookie:book_riakput(Bookie1, Obj, Spc) end, - ObjList1), + testutil:riakload(Bookie1, ObjList1), SnapOpts1 = [{snapshot_bookie, Bookie1}], {ok, SnapBookie1} = leveled_bookie:book_start(SnapOpts1), ChkList1 = lists:sublist(lists:sort(ObjList1), 100), @@ -211,9 +201,7 @@ fetchput_snapshot(_Config) -> ObjList2 = testutil:generate_objects(5000, 2), - lists:foreach(fun({_RN, Obj, Spc}) -> - leveled_bookie:book_riakput(Bookie2, Obj, Spc) end, - ObjList2), + testutil:riakload(Bookie2, ObjList2), io:format("Replacement objects put~n"), ChkList2 = lists:sublist(lists:sort(ObjList2), 100), @@ -225,9 +213,7 @@ fetchput_snapshot(_Config) -> ok = filelib:ensure_dir(RootPath ++ "/ledger/ledger_files"), {ok, FNsA} = file:list_dir(RootPath ++ "/ledger/ledger_files"), ObjList3 = testutil:generate_objects(15000, 5002), - lists:foreach(fun({_RN, Obj, Spc}) -> - leveled_bookie:book_riakput(Bookie2, Obj, Spc) end, - ObjList3), + testutil:riakload(Bookie2, ObjList3), ChkList3 = lists:sublist(lists:sort(ObjList3), 100), testutil:check_forlist(Bookie2, ChkList3), testutil:check_formissinglist(SnapBookie2, ChkList3), @@ -291,7 +277,7 @@ load_and_count(_Config) -> StartOpts1 = [{root_path, RootPath}, {max_journalsize, 50000000}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), {TestObject, TestSpec} = testutil:generate_testobject(), - ok = leveled_bookie:book_riakput(Bookie1, TestObject, TestSpec), + ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), testutil:check_forobject(Bookie1, TestObject), io:format("Loading initial small objects~n"), G1 = fun testutil:generate_smallobjects/2, @@ -374,7 +360,7 @@ load_and_count_withdelete(_Config) -> StartOpts1 = [{root_path, RootPath}, {max_journalsize, 50000000}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), {TestObject, TestSpec} = testutil:generate_testobject(), - ok = leveled_bookie:book_riakput(Bookie1, TestObject, TestSpec), + ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), testutil:check_forobject(Bookie1, TestObject), io:format("Loading initial small objects~n"), G1 = fun testutil:generate_smallobjects/2, @@ -396,8 +382,8 @@ load_and_count_withdelete(_Config) -> testutil:check_forobject(Bookie1, TestObject), {BucketD, KeyD} = leveled_codec:riakto_keydetails(TestObject), {_, 1} = testutil:check_bucket_stats(Bookie1, BucketD), - ok = leveled_bookie:book_riakdelete(Bookie1, BucketD, KeyD, []), - not_found = leveled_bookie:book_riakget(Bookie1, BucketD, KeyD), + ok = testutil:book_riakdelete(Bookie1, BucketD, KeyD, []), + not_found = testutil:book_riakget(Bookie1, BucketD, KeyD), {_, 0} = testutil:check_bucket_stats(Bookie1, BucketD), io:format("Loading larger compressible objects~n"), G2 = fun testutil:generate_compressibleobjects/2, @@ -416,7 +402,7 @@ load_and_count_withdelete(_Config) -> Acc + 5000 end, 100000, lists:seq(1, 20)), - not_found = leveled_bookie:book_riakget(Bookie1, BucketD, KeyD), + not_found = testutil:book_riakget(Bookie1, BucketD, KeyD), ok = leveled_bookie:book_close(Bookie1), {ok, Bookie2} = leveled_bookie:book_start(StartOpts1), testutil:check_formissingobject(Bookie2, BucketD, KeyD), @@ -461,10 +447,10 @@ space_clear_ondelete(_Config) -> % Delete the keys SW2 = os:timestamp(), lists:foreach(fun({Bucket, Key}) -> - ok = leveled_bookie:book_riakdelete(Book1, - Bucket, - Key, - []) + testutil:book_riakdelete(Book1, + Bucket, + Key, + []) end, KL1), io:format("Deletion took ~w microseconds for 80K keys~n", diff --git a/test/end_to_end/iterator_SUITE.erl b/test/end_to_end/iterator_SUITE.erl index c52cee9..538f37a 100644 --- a/test/end_to_end/iterator_SUITE.erl +++ b/test/end_to_end/iterator_SUITE.erl @@ -24,7 +24,7 @@ small_load_with2i(_Config) -> % low journal size to make sure > 1 created {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), {TestObject, TestSpec} = testutil:generate_testobject(), - ok = leveled_bookie:book_riakput(Bookie1, TestObject, TestSpec), + ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), testutil:check_forobject(Bookie1, TestObject), testutil:check_formissingobject(Bookie1, "Bucket1", "Key2"), testutil:check_forobject(Bookie1, TestObject), @@ -35,9 +35,7 @@ small_load_with2i(_Config) -> [], ObjectGen, IndexGen), - lists:foreach(fun({_RN, Obj, Spc}) -> - leveled_bookie:book_riakput(Bookie1, Obj, Spc) end, - ObjL1), + testutil:riakload(Bookie1, ObjL1), ChkList1 = lists:sublist(lists:sort(ObjL1), 100), testutil:check_forlist(Bookie1, ChkList1), testutil:check_forobject(Bookie1, TestObject), @@ -48,7 +46,7 @@ small_load_with2i(_Config) -> end, Spc), {B, K} = leveled_codec:riakto_keydetails(Obj), - leveled_bookie:book_riakdelete(Bookie1, B, K, DSpc) + testutil:book_riakdelete(Bookie1, B, K, DSpc) end, ChkList1), %% Get the Buckets Keys and Hashes for the whole bucket @@ -116,7 +114,7 @@ query_count(_Config) -> "Value1", [], {"MDK1", "MDV1"}), - ok = leveled_bookie:book_riakput(Book1, TestObject, TestSpec), + ok = testutil:book_riakput(Book1, TestObject, TestSpec), testutil:check_forobject(Book1, TestObject), testutil:check_formissingobject(Book1, "Bucket1", "Key2"), testutil:check_forobject(Book1, TestObject), @@ -129,12 +127,7 @@ query_count(_Config) -> [], V, Indexes), - lists:foreach(fun({_RN, Obj, Spc}) -> - leveled_bookie:book_riakput(Book1, - Obj, - Spc) - end, - ObjL1), + testutil:riakload(Book1, ObjL1), io:format("Put of 10000 objects with 8 index entries " ++ "each completed in ~w microseconds~n", @@ -243,7 +236,7 @@ query_count(_Config) -> V9 = testutil:get_compressiblevalue(), Indexes9 = testutil:get_randomindexes_generator(8), [{_RN, Obj9, Spc9}] = testutil:generate_objects(1, uuid, [], V9, Indexes9), - ok = leveled_bookie:book_riakput(Book2, Obj9, Spc9), + ok = testutil:book_riakput(Book2, Obj9, Spc9), R9 = lists:map(fun({add, IdxF, IdxT}) -> R = leveled_bookie:book_returnfolder(Book2, {index_query, @@ -261,7 +254,7 @@ query_count(_Config) -> Spc9), Spc9Del = lists:map(fun({add, IdxF, IdxT}) -> {remove, IdxF, IdxT} end, Spc9), - ok = leveled_bookie:book_riakput(Book2, Obj9, Spc9Del), + ok = testutil:book_riakput(Book2, Obj9, Spc9Del), lists:foreach(fun({IdxF, IdxT, X}) -> R = leveled_bookie:book_returnfolder(Book2, {index_query, @@ -294,7 +287,7 @@ query_count(_Config) -> end end, R9), - ok = leveled_bookie:book_riakput(Book3, Obj9, Spc9), + ok = testutil:book_riakput(Book3, Obj9, Spc9), ok = leveled_bookie:book_close(Book3), {ok, Book4} = leveled_bookie:book_start(RootPath, 2000, 50000000), lists:foreach(fun({IdxF, IdxT, X}) -> diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index de58e4c..e7b924c 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -47,7 +47,7 @@ aae_bustedjournal(_Config) -> {max_journalsize, 20000000}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts), {TestObject, TestSpec} = testutil:generate_testobject(), - ok = leveled_bookie:book_riakput(Bookie1, TestObject, TestSpec), + ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), testutil:check_forobject(Bookie1, TestObject), GenList = [2], _CLs = testutil:load_objects(20000, GenList, Bookie1, TestObject, @@ -64,7 +64,7 @@ aae_bustedjournal(_Config) -> KeyList = KeyF(), 20001 = length(KeyList), HeadCount = lists:foldl(fun({B, K}, Acc) -> - case leveled_bookie:book_riakhead(Bookie2, + case testutil:book_riakhead(Bookie2, B, K) of {ok, _} -> Acc + 1; @@ -75,7 +75,7 @@ aae_bustedjournal(_Config) -> KeyList), 20001 = HeadCount, GetCount = lists:foldl(fun({B, K}, Acc) -> - case leveled_bookie:book_riakget(Bookie2, + case testutil:book_riakget(Bookie2, B, K) of {ok, _} -> Acc + 1; @@ -199,16 +199,16 @@ journal_compaction_bustedjournal(_Config) -> {max_run_length, 10}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), {TestObject, TestSpec} = testutil:generate_testobject(), - ok = leveled_bookie:book_riakput(Bookie1, TestObject, TestSpec), + ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), testutil:check_forobject(Bookie1, TestObject), ObjList1 = testutil:generate_objects(50000, 2), lists:foreach(fun({_RN, Obj, Spc}) -> - leveled_bookie:book_riakput(Bookie1, Obj, Spc) end, + testutil:book_riakput(Bookie1, Obj, Spc) end, ObjList1), %% Now replace all the objects ObjList2 = testutil:generate_objects(50000, 2), lists:foreach(fun({_RN, Obj, Spc}) -> - leveled_bookie:book_riakput(Bookie1, Obj, Spc) end, + testutil:book_riakput(Bookie1, Obj, Spc) end, ObjList2), ok = leveled_bookie:book_close(Bookie1), diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index 755cf88..b49ab21 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -2,7 +2,12 @@ -include("../include/leveled.hrl"). --export([reset_filestructure/0, +-export([book_riakput/3, + book_riakdelete/4, + book_riakget/3, + book_riakhead/3, + riakload/2, + reset_filestructure/0, reset_filestructure/1, check_bucket_stats/2, check_forlist/2, @@ -37,6 +42,33 @@ riak_hash/1]). -define(RETURN_TERMS, {true, undefined}). +-define(SLOWOFFER_DELAY, 5). + + + +book_riakput(Pid, RiakObject, IndexSpecs) -> + {Bucket, Key} = leveled_codec:riakto_keydetails(RiakObject), + leveled_bookie:book_put(Pid, Bucket, Key, RiakObject, IndexSpecs, ?RIAK_TAG). + +book_riakdelete(Pid, Bucket, Key, IndexSpecs) -> + leveled_bookie:book_put(Pid, Bucket, Key, delete, IndexSpecs, ?RIAK_TAG). + +book_riakget(Pid, Bucket, Key) -> + leveled_bookie:book_get(Pid, Bucket, Key, ?RIAK_TAG). + +book_riakhead(Pid, Bucket, Key) -> + leveled_bookie:book_head(Pid, Bucket, Key, ?RIAK_TAG). + + +riakload(Bookie, ObjectList) -> + lists:foreach(fun({_RN, Obj, Spc}) -> + R = book_riakput(Bookie, Obj, Spc), + case R of + ok -> ok; + pause -> timer:sleep(?SLOWOFFER_DELAY) + end + end, + ObjectList). reset_filestructure() -> @@ -81,9 +113,9 @@ check_forlist(Bookie, ChkList, Log) -> true -> ok end, - R = leveled_bookie:book_riakget(Bookie, - Obj#r_object.bucket, - Obj#r_object.key), + R = book_riakget(Bookie, + Obj#r_object.bucket, + Obj#r_object.key), ok = case R of {ok, Obj} -> ok; @@ -100,21 +132,21 @@ check_forlist(Bookie, ChkList, Log) -> check_formissinglist(Bookie, ChkList) -> SW = os:timestamp(), lists:foreach(fun({_RN, Obj, _Spc}) -> - R = leveled_bookie:book_riakget(Bookie, - Obj#r_object.bucket, - Obj#r_object.key), + R = book_riakget(Bookie, + Obj#r_object.bucket, + Obj#r_object.key), R = not_found end, ChkList), io:format("Miss check took ~w microseconds checking list of length ~w~n", [timer:now_diff(os:timestamp(), SW), length(ChkList)]). check_forobject(Bookie, TestObject) -> - {ok, TestObject} = leveled_bookie:book_riakget(Bookie, - TestObject#r_object.bucket, - TestObject#r_object.key), - {ok, HeadObject} = leveled_bookie:book_riakhead(Bookie, - TestObject#r_object.bucket, - TestObject#r_object.key), + {ok, TestObject} = book_riakget(Bookie, + TestObject#r_object.bucket, + TestObject#r_object.key), + {ok, HeadObject} = book_riakhead(Bookie, + TestObject#r_object.bucket, + TestObject#r_object.key), ok = case {HeadObject#r_object.bucket, HeadObject#r_object.key, HeadObject#r_object.vclock} of @@ -125,8 +157,8 @@ check_forobject(Bookie, TestObject) -> end. check_formissingobject(Bookie, Bucket, Key) -> - not_found = leveled_bookie:book_riakget(Bookie, Bucket, Key), - not_found = leveled_bookie:book_riakhead(Bookie, Bucket, Key). + not_found = book_riakget(Bookie, Bucket, Key), + not_found = book_riakhead(Bookie, Bucket, Key). generate_testobject() -> @@ -235,10 +267,7 @@ load_objects(ChunkSize, GenList, Bookie, TestObject, Generator) -> lists:map(fun(KN) -> ObjListA = Generator(ChunkSize, KN), StartWatchA = os:timestamp(), - lists:foreach(fun({_RN, Obj, Spc}) -> - leveled_bookie:book_riakput(Bookie, Obj, Spc) - end, - ObjListA), + riakload(Bookie, ObjListA), Time = timer:now_diff(os:timestamp(), StartWatchA), io:format("~w objects loaded in ~w seconds~n", [ChunkSize, Time/1000000]), @@ -289,7 +318,7 @@ check_indexed_objects(Book, B, KSpecL, V) -> % Check all objects match, return what should be the results of an all % index query IdxR = lists:map(fun({K, Spc}) -> - {ok, O} = leveled_bookie:book_riakget(Book, B, K), + {ok, O} = book_riakget(Book, B, K), V = testutil:get_value(O), {add, "idx1_bin", @@ -333,9 +362,7 @@ put_indexed_objects(Book, Bucket, Count) -> IndexGen, Bucket), KSpecL = lists:map(fun({_RN, Obj, Spc}) -> - leveled_bookie:book_riakput(Book, - Obj, - Spc), + book_riakput(Book, Obj, Spc), {testutil:get_key(Obj), Spc} end, ObjL1), @@ -364,9 +391,7 @@ put_altered_indexed_objects(Book, Bucket, KSpecL, RemoveOld2i) -> V, IndexGen, AddSpc), - ok = leveled_bookie:book_riakput(Book, - O, - AltSpc), + ok = book_riakput(Book, O, AltSpc), {K, AltSpc} end, KSpecL), {RplKSpecL, V}.