Merge remote-tracking branch 'refs/remotes/origin/master' into mas-specs-i61b
This commit is contained in:
commit
f23072fb54
5 changed files with 112 additions and 32 deletions
|
@ -729,6 +729,7 @@ binary_bucketlist(State, Tag, {FoldBucketsFun, InitAcc}) ->
|
||||||
no_lookup),
|
no_lookup),
|
||||||
Folder = fun() ->
|
Folder = fun() ->
|
||||||
BucketAcc = get_nextbucket(null,
|
BucketAcc = get_nextbucket(null,
|
||||||
|
null,
|
||||||
Tag,
|
Tag,
|
||||||
LedgerSnapshot,
|
LedgerSnapshot,
|
||||||
[]),
|
[]),
|
||||||
|
@ -739,26 +740,40 @@ binary_bucketlist(State, Tag, {FoldBucketsFun, InitAcc}) ->
|
||||||
end,
|
end,
|
||||||
{async, Folder}.
|
{async, Folder}.
|
||||||
|
|
||||||
get_nextbucket(NextBucket, Tag, LedgerSnapshot, BKList) ->
|
get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList) ->
|
||||||
StartKey = leveled_codec:to_ledgerkey(NextBucket, null, Tag),
|
Now = leveled_codec:integer_now(),
|
||||||
|
StartKey = leveled_codec:to_ledgerkey(NextBucket, NextKey, Tag),
|
||||||
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
||||||
ExtractFun = fun(LK, _V, _Acc) -> leveled_codec:from_ledgerkey(LK) end,
|
ExtractFun =
|
||||||
BK = leveled_penciller:pcl_fetchnextkey(LedgerSnapshot,
|
fun(LK, V, _Acc) ->
|
||||||
StartKey,
|
{leveled_codec:from_ledgerkey(LK), V}
|
||||||
EndKey,
|
end,
|
||||||
ExtractFun,
|
R = leveled_penciller:pcl_fetchnextkey(LedgerSnapshot,
|
||||||
null),
|
StartKey,
|
||||||
case BK of
|
EndKey,
|
||||||
|
ExtractFun,
|
||||||
|
null),
|
||||||
|
case R of
|
||||||
null ->
|
null ->
|
||||||
leveled_log:log("B0008",[]),
|
leveled_log:log("B0008",[]),
|
||||||
BKList;
|
BKList;
|
||||||
{B, K} when is_binary(B) ->
|
{{B, K}, V} when is_binary(B), is_binary(K) ->
|
||||||
leveled_log:log("B0009",[B]),
|
case leveled_codec:is_active({B, K}, V, Now) of
|
||||||
get_nextbucket(<<B/binary, 0>>,
|
true ->
|
||||||
Tag,
|
leveled_log:log("B0009",[B]),
|
||||||
LedgerSnapshot,
|
get_nextbucket(<<B/binary, 0>>,
|
||||||
[{B, K}|BKList]);
|
null,
|
||||||
NB ->
|
Tag,
|
||||||
|
LedgerSnapshot,
|
||||||
|
[{B, K}|BKList]);
|
||||||
|
false ->
|
||||||
|
get_nextbucket(B,
|
||||||
|
<<K/binary, 0>>,
|
||||||
|
Tag,
|
||||||
|
LedgerSnapshot,
|
||||||
|
BKList)
|
||||||
|
end;
|
||||||
|
{NB, _V} ->
|
||||||
leveled_log:log("B0010",[NB]),
|
leveled_log:log("B0010",[NB]),
|
||||||
[]
|
[]
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -80,7 +80,7 @@
|
||||||
handle_info/2,
|
handle_info/2,
|
||||||
terminate/2,
|
terminate/2,
|
||||||
clerk_new/1,
|
clerk_new/1,
|
||||||
clerk_compact/6,
|
clerk_compact/7,
|
||||||
clerk_hashtablecalc/3,
|
clerk_hashtablecalc/3,
|
||||||
clerk_stop/1,
|
clerk_stop/1,
|
||||||
code_change/3]).
|
code_change/3]).
|
||||||
|
@ -124,14 +124,15 @@
|
||||||
clerk_new(InkerClerkOpts) ->
|
clerk_new(InkerClerkOpts) ->
|
||||||
gen_server:start(?MODULE, [InkerClerkOpts], []).
|
gen_server:start(?MODULE, [InkerClerkOpts], []).
|
||||||
|
|
||||||
clerk_compact(Pid, Checker, InitiateFun, FilterFun, Inker, Timeout) ->
|
clerk_compact(Pid, Checker, InitiateFun, CloseFun, FilterFun, Inker, TimeO) ->
|
||||||
gen_server:cast(Pid,
|
gen_server:cast(Pid,
|
||||||
{compact,
|
{compact,
|
||||||
Checker,
|
Checker,
|
||||||
InitiateFun,
|
InitiateFun,
|
||||||
|
CloseFun,
|
||||||
FilterFun,
|
FilterFun,
|
||||||
Inker,
|
Inker,
|
||||||
Timeout}).
|
TimeO}).
|
||||||
|
|
||||||
clerk_hashtablecalc(HashTree, StartPos, CDBpid) ->
|
clerk_hashtablecalc(HashTree, StartPos, CDBpid) ->
|
||||||
{ok, Clerk} = gen_server:start(?MODULE, [#iclerk_options{}], []),
|
{ok, Clerk} = gen_server:start(?MODULE, [#iclerk_options{}], []),
|
||||||
|
@ -171,7 +172,7 @@ init([IClerkOpts]) ->
|
||||||
handle_call(_Msg, _From, State) ->
|
handle_call(_Msg, _From, State) ->
|
||||||
{reply, not_supported, State}.
|
{reply, not_supported, State}.
|
||||||
|
|
||||||
handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout},
|
handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO},
|
||||||
State) ->
|
State) ->
|
||||||
% Empty the waste folder
|
% Empty the waste folder
|
||||||
clear_waste(State),
|
clear_waste(State),
|
||||||
|
@ -207,11 +208,13 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout},
|
||||||
update_inker(Inker,
|
update_inker(Inker,
|
||||||
ManifestSlice,
|
ManifestSlice,
|
||||||
FilesToDelete),
|
FilesToDelete),
|
||||||
|
ok = CloseFun(FilterServer),
|
||||||
{noreply, State}
|
{noreply, State}
|
||||||
end;
|
end;
|
||||||
Score ->
|
Score ->
|
||||||
leveled_log:log("IC003", [Score]),
|
leveled_log:log("IC003", [Score]),
|
||||||
ok = leveled_inker:ink_compactioncomplete(Inker),
|
ok = leveled_inker:ink_compactioncomplete(Inker),
|
||||||
|
ok = CloseFun(FilterServer),
|
||||||
{noreply, State}
|
{noreply, State}
|
||||||
end;
|
end;
|
||||||
handle_cast({hashtable_calc, HashTree, StartPos, CDBpid}, State) ->
|
handle_cast({hashtable_calc, HashTree, StartPos, CDBpid}, State) ->
|
||||||
|
|
|
@ -275,22 +275,25 @@ ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) ->
|
||||||
%% required to reload the Ledger on startup).
|
%% required to reload the Ledger on startup).
|
||||||
ink_compactjournal(Pid, Bookie, Timeout) ->
|
ink_compactjournal(Pid, Bookie, Timeout) ->
|
||||||
CheckerInitiateFun = fun initiate_penciller_snapshot/1,
|
CheckerInitiateFun = fun initiate_penciller_snapshot/1,
|
||||||
|
CheckerCloseFun = fun leveled_penciller:pcl_close/1,
|
||||||
CheckerFilterFun = fun leveled_penciller:pcl_checksequencenumber/3,
|
CheckerFilterFun = fun leveled_penciller:pcl_checksequencenumber/3,
|
||||||
gen_server:call(Pid,
|
gen_server:call(Pid,
|
||||||
{compact,
|
{compact,
|
||||||
Bookie,
|
Bookie,
|
||||||
CheckerInitiateFun,
|
CheckerInitiateFun,
|
||||||
|
CheckerCloseFun,
|
||||||
CheckerFilterFun,
|
CheckerFilterFun,
|
||||||
Timeout},
|
Timeout},
|
||||||
infinity).
|
infinity).
|
||||||
|
|
||||||
%% Allows the Checker to be overriden in test, use something other than a
|
%% Allows the Checker to be overriden in test, use something other than a
|
||||||
%% penciller
|
%% penciller
|
||||||
ink_compactjournal(Pid, Checker, InitiateFun, FilterFun, Timeout) ->
|
ink_compactjournal(Pid, Checker, InitiateFun, CloseFun, FilterFun, Timeout) ->
|
||||||
gen_server:call(Pid,
|
gen_server:call(Pid,
|
||||||
{compact,
|
{compact,
|
||||||
Checker,
|
Checker,
|
||||||
InitiateFun,
|
InitiateFun,
|
||||||
|
CloseFun,
|
||||||
FilterFun,
|
FilterFun,
|
||||||
Timeout},
|
Timeout},
|
||||||
infinity).
|
infinity).
|
||||||
|
@ -427,12 +430,14 @@ handle_call(print_manifest, _From, State) ->
|
||||||
handle_call({compact,
|
handle_call({compact,
|
||||||
Checker,
|
Checker,
|
||||||
InitiateFun,
|
InitiateFun,
|
||||||
|
CloseFun,
|
||||||
FilterFun,
|
FilterFun,
|
||||||
Timeout},
|
Timeout},
|
||||||
_From, State) ->
|
_From, State) ->
|
||||||
leveled_iclerk:clerk_compact(State#state.clerk,
|
leveled_iclerk:clerk_compact(State#state.clerk,
|
||||||
Checker,
|
Checker,
|
||||||
InitiateFun,
|
InitiateFun,
|
||||||
|
CloseFun,
|
||||||
FilterFun,
|
FilterFun,
|
||||||
self(),
|
self(),
|
||||||
Timeout),
|
Timeout),
|
||||||
|
@ -839,7 +844,6 @@ initiate_penciller_snapshot(Bookie) ->
|
||||||
MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap),
|
MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap),
|
||||||
{LedgerSnap, MaxSQN}.
|
{LedgerSnap, MaxSQN}.
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% Test
|
%%% Test
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
@ -1001,6 +1005,7 @@ compact_journal_test() ->
|
||||||
ok = ink_compactjournal(Ink1,
|
ok = ink_compactjournal(Ink1,
|
||||||
Checker,
|
Checker,
|
||||||
fun(X) -> {X, 55} end,
|
fun(X) -> {X, 55} end,
|
||||||
|
fun(_F) -> ok end,
|
||||||
fun(L, K, SQN) -> lists:member({SQN, K}, L) end,
|
fun(L, K, SQN) -> lists:member({SQN, K}, L) end,
|
||||||
5000),
|
5000),
|
||||||
timer:sleep(1000),
|
timer:sleep(1000),
|
||||||
|
@ -1010,6 +1015,7 @@ compact_journal_test() ->
|
||||||
ok = ink_compactjournal(Ink1,
|
ok = ink_compactjournal(Ink1,
|
||||||
Checker2,
|
Checker2,
|
||||||
fun(X) -> {X, 55} end,
|
fun(X) -> {X, 55} end,
|
||||||
|
fun(_F) -> ok end,
|
||||||
fun(L, K, SQN) -> lists:member({SQN, K}, L) end,
|
fun(L, K, SQN) -> lists:member({SQN, K}, L) end,
|
||||||
5000),
|
5000),
|
||||||
timer:sleep(1000),
|
timer:sleep(1000),
|
||||||
|
@ -1041,6 +1047,7 @@ empty_manifest_test() ->
|
||||||
ok = ink_compactjournal(Ink1,
|
ok = ink_compactjournal(Ink1,
|
||||||
[],
|
[],
|
||||||
fun(X) -> {X, 55} end,
|
fun(X) -> {X, 55} end,
|
||||||
|
fun(_F) -> ok end,
|
||||||
CheckFun,
|
CheckFun,
|
||||||
5000),
|
5000),
|
||||||
timer:sleep(1000),
|
timer:sleep(1000),
|
||||||
|
|
|
@ -8,7 +8,8 @@
|
||||||
fetchput_snapshot/1,
|
fetchput_snapshot/1,
|
||||||
load_and_count/1,
|
load_and_count/1,
|
||||||
load_and_count_withdelete/1,
|
load_and_count_withdelete/1,
|
||||||
space_clear_ondelete/1
|
space_clear_ondelete/1,
|
||||||
|
is_empty_test/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
all() -> [
|
all() -> [
|
||||||
|
@ -18,7 +19,8 @@ all() -> [
|
||||||
fetchput_snapshot,
|
fetchput_snapshot,
|
||||||
load_and_count,
|
load_and_count,
|
||||||
load_and_count_withdelete,
|
load_and_count_withdelete,
|
||||||
space_clear_ondelete
|
space_clear_ondelete,
|
||||||
|
is_empty_test
|
||||||
].
|
].
|
||||||
|
|
||||||
|
|
||||||
|
@ -591,4 +593,58 @@ space_clear_ondelete(_Config) ->
|
||||||
true = length(FNsD_L) < length(FNsA_L),
|
true = length(FNsD_L) < length(FNsA_L),
|
||||||
true = length(FNsD_L) < length(FNsB_L),
|
true = length(FNsD_L) < length(FNsB_L),
|
||||||
true = length(FNsD_L) < length(FNsC_L),
|
true = length(FNsD_L) < length(FNsC_L),
|
||||||
true = length(FNsD_L) == 0.
|
true = length(FNsD_L) == 0.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
is_empty_test(_Config) ->
|
||||||
|
RootPath = testutil:reset_filestructure(),
|
||||||
|
StartOpts1 = [{root_path, RootPath},
|
||||||
|
{sync_strategy, testutil:sync_strategy()}],
|
||||||
|
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
|
||||||
|
|
||||||
|
{B1, K1, V1, Spec, MD} = {term_to_binary("Bucket1"),
|
||||||
|
term_to_binary("Key1"),
|
||||||
|
"Value1",
|
||||||
|
[],
|
||||||
|
[{"MDK1", "MDV1"}]},
|
||||||
|
{TestObject1, TestSpec1} =
|
||||||
|
testutil:generate_testobject(B1, K1, V1, Spec, MD),
|
||||||
|
{B1, K2, V2, Spec, MD} = {term_to_binary("Bucket1"),
|
||||||
|
term_to_binary("Key2"),
|
||||||
|
"Value2",
|
||||||
|
[],
|
||||||
|
[{"MDK1", "MDV1"}]},
|
||||||
|
{TestObject2, TestSpec2} =
|
||||||
|
testutil:generate_testobject(B1, K2, V2, Spec, MD),
|
||||||
|
{B2, K3, V3, Spec, MD} = {term_to_binary("Bucket2"),
|
||||||
|
term_to_binary("Key3"),
|
||||||
|
"Value3",
|
||||||
|
[],
|
||||||
|
[{"MDK1", "MDV1"}]},
|
||||||
|
{TestObject3, TestSpec3} =
|
||||||
|
testutil:generate_testobject(B2, K3, V3, Spec, MD),
|
||||||
|
ok = testutil:book_riakput(Bookie1, TestObject1, TestSpec1),
|
||||||
|
ok = testutil:book_riakput(Bookie1, TestObject2, TestSpec2),
|
||||||
|
ok = testutil:book_riakput(Bookie1, TestObject3, TestSpec3),
|
||||||
|
|
||||||
|
FoldBucketsFun = fun(B, Acc) -> sets:add_element(B, Acc) end,
|
||||||
|
BucketListQuery = {binary_bucketlist,
|
||||||
|
?RIAK_TAG,
|
||||||
|
{FoldBucketsFun, sets:new()}},
|
||||||
|
{async, BL} = leveled_bookie:book_returnfolder(Bookie1, BucketListQuery),
|
||||||
|
true = sets:size(BL()) == 2,
|
||||||
|
|
||||||
|
ok = leveled_bookie:book_put(Bookie1, B2, K3, delete, [], ?RIAK_TAG),
|
||||||
|
{async, BLpd1} = leveled_bookie:book_returnfolder(Bookie1, BucketListQuery),
|
||||||
|
true = sets:size(BLpd1()) == 1,
|
||||||
|
|
||||||
|
ok = leveled_bookie:book_put(Bookie1, B1, K2, delete, [], ?RIAK_TAG),
|
||||||
|
{async, BLpd2} = leveled_bookie:book_returnfolder(Bookie1, BucketListQuery),
|
||||||
|
true = sets:size(BLpd2()) == 1,
|
||||||
|
|
||||||
|
ok = leveled_bookie:book_put(Bookie1, B1, K1, delete, [], ?RIAK_TAG),
|
||||||
|
{async, BLpd3} = leveled_bookie:book_returnfolder(Bookie1, BucketListQuery),
|
||||||
|
true = sets:size(BLpd3()) == 0,
|
||||||
|
|
||||||
|
ok = leveled_bookie:book_close(Bookie1).
|
||||||
|
|
|
@ -97,7 +97,7 @@ small_load_with2i(_Config) ->
|
||||||
IdxQ1 = {index_query,
|
IdxQ1 = {index_query,
|
||||||
"Bucket",
|
"Bucket",
|
||||||
{fun testutil:foldkeysfun/3, []},
|
{fun testutil:foldkeysfun/3, []},
|
||||||
{"idx1_bin", "#", "~"},
|
{"idx1_bin", "#", "|"},
|
||||||
{true, undefined}},
|
{true, undefined}},
|
||||||
{async, IdxFolder} = leveled_bookie:book_returnfolder(Bookie1, IdxQ1),
|
{async, IdxFolder} = leveled_bookie:book_returnfolder(Bookie1, IdxQ1),
|
||||||
KeyList1 = lists:usort(IdxFolder()),
|
KeyList1 = lists:usort(IdxFolder()),
|
||||||
|
@ -106,7 +106,7 @@ small_load_with2i(_Config) ->
|
||||||
IdxQ2 = {index_query,
|
IdxQ2 = {index_query,
|
||||||
{"Bucket", LastKey},
|
{"Bucket", LastKey},
|
||||||
{fun testutil:foldkeysfun/3, []},
|
{fun testutil:foldkeysfun/3, []},
|
||||||
{"idx1_bin", LastTerm, "~"},
|
{"idx1_bin", LastTerm, "|"},
|
||||||
{false, undefined}},
|
{false, undefined}},
|
||||||
{async, IdxFolderLK} = leveled_bookie:book_returnfolder(Bookie1, IdxQ2),
|
{async, IdxFolderLK} = leveled_bookie:book_returnfolder(Bookie1, IdxQ2),
|
||||||
KeyList2 = lists:usort(IdxFolderLK()),
|
KeyList2 = lists:usort(IdxFolderLK()),
|
||||||
|
@ -141,7 +141,7 @@ small_load_with2i(_Config) ->
|
||||||
?RIAK_TAG,
|
?RIAK_TAG,
|
||||||
"Bucket",
|
"Bucket",
|
||||||
{"idx1_bin",
|
{"idx1_bin",
|
||||||
"#", "~"},
|
"#", "|"},
|
||||||
FoldObjectsFun}),
|
FoldObjectsFun}),
|
||||||
KeyHashList3 = HTreeF3(),
|
KeyHashList3 = HTreeF3(),
|
||||||
true = 9901 == length(KeyHashList1), % also includes the test object
|
true = 9901 == length(KeyHashList1), % also includes the test object
|
||||||
|
@ -186,7 +186,7 @@ query_count(_Config) ->
|
||||||
testutil:sync_strategy()),
|
testutil:sync_strategy()),
|
||||||
BucketBin = list_to_binary("Bucket"),
|
BucketBin = list_to_binary("Bucket"),
|
||||||
{TestObject, TestSpec} = testutil:generate_testobject(BucketBin,
|
{TestObject, TestSpec} = testutil:generate_testobject(BucketBin,
|
||||||
"Key1",
|
term_to_binary("Key1"),
|
||||||
"Value1",
|
"Value1",
|
||||||
[],
|
[],
|
||||||
[{"MDK1", "MDV1"}]),
|
[{"MDK1", "MDV1"}]),
|
||||||
|
@ -269,7 +269,7 @@ query_count(_Config) ->
|
||||||
Query1 = {index_query,
|
Query1 = {index_query,
|
||||||
BucketBin,
|
BucketBin,
|
||||||
{fun testutil:foldkeysfun/3, []},
|
{fun testutil:foldkeysfun/3, []},
|
||||||
{"idx2_bin", "2000", "2000~"},
|
{"idx2_bin", "2000", "2000|"},
|
||||||
{false, RegMia}},
|
{false, RegMia}},
|
||||||
{async,
|
{async,
|
||||||
Mia2KFolder1} = leveled_bookie:book_returnfolder(Book2, Query1),
|
Mia2KFolder1} = leveled_bookie:book_returnfolder(Book2, Query1),
|
||||||
|
@ -396,8 +396,7 @@ query_count(_Config) ->
|
||||||
io:format("Bucket set returned in ~w microseconds",
|
io:format("Bucket set returned in ~w microseconds",
|
||||||
[timer:now_diff(os:timestamp(), SW_QA)]),
|
[timer:now_diff(os:timestamp(), SW_QA)]),
|
||||||
|
|
||||||
true = sets:size(BucketSet1) == 1,
|
true = sets:size(BucketSet1) == 1,
|
||||||
true = sets:is_element(list_to_binary("Bucket"), BucketSet1),
|
|
||||||
|
|
||||||
ObjList10A = testutil:generate_objects(5000,
|
ObjList10A = testutil:generate_objects(5000,
|
||||||
binary_uuid,
|
binary_uuid,
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue