Merge pull request #60 from martinsumner/mas-sweeperfold-i59
Mas sweeperfold i59
This commit is contained in:
commit
b0c91172f4
10 changed files with 411 additions and 82 deletions
|
@ -12,7 +12,7 @@ The store supports all the required Riak backend capabilities. A number of furt
|
||||||
|
|
||||||
- A bucket size query, which requires traversal only of the Ledger and counts the keys and sums the total on-disk size of the objects within the bucket.
|
- A bucket size query, which requires traversal only of the Ledger and counts the keys and sums the total on-disk size of the objects within the bucket.
|
||||||
|
|
||||||
- A new SW count to operate in parallel to DW, where SW is the number of nodes required to have flushed to disk (not just written to buffer); with SW being configurable to 0 (no vnodes will need to sync this write), 1 (the PUT coordinator will sync, but no other vnodes) or all (all vnodes will be required to sync this write before providing a DW ack). This will allow the expensive disk sync operation to be constrained to the writes for which it is most needed.
|
- A new SW count to operate in parallel to DW, where SW is the number of nodes required to have flushed to disk (not just written to buffer); with SW being configurable to 0 (no vnodes will need to sync this write), 1 (the PUT coordinator will sync, but no other vnodes) or all (all vnodes will be required to sync this write before providing a DW ack). This will allow the expensive disk sync operation to be constrained to the writes for which it is most needed.
|
||||||
|
|
||||||
- Support for a specific Riak tombstone tag where reaping of tombstones can be deferred (by many days) i.e. so that a 'sort-of-keep' deletion strategy can be followed that will eventually garbage collect without the need to hold pending full deletion state in memory.
|
- Support for a specific Riak tombstone tag where reaping of tombstones can be deferred (by many days) i.e. so that a 'sort-of-keep' deletion strategy can be followed that will eventually garbage collect without the need to hold pending full deletion state in memory.
|
||||||
|
|
||||||
|
@ -29,8 +29,6 @@ There is some work required before LevelEd could be considered production ready:
|
||||||
|
|
||||||
- Introduction of property-based testing.
|
- Introduction of property-based testing.
|
||||||
|
|
||||||
- Riak modifications to support the optimised Key/Clock scanning for hashtree rebuilds.
|
|
||||||
|
|
||||||
- Amend compaction scheduling to ensure that all vnodes do not try to concurrently compact during a single window.
|
- Amend compaction scheduling to ensure that all vnodes do not try to concurrently compact during a single window.
|
||||||
|
|
||||||
- Improved handling of corrupted files.
|
- Improved handling of corrupted files.
|
||||||
|
@ -48,13 +46,13 @@ Branch: [mas-leveleddb](https://github.com/martinsumner/riak_kv/tree/mas-leveled
|
||||||
|
|
||||||
Branched-From: [Basho/develop](https://github.com/basho/riak_kv)
|
Branched-From: [Basho/develop](https://github.com/basho/riak_kv)
|
||||||
|
|
||||||
Description:
|
Description:
|
||||||
|
|
||||||
The leveled backend has been implemented with some basic manual functional tests. The backend has the following capabilities:
|
The leveled backend has been implemented with some basic manual functional tests. The backend has the following capabilities:
|
||||||
|
|
||||||
- async_fold - can return a Folder() function in response to 2i queries;
|
- async_fold - can return a Folder() function in response to 2i queries;
|
||||||
- indexes - support for secondary indexes (currently being returned in the wrong order);
|
- indexes - support for secondary indexes (currently being returned in the wrong order);
|
||||||
- head - supports a HEAD as well as a GET request;
|
- head - supports a HEAD as well as a GET request;
|
||||||
- hash_query - can return keys/hashes when rebuilding hashtrees, so implementation of hashtree can call this rather than folding objects and hashing the objects returning from the fold.
|
- hash_query - can return keys/hashes when rebuilding hashtrees, so implementation of hashtree can call this rather than folding objects and hashing the objects returning from the fold.
|
||||||
|
|
||||||
All standard features should (in theory) be supportable (e.g. TTL, M/R, term_regex etc) but aren't subject to end-to_end testing at present.
|
All standard features should (in theory) be supportable (e.g. TTL, M/R, term_regex etc) but aren't subject to end-to_end testing at present.
|
||||||
|
@ -117,3 +115,15 @@ This branch changes the behaviour slightly at the non-coordinating vnodes. Thes
|
||||||
This should save two object fetches (where n=3) in most circumstances.
|
This should save two object fetches (where n=3) in most circumstances.
|
||||||
|
|
||||||
Note, although the branch name refers to the put fsm - the actual fsm is unchanged by this, all of the changes are within vnode_put
|
Note, although the branch name refers to the put fsm - the actual fsm is unchanged by this, all of the changes are within vnode_put
|
||||||
|
|
||||||
|
### AAE Rebuild Acceleration
|
||||||
|
|
||||||
|
Branch: [mas-leveled-scanner-i649](https://github.com/martinsumner/riak_kv/tree/mas-leveled-scanner-i649)
|
||||||
|
|
||||||
|
Branched-From: [mas-leveled-putfsm](https://github.com/martinsumner/riak_kv/tree/mas-leveled-putfsm)
|
||||||
|
|
||||||
|
Description:
|
||||||
|
|
||||||
|
The riak_kv_sweeper which is part of the post-2.2 develop branch controls folds over objects so that multiple functions can be applied to a single fold. The only aspect of the Riak system that uses this feature at present is AAE hashtree rebuilds.
|
||||||
|
|
||||||
|
This branch modifies the kv_sweeper so that if the capability exists, and unless a sweeper has explicitly stated a requirement not to allow this feature, the sweeper can defer the fetching of the objects. This means that the sweeper will fold over the "heads" of the objects returning a specially crafter Riak Object which contains a reference to the body rather than the actual body - so that the object body can be fetched if and only if access to the object contents is requested via the riak_object module.
|
||||||
|
|
|
@ -142,19 +142,81 @@ As a general rule, looking at the resource utilisation during the tests, the fol
|
||||||
- leveldb throughput would be increased by having improved disk i/o.
|
- leveldb throughput would be increased by having improved disk i/o.
|
||||||
|
|
||||||
|
|
||||||
## Riak Cluster Test - Phase 2
|
## Riak Cluster Test - Phase 2 - AAE Rebuild
|
||||||
|
|
||||||
to be completed ..
|
These tests have been completed using the following static characteristics which are designed to be a balanced comparison between leveled and leveldb:
|
||||||
|
|
||||||
Testing with changed hashtree logic in Riak so key/clock scan is effective
|
- 8KB value,
|
||||||
|
- 100 workers,
|
||||||
|
- no sync on write,
|
||||||
|
- 5 x i2.2x nodes,
|
||||||
|
- 6 hour duration.
|
||||||
|
|
||||||
## Riak Cluster Test - Phase 3
|
This is the test used in Phase 1. Note that since Phase 1 was completed a number of performance improvements have been made in leveled, so that the starting gap between Riak/leveled and Riak/leveldb has widened.
|
||||||
|
|
||||||
|
The tests have been run using the new riak_kv_sweeper facility within develop. This feature is an alternative approach to controlling and scheduling rebuilds, allowing for other work to be scheduled into the same fold. As the test is focused on hashtree rebuilds, the test was run with:
|
||||||
|
|
||||||
|
- active anti-entropy enabled,
|
||||||
|
- a 3 hour rebuild timer.
|
||||||
|
|
||||||
|
The 3-hour rebuild timer is not a recommended configuration, it is an artificial scenario to support testing in a short time window.
|
||||||
|
|
||||||
|
In the current Riak develop branch all sweeps use the Mod:fold_objects/4 function in the backend behaviour. In the testing of Riak/leveled this was changed to allow use of the new Mod:fold_heads/4 function available in the leveled backend (which can be used if the backend supports the fold_heads capability).
|
||||||
|
|
||||||
|
In clusters which have fully migrated to Riak 2.2, the hashtrees are built from a hash of the vector clock, not the object - handling the issue of consistent hashing without canonicalisation of riak objects. This means that hashtrees can be rebuilt without knowledge of the object itself. However, the purpose of rebuilding the hashtree is to ensure that the hashtree represents the data that is still present in the store, as opposed to the assumed state of the store based on the history of changes. Rebuilding hashtrees is part of the defence against accidental deletion (e.g. through user error), and data corruption within the store where no read-repair is other wise triggered. So although leveled can make hashtree rebuilds faster by only folding over the heads, this only answers part of the problem. A rebuild based on heads only proves deletion/corruption has not occurred in the Ledger, but doesn't rule out the possibility that deletion/corruption has occurred in the Journal.
|
||||||
|
|
||||||
|
The fold_heads/4 implementation in leveled partially answers the challenge of Journal deletion or corruption by checking for presence in the Journal as part of the fold. Presence checking means that the object sequence number is in the Journal manifest, and the hash of the Key & sequence number is the lookup tree for that Journal. It is expected that corruption of blocks within journal files will be handled as part of the compaction work to be tested in Phase 3.
|
||||||
|
|
||||||
|
### Leveled AAE rebuild with journal check
|
||||||
|
|
||||||
|
The comparison between leveled and leveldb shows a marked difference in throughput volatility and tail latency (especially with updates).
|
||||||
|
|
||||||
|
Riak + leveled | Riak + leveldb
|
||||||
|
:-------------------------:|:-------------------------:
|
||||||
|
 | 
|
||||||
|
|
||||||
|
The differences between the two tests are:
|
||||||
|
|
||||||
|
- <strong>47.7%</strong> more requests handled with leveled across the whole test window
|
||||||
|
- <strong>54.5%</strong> more requests handled with leveled in the last hour of the test
|
||||||
|
- <b>7</b> more hashtree rebuilds completed in the leveled test
|
||||||
|
|
||||||
|
As with other tests the leveled test had a <strong>slower mean GET time</strong> (<b>6.0ms</b> compared to <b>3.7ms</b>) reflecting the extra cycle caused by deferring the GET request until quorum has been reached through HEAD requests. The leveled test by contrast though had a substantially <strong>more predictable and lower mean UPDATE time</strong> (<b>14.7ms</b> compared to <b>52.4ms</b>).
|
||||||
|
|
||||||
|
Throughput in the leveldb test is reduced from a comparative test without active anti-entropy by a significant amount (more than <b>11%</b>), whereas the throughput reduction from enabling/disabling anti-entropy is marginal in leveled comparisons.
|
||||||
|
|
||||||
|
### Comparison without journal check
|
||||||
|
|
||||||
|
One surprising result from the AAE comparison is that although the leveled test shows less variation in throughput because of the rebuilds, the actual rebuilds take a roughly equivalent time in both the leveled and leveldb tests:
|
||||||
|
|
||||||
|
Hour | Riak + leveled rebuild times | Riak + leveldb rebuild times
|
||||||
|
:-----------:|:-------------------------:|:-------------------------:
|
||||||
|
1 | 519.51 | 518.30
|
||||||
|
2 | 793.24 | 719.97
|
||||||
|
3 | 1,130.36 | 1,111.25
|
||||||
|
4 | 1,428.16 | 1,459.03
|
||||||
|
5 | 1,677.24 | 1,668.50
|
||||||
|
6 | 1,818.63 | 1,859.14
|
||||||
|
|
||||||
|
Note that throughput was 50% higher in the Riak + leveled test, so although the times are comparable the database sizes are larger in this test.
|
||||||
|
|
||||||
|
To explore this further, the same test was also run but with the leveled backend only folding heads across the Ledger (and not doing any check for presence in the Journal). In the ledger-only checking version 110 rebuilds were completed in the test as opposed to 80 rebuilds completed without the Journal check. The time for the checks are halved if the Journal check is removed.
|
||||||
|
|
||||||
|
Although the rebuild times are reduced by removing the Journal check, the throughput change for database consumers is negligible (and well within the margin of error between test runs). So removing the Journal check makes rebuilds faster, but doesn't improve overall database performance for users.
|
||||||
|
|
||||||
|
### Comparison with "legacy" rebuild mechanism in leveldb
|
||||||
|
|
||||||
|
The sweeper mechanism is a new facility in the riak_kv develop branch, and has a different system of throttles to the direct fold mechanism previously used. Without sweeper enabled, by default only one concurrent rebuild is allowed per cluster, whereas sweeper restricts to one concurrent rebuild per node. Sweeper has an additional throttle based on the volume of data passed to control the pace of each sweep.
|
||||||
|
|
||||||
|
If the same test is run with a leveldb backend but with the pre-sweeper fold mechanism, then total throughput across the is improved by <b>8.9%</b>. However, this throughput reduction comes at the cost of a <b>90%</b> reduction in the number of rebuilds completed within the test.
|
||||||
|
|
||||||
|
## Riak Cluster Test - Phase 3 - Compaction
|
||||||
|
|
||||||
to be completed ..
|
to be completed ..
|
||||||
|
|
||||||
Testing during a journal compaction window
|
Testing during a journal compaction window
|
||||||
|
|
||||||
## Riak Cluster Test - Phase 4
|
## Riak Cluster Test - Phase 4 - 2i
|
||||||
|
|
||||||
Testing with secondary indexes provides some new challenges:
|
Testing with secondary indexes provides some new challenges:
|
||||||
|
|
||||||
|
@ -199,7 +261,7 @@ As before, increasing the available CPU will increase the potential throughput o
|
||||||
In the 2i query test there is the same significant reduction in tail latency when comparing leveled with leveldb. For the 2i queries there is also a significant reduction in mean latency - a <b> 57.96%</b> reduction over the course of the test. However this reduction is not a sign that leveled can resolve 2i queries faster than leveldb - it is related to the problem of tail latency. The minimum response time for a 2i query in leveldb drifts from 4ms to 14ms as the test progresses - whereas the minimum response time for Riak and leveled fluctuates from 5ms through to 20ms at the end of the test. Outside of resource pressure leveldb will give a faster response - but resource pressure is much more likely with leveldb.
|
In the 2i query test there is the same significant reduction in tail latency when comparing leveled with leveldb. For the 2i queries there is also a significant reduction in mean latency - a <b> 57.96%</b> reduction over the course of the test. However this reduction is not a sign that leveled can resolve 2i queries faster than leveldb - it is related to the problem of tail latency. The minimum response time for a 2i query in leveldb drifts from 4ms to 14ms as the test progresses - whereas the minimum response time for Riak and leveled fluctuates from 5ms through to 20ms at the end of the test. Outside of resource pressure leveldb will give a faster response - but resource pressure is much more likely with leveldb.
|
||||||
|
|
||||||
|
|
||||||
## Riak Cluster Test - Phase 5
|
## Riak Cluster Test - Phase 5 - Bitcask compare
|
||||||
|
|
||||||
For larger objects, it is interesting to compare performance with between Bitcask and leveled. At the start of a pure push test it is not possible for leveled to operate at the same rate as bitcask - as bitcask is finding/updating pointer s through a NIF'd memory operation in C (whereas leveled needs to has two levels of indirect pointers which are being lazily persisted to disk).
|
For larger objects, it is interesting to compare performance with between Bitcask and leveled. At the start of a pure push test it is not possible for leveled to operate at the same rate as bitcask - as bitcask is finding/updating pointer s through a NIF'd memory operation in C (whereas leveled needs to has two levels of indirect pointers which are being lazily persisted to disk).
|
||||||
|
|
||||||
|
|
|
@ -69,7 +69,8 @@
|
||||||
empty_ledgercache/0,
|
empty_ledgercache/0,
|
||||||
loadqueue_ledgercache/1,
|
loadqueue_ledgercache/1,
|
||||||
push_ledgercache/2,
|
push_ledgercache/2,
|
||||||
snapshot_store/5]).
|
snapshot_store/5,
|
||||||
|
fetch_value/2]).
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
@ -447,7 +448,7 @@ handle_call({get, Bucket, Key, Tag}, _From, State) ->
|
||||||
{reply, not_found, State};
|
{reply, not_found, State};
|
||||||
{active, TS} ->
|
{active, TS} ->
|
||||||
Active = TS >= leveled_codec:integer_now(),
|
Active = TS >= leveled_codec:integer_now(),
|
||||||
Object = fetch_value(LedgerKey, Seqn, State#state.inker),
|
Object = fetch_value(State#state.inker, {LedgerKey, Seqn}),
|
||||||
GT1 = leveled_log:get_timing(GT0, SWg, fetch),
|
GT1 = leveled_log:get_timing(GT0, SWg, fetch),
|
||||||
case {Active, Object} of
|
case {Active, Object} of
|
||||||
{_, not_present} ->
|
{_, not_present} ->
|
||||||
|
@ -483,6 +484,9 @@ handle_call({head, Bucket, Key, Tag}, _From, State) ->
|
||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
handle_call({snapshot, _Requestor, SnapType, _Timeout}, _From, State) ->
|
handle_call({snapshot, _Requestor, SnapType, _Timeout}, _From, State) ->
|
||||||
|
% TODO: clean-up passing of Requestor (which was previously just used in
|
||||||
|
% logs) and so can now be ignored, and timeout which is ignored - but
|
||||||
|
% probably shouldn't be.
|
||||||
Reply = snapshot_store(State, SnapType),
|
Reply = snapshot_store(State, SnapType),
|
||||||
{reply, Reply, State};
|
{reply, Reply, State};
|
||||||
handle_call({return_folder, FolderType}, _From, State) ->
|
handle_call({return_folder, FolderType}, _From, State) ->
|
||||||
|
@ -531,6 +535,14 @@ handle_call({return_folder, FolderType}, _From, State) ->
|
||||||
{reply,
|
{reply,
|
||||||
hashtree_query(State, Tag, JournalCheck),
|
hashtree_query(State, Tag, JournalCheck),
|
||||||
State};
|
State};
|
||||||
|
{foldheads_allkeys, Tag, FoldHeadsFun} ->
|
||||||
|
{reply,
|
||||||
|
foldheads_allkeys(State, Tag, FoldHeadsFun),
|
||||||
|
State};
|
||||||
|
{foldheads_bybucket, Tag, Bucket, FoldHeadsFun} ->
|
||||||
|
{reply,
|
||||||
|
foldheads_bybucket(State, Tag, Bucket, FoldHeadsFun),
|
||||||
|
State};
|
||||||
{foldobjects_allkeys, Tag, FoldObjectsFun} ->
|
{foldobjects_allkeys, Tag, FoldObjectsFun} ->
|
||||||
{reply,
|
{reply,
|
||||||
foldobjects_allkeys(State, Tag, FoldObjectsFun),
|
foldobjects_allkeys(State, Tag, FoldObjectsFun),
|
||||||
|
@ -668,6 +680,16 @@ snapshot_store(State, SnapType, Query) ->
|
||||||
SnapType,
|
SnapType,
|
||||||
Query).
|
Query).
|
||||||
|
|
||||||
|
fetch_value(Inker, {Key, SQN}) ->
|
||||||
|
SW = os:timestamp(),
|
||||||
|
case leveled_inker:ink_fetch(Inker, Key, SQN) of
|
||||||
|
{ok, Value} ->
|
||||||
|
maybe_longrunning(SW, inker_fetch),
|
||||||
|
Value;
|
||||||
|
not_present ->
|
||||||
|
not_present
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
|
@ -815,39 +837,70 @@ hashtree_query(State, Tag, JournalCheck) ->
|
||||||
foldobjects_allkeys(State, Tag, FoldObjectsFun) ->
|
foldobjects_allkeys(State, Tag, FoldObjectsFun) ->
|
||||||
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
||||||
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
||||||
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun).
|
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun, false).
|
||||||
|
|
||||||
|
foldheads_allkeys(State, Tag, FoldHeadsFun) ->
|
||||||
|
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
||||||
|
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
||||||
|
foldobjects(State, Tag, StartKey, EndKey, FoldHeadsFun, true).
|
||||||
|
|
||||||
foldobjects_bybucket(State, Tag, Bucket, FoldObjectsFun) ->
|
foldobjects_bybucket(State, Tag, Bucket, FoldObjectsFun) ->
|
||||||
StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
||||||
EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
||||||
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun).
|
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun, false).
|
||||||
|
|
||||||
foldobjects_byindex(State, Tag, Bucket, Field, FromTerm, ToTerm, FoldObjectsFun) ->
|
foldheads_bybucket(State, Tag, Bucket, FoldHeadsFun) ->
|
||||||
StartKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field,
|
StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
||||||
FromTerm),
|
EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
||||||
EndKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field,
|
foldobjects(State, Tag, StartKey, EndKey, FoldHeadsFun, true).
|
||||||
ToTerm),
|
|
||||||
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun).
|
|
||||||
|
|
||||||
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun) ->
|
foldobjects_byindex(State, Tag, Bucket,
|
||||||
{ok, LedgerSnapshot, JournalSnapshot} = snapshot_store(State, store),
|
Field, FromTerm, ToTerm, FoldObjectsFun) ->
|
||||||
|
StartKey =
|
||||||
|
leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, FromTerm),
|
||||||
|
EndKey =
|
||||||
|
leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, ToTerm),
|
||||||
|
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun, false).
|
||||||
|
|
||||||
|
|
||||||
|
foldobjects(_State, Tag, StartKey, EndKey, FoldObjectsFun, DeferredFetch) ->
|
||||||
{FoldFun, InitAcc} = case is_tuple(FoldObjectsFun) of
|
{FoldFun, InitAcc} = case is_tuple(FoldObjectsFun) of
|
||||||
true ->
|
true ->
|
||||||
FoldObjectsFun;
|
FoldObjectsFun;
|
||||||
false ->
|
false ->
|
||||||
{FoldObjectsFun, []}
|
{FoldObjectsFun, []}
|
||||||
end,
|
end,
|
||||||
Folder = fun() ->
|
% For fold_objects the snapshot has been moved inside of the Folder
|
||||||
AccFun = accumulate_objects(FoldFun, JournalSnapshot, Tag),
|
% function.
|
||||||
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
%
|
||||||
StartKey,
|
% fold_objects and fold_heads are called by the riak_kv_sweeper in Riak,
|
||||||
EndKey,
|
% and the sweeper prompts the fold before checking to see if the fold is
|
||||||
AccFun,
|
% ready to be run. This may lead to the fold being called on an old
|
||||||
InitAcc),
|
% snapshot.
|
||||||
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
Self = self(),
|
||||||
ok = leveled_inker:ink_close(JournalSnapshot),
|
Folder =
|
||||||
Acc
|
fun() ->
|
||||||
end,
|
{ok,
|
||||||
|
LedgerSnapshot,
|
||||||
|
JournalSnapshot} = book_snapshotstore(Self, Self, 5400),
|
||||||
|
% Timeout will be ignored, as will Requestor
|
||||||
|
%
|
||||||
|
% This uses the external snapshot - as the snpshot will need
|
||||||
|
% to have consistent state between Bookie and Penciller when
|
||||||
|
% it is made.
|
||||||
|
AccFun = accumulate_objects(FoldFun,
|
||||||
|
JournalSnapshot,
|
||||||
|
Tag,
|
||||||
|
DeferredFetch),
|
||||||
|
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
||||||
|
StartKey,
|
||||||
|
EndKey,
|
||||||
|
AccFun,
|
||||||
|
InitAcc),
|
||||||
|
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
||||||
|
ok = leveled_inker:ink_close(JournalSnapshot),
|
||||||
|
Acc
|
||||||
|
end,
|
||||||
{async, Folder}.
|
{async, Folder}.
|
||||||
|
|
||||||
|
|
||||||
|
@ -955,8 +1008,8 @@ set_options(Opts) ->
|
||||||
|
|
||||||
JournalFP = RootPath ++ "/" ++ ?JOURNAL_FP,
|
JournalFP = RootPath ++ "/" ++ ?JOURNAL_FP,
|
||||||
LedgerFP = RootPath ++ "/" ++ ?LEDGER_FP,
|
LedgerFP = RootPath ++ "/" ++ ?LEDGER_FP,
|
||||||
ok =filelib:ensure_dir(JournalFP),
|
ok = filelib:ensure_dir(JournalFP),
|
||||||
ok =filelib:ensure_dir(LedgerFP),
|
ok = filelib:ensure_dir(LedgerFP),
|
||||||
|
|
||||||
{#inker_options{root_path = JournalFP,
|
{#inker_options{root_path = JournalFP,
|
||||||
reload_strategy = ReloadStrategy,
|
reload_strategy = ReloadStrategy,
|
||||||
|
@ -1005,16 +1058,6 @@ fetch_head(Key, Penciller, LedgerCache) ->
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
fetch_value(Key, SQN, Inker) ->
|
|
||||||
SW = os:timestamp(),
|
|
||||||
case leveled_inker:ink_fetch(Inker, Key, SQN) of
|
|
||||||
{ok, Value} ->
|
|
||||||
maybe_longrunning(SW, inker_fetch),
|
|
||||||
Value;
|
|
||||||
not_present ->
|
|
||||||
not_present
|
|
||||||
end.
|
|
||||||
|
|
||||||
|
|
||||||
accumulate_size() ->
|
accumulate_size() ->
|
||||||
Now = leveled_codec:integer_now(),
|
Now = leveled_codec:integer_now(),
|
||||||
|
@ -1053,28 +1096,71 @@ accumulate_hashes(JournalCheck, InkerClone) ->
|
||||||
end,
|
end,
|
||||||
AccFun.
|
AccFun.
|
||||||
|
|
||||||
accumulate_objects(FoldObjectsFun, InkerClone, Tag) ->
|
|
||||||
|
accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
|
||||||
Now = leveled_codec:integer_now(),
|
Now = leveled_codec:integer_now(),
|
||||||
AccFun = fun(LK, V, Acc) ->
|
AccFun =
|
||||||
case leveled_codec:is_active(LK, V, Now) of
|
fun(LK, V, Acc) ->
|
||||||
|
% The function takes the Ledger Key and the value from the
|
||||||
|
% ledger (with the value being the object metadata)
|
||||||
|
%
|
||||||
|
% Need to check if this is an active object (so TTL has not
|
||||||
|
% expired).
|
||||||
|
% If this is a deferred_fetch (i.e. the fold is a fold_heads not
|
||||||
|
% a fold_objects), then a metadata object needs to be built to be
|
||||||
|
% returned - but a quick check that Key is present in the Journal
|
||||||
|
% is made first
|
||||||
|
case leveled_codec:is_active(LK, V, Now) of
|
||||||
|
true ->
|
||||||
|
{SQN, _St, _MH, MD} =
|
||||||
|
leveled_codec:striphead_to_details(V),
|
||||||
|
{B, K} =
|
||||||
|
case leveled_codec:from_ledgerkey(LK) of
|
||||||
|
{B0, K0} ->
|
||||||
|
{B0, K0};
|
||||||
|
{B0, K0, _T0} ->
|
||||||
|
{B0, K0}
|
||||||
|
end,
|
||||||
|
JK = {leveled_codec:to_ledgerkey(B, K, Tag), SQN},
|
||||||
|
case DeferredFetch of
|
||||||
true ->
|
true ->
|
||||||
SQN = leveled_codec:strip_to_seqonly({LK, V}),
|
InJournal =
|
||||||
{B, K} = case leveled_codec:from_ledgerkey(LK) of
|
leveled_inker:ink_keycheck(InkerClone,
|
||||||
{B0, K0} -> {B0, K0};
|
LK,
|
||||||
{B0, K0, _T0} -> {B0, K0}
|
SQN),
|
||||||
end,
|
case InJournal of
|
||||||
QK = leveled_codec:to_ledgerkey(B, K, Tag),
|
probably ->
|
||||||
R = leveled_inker:ink_fetch(InkerClone, QK, SQN),
|
Size = leveled_codec:get_size(LK, V),
|
||||||
case R of
|
MDBin =
|
||||||
{ok, Value} ->
|
leveled_codec:build_metadata_object(LK,
|
||||||
FoldObjectsFun(B, K, Value, Acc);
|
MD),
|
||||||
not_present ->
|
Value = {proxy_object,
|
||||||
|
MDBin,
|
||||||
|
Size,
|
||||||
|
{fun fetch_value/2,
|
||||||
|
InkerClone,
|
||||||
|
JK}},
|
||||||
|
FoldObjectsFun(B,
|
||||||
|
K,
|
||||||
|
term_to_binary(Value),
|
||||||
|
Acc);
|
||||||
|
missing ->
|
||||||
Acc
|
Acc
|
||||||
end;
|
end;
|
||||||
false ->
|
false ->
|
||||||
Acc
|
R = fetch_value(InkerClone, JK),
|
||||||
end
|
case R of
|
||||||
end,
|
not_present ->
|
||||||
|
Acc;
|
||||||
|
Value ->
|
||||||
|
FoldObjectsFun(B, K, Value, Acc)
|
||||||
|
|
||||||
|
end
|
||||||
|
end;
|
||||||
|
false ->
|
||||||
|
Acc
|
||||||
|
end
|
||||||
|
end,
|
||||||
AccFun.
|
AccFun.
|
||||||
|
|
||||||
|
|
||||||
|
@ -1475,19 +1561,123 @@ foldobjects_vs_hashtree_test() ->
|
||||||
?STD_TAG,
|
?STD_TAG,
|
||||||
false}),
|
false}),
|
||||||
KeyHashList1 = lists:usort(HTFolder1()),
|
KeyHashList1 = lists:usort(HTFolder1()),
|
||||||
io:format("First item ~w~n", [lists:nth(1, KeyHashList1)]),
|
|
||||||
FoldObjectsFun = fun(B, K, V, Acc) ->
|
FoldObjectsFun = fun(B, K, V, Acc) ->
|
||||||
[{B, K, erlang:phash2(term_to_binary(V))}|Acc] end,
|
[{B, K, erlang:phash2(term_to_binary(V))}|Acc] end,
|
||||||
{async, HTFolder2} = book_returnfolder(Bookie1,
|
{async, HTFolder2} =
|
||||||
{foldobjects_allkeys,
|
book_returnfolder(Bookie1,
|
||||||
?STD_TAG,
|
{foldobjects_allkeys, ?STD_TAG, FoldObjectsFun}),
|
||||||
FoldObjectsFun}),
|
|
||||||
KeyHashList2 = HTFolder2(),
|
KeyHashList2 = HTFolder2(),
|
||||||
?assertMatch(KeyHashList1, lists:usort(KeyHashList2)),
|
?assertMatch(KeyHashList1, lists:usort(KeyHashList2)),
|
||||||
|
|
||||||
|
FoldHeadsFun =
|
||||||
|
fun(B, K, ProxyV, Acc) ->
|
||||||
|
{proxy_object,
|
||||||
|
_MDBin,
|
||||||
|
_Size,
|
||||||
|
{FetchFun, Clone, JK}} = binary_to_term(ProxyV),
|
||||||
|
V = FetchFun(Clone, JK),
|
||||||
|
[{B, K, erlang:phash2(term_to_binary(V))}|Acc]
|
||||||
|
end,
|
||||||
|
|
||||||
|
{async, HTFolder3} =
|
||||||
|
book_returnfolder(Bookie1,
|
||||||
|
{foldheads_allkeys, ?STD_TAG, FoldHeadsFun}),
|
||||||
|
KeyHashList3 = HTFolder3(),
|
||||||
|
?assertMatch(KeyHashList1, lists:usort(KeyHashList3)),
|
||||||
|
|
||||||
|
FoldHeadsFun2 =
|
||||||
|
fun(B, K, ProxyV, Acc) ->
|
||||||
|
{proxy_object,
|
||||||
|
MD,
|
||||||
|
_Size,
|
||||||
|
_Fetcher} = binary_to_term(ProxyV),
|
||||||
|
{Hash, _Size} = MD,
|
||||||
|
[{B, K, Hash}|Acc]
|
||||||
|
end,
|
||||||
|
|
||||||
|
{async, HTFolder4} =
|
||||||
|
book_returnfolder(Bookie1,
|
||||||
|
{foldheads_allkeys, ?STD_TAG, FoldHeadsFun2}),
|
||||||
|
KeyHashList4 = HTFolder4(),
|
||||||
|
?assertMatch(KeyHashList1, lists:usort(KeyHashList4)),
|
||||||
|
|
||||||
ok = book_close(Bookie1),
|
ok = book_close(Bookie1),
|
||||||
reset_filestructure().
|
reset_filestructure().
|
||||||
|
|
||||||
|
|
||||||
|
foldobjects_vs_foldheads_bybucket_test() ->
|
||||||
|
RootPath = reset_filestructure(),
|
||||||
|
{ok, Bookie1} = book_start([{root_path, RootPath},
|
||||||
|
{max_journalsize, 1000000},
|
||||||
|
{cache_size, 500}]),
|
||||||
|
ObjL1 = generate_multiple_objects(400, 1),
|
||||||
|
ObjL2 = generate_multiple_objects(400, 1),
|
||||||
|
% Put in all the objects with a TTL in the future
|
||||||
|
Future = leveled_codec:integer_now() + 300,
|
||||||
|
lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1,
|
||||||
|
"BucketA", K, V, S,
|
||||||
|
?STD_TAG,
|
||||||
|
Future) end,
|
||||||
|
ObjL1),
|
||||||
|
lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1,
|
||||||
|
"BucketB", K, V, S,
|
||||||
|
?STD_TAG,
|
||||||
|
Future) end,
|
||||||
|
ObjL2),
|
||||||
|
|
||||||
|
FoldObjectsFun = fun(B, K, V, Acc) ->
|
||||||
|
[{B, K, erlang:phash2(term_to_binary(V))}|Acc] end,
|
||||||
|
{async, HTFolder1A} =
|
||||||
|
book_returnfolder(Bookie1,
|
||||||
|
{foldobjects_bybucket,
|
||||||
|
?STD_TAG,
|
||||||
|
"BucketA",
|
||||||
|
FoldObjectsFun}),
|
||||||
|
KeyHashList1A = HTFolder1A(),
|
||||||
|
{async, HTFolder1B} =
|
||||||
|
book_returnfolder(Bookie1,
|
||||||
|
{foldobjects_bybucket,
|
||||||
|
?STD_TAG,
|
||||||
|
"BucketB",
|
||||||
|
FoldObjectsFun}),
|
||||||
|
KeyHashList1B = HTFolder1B(),
|
||||||
|
?assertMatch(false,
|
||||||
|
lists:usort(KeyHashList1A) == lists:usort(KeyHashList1B)),
|
||||||
|
|
||||||
|
FoldHeadsFun =
|
||||||
|
fun(B, K, ProxyV, Acc) ->
|
||||||
|
{proxy_object,
|
||||||
|
_MDBin,
|
||||||
|
_Size,
|
||||||
|
{FetchFun, Clone, JK}} = binary_to_term(ProxyV),
|
||||||
|
V = FetchFun(Clone, JK),
|
||||||
|
[{B, K, erlang:phash2(term_to_binary(V))}|Acc]
|
||||||
|
end,
|
||||||
|
|
||||||
|
{async, HTFolder2A} =
|
||||||
|
book_returnfolder(Bookie1,
|
||||||
|
{foldheads_bybucket,
|
||||||
|
?STD_TAG,
|
||||||
|
"BucketA",
|
||||||
|
FoldHeadsFun}),
|
||||||
|
KeyHashList2A = HTFolder2A(),
|
||||||
|
{async, HTFolder2B} =
|
||||||
|
book_returnfolder(Bookie1,
|
||||||
|
{foldheads_bybucket,
|
||||||
|
?STD_TAG,
|
||||||
|
"BucketB",
|
||||||
|
FoldHeadsFun}),
|
||||||
|
KeyHashList2B = HTFolder2B(),
|
||||||
|
?assertMatch(true,
|
||||||
|
lists:usort(KeyHashList1A) == lists:usort(KeyHashList2A)),
|
||||||
|
?assertMatch(true,
|
||||||
|
lists:usort(KeyHashList1B) == lists:usort(KeyHashList2B)),
|
||||||
|
|
||||||
|
ok = book_close(Bookie1),
|
||||||
|
reset_filestructure().
|
||||||
|
|
||||||
|
|
||||||
scan_table_test() ->
|
scan_table_test() ->
|
||||||
K1 = leveled_codec:to_ledgerkey(<<"B1">>,
|
K1 = leveled_codec:to_ledgerkey(<<"B1">>,
|
||||||
<<"K1">>,
|
<<"K1">>,
|
||||||
|
|
|
@ -139,7 +139,7 @@
|
||||||
{info, "Timeout of snapshot with pid=~w at SQN=~w at TS ~w "
|
{info, "Timeout of snapshot with pid=~w at SQN=~w at TS ~w "
|
||||||
++ "set to timeout=~w"}},
|
++ "set to timeout=~w"}},
|
||||||
{"P0039",
|
{"P0039",
|
||||||
{info, "Failed to relase pid=~w "
|
{info, "Failed to release pid=~w "
|
||||||
++ "leaving SnapshotCount=~w and MinSQN=~w"}},
|
++ "leaving SnapshotCount=~w and MinSQN=~w"}},
|
||||||
|
|
||||||
{"PC001",
|
{"PC001",
|
||||||
|
|
|
@ -331,7 +331,7 @@ init([PCLopts]) ->
|
||||||
PCLopts#penciller_options.start_snapshot,
|
PCLopts#penciller_options.start_snapshot,
|
||||||
PCLopts#penciller_options.snapshot_query,
|
PCLopts#penciller_options.snapshot_query,
|
||||||
PCLopts#penciller_options.bookies_mem} of
|
PCLopts#penciller_options.bookies_mem} of
|
||||||
{undefined, true, Query, BookiesMem} ->
|
{undefined, _Snapshot=true, Query, BookiesMem} ->
|
||||||
SrcPenciller = PCLopts#penciller_options.source_penciller,
|
SrcPenciller = PCLopts#penciller_options.source_penciller,
|
||||||
LongRunning = PCLopts#penciller_options.snapshot_longrunning,
|
LongRunning = PCLopts#penciller_options.snapshot_longrunning,
|
||||||
{ok, State} = pcl_registersnapshot(SrcPenciller,
|
{ok, State} = pcl_registersnapshot(SrcPenciller,
|
||||||
|
@ -342,7 +342,7 @@ init([PCLopts]) ->
|
||||||
leveled_log:log("P0001", [self()]),
|
leveled_log:log("P0001", [self()]),
|
||||||
{ok, State#state{is_snapshot=true,
|
{ok, State#state{is_snapshot=true,
|
||||||
source_penciller=SrcPenciller}};
|
source_penciller=SrcPenciller}};
|
||||||
{_RootPath, false, _Q, _BM} ->
|
{_RootPath, _Snapshot=false, _Q, _BM} ->
|
||||||
start_from_file(PCLopts)
|
start_from_file(PCLopts)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -544,7 +544,9 @@ handle_call(doom, _From, State) ->
|
||||||
handle_cast({manifest_change, NewManifest}, State) ->
|
handle_cast({manifest_change, NewManifest}, State) ->
|
||||||
NewManSQN = leveled_pmanifest:get_manifest_sqn(NewManifest),
|
NewManSQN = leveled_pmanifest:get_manifest_sqn(NewManifest),
|
||||||
ok = leveled_pclerk:clerk_promptdeletions(State#state.clerk, NewManSQN),
|
ok = leveled_pclerk:clerk_promptdeletions(State#state.clerk, NewManSQN),
|
||||||
{noreply, State#state{manifest = NewManifest, work_ongoing=false}};
|
UpdManifest = leveled_pmanifest:merge_snapshot(State#state.manifest,
|
||||||
|
NewManifest),
|
||||||
|
{noreply, State#state{manifest = UpdManifest, work_ongoing=false}};
|
||||||
handle_cast({release_snapshot, Snapshot}, State) ->
|
handle_cast({release_snapshot, Snapshot}, State) ->
|
||||||
Manifest0 = leveled_pmanifest:release_snapshot(State#state.manifest,
|
Manifest0 = leveled_pmanifest:release_snapshot(State#state.manifest,
|
||||||
Snapshot),
|
Snapshot),
|
||||||
|
|
|
@ -37,6 +37,7 @@
|
||||||
mergefile_selector/2,
|
mergefile_selector/2,
|
||||||
add_snapshot/3,
|
add_snapshot/3,
|
||||||
release_snapshot/2,
|
release_snapshot/2,
|
||||||
|
merge_snapshot/2,
|
||||||
ready_to_delete/2,
|
ready_to_delete/2,
|
||||||
check_for_work/2,
|
check_for_work/2,
|
||||||
is_basement/2,
|
is_basement/2,
|
||||||
|
@ -266,18 +267,26 @@ mergefile_selector(Manifest, LevelIdx) ->
|
||||||
{_SK, ME} = lists:nth(random:uniform(length(Level)), Level),
|
{_SK, ME} = lists:nth(random:uniform(length(Level)), Level),
|
||||||
ME.
|
ME.
|
||||||
|
|
||||||
|
%% When the cllerk returns an update manifest to the penciller, the penciller
|
||||||
|
%% should restore its view of the snapshots to that manifest
|
||||||
|
merge_snapshot(PencillerManifest, ClerkManifest) ->
|
||||||
|
ClerkManifest#manifest{snapshots =
|
||||||
|
PencillerManifest#manifest.snapshots,
|
||||||
|
min_snapshot_sqn =
|
||||||
|
PencillerManifest#manifest.min_snapshot_sqn}.
|
||||||
|
|
||||||
add_snapshot(Manifest, Pid, Timeout) ->
|
add_snapshot(Manifest, Pid, Timeout) ->
|
||||||
SnapEntry = {Pid, Manifest#manifest.manifest_sqn, seconds_now(), Timeout},
|
SnapEntry = {Pid, Manifest#manifest.manifest_sqn, seconds_now(), Timeout},
|
||||||
SnapList0 = [SnapEntry|Manifest#manifest.snapshots],
|
SnapList0 = [SnapEntry|Manifest#manifest.snapshots],
|
||||||
ManSQN = Manifest#manifest.manifest_sqn,
|
ManSQN = Manifest#manifest.manifest_sqn,
|
||||||
case Manifest#manifest.min_snapshot_sqn of
|
case Manifest#manifest.min_snapshot_sqn of
|
||||||
0 ->
|
0 ->
|
||||||
|
|
||||||
Manifest#manifest{snapshots = SnapList0,
|
Manifest#manifest{snapshots = SnapList0,
|
||||||
min_snapshot_sqn = ManSQN};
|
min_snapshot_sqn = ManSQN};
|
||||||
N ->
|
N ->
|
||||||
N0 = min(N, ManSQN),
|
N0 = min(N, ManSQN),
|
||||||
Manifest#manifest{snapshots = SnapList0, min_snapshot_sqn = N0}
|
Manifest#manifest{snapshots = SnapList0,
|
||||||
|
min_snapshot_sqn = N0}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
release_snapshot(Manifest, Pid) ->
|
release_snapshot(Manifest, Pid) ->
|
||||||
|
|
|
@ -489,7 +489,7 @@ space_clear_ondelete(_Config) ->
|
||||||
timer:sleep(10000), % Allow for any L0 file to be rolled
|
timer:sleep(10000), % Allow for any L0 file to be rolled
|
||||||
{ok, FNsA_L} = file:list_dir(RootPath ++ "/ledger/ledger_files"),
|
{ok, FNsA_L} = file:list_dir(RootPath ++ "/ledger/ledger_files"),
|
||||||
{ok, FNsA_J} = file:list_dir(RootPath ++ "/journal/journal_files"),
|
{ok, FNsA_J} = file:list_dir(RootPath ++ "/journal/journal_files"),
|
||||||
io:format("Bookie created ~w journal files and ~w ledger files~n",
|
io:format("FNsA - Bookie created ~w journal files and ~w ledger files~n",
|
||||||
[length(FNsA_J), length(FNsA_L)]),
|
[length(FNsA_J), length(FNsA_L)]),
|
||||||
|
|
||||||
% Get an iterator to lock the inker during compaction
|
% Get an iterator to lock the inker during compaction
|
||||||
|
@ -499,6 +499,8 @@ space_clear_ondelete(_Config) ->
|
||||||
{foldobjects_allkeys,
|
{foldobjects_allkeys,
|
||||||
?RIAK_TAG,
|
?RIAK_TAG,
|
||||||
FoldObjectsFun}),
|
FoldObjectsFun}),
|
||||||
|
{async, KF1} = leveled_bookie:book_returnfolder(Book1, AllKeyQuery),
|
||||||
|
|
||||||
% Delete the keys
|
% Delete the keys
|
||||||
SW2 = os:timestamp(),
|
SW2 = os:timestamp(),
|
||||||
lists:foreach(fun({Bucket, Key}) ->
|
lists:foreach(fun({Bucket, Key}) ->
|
||||||
|
@ -529,9 +531,12 @@ space_clear_ondelete(_Config) ->
|
||||||
lists:seq(1, 15)),
|
lists:seq(1, 15)),
|
||||||
io:format("Waiting for journal deletes - blocked~n"),
|
io:format("Waiting for journal deletes - blocked~n"),
|
||||||
timer:sleep(20000),
|
timer:sleep(20000),
|
||||||
KeyHashList1 = HTreeF1(),
|
|
||||||
io:format("Key Hash List returned of length ~w~n", [length(KeyHashList1)]),
|
% for this query snapshot is made at fold time
|
||||||
true = length(KeyHashList1) == 80000,
|
true = length(HTreeF1()) == 0,
|
||||||
|
% This query uses a genuine async fold on a snasphot made at request time
|
||||||
|
true = length(KF1()) == 80000,
|
||||||
|
|
||||||
io:format("Waiting for journal deletes - unblocked~n"),
|
io:format("Waiting for journal deletes - unblocked~n"),
|
||||||
timer:sleep(20000),
|
timer:sleep(20000),
|
||||||
{ok, FNsB_L} = file:list_dir(RootPath ++ "/ledger/ledger_files"),
|
{ok, FNsB_L} = file:list_dir(RootPath ++ "/ledger/ledger_files"),
|
||||||
|
@ -539,7 +544,7 @@ space_clear_ondelete(_Config) ->
|
||||||
{ok, FNsB_PC} = file:list_dir(RootPath
|
{ok, FNsB_PC} = file:list_dir(RootPath
|
||||||
++ "/journal/journal_files/post_compact"),
|
++ "/journal/journal_files/post_compact"),
|
||||||
PointB_Journals = length(FNsB_J) + length(FNsB_PC),
|
PointB_Journals = length(FNsB_J) + length(FNsB_PC),
|
||||||
io:format("Bookie has ~w journal files and ~w ledger files " ++
|
io:format("FNsB - Bookie has ~w journal files and ~w ledger files " ++
|
||||||
"after deletes~n",
|
"after deletes~n",
|
||||||
[PointB_Journals, length(FNsB_L)]),
|
[PointB_Journals, length(FNsB_L)]),
|
||||||
|
|
||||||
|
@ -566,7 +571,7 @@ space_clear_ondelete(_Config) ->
|
||||||
end,
|
end,
|
||||||
ok = leveled_bookie:book_close(Book2),
|
ok = leveled_bookie:book_close(Book2),
|
||||||
{ok, FNsC_L} = file:list_dir(RootPath ++ "/ledger/ledger_files"),
|
{ok, FNsC_L} = file:list_dir(RootPath ++ "/ledger/ledger_files"),
|
||||||
io:format("Bookie has ~w ledger files " ++
|
io:format("FNsC - Bookie has ~w ledger files " ++
|
||||||
"after close~n", [length(FNsC_L)]),
|
"after close~n", [length(FNsC_L)]),
|
||||||
|
|
||||||
{ok, Book3} = leveled_bookie:book_start(StartOpts1),
|
{ok, Book3} = leveled_bookie:book_start(StartOpts1),
|
||||||
|
@ -576,8 +581,12 @@ space_clear_ondelete(_Config) ->
|
||||||
timer:sleep(12000),
|
timer:sleep(12000),
|
||||||
ok = leveled_bookie:book_close(Book3),
|
ok = leveled_bookie:book_close(Book3),
|
||||||
{ok, FNsD_L} = file:list_dir(RootPath ++ "/ledger/ledger_files"),
|
{ok, FNsD_L} = file:list_dir(RootPath ++ "/ledger/ledger_files"),
|
||||||
io:format("Bookie has ~w ledger files " ++
|
io:format("FNsD - Bookie has ~w ledger files " ++
|
||||||
"after second close~n", [length(FNsD_L)]),
|
"after second close~n", [length(FNsD_L)]),
|
||||||
|
lists:foreach(fun(FN) ->
|
||||||
|
io:format("FNsD - Ledger file if ~s~n", [FN])
|
||||||
|
end,
|
||||||
|
FNsD_L),
|
||||||
true = PointB_Journals < length(FNsA_J),
|
true = PointB_Journals < length(FNsA_J),
|
||||||
true = length(FNsD_L) < length(FNsA_L),
|
true = length(FNsD_L) < length(FNsA_L),
|
||||||
true = length(FNsD_L) < length(FNsB_L),
|
true = length(FNsD_L) < length(FNsB_L),
|
||||||
|
|
|
@ -4,6 +4,7 @@
|
||||||
-export([all/0]).
|
-export([all/0]).
|
||||||
-export([retain_strategy/1,
|
-export([retain_strategy/1,
|
||||||
recovr_strategy/1,
|
recovr_strategy/1,
|
||||||
|
aae_missingjournal/1,
|
||||||
aae_bustedjournal/1,
|
aae_bustedjournal/1,
|
||||||
journal_compaction_bustedjournal/1
|
journal_compaction_bustedjournal/1
|
||||||
]).
|
]).
|
||||||
|
@ -11,6 +12,7 @@
|
||||||
all() -> [
|
all() -> [
|
||||||
retain_strategy,
|
retain_strategy,
|
||||||
recovr_strategy,
|
recovr_strategy,
|
||||||
|
aae_missingjournal,
|
||||||
aae_bustedjournal,
|
aae_bustedjournal,
|
||||||
journal_compaction_bustedjournal
|
journal_compaction_bustedjournal
|
||||||
].
|
].
|
||||||
|
@ -94,6 +96,51 @@ recovr_strategy(_Config) ->
|
||||||
testutil:reset_filestructure().
|
testutil:reset_filestructure().
|
||||||
|
|
||||||
|
|
||||||
|
aae_missingjournal(_Config) ->
|
||||||
|
RootPath = testutil:reset_filestructure(),
|
||||||
|
StartOpts = [{root_path, RootPath},
|
||||||
|
{max_journalsize, 20000000},
|
||||||
|
{sync_strategy, testutil:sync_strategy()}],
|
||||||
|
{ok, Bookie1} = leveled_bookie:book_start(StartOpts),
|
||||||
|
{TestObject, TestSpec} = testutil:generate_testobject(),
|
||||||
|
ok = testutil:book_riakput(Bookie1, TestObject, TestSpec),
|
||||||
|
testutil:check_forobject(Bookie1, TestObject),
|
||||||
|
GenList = [2],
|
||||||
|
_CLs = testutil:load_objects(20000, GenList, Bookie1, TestObject,
|
||||||
|
fun testutil:generate_objects/2),
|
||||||
|
|
||||||
|
FoldHeadsFun =
|
||||||
|
fun(B, K, _V, Acc) -> [{B, K}|Acc] end,
|
||||||
|
|
||||||
|
{async, AllHeadF1} =
|
||||||
|
leveled_bookie:book_returnfolder(Bookie1,
|
||||||
|
{foldheads_allkeys,
|
||||||
|
?RIAK_TAG,
|
||||||
|
FoldHeadsFun}),
|
||||||
|
HeadL1 = length(AllHeadF1()),
|
||||||
|
io:format("Fold head returned ~w objects~n", [HeadL1]),
|
||||||
|
|
||||||
|
ok = leveled_bookie:book_close(Bookie1),
|
||||||
|
CDBFiles = testutil:find_journals(RootPath),
|
||||||
|
[HeadF|_Rest] = CDBFiles,
|
||||||
|
io:format("Selected Journal for removal of ~s~n", [HeadF]),
|
||||||
|
ok = file:delete(RootPath ++ "/journal/journal_files/" ++ HeadF),
|
||||||
|
|
||||||
|
{ok, Bookie2} = leveled_bookie:book_start(StartOpts),
|
||||||
|
% Check that fold heads picks up on the missing file
|
||||||
|
{async, AllHeadF2} =
|
||||||
|
leveled_bookie:book_returnfolder(Bookie2,
|
||||||
|
{foldheads_allkeys,
|
||||||
|
?RIAK_TAG,
|
||||||
|
FoldHeadsFun}),
|
||||||
|
HeadL2 = length(AllHeadF2()),
|
||||||
|
io:format("Fold head returned ~w objects~n", [HeadL2]),
|
||||||
|
true = HeadL2 < HeadL1,
|
||||||
|
true = HeadL2 > 0,
|
||||||
|
|
||||||
|
ok = leveled_bookie:book_close(Bookie2),
|
||||||
|
testutil:reset_filestructure().
|
||||||
|
|
||||||
aae_bustedjournal(_Config) ->
|
aae_bustedjournal(_Config) ->
|
||||||
RootPath = testutil:reset_filestructure(),
|
RootPath = testutil:reset_filestructure(),
|
||||||
StartOpts = [{root_path, RootPath},
|
StartOpts = [{root_path, RootPath},
|
||||||
|
@ -223,7 +270,7 @@ aae_bustedjournal(_Config) ->
|
||||||
KeyHashList6 = HashTreeF6(),
|
KeyHashList6 = HashTreeF6(),
|
||||||
true = length(KeyHashList6) > 19000,
|
true = length(KeyHashList6) > 19000,
|
||||||
true = length(KeyHashList6) < HeadCount,
|
true = length(KeyHashList6) < HeadCount,
|
||||||
|
|
||||||
ok = leveled_bookie:book_close(Bookie4),
|
ok = leveled_bookie:book_close(Bookie4),
|
||||||
|
|
||||||
testutil:restore_topending(RootPath, HeadF),
|
testutil:restore_topending(RootPath, HeadF),
|
||||||
|
|
Binary file not shown.
After Width: | Height: | Size: 115 KiB |
Binary file not shown.
After Width: | Height: | Size: 78 KiB |
Loading…
Add table
Add a link
Reference in a new issue