Complete 2i work - some refactoring

The 2i work now has tests for removals as well as regex etc.

Some initial refactoring work has also been tried - to try and take some
tasks of the critical path of push_mem.  The primary change has been to
avoid putting index keys into the gb_tree, and building the KeyChanges
list in parallel to the gb_tree (now known as ObjectTree) within the
Ledger Cache.

Some initial experiments done as to changing the ETS table in the
Penciller now that it will now be used for iterating - but that has been
reverted for now.
This commit is contained in:
martinsumner 2016-10-18 19:41:33 +01:00
parent 905b712764
commit 8f29a6c40f
6 changed files with 144 additions and 45 deletions

View file

@ -149,7 +149,7 @@
-include_lib("eunit/include/eunit.hrl").
-define(CACHE_SIZE, 1000).
-define(CACHE_SIZE, 2000).
-define(JOURNAL_FP, "journal").
-define(LEDGER_FP, "ledger").
-define(SHUTDOWN_WAITS, 60).
@ -160,7 +160,7 @@
penciller :: pid(),
cache_size :: integer(),
back_pressure :: boolean(),
ledger_cache :: gb_trees:tree(),
ledger_cache :: {gb_trees:tree(), list()},
is_snapshot :: boolean()}).
@ -242,7 +242,7 @@ init([Opts]) ->
{ok, #state{inker=Inker,
penciller=Penciller,
cache_size=CacheSize,
ledger_cache=gb_trees:empty(),
ledger_cache={gb_trees:empty(), []},
is_snapshot=false}};
Bookie ->
{ok,
@ -397,16 +397,15 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%============================================================================
bucket_stats(Penciller, LedgerCache, Bucket, Tag) ->
bucket_stats(Penciller, {_ObjTree, ChangeList}, Bucket, Tag) ->
PCLopts = #penciller_options{start_snapshot=true,
source_penciller=Penciller},
{ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts),
Folder = fun() ->
Increment = gb_trees:to_list(LedgerCache),
io:format("Length of increment in snapshot is ~w~n",
[length(Increment)]),
[length(ChangeList)]),
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
{infinity, Increment}),
{infinity, ChangeList}),
StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
@ -419,7 +418,7 @@ bucket_stats(Penciller, LedgerCache, Bucket, Tag) ->
end,
{async, Folder}.
index_query(Penciller, LedgerCache,
index_query(Penciller, {_ObjTree, ChangeList},
Bucket,
{IdxField, StartValue, EndValue},
{ReturnTerms, TermRegex}) ->
@ -427,11 +426,10 @@ index_query(Penciller, LedgerCache,
source_penciller=Penciller},
{ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts),
Folder = fun() ->
Increment = gb_trees:to_list(LedgerCache),
io:format("Length of increment in snapshot is ~w~n",
[length(Increment)]),
[length(ChangeList)]),
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
{infinity, Increment}),
{infinity, ChangeList}),
StartKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG,
IdxField, StartValue),
EndKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG,
@ -493,8 +491,9 @@ startup(InkerOpts, PencillerOpts) ->
{Inker, Penciller}.
fetch_head(Key, Penciller, Cache) ->
case gb_trees:lookup(Key, Cache) of
fetch_head(Key, Penciller, {ObjTree, _ChangeList}) ->
case gb_trees:lookup(Key, ObjTree) of
{value, Head} ->
Head;
none ->
@ -561,8 +560,6 @@ accumulate_index(TermRe, AddFun) ->
end.
preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs) ->
{Bucket, Key, PrimaryChange} = leveled_codec:generate_ledgerkv(PK,
SQN,
@ -572,20 +569,30 @@ preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs) ->
[PrimaryChange] ++ ConvSpecs.
addto_ledgercache(Changes, Cache) ->
lists:foldl(fun({K, V}, Acc) -> gb_trees:enter(K, V, Acc) end,
Cache,
Changes).
{ObjectTree, ChangeList} = Cache,
{lists:foldl(fun({K, V}, Acc) ->
case leveled_codec:is_indexkey(K) of
false ->
gb_trees:enter(K, V, Acc);
true ->
Acc
end
end,
ObjectTree,
Changes),
ChangeList ++ Changes}.
maybepush_ledgercache(MaxCacheSize, Cache, Penciller) ->
CacheSize = gb_trees:size(Cache),
{_ObjectTree, ChangeList} = Cache,
CacheSize = length(ChangeList),
if
CacheSize > MaxCacheSize ->
case leveled_penciller:pcl_pushmem(Penciller,
gb_trees:to_list(Cache)) of
ChangeList) of
ok ->
{ok, gb_trees:empty()};
{ok, {gb_trees:empty(), []}};
pause ->
{pause, gb_trees:empty()};
{pause, {gb_trees:empty(), []}};
refused ->
{ok, Cache}
end;

View file

@ -39,6 +39,7 @@
strip_to_keyseqstatusonly/1,
striphead_to_details/1,
is_active/2,
is_indexkey/1,
endkey_passed/2,
key_dominates/2,
print_key/1,
@ -107,6 +108,11 @@ to_ledgerkey(Bucket, Key, Tag, Field, Value) when Tag == ?IDX_TAG ->
to_ledgerkey(Bucket, Key, Tag) ->
{Tag, Bucket, Key, null}.
is_indexkey({Tag, _, _, _}) when Tag == ?IDX_TAG ->
true;
is_indexkey(_Key) ->
false.
hash(Obj) ->
erlang:phash2(term_to_binary(Obj)).
@ -156,7 +162,7 @@ convert_indexspecs(IndexSpecs, Bucket, Key, SQN) ->
{active, infinity};
remove ->
%% TODO: timestamps for delayed reaping
{tomb, infinity}
tomb
end,
{to_ledgerkey(Bucket, Key, ?IDX_TAG,
IdxField, IdxValue),
@ -260,7 +266,7 @@ indexspecs_test() ->
?assertMatch({{i, "Bucket", {"t1_bin", "adbc123"}, "Key2"},
{1, {active, infinity}, null}}, lists:nth(2, Changes)),
?assertMatch({{i, "Bucket", {"t1_bin", "abdc456"}, "Key2"},
{1, {tomb, infinity}, null}}, lists:nth(3, Changes)).
{1, tomb, null}}, lists:nth(3, Changes)).
endkey_passed_test() ->
TestKey = {i, null, null, null},

View file

@ -724,10 +724,9 @@ manifest_printer(Manifest) ->
initiate_penciller_snapshot(Bookie) ->
{ok,
{LedgerSnap, LedgerCache},
{LedgerSnap, {_ObjTree, ChangeList}},
_} = leveled_bookie:book_snapshotledger(Bookie, self(), undefined),
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnap,
gb_trees:to_list(LedgerCache)),
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnap, ChangeList),
MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap),
{LedgerSnap, MaxSQN}.

View file

@ -639,6 +639,14 @@ start_from_file(PCLopts) ->
M ->
M
end,
% Options (not) chosen here:
% - As we pass the ETS table to the sft file when the L0 file is created
% then this cannot be private.
% - There is no use of iterator, so a set could be used, but then the
% output of tab2list would need to be sorted
% TODO:
% - Test switching to [set, private] and sending the L0 snapshots to the
% sft_new cast
TID = ets:new(?MEMTABLE, [ordered_set]),
{ok, Clerk} = leveled_pclerk:clerk_new(self()),
InitState = #state{memtable=TID,
@ -1371,14 +1379,12 @@ confirm_delete(Filename, UnreferencedFiles, RegisteredSnapshots) ->
assess_sqn([]) ->
empty;
assess_sqn(DumpList) ->
assess_sqn(DumpList, infinity, 0).
assess_sqn([], MinSQN, MaxSQN) ->
{MinSQN, MaxSQN};
assess_sqn([HeadKey|Tail], MinSQN, MaxSQN) ->
{_K, SQN} = leveled_codec:strip_to_keyseqonly(HeadKey),
assess_sqn(Tail, min(MinSQN, SQN), max(MaxSQN, SQN)).
assess_sqn([HeadKV|[]]) ->
{leveled_codec:strip_to_seqonly(HeadKV),
leveled_codec:strip_to_seqonly(HeadKV)};
assess_sqn([HeadKV|DumpList]) ->
{leveled_codec:strip_to_seqonly(HeadKV),
leveled_codec:strip_to_seqonly(lists:last(DumpList))}.
%%%============================================================================
@ -1406,6 +1412,13 @@ clean_subdir(DirPath) ->
ok
end.
assess_sqn_test() ->
L1 = [{{}, {5, active, {}}}, {{}, {6, active, {}}}],
?assertMatch({5, 6}, assess_sqn(L1)),
L2 = [{{}, {5, active, {}}}],
?assertMatch({5, 5}, assess_sqn(L2)),
?assertMatch(empty, assess_sqn([])).
compaction_work_assessment_test() ->
L0 = [{{o, "B1", "K1", null}, {o, "B3", "K3", null}, dummy_pid}],
L1 = [{{o, "B1", "K1", null}, {o, "B2", "K2", null}, dummy_pid},
@ -1462,13 +1475,13 @@ simple_server_test() ->
{ok, PCL} = pcl_start(#penciller_options{root_path=RootPath,
max_inmemory_tablesize=1000}),
Key1 = {{o,"Bucket0001", "Key0001", null}, {1, {active, infinity}, null}},
KL1 = lists:sort(leveled_sft:generate_randomkeys({1000, 2})),
KL1 = leveled_sft:generate_randomkeys({1000, 2}),
Key2 = {{o,"Bucket0002", "Key0002", null}, {1002, {active, infinity}, null}},
KL2 = lists:sort(leveled_sft:generate_randomkeys({1000, 1002})),
KL2 = leveled_sft:generate_randomkeys({1000, 1002}),
Key3 = {{o,"Bucket0003", "Key0003", null}, {2002, {active, infinity}, null}},
KL3 = lists:sort(leveled_sft:generate_randomkeys({1000, 2002})),
KL3 = leveled_sft:generate_randomkeys({1000, 2002}),
Key4 = {{o,"Bucket0004", "Key0004", null}, {3002, {active, infinity}, null}},
KL4 = lists:sort(leveled_sft:generate_randomkeys({1000, 3002})),
KL4 = leveled_sft:generate_randomkeys({1000, 3002}),
ok = pcl_pushmem(PCL, [Key1]),
?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001", null})),
ok = pcl_pushmem(PCL, KL1),
@ -1546,7 +1559,7 @@ simple_server_test() ->
% sees the old version in the previous snapshot, but will see the new version
% in a new snapshot
Key1A = {{o,"Bucket0001", "Key0001", null}, {4002, {active, infinity}, null}},
KL1A = lists:sort(leveled_sft:generate_randomkeys({4002, 2})),
KL1A = leveled_sft:generate_randomkeys({4002, 2}),
maybe_pause_push(pcl_pushmem(PCLr, [Key1A])),
maybe_pause_push(pcl_pushmem(PCLr, KL1A)),
?assertMatch(true, pcl_checksequencenumber(PclSnap,

View file

@ -1378,7 +1378,7 @@ generate_randomkeys(Count) ->
generate_randomkeys(Count, 0, []).
generate_randomkeys(0, _SQN, Acc) ->
Acc;
lists:reverse(Acc);
generate_randomkeys(Count, SQN, Acc) ->
RandKey = {{o,
lists:concat(["Bucket", random:uniform(1024)]),
@ -1651,7 +1651,7 @@ initial_create_file_test() ->
big_create_file_test() ->
Filename = "../test/bigtest1.sft",
{KL1, KL2} = {lists:sort(generate_randomkeys(2000)),
lists:sort(generate_randomkeys(50000))},
lists:sort(generate_randomkeys(40000))},
{InitHandle, InitFileMD} = create_file(Filename),
{Handle, FileMD, {_KL1Rem, _KL2Rem}} = complete_file(InitHandle,
InitFileMD,

View file

@ -3,6 +3,8 @@
-include_lib("common_test/include/ct.hrl").
-include("include/leveled.hrl").
-define(KEY_ONLY, {false, undefined}).
-export([all/0]).
-export([simple_load_with2i/1,
simple_querycount/1]).
@ -73,7 +75,7 @@ simple_querycount(_Config) ->
T = count_termsonindex("Bucket",
IdxF,
Book1,
{false, undefined}),
?KEY_ONLY),
io:format("~w terms found on index ~s~n",
[T, IdxF]),
Acc + T
@ -87,13 +89,13 @@ simple_querycount(_Config) ->
Index1Count = count_termsonindex("Bucket",
"idx1_bin",
Book1,
{false, undefined}),
?KEY_ONLY),
ok = leveled_bookie:book_close(Book1),
{ok, Book2} = leveled_bookie:book_start(StartOpts1),
Index1Count = count_termsonindex("Bucket",
"idx1_bin",
Book2,
{false, undefined}),
?KEY_ONLY),
NameList = testutil:name_list(),
TotalNameByName = lists:foldl(fun({_X, Name}, Acc) ->
{ok, Regex} = re:compile("[0-9]+" ++
@ -165,7 +167,79 @@ simple_querycount(_Config) ->
{false,
RxMia2K}}),
Mia2000Count1 = length(Mia2KFolder3()),
V9 = testutil:get_compressiblevalue(),
Indexes9 = testutil:get_randomindexes_generator(8),
[{_RN, Obj9, Spc9}] = testutil:generate_objects(1, uuid, [], V9, Indexes9),
ok = leveled_bookie:book_riakput(Book2, Obj9, Spc9),
R9 = lists:map(fun({add, IdxF, IdxT}) ->
R = leveled_bookie:book_returnfolder(Book2,
{index_query,
"Bucket",
{IdxF,
IdxT,
IdxT},
?KEY_ONLY}),
{async, Fldr} = R,
case length(Fldr()) of
X when X > 0 ->
{IdxF, IdxT, X}
end
end,
Spc9),
Spc9Del = lists:map(fun({add, IdxF, IdxT}) -> {remove, IdxF, IdxT} end,
Spc9),
ok = leveled_bookie:book_riakput(Book2, Obj9, Spc9Del),
lists:foreach(fun({IdxF, IdxT, X}) ->
R = leveled_bookie:book_returnfolder(Book2,
{index_query,
"Bucket",
{IdxF,
IdxT,
IdxT},
?KEY_ONLY}),
{async, Fldr} = R,
case length(Fldr()) of
Y ->
Y = X - 1
end
end,
R9),
ok = leveled_bookie:book_close(Book2),
{ok, Book3} = leveled_bookie:book_start(StartOpts1),
lists:foreach(fun({IdxF, IdxT, X}) ->
R = leveled_bookie:book_returnfolder(Book3,
{index_query,
"Bucket",
{IdxF,
IdxT,
IdxT},
?KEY_ONLY}),
{async, Fldr} = R,
case length(Fldr()) of
Y ->
Y = X - 1
end
end,
R9),
ok = leveled_bookie:book_riakput(Book3, Obj9, Spc9),
{ok, Book4} = leveled_bookie:book_start(StartOpts1),
lists:foreach(fun({IdxF, IdxT, X}) ->
R = leveled_bookie:book_returnfolder(Book4,
{index_query,
"Bucket",
{IdxF,
IdxT,
IdxT},
?KEY_ONLY}),
{async, Fldr} = R,
case length(Fldr()) of
X ->
ok
end
end,
R9),
ok = leveled_bookie:book_close(Book4),
testutil:reset_filestructure().