From 8f29a6c40f57b23090cb819832fbab3c85aadd27 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Tue, 18 Oct 2016 19:41:33 +0100 Subject: [PATCH] Complete 2i work - some refactoring The 2i work now has tests for removals as well as regex etc. Some initial refactoring work has also been tried - to try and take some tasks of the critical path of push_mem. The primary change has been to avoid putting index keys into the gb_tree, and building the KeyChanges list in parallel to the gb_tree (now known as ObjectTree) within the Ledger Cache. Some initial experiments done as to changing the ETS table in the Penciller now that it will now be used for iterating - but that has been reverted for now. --- src/leveled_bookie.erl | 51 +++++++++++-------- src/leveled_codec.erl | 10 +++- src/leveled_inker.erl | 5 +- src/leveled_penciller.erl | 39 ++++++++++----- src/leveled_sft.erl | 4 +- test/end_to_end/iterator_SUITE.erl | 80 ++++++++++++++++++++++++++++-- 6 files changed, 144 insertions(+), 45 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index c607606..3794dab 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -149,7 +149,7 @@ -include_lib("eunit/include/eunit.hrl"). --define(CACHE_SIZE, 1000). +-define(CACHE_SIZE, 2000). -define(JOURNAL_FP, "journal"). -define(LEDGER_FP, "ledger"). -define(SHUTDOWN_WAITS, 60). @@ -160,7 +160,7 @@ penciller :: pid(), cache_size :: integer(), back_pressure :: boolean(), - ledger_cache :: gb_trees:tree(), + ledger_cache :: {gb_trees:tree(), list()}, is_snapshot :: boolean()}). @@ -242,7 +242,7 @@ init([Opts]) -> {ok, #state{inker=Inker, penciller=Penciller, cache_size=CacheSize, - ledger_cache=gb_trees:empty(), + ledger_cache={gb_trees:empty(), []}, is_snapshot=false}}; Bookie -> {ok, @@ -397,16 +397,15 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================ -bucket_stats(Penciller, LedgerCache, Bucket, Tag) -> +bucket_stats(Penciller, {_ObjTree, ChangeList}, Bucket, Tag) -> PCLopts = #penciller_options{start_snapshot=true, source_penciller=Penciller}, {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), Folder = fun() -> - Increment = gb_trees:to_list(LedgerCache), io:format("Length of increment in snapshot is ~w~n", - [length(Increment)]), + [length(ChangeList)]), ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, - {infinity, Increment}), + {infinity, ChangeList}), StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, @@ -419,7 +418,7 @@ bucket_stats(Penciller, LedgerCache, Bucket, Tag) -> end, {async, Folder}. -index_query(Penciller, LedgerCache, +index_query(Penciller, {_ObjTree, ChangeList}, Bucket, {IdxField, StartValue, EndValue}, {ReturnTerms, TermRegex}) -> @@ -427,11 +426,10 @@ index_query(Penciller, LedgerCache, source_penciller=Penciller}, {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), Folder = fun() -> - Increment = gb_trees:to_list(LedgerCache), io:format("Length of increment in snapshot is ~w~n", - [length(Increment)]), + [length(ChangeList)]), ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, - {infinity, Increment}), + {infinity, ChangeList}), StartKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, IdxField, StartValue), EndKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, @@ -493,8 +491,9 @@ startup(InkerOpts, PencillerOpts) -> {Inker, Penciller}. -fetch_head(Key, Penciller, Cache) -> - case gb_trees:lookup(Key, Cache) of +fetch_head(Key, Penciller, {ObjTree, _ChangeList}) -> + + case gb_trees:lookup(Key, ObjTree) of {value, Head} -> Head; none -> @@ -561,8 +560,6 @@ accumulate_index(TermRe, AddFun) -> end. - - preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs) -> {Bucket, Key, PrimaryChange} = leveled_codec:generate_ledgerkv(PK, SQN, @@ -572,20 +569,30 @@ preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs) -> [PrimaryChange] ++ ConvSpecs. addto_ledgercache(Changes, Cache) -> - lists:foldl(fun({K, V}, Acc) -> gb_trees:enter(K, V, Acc) end, - Cache, - Changes). + {ObjectTree, ChangeList} = Cache, + {lists:foldl(fun({K, V}, Acc) -> + case leveled_codec:is_indexkey(K) of + false -> + gb_trees:enter(K, V, Acc); + true -> + Acc + end + end, + ObjectTree, + Changes), + ChangeList ++ Changes}. maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> - CacheSize = gb_trees:size(Cache), + {_ObjectTree, ChangeList} = Cache, + CacheSize = length(ChangeList), if CacheSize > MaxCacheSize -> case leveled_penciller:pcl_pushmem(Penciller, - gb_trees:to_list(Cache)) of + ChangeList) of ok -> - {ok, gb_trees:empty()}; + {ok, {gb_trees:empty(), []}}; pause -> - {pause, gb_trees:empty()}; + {pause, {gb_trees:empty(), []}}; refused -> {ok, Cache} end; diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 370133c..096e48f 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -39,6 +39,7 @@ strip_to_keyseqstatusonly/1, striphead_to_details/1, is_active/2, + is_indexkey/1, endkey_passed/2, key_dominates/2, print_key/1, @@ -107,6 +108,11 @@ to_ledgerkey(Bucket, Key, Tag, Field, Value) when Tag == ?IDX_TAG -> to_ledgerkey(Bucket, Key, Tag) -> {Tag, Bucket, Key, null}. +is_indexkey({Tag, _, _, _}) when Tag == ?IDX_TAG -> + true; +is_indexkey(_Key) -> + false. + hash(Obj) -> erlang:phash2(term_to_binary(Obj)). @@ -156,7 +162,7 @@ convert_indexspecs(IndexSpecs, Bucket, Key, SQN) -> {active, infinity}; remove -> %% TODO: timestamps for delayed reaping - {tomb, infinity} + tomb end, {to_ledgerkey(Bucket, Key, ?IDX_TAG, IdxField, IdxValue), @@ -260,7 +266,7 @@ indexspecs_test() -> ?assertMatch({{i, "Bucket", {"t1_bin", "adbc123"}, "Key2"}, {1, {active, infinity}, null}}, lists:nth(2, Changes)), ?assertMatch({{i, "Bucket", {"t1_bin", "abdc456"}, "Key2"}, - {1, {tomb, infinity}, null}}, lists:nth(3, Changes)). + {1, tomb, null}}, lists:nth(3, Changes)). endkey_passed_test() -> TestKey = {i, null, null, null}, diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index a876fa8..0ddc6d7 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -724,10 +724,9 @@ manifest_printer(Manifest) -> initiate_penciller_snapshot(Bookie) -> {ok, - {LedgerSnap, LedgerCache}, + {LedgerSnap, {_ObjTree, ChangeList}}, _} = leveled_bookie:book_snapshotledger(Bookie, self(), undefined), - ok = leveled_penciller:pcl_loadsnapshot(LedgerSnap, - gb_trees:to_list(LedgerCache)), + ok = leveled_penciller:pcl_loadsnapshot(LedgerSnap, ChangeList), MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap), {LedgerSnap, MaxSQN}. diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 34a99a4..32c70a3 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -639,6 +639,14 @@ start_from_file(PCLopts) -> M -> M end, + % Options (not) chosen here: + % - As we pass the ETS table to the sft file when the L0 file is created + % then this cannot be private. + % - There is no use of iterator, so a set could be used, but then the + % output of tab2list would need to be sorted + % TODO: + % - Test switching to [set, private] and sending the L0 snapshots to the + % sft_new cast TID = ets:new(?MEMTABLE, [ordered_set]), {ok, Clerk} = leveled_pclerk:clerk_new(self()), InitState = #state{memtable=TID, @@ -1371,14 +1379,12 @@ confirm_delete(Filename, UnreferencedFiles, RegisteredSnapshots) -> assess_sqn([]) -> empty; -assess_sqn(DumpList) -> - assess_sqn(DumpList, infinity, 0). - -assess_sqn([], MinSQN, MaxSQN) -> - {MinSQN, MaxSQN}; -assess_sqn([HeadKey|Tail], MinSQN, MaxSQN) -> - {_K, SQN} = leveled_codec:strip_to_keyseqonly(HeadKey), - assess_sqn(Tail, min(MinSQN, SQN), max(MaxSQN, SQN)). +assess_sqn([HeadKV|[]]) -> + {leveled_codec:strip_to_seqonly(HeadKV), + leveled_codec:strip_to_seqonly(HeadKV)}; +assess_sqn([HeadKV|DumpList]) -> + {leveled_codec:strip_to_seqonly(HeadKV), + leveled_codec:strip_to_seqonly(lists:last(DumpList))}. %%%============================================================================ @@ -1406,6 +1412,13 @@ clean_subdir(DirPath) -> ok end. +assess_sqn_test() -> + L1 = [{{}, {5, active, {}}}, {{}, {6, active, {}}}], + ?assertMatch({5, 6}, assess_sqn(L1)), + L2 = [{{}, {5, active, {}}}], + ?assertMatch({5, 5}, assess_sqn(L2)), + ?assertMatch(empty, assess_sqn([])). + compaction_work_assessment_test() -> L0 = [{{o, "B1", "K1", null}, {o, "B3", "K3", null}, dummy_pid}], L1 = [{{o, "B1", "K1", null}, {o, "B2", "K2", null}, dummy_pid}, @@ -1462,13 +1475,13 @@ simple_server_test() -> {ok, PCL} = pcl_start(#penciller_options{root_path=RootPath, max_inmemory_tablesize=1000}), Key1 = {{o,"Bucket0001", "Key0001", null}, {1, {active, infinity}, null}}, - KL1 = lists:sort(leveled_sft:generate_randomkeys({1000, 2})), + KL1 = leveled_sft:generate_randomkeys({1000, 2}), Key2 = {{o,"Bucket0002", "Key0002", null}, {1002, {active, infinity}, null}}, - KL2 = lists:sort(leveled_sft:generate_randomkeys({1000, 1002})), + KL2 = leveled_sft:generate_randomkeys({1000, 1002}), Key3 = {{o,"Bucket0003", "Key0003", null}, {2002, {active, infinity}, null}}, - KL3 = lists:sort(leveled_sft:generate_randomkeys({1000, 2002})), + KL3 = leveled_sft:generate_randomkeys({1000, 2002}), Key4 = {{o,"Bucket0004", "Key0004", null}, {3002, {active, infinity}, null}}, - KL4 = lists:sort(leveled_sft:generate_randomkeys({1000, 3002})), + KL4 = leveled_sft:generate_randomkeys({1000, 3002}), ok = pcl_pushmem(PCL, [Key1]), ?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001", null})), ok = pcl_pushmem(PCL, KL1), @@ -1546,7 +1559,7 @@ simple_server_test() -> % sees the old version in the previous snapshot, but will see the new version % in a new snapshot Key1A = {{o,"Bucket0001", "Key0001", null}, {4002, {active, infinity}, null}}, - KL1A = lists:sort(leveled_sft:generate_randomkeys({4002, 2})), + KL1A = leveled_sft:generate_randomkeys({4002, 2}), maybe_pause_push(pcl_pushmem(PCLr, [Key1A])), maybe_pause_push(pcl_pushmem(PCLr, KL1A)), ?assertMatch(true, pcl_checksequencenumber(PclSnap, diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 000dd45..1e846e2 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -1378,7 +1378,7 @@ generate_randomkeys(Count) -> generate_randomkeys(Count, 0, []). generate_randomkeys(0, _SQN, Acc) -> - Acc; + lists:reverse(Acc); generate_randomkeys(Count, SQN, Acc) -> RandKey = {{o, lists:concat(["Bucket", random:uniform(1024)]), @@ -1651,7 +1651,7 @@ initial_create_file_test() -> big_create_file_test() -> Filename = "../test/bigtest1.sft", {KL1, KL2} = {lists:sort(generate_randomkeys(2000)), - lists:sort(generate_randomkeys(50000))}, + lists:sort(generate_randomkeys(40000))}, {InitHandle, InitFileMD} = create_file(Filename), {Handle, FileMD, {_KL1Rem, _KL2Rem}} = complete_file(InitHandle, InitFileMD, diff --git a/test/end_to_end/iterator_SUITE.erl b/test/end_to_end/iterator_SUITE.erl index 0e47b59..3b8701f 100644 --- a/test/end_to_end/iterator_SUITE.erl +++ b/test/end_to_end/iterator_SUITE.erl @@ -3,6 +3,8 @@ -include_lib("common_test/include/ct.hrl"). -include("include/leveled.hrl"). +-define(KEY_ONLY, {false, undefined}). + -export([all/0]). -export([simple_load_with2i/1, simple_querycount/1]). @@ -73,7 +75,7 @@ simple_querycount(_Config) -> T = count_termsonindex("Bucket", IdxF, Book1, - {false, undefined}), + ?KEY_ONLY), io:format("~w terms found on index ~s~n", [T, IdxF]), Acc + T @@ -87,13 +89,13 @@ simple_querycount(_Config) -> Index1Count = count_termsonindex("Bucket", "idx1_bin", Book1, - {false, undefined}), + ?KEY_ONLY), ok = leveled_bookie:book_close(Book1), {ok, Book2} = leveled_bookie:book_start(StartOpts1), Index1Count = count_termsonindex("Bucket", "idx1_bin", Book2, - {false, undefined}), + ?KEY_ONLY), NameList = testutil:name_list(), TotalNameByName = lists:foldl(fun({_X, Name}, Acc) -> {ok, Regex} = re:compile("[0-9]+" ++ @@ -165,7 +167,79 @@ simple_querycount(_Config) -> {false, RxMia2K}}), Mia2000Count1 = length(Mia2KFolder3()), + + V9 = testutil:get_compressiblevalue(), + Indexes9 = testutil:get_randomindexes_generator(8), + [{_RN, Obj9, Spc9}] = testutil:generate_objects(1, uuid, [], V9, Indexes9), + ok = leveled_bookie:book_riakput(Book2, Obj9, Spc9), + R9 = lists:map(fun({add, IdxF, IdxT}) -> + R = leveled_bookie:book_returnfolder(Book2, + {index_query, + "Bucket", + {IdxF, + IdxT, + IdxT}, + ?KEY_ONLY}), + {async, Fldr} = R, + case length(Fldr()) of + X when X > 0 -> + {IdxF, IdxT, X} + end + end, + Spc9), + Spc9Del = lists:map(fun({add, IdxF, IdxT}) -> {remove, IdxF, IdxT} end, + Spc9), + ok = leveled_bookie:book_riakput(Book2, Obj9, Spc9Del), + lists:foreach(fun({IdxF, IdxT, X}) -> + R = leveled_bookie:book_returnfolder(Book2, + {index_query, + "Bucket", + {IdxF, + IdxT, + IdxT}, + ?KEY_ONLY}), + {async, Fldr} = R, + case length(Fldr()) of + Y -> + Y = X - 1 + end + end, + R9), ok = leveled_bookie:book_close(Book2), + {ok, Book3} = leveled_bookie:book_start(StartOpts1), + lists:foreach(fun({IdxF, IdxT, X}) -> + R = leveled_bookie:book_returnfolder(Book3, + {index_query, + "Bucket", + {IdxF, + IdxT, + IdxT}, + ?KEY_ONLY}), + {async, Fldr} = R, + case length(Fldr()) of + Y -> + Y = X - 1 + end + end, + R9), + ok = leveled_bookie:book_riakput(Book3, Obj9, Spc9), + {ok, Book4} = leveled_bookie:book_start(StartOpts1), + lists:foreach(fun({IdxF, IdxT, X}) -> + R = leveled_bookie:book_returnfolder(Book4, + {index_query, + "Bucket", + {IdxF, + IdxT, + IdxT}, + ?KEY_ONLY}), + {async, Fldr} = R, + case length(Fldr()) of + X -> + ok + end + end, + R9), + ok = leveled_bookie:book_close(Book4), testutil:reset_filestructure().