diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index c607606..3794dab 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -149,7 +149,7 @@ -include_lib("eunit/include/eunit.hrl"). --define(CACHE_SIZE, 1000). +-define(CACHE_SIZE, 2000). -define(JOURNAL_FP, "journal"). -define(LEDGER_FP, "ledger"). -define(SHUTDOWN_WAITS, 60). @@ -160,7 +160,7 @@ penciller :: pid(), cache_size :: integer(), back_pressure :: boolean(), - ledger_cache :: gb_trees:tree(), + ledger_cache :: {gb_trees:tree(), list()}, is_snapshot :: boolean()}). @@ -242,7 +242,7 @@ init([Opts]) -> {ok, #state{inker=Inker, penciller=Penciller, cache_size=CacheSize, - ledger_cache=gb_trees:empty(), + ledger_cache={gb_trees:empty(), []}, is_snapshot=false}}; Bookie -> {ok, @@ -397,16 +397,15 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================ -bucket_stats(Penciller, LedgerCache, Bucket, Tag) -> +bucket_stats(Penciller, {_ObjTree, ChangeList}, Bucket, Tag) -> PCLopts = #penciller_options{start_snapshot=true, source_penciller=Penciller}, {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), Folder = fun() -> - Increment = gb_trees:to_list(LedgerCache), io:format("Length of increment in snapshot is ~w~n", - [length(Increment)]), + [length(ChangeList)]), ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, - {infinity, Increment}), + {infinity, ChangeList}), StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, @@ -419,7 +418,7 @@ bucket_stats(Penciller, LedgerCache, Bucket, Tag) -> end, {async, Folder}. -index_query(Penciller, LedgerCache, +index_query(Penciller, {_ObjTree, ChangeList}, Bucket, {IdxField, StartValue, EndValue}, {ReturnTerms, TermRegex}) -> @@ -427,11 +426,10 @@ index_query(Penciller, LedgerCache, source_penciller=Penciller}, {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), Folder = fun() -> - Increment = gb_trees:to_list(LedgerCache), io:format("Length of increment in snapshot is ~w~n", - [length(Increment)]), + [length(ChangeList)]), ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, - {infinity, Increment}), + {infinity, ChangeList}), StartKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, IdxField, StartValue), EndKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, @@ -493,8 +491,9 @@ startup(InkerOpts, PencillerOpts) -> {Inker, Penciller}. -fetch_head(Key, Penciller, Cache) -> - case gb_trees:lookup(Key, Cache) of +fetch_head(Key, Penciller, {ObjTree, _ChangeList}) -> + + case gb_trees:lookup(Key, ObjTree) of {value, Head} -> Head; none -> @@ -561,8 +560,6 @@ accumulate_index(TermRe, AddFun) -> end. - - preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs) -> {Bucket, Key, PrimaryChange} = leveled_codec:generate_ledgerkv(PK, SQN, @@ -572,20 +569,30 @@ preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs) -> [PrimaryChange] ++ ConvSpecs. addto_ledgercache(Changes, Cache) -> - lists:foldl(fun({K, V}, Acc) -> gb_trees:enter(K, V, Acc) end, - Cache, - Changes). + {ObjectTree, ChangeList} = Cache, + {lists:foldl(fun({K, V}, Acc) -> + case leveled_codec:is_indexkey(K) of + false -> + gb_trees:enter(K, V, Acc); + true -> + Acc + end + end, + ObjectTree, + Changes), + ChangeList ++ Changes}. maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> - CacheSize = gb_trees:size(Cache), + {_ObjectTree, ChangeList} = Cache, + CacheSize = length(ChangeList), if CacheSize > MaxCacheSize -> case leveled_penciller:pcl_pushmem(Penciller, - gb_trees:to_list(Cache)) of + ChangeList) of ok -> - {ok, gb_trees:empty()}; + {ok, {gb_trees:empty(), []}}; pause -> - {pause, gb_trees:empty()}; + {pause, {gb_trees:empty(), []}}; refused -> {ok, Cache} end; diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 370133c..096e48f 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -39,6 +39,7 @@ strip_to_keyseqstatusonly/1, striphead_to_details/1, is_active/2, + is_indexkey/1, endkey_passed/2, key_dominates/2, print_key/1, @@ -107,6 +108,11 @@ to_ledgerkey(Bucket, Key, Tag, Field, Value) when Tag == ?IDX_TAG -> to_ledgerkey(Bucket, Key, Tag) -> {Tag, Bucket, Key, null}. +is_indexkey({Tag, _, _, _}) when Tag == ?IDX_TAG -> + true; +is_indexkey(_Key) -> + false. + hash(Obj) -> erlang:phash2(term_to_binary(Obj)). @@ -156,7 +162,7 @@ convert_indexspecs(IndexSpecs, Bucket, Key, SQN) -> {active, infinity}; remove -> %% TODO: timestamps for delayed reaping - {tomb, infinity} + tomb end, {to_ledgerkey(Bucket, Key, ?IDX_TAG, IdxField, IdxValue), @@ -260,7 +266,7 @@ indexspecs_test() -> ?assertMatch({{i, "Bucket", {"t1_bin", "adbc123"}, "Key2"}, {1, {active, infinity}, null}}, lists:nth(2, Changes)), ?assertMatch({{i, "Bucket", {"t1_bin", "abdc456"}, "Key2"}, - {1, {tomb, infinity}, null}}, lists:nth(3, Changes)). + {1, tomb, null}}, lists:nth(3, Changes)). endkey_passed_test() -> TestKey = {i, null, null, null}, diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index a876fa8..0ddc6d7 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -724,10 +724,9 @@ manifest_printer(Manifest) -> initiate_penciller_snapshot(Bookie) -> {ok, - {LedgerSnap, LedgerCache}, + {LedgerSnap, {_ObjTree, ChangeList}}, _} = leveled_bookie:book_snapshotledger(Bookie, self(), undefined), - ok = leveled_penciller:pcl_loadsnapshot(LedgerSnap, - gb_trees:to_list(LedgerCache)), + ok = leveled_penciller:pcl_loadsnapshot(LedgerSnap, ChangeList), MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap), {LedgerSnap, MaxSQN}. diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 34a99a4..32c70a3 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -639,6 +639,14 @@ start_from_file(PCLopts) -> M -> M end, + % Options (not) chosen here: + % - As we pass the ETS table to the sft file when the L0 file is created + % then this cannot be private. + % - There is no use of iterator, so a set could be used, but then the + % output of tab2list would need to be sorted + % TODO: + % - Test switching to [set, private] and sending the L0 snapshots to the + % sft_new cast TID = ets:new(?MEMTABLE, [ordered_set]), {ok, Clerk} = leveled_pclerk:clerk_new(self()), InitState = #state{memtable=TID, @@ -1371,14 +1379,12 @@ confirm_delete(Filename, UnreferencedFiles, RegisteredSnapshots) -> assess_sqn([]) -> empty; -assess_sqn(DumpList) -> - assess_sqn(DumpList, infinity, 0). - -assess_sqn([], MinSQN, MaxSQN) -> - {MinSQN, MaxSQN}; -assess_sqn([HeadKey|Tail], MinSQN, MaxSQN) -> - {_K, SQN} = leveled_codec:strip_to_keyseqonly(HeadKey), - assess_sqn(Tail, min(MinSQN, SQN), max(MaxSQN, SQN)). +assess_sqn([HeadKV|[]]) -> + {leveled_codec:strip_to_seqonly(HeadKV), + leveled_codec:strip_to_seqonly(HeadKV)}; +assess_sqn([HeadKV|DumpList]) -> + {leveled_codec:strip_to_seqonly(HeadKV), + leveled_codec:strip_to_seqonly(lists:last(DumpList))}. %%%============================================================================ @@ -1406,6 +1412,13 @@ clean_subdir(DirPath) -> ok end. +assess_sqn_test() -> + L1 = [{{}, {5, active, {}}}, {{}, {6, active, {}}}], + ?assertMatch({5, 6}, assess_sqn(L1)), + L2 = [{{}, {5, active, {}}}], + ?assertMatch({5, 5}, assess_sqn(L2)), + ?assertMatch(empty, assess_sqn([])). + compaction_work_assessment_test() -> L0 = [{{o, "B1", "K1", null}, {o, "B3", "K3", null}, dummy_pid}], L1 = [{{o, "B1", "K1", null}, {o, "B2", "K2", null}, dummy_pid}, @@ -1462,13 +1475,13 @@ simple_server_test() -> {ok, PCL} = pcl_start(#penciller_options{root_path=RootPath, max_inmemory_tablesize=1000}), Key1 = {{o,"Bucket0001", "Key0001", null}, {1, {active, infinity}, null}}, - KL1 = lists:sort(leveled_sft:generate_randomkeys({1000, 2})), + KL1 = leveled_sft:generate_randomkeys({1000, 2}), Key2 = {{o,"Bucket0002", "Key0002", null}, {1002, {active, infinity}, null}}, - KL2 = lists:sort(leveled_sft:generate_randomkeys({1000, 1002})), + KL2 = leveled_sft:generate_randomkeys({1000, 1002}), Key3 = {{o,"Bucket0003", "Key0003", null}, {2002, {active, infinity}, null}}, - KL3 = lists:sort(leveled_sft:generate_randomkeys({1000, 2002})), + KL3 = leveled_sft:generate_randomkeys({1000, 2002}), Key4 = {{o,"Bucket0004", "Key0004", null}, {3002, {active, infinity}, null}}, - KL4 = lists:sort(leveled_sft:generate_randomkeys({1000, 3002})), + KL4 = leveled_sft:generate_randomkeys({1000, 3002}), ok = pcl_pushmem(PCL, [Key1]), ?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001", null})), ok = pcl_pushmem(PCL, KL1), @@ -1546,7 +1559,7 @@ simple_server_test() -> % sees the old version in the previous snapshot, but will see the new version % in a new snapshot Key1A = {{o,"Bucket0001", "Key0001", null}, {4002, {active, infinity}, null}}, - KL1A = lists:sort(leveled_sft:generate_randomkeys({4002, 2})), + KL1A = leveled_sft:generate_randomkeys({4002, 2}), maybe_pause_push(pcl_pushmem(PCLr, [Key1A])), maybe_pause_push(pcl_pushmem(PCLr, KL1A)), ?assertMatch(true, pcl_checksequencenumber(PclSnap, diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 000dd45..1e846e2 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -1378,7 +1378,7 @@ generate_randomkeys(Count) -> generate_randomkeys(Count, 0, []). generate_randomkeys(0, _SQN, Acc) -> - Acc; + lists:reverse(Acc); generate_randomkeys(Count, SQN, Acc) -> RandKey = {{o, lists:concat(["Bucket", random:uniform(1024)]), @@ -1651,7 +1651,7 @@ initial_create_file_test() -> big_create_file_test() -> Filename = "../test/bigtest1.sft", {KL1, KL2} = {lists:sort(generate_randomkeys(2000)), - lists:sort(generate_randomkeys(50000))}, + lists:sort(generate_randomkeys(40000))}, {InitHandle, InitFileMD} = create_file(Filename), {Handle, FileMD, {_KL1Rem, _KL2Rem}} = complete_file(InitHandle, InitFileMD, diff --git a/test/end_to_end/iterator_SUITE.erl b/test/end_to_end/iterator_SUITE.erl index 0e47b59..3b8701f 100644 --- a/test/end_to_end/iterator_SUITE.erl +++ b/test/end_to_end/iterator_SUITE.erl @@ -3,6 +3,8 @@ -include_lib("common_test/include/ct.hrl"). -include("include/leveled.hrl"). +-define(KEY_ONLY, {false, undefined}). + -export([all/0]). -export([simple_load_with2i/1, simple_querycount/1]). @@ -73,7 +75,7 @@ simple_querycount(_Config) -> T = count_termsonindex("Bucket", IdxF, Book1, - {false, undefined}), + ?KEY_ONLY), io:format("~w terms found on index ~s~n", [T, IdxF]), Acc + T @@ -87,13 +89,13 @@ simple_querycount(_Config) -> Index1Count = count_termsonindex("Bucket", "idx1_bin", Book1, - {false, undefined}), + ?KEY_ONLY), ok = leveled_bookie:book_close(Book1), {ok, Book2} = leveled_bookie:book_start(StartOpts1), Index1Count = count_termsonindex("Bucket", "idx1_bin", Book2, - {false, undefined}), + ?KEY_ONLY), NameList = testutil:name_list(), TotalNameByName = lists:foldl(fun({_X, Name}, Acc) -> {ok, Regex} = re:compile("[0-9]+" ++ @@ -165,7 +167,79 @@ simple_querycount(_Config) -> {false, RxMia2K}}), Mia2000Count1 = length(Mia2KFolder3()), + + V9 = testutil:get_compressiblevalue(), + Indexes9 = testutil:get_randomindexes_generator(8), + [{_RN, Obj9, Spc9}] = testutil:generate_objects(1, uuid, [], V9, Indexes9), + ok = leveled_bookie:book_riakput(Book2, Obj9, Spc9), + R9 = lists:map(fun({add, IdxF, IdxT}) -> + R = leveled_bookie:book_returnfolder(Book2, + {index_query, + "Bucket", + {IdxF, + IdxT, + IdxT}, + ?KEY_ONLY}), + {async, Fldr} = R, + case length(Fldr()) of + X when X > 0 -> + {IdxF, IdxT, X} + end + end, + Spc9), + Spc9Del = lists:map(fun({add, IdxF, IdxT}) -> {remove, IdxF, IdxT} end, + Spc9), + ok = leveled_bookie:book_riakput(Book2, Obj9, Spc9Del), + lists:foreach(fun({IdxF, IdxT, X}) -> + R = leveled_bookie:book_returnfolder(Book2, + {index_query, + "Bucket", + {IdxF, + IdxT, + IdxT}, + ?KEY_ONLY}), + {async, Fldr} = R, + case length(Fldr()) of + Y -> + Y = X - 1 + end + end, + R9), ok = leveled_bookie:book_close(Book2), + {ok, Book3} = leveled_bookie:book_start(StartOpts1), + lists:foreach(fun({IdxF, IdxT, X}) -> + R = leveled_bookie:book_returnfolder(Book3, + {index_query, + "Bucket", + {IdxF, + IdxT, + IdxT}, + ?KEY_ONLY}), + {async, Fldr} = R, + case length(Fldr()) of + Y -> + Y = X - 1 + end + end, + R9), + ok = leveled_bookie:book_riakput(Book3, Obj9, Spc9), + {ok, Book4} = leveled_bookie:book_start(StartOpts1), + lists:foreach(fun({IdxF, IdxT, X}) -> + R = leveled_bookie:book_returnfolder(Book4, + {index_query, + "Bucket", + {IdxF, + IdxT, + IdxT}, + ?KEY_ONLY}), + {async, Fldr} = R, + case length(Fldr()) of + X -> + ok + end + end, + R9), + ok = leveled_bookie:book_close(Book4), testutil:reset_filestructure().