Extend journal compaction test
to cover with and without waste retention. Also makes sure that CDB files in a restarted store will respect the wast retention period set.
This commit is contained in:
parent
1d2effc773
commit
7de4dccbd9
2 changed files with 66 additions and 25 deletions
|
@ -105,6 +105,7 @@
|
||||||
|
|
||||||
-type book_state() :: #state{}.
|
-type book_state() :: #state{}.
|
||||||
-type sync_mode() :: sync|none|riak_sync.
|
-type sync_mode() :: sync|none|riak_sync.
|
||||||
|
-type ledger_cache() :: #ledger_cache{}.
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
|
@ -606,11 +607,15 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%% External functions
|
%%% External functions
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
%% @doc Empty the ledger cache table following a push
|
-spec empty_ledgercache() -> ledger_cache().
|
||||||
|
%% @doc
|
||||||
|
%% Empty the ledger cache table following a push
|
||||||
empty_ledgercache() ->
|
empty_ledgercache() ->
|
||||||
#ledger_cache{mem = ets:new(empty, [ordered_set])}.
|
#ledger_cache{mem = ets:new(empty, [ordered_set])}.
|
||||||
|
|
||||||
%% @doc push the ledgercache to the Penciller - which should respond ok or
|
-spec push_ledgercache(pid(), ledger_cache()) -> ok|returned.
|
||||||
|
%% @doc
|
||||||
|
%% Push the ledgercache to the Penciller - which should respond ok or
|
||||||
%% returned. If the response is ok the cache can be flushed, but if the
|
%% returned. If the response is ok the cache can be flushed, but if the
|
||||||
%% response is returned the cache should continue to build and it should try
|
%% response is returned the cache should continue to build and it should try
|
||||||
%% to flush at a later date
|
%% to flush at a later date
|
||||||
|
@ -621,8 +626,10 @@ push_ledgercache(Penciller, Cache) ->
|
||||||
Cache#ledger_cache.max_sqn},
|
Cache#ledger_cache.max_sqn},
|
||||||
leveled_penciller:pcl_pushmem(Penciller, CacheToLoad).
|
leveled_penciller:pcl_pushmem(Penciller, CacheToLoad).
|
||||||
|
|
||||||
%% @doc the ledger cache can be built from a queue, for example when
|
-spec loadqueue_ledgercache(ledger_cache()) -> ledger_cache().
|
||||||
%% loading the ledger from the head of the journal on startup
|
%% @doc
|
||||||
|
%% The ledger cache can be built from a queue, for example when loading the
|
||||||
|
%% ledger from the head of the journal on startup
|
||||||
%%
|
%%
|
||||||
%% The queue should be build using [NewKey|Acc] so that the most recent
|
%% The queue should be build using [NewKey|Acc] so that the most recent
|
||||||
%% key is kept in the sort
|
%% key is kept in the sort
|
||||||
|
@ -631,7 +638,12 @@ loadqueue_ledgercache(Cache) ->
|
||||||
T = leveled_tree:from_orderedlist(SL, ?CACHE_TYPE),
|
T = leveled_tree:from_orderedlist(SL, ?CACHE_TYPE),
|
||||||
Cache#ledger_cache{load_queue = [], loader = T}.
|
Cache#ledger_cache{load_queue = [], loader = T}.
|
||||||
|
|
||||||
%% @doc Allow all a snapshot to be created from part of the store, preferably
|
-spec snapshot_store(ledger_cache(),
|
||||||
|
pid(), null|pid(), store|ledger,
|
||||||
|
undefined|tuple(), undefined|boolean()) ->
|
||||||
|
{ok, pid(), pid()|null}.
|
||||||
|
%% @doc
|
||||||
|
%% Allow all a snapshot to be created from part of the store, preferably
|
||||||
%% passing in a query filter so that all of the LoopState does not need to
|
%% passing in a query filter so that all of the LoopState does not need to
|
||||||
%% be copied from the real actor to the clone
|
%% be copied from the real actor to the clone
|
||||||
%%
|
%%
|
||||||
|
@ -688,6 +700,9 @@ snapshot_store(State, SnapType, Query, LongRunning) ->
|
||||||
Query,
|
Query,
|
||||||
LongRunning).
|
LongRunning).
|
||||||
|
|
||||||
|
-spec fetch_value(pid(), {any(), integer()}) -> not_present|any().
|
||||||
|
%% @doc
|
||||||
|
%% Fetch a value from the Journal
|
||||||
fetch_value(Inker, {Key, SQN}) ->
|
fetch_value(Inker, {Key, SQN}) ->
|
||||||
SW = os:timestamp(),
|
SW = os:timestamp(),
|
||||||
case leveled_inker:ink_fetch(Inker, Key, SQN) of
|
case leveled_inker:ink_fetch(Inker, Key, SQN) of
|
||||||
|
|
|
@ -112,21 +112,27 @@ many_put_fetch_head(_Config) ->
|
||||||
ok = leveled_bookie:book_destroy(Bookie3).
|
ok = leveled_bookie:book_destroy(Bookie3).
|
||||||
|
|
||||||
journal_compaction(_Config) ->
|
journal_compaction(_Config) ->
|
||||||
|
journal_compaction_tester(false, 3600),
|
||||||
|
journal_compaction_tester(false, undefined),
|
||||||
|
journal_compaction_tester(true, 3600).
|
||||||
|
|
||||||
|
journal_compaction_tester(Restart, WRP) ->
|
||||||
RootPath = testutil:reset_filestructure(),
|
RootPath = testutil:reset_filestructure(),
|
||||||
StartOpts1 = [{root_path, RootPath},
|
StartOpts1 = [{root_path, RootPath},
|
||||||
{max_journalsize, 10000000},
|
{max_journalsize, 10000000},
|
||||||
{max_run_length, 1},
|
{max_run_length, 1},
|
||||||
{sync_strategy, testutil:sync_strategy()}],
|
{sync_strategy, testutil:sync_strategy()},
|
||||||
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
|
{waste_retention_period, WRP}],
|
||||||
ok = leveled_bookie:book_compactjournal(Bookie1, 30000),
|
{ok, Bookie0} = leveled_bookie:book_start(StartOpts1),
|
||||||
|
ok = leveled_bookie:book_compactjournal(Bookie0, 30000),
|
||||||
{TestObject, TestSpec} = testutil:generate_testobject(),
|
{TestObject, TestSpec} = testutil:generate_testobject(),
|
||||||
ok = testutil:book_riakput(Bookie1, TestObject, TestSpec),
|
ok = testutil:book_riakput(Bookie0, TestObject, TestSpec),
|
||||||
testutil:check_forobject(Bookie1, TestObject),
|
testutil:check_forobject(Bookie0, TestObject),
|
||||||
ObjList1 = testutil:generate_objects(20000, 2),
|
ObjList1 = testutil:generate_objects(20000, 2),
|
||||||
testutil:riakload(Bookie1, ObjList1),
|
testutil:riakload(Bookie0, ObjList1),
|
||||||
ChkList1 = lists:sublist(lists:sort(ObjList1), 10000),
|
ChkList1 = lists:sublist(lists:sort(ObjList1), 10000),
|
||||||
testutil:check_forlist(Bookie1, ChkList1),
|
testutil:check_forlist(Bookie0, ChkList1),
|
||||||
testutil:check_forobject(Bookie1, TestObject),
|
testutil:check_forobject(Bookie0, TestObject),
|
||||||
{B2, K2, V2, Spec2, MD} = {"Bucket2",
|
{B2, K2, V2, Spec2, MD} = {"Bucket2",
|
||||||
"Key2",
|
"Key2",
|
||||||
"Value2",
|
"Value2",
|
||||||
|
@ -134,18 +140,18 @@ journal_compaction(_Config) ->
|
||||||
[{"MDK2", "MDV2"}]},
|
[{"MDK2", "MDV2"}]},
|
||||||
{TestObject2, TestSpec2} = testutil:generate_testobject(B2, K2,
|
{TestObject2, TestSpec2} = testutil:generate_testobject(B2, K2,
|
||||||
V2, Spec2, MD),
|
V2, Spec2, MD),
|
||||||
ok = testutil:book_riakput(Bookie1, TestObject2, TestSpec2),
|
ok = testutil:book_riakput(Bookie0, TestObject2, TestSpec2),
|
||||||
ok = leveled_bookie:book_compactjournal(Bookie1, 30000),
|
ok = leveled_bookie:book_compactjournal(Bookie0, 30000),
|
||||||
testutil:check_forlist(Bookie1, ChkList1),
|
testutil:check_forlist(Bookie0, ChkList1),
|
||||||
testutil:check_forobject(Bookie1, TestObject),
|
testutil:check_forobject(Bookie0, TestObject),
|
||||||
testutil:check_forobject(Bookie1, TestObject2),
|
testutil:check_forobject(Bookie0, TestObject2),
|
||||||
testutil:check_forlist(Bookie1, ChkList1),
|
testutil:check_forlist(Bookie0, ChkList1),
|
||||||
testutil:check_forobject(Bookie1, TestObject),
|
testutil:check_forobject(Bookie0, TestObject),
|
||||||
testutil:check_forobject(Bookie1, TestObject2),
|
testutil:check_forobject(Bookie0, TestObject2),
|
||||||
%% Delete some of the objects
|
%% Delete some of the objects
|
||||||
ObjListD = testutil:generate_objects(10000, 2),
|
ObjListD = testutil:generate_objects(10000, 2),
|
||||||
lists:foreach(fun({_R, O, _S}) ->
|
lists:foreach(fun({_R, O, _S}) ->
|
||||||
testutil:book_riakdelete(Bookie1,
|
testutil:book_riakdelete(Bookie0,
|
||||||
O#r_object.bucket,
|
O#r_object.bucket,
|
||||||
O#r_object.key,
|
O#r_object.key,
|
||||||
[])
|
[])
|
||||||
|
@ -154,7 +160,17 @@ journal_compaction(_Config) ->
|
||||||
|
|
||||||
%% Now replace all the other objects
|
%% Now replace all the other objects
|
||||||
ObjList2 = testutil:generate_objects(40000, 10002),
|
ObjList2 = testutil:generate_objects(40000, 10002),
|
||||||
testutil:riakload(Bookie1, ObjList2),
|
testutil:riakload(Bookie0, ObjList2),
|
||||||
|
|
||||||
|
Bookie1 =
|
||||||
|
case Restart of
|
||||||
|
true ->
|
||||||
|
ok = leveled_bookie:book_close(Bookie0),
|
||||||
|
{ok, RestartedB} = leveled_bookie:book_start(StartOpts1),
|
||||||
|
RestartedB;
|
||||||
|
false ->
|
||||||
|
Bookie0
|
||||||
|
end,
|
||||||
|
|
||||||
ok = leveled_bookie:book_compactjournal(Bookie1, 30000),
|
ok = leveled_bookie:book_compactjournal(Bookie1, 30000),
|
||||||
|
|
||||||
|
@ -184,7 +200,12 @@ journal_compaction(_Config) ->
|
||||||
[2000,2000,2000,2000,2000,2000]),
|
[2000,2000,2000,2000,2000,2000]),
|
||||||
{ok, ClearedJournals} = file:list_dir(WasteFP),
|
{ok, ClearedJournals} = file:list_dir(WasteFP),
|
||||||
io:format("~w ClearedJournals found~n", [length(ClearedJournals)]),
|
io:format("~w ClearedJournals found~n", [length(ClearedJournals)]),
|
||||||
true = length(ClearedJournals) > 0,
|
case is_integer(WRP) of
|
||||||
|
true ->
|
||||||
|
true = length(ClearedJournals) > 0;
|
||||||
|
false ->
|
||||||
|
true = length(ClearedJournals) == 0
|
||||||
|
end,
|
||||||
|
|
||||||
ChkList3 = lists:sublist(lists:sort(ObjList2), 500),
|
ChkList3 = lists:sublist(lists:sort(ObjList2), 500),
|
||||||
testutil:check_forlist(Bookie1, ChkList3),
|
testutil:check_forlist(Bookie1, ChkList3),
|
||||||
|
@ -212,7 +233,12 @@ journal_compaction(_Config) ->
|
||||||
|
|
||||||
{ok, ClearedJournalsPC} = file:list_dir(WasteFP),
|
{ok, ClearedJournalsPC} = file:list_dir(WasteFP),
|
||||||
io:format("~w ClearedJournals found~n", [length(ClearedJournalsPC)]),
|
io:format("~w ClearedJournals found~n", [length(ClearedJournalsPC)]),
|
||||||
true = length(ClearedJournalsPC) == 0,
|
case is_integer(WRP) of
|
||||||
|
true ->
|
||||||
|
true = length(ClearedJournals) > 0;
|
||||||
|
false ->
|
||||||
|
true = length(ClearedJournals) == 0
|
||||||
|
end,
|
||||||
|
|
||||||
testutil:reset_filestructure(10000).
|
testutil:reset_filestructure(10000).
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue