diff --git a/docs/ANTI_ENTROPY.md b/docs/ANTI_ENTROPY.md
new file mode 100644
index 0000000..f92eb19
--- /dev/null
+++ b/docs/ANTI_ENTROPY.md
@@ -0,0 +1,281 @@
+# Anti-Entropy
+
+## Background
+
+In the early history of Riak, there were three levels of protection against loss of data, where loss is caused by either a backend store not receiving data (because it was unavailable), or losing writes (due to a crash, or corruption of previously written data):
+
+- [Read repair](http://docs.basho.com/riak/kv/2.2.3/learn/concepts/replication/#read-repair), whenever an object was read, the finite-state-machine managing the GET, would wait for a response from all vnodes; after replying to the client the FSM would update any vnode which had revealed an out of date version of the object.
+
+- [Hinted handoff](http://docs.basho.com/riak/kv/2.2.3/using/reference/handoff/#types-of-handoff), if a fallback node has taken responsibility for writes to a given vnode due to a temporary ring change in the cluster (e.g. due to a node failure), then when the expected primary returns to service the fallback node should be triggered to handoff any data it has (from this or any previous fallback period) to the expected primary vnode. Once handoff was complete the vnode would then self-destruct and remove any durable state. Fallback nodes start vnodes for the same ring partition as the primary vnode. A fallback node is selected because it owns the next vnode in the ring, but it starts a new vnode to replace the primary vnode, it doesn't store data in the vnode backend which caused it to be considered a fallback (fallback is to a node not to a vnode) - so handoff is not normally required to be selective about the data that is handed off.
+
+- [Key-listing for multi-data-centre replication](http://docs.basho.com/riak/kv/2.2.3/using/reference/v2-multi-datacenter/architecture/#fullsync-replication), for customers with the proprietary Riak Enterprise software there was a mechanism whereby vnode by vnode there would be a fold over all the objects in the vnode, for a replicated bucket, calculating a hash for the object and sending the keys and hashes to a replicated cluster for comparison with the result of its equivalent object fold. Any variances would then be repaired by streaming those missing updates between the clusters to be re-added across all required vnodes.
+
+There were three primary issues with these mechanisms:
+
+- Some objects may be read very infrequently, and such objects may be lost due to a series of failure or disk-corruption events that occurred between reads and hence without the protection of read repair.
+
+- For large stores per-vnode object folding required for MDC was an expensive operation, and when run in parallel with standard database load could lead to unpredictable response times.
+
+- Some read events do not validate across multiple vnodes, primarily secondary index queries, so an inconsistent index due to a failed write would never be detected by the database. Secondary index queries were not necessarily eventually consistent, but were potentially never consistent.
+
+To address these weaknesses Active Anti-Entropy (AAE) was introduced to Riak, as a configurable option. Configuring Active Anti-Entropy would start a new AAE "hashtree" store for every primary vnode in the ring. The vnode process would, following a successful put, [update this hashtree store process](https://github.com/basho/riak_kv/blob/2.1.7/src/riak_kv_vnode.erl#L2139-L2169) by sending it the updated object after converting it from its binary format. This would generally happen in via async message passing, but periodically the change would block the vnode to confirm that the AAE process was keeping up. The hashtree store process would hash the riak object to create a hash for the update, and hash the Key to map it to one of 1024 * 1024 segments - and then in batches update the store with a key of {$t, Partition, Segment, Key}, and a value of the object hash.
+
+From this persisted store a [Merkle tree](https://en.wikipedia.org/wiki/Merkle_tree) is maintained for each Partition. These Merkle trees can then then be exchanged with another vnode's AAE hashtree store if that vnode is also a primary vnode for that same partition. Exchanging the Merkle tree would highlight any segments which had a variance - and then it would be possible to iterate over the store segment by segment to discover which keys actually differed. The objects associated with these keys could then be re-read within the actual vnode stores, so that read repair would correct any entropy that had been indicated by the discrepancy between the hashtree stores.
+
+The process of maintaining the hashtree is partially deferred to the point of the exchange, and this update process is of a low but not-necessarily non-trivial cost. the cost is low enough so that these exchanges can occur with reasonable frequency (i.e. many minutes between exchanges) without creating significant background load.
+
+Infrequently, but regularly, the hashtree store would be cleared and rebuilt from an object fold over the vnode store to ensure that it reflected the actual persisted state in the store. This rebuild process depends on some cluster-wide lock acquisition and other throttling techniques, as it has to avoid generating false negative results from exchanges scheduled to occur during the rebuild, avoid unexpected conditions on shutdown during the rebuild, avoid excessive concurrency of rebuild operations within the cluster, and avoid flooding the cluster with read-repair events following a rebuild. Despite precautionary measures within the design, the rebuild process is, when compared to other Riak features, a relatively common trigger for production issues.
+
+Prior to AAE being available there were three primary issues with anti-entropy in Riak, as listed above. The impact on the three primary issues from introducing AAE was:
+
+- Repair was de-coupled from reads, and so unread objects would not have a vastly reduced risk of disappearing following node failure and disk corruption events.
+
+- Exchanges have a predictable and limited impact on cluster load, relative to object folds, if the variance highlighted by the exchange is small.
+
+- Secondary index queries would be made consistent following PUT failure after the next AAE exchange, and those exchanges are regular. Consistency is maintained by comparison of the actual object, not the index entries within the backed, and so loss of index data due to backend corruption would still not be detected by AAE.
+
+Although this represented an improvement in terms of entropy management, there were still some imperfections with the approach:
+
+- The hash of the object was *not* based on a canonicalised version of the object, so could be inconsistent between trees (https://github.com/basho/riak_kv/issues/1189).
+
+- Converting the object from_binary and sending it to another process (to pass from the `riak_kv_vnode` to the `riak_kv_index_hashtree` has a potentially non-trivial cost for larger objects with significant amounts of metadata (e.g. 2i terms).
+
+- Hashtrees may become mysteriously inconsistent following rebuilds, if the rebuild followed a cluster change operation (e.g. adding/removing a node) - and there would be storms of read actions prompted that would not lead to repairs.
+
+- The anti-entropy mechanism is tightly coupled with the partitioning of the cluster, and so cannot be used between clusters of different ring-sizes so that replication cannot support safe ring-size changes (i.e. we cannot change ring size by starting another cluster with a different size and replicating to that cluster).
+
+- The hashtrees are not externally exposed, and so cannot be used for externally managed replication (e.g. to another database).
+
+- The rebuilds of the hashtree still require the relatively expensive fold_objects operation, and so parallelisation of rebuilds may need to be controlled to prevent an impact on cluster performance. Measuring the impact is difficult in pre-production load tests due to the scheduled and infrequent nature of AAE rebuilds.
+
+- Improvements to hashtrees require significant devleopment and test for transition, due to the potential for hashtree changes to break many things (e.g. Solr integration, MDC), and also the difficulty in coordinating changes between different dependent systems that independently build state over long periods of time.
+
+## Leveled and AAE
+
+Leveled is primarily designed to be a backend for Riak. As part of the ongoing community work to build improvements into a new pure open-source release of Riak, some features have been added directly to Leveled to explore some potential enhancements to anti-entropy. These features are concerned with:
+
+- Allowing for the database state within in a Leveled store or stores to be compared with an other store or stores which should share a portion of that state;
+
+- Allowing for quicker checking that recent changes to one store or stores have also been received by another store or stores that should be receiving the same changes.
+
+The aim is to use these as new backend capabilities, combined with new coverage FSM query behaviour, to allow for new Riak anti-entropy mechanisms with the following features:
+
+- Comparison can be made between clusters with different ring-sizes - comparison is not coupled to partitioning.
+
+- Comparison can use a consistent approach to compare state within and between clusters.
+
+- Comparison does not rely on duplication of database state to a separate anti-entropy database, with further anti-entropy required to manage state variance between the actual stores and anti-entropy stores.
+
+- Comparison of state can be abstracted from Riak specific implementation so that mechanisms to compare between Riak clusters can be re-used to compare between a Riak cluster and another database store. Coordination with another data store (e.g. Solr) can be controlled by the Riak user not just the Riak developer.
+
+- Comparison can be controlled at a bucket level, so that buckets can be configured to be either specifically whitelisted into the anti-entropy scope, or blacklisted from it - with the option to support different schedules for anti-entropy operations for different buckets when whitelisting is used.
+
+- Through the use of key types allow for flexibility to calculate anti-entropy mechanisms in a way specific to the type of object being stored (e.g. support alternative mechanisms for some CRDT types).
+
+## Merkle Trees
+
+Riak has historically used [Merkle trees](https://en.wikipedia.org/wiki/Merkle_tree) as a way to communicate state efficiently between actors. Merkle trees have been designed to be cryptographically secure so that they don't leak details of the individual transactions themselves. This strength is useful in many Merkle Tree use cases, and is part derived from the use of concatenation when calculating branch hashes from leaf hashes:
+
+> A hash tree is a tree of hashes in which the leaves are hashes of data blocks in, for instance, a file or set of files. Nodes further up in the tree are the hashes of their respective children. For example, in the picture hash 0 is the result of hashing the concatenation of hash 0-0 and hash 0-1. That is, hash 0 = hash( hash 0-0 + hash 0-1 ) where + denotes concatenation.
+
+A side effect of the concatenation decision is that trees cannot be calculated incrementally, when elements are not ordered by segment. To calculate the hash of an individual leaf (or segment), the hashes of all the elements under that leaf must be accumulated first. In the case of the leaf segments in Riak, the leaf segments are made up of a hash of the concatenation of {Key, Hash} pairs under that leaf:
+
+``hash([{K1, H1}, {K2, H2} .. {Kn, Hn}])``
+
+This requires all of the keys and hashes to be pulled into memory to build the hashtree - unless the tree is being built segment by segment. The Riak hashtree data store is therefore ordered by segment so that it can be incrementally built. The segments which have had key changes are tracked, and at exchange time all "dirty segments" are re-scanned in the store segment by segment, so that the hashtree can be rebuilt. Note though, that this is necessary in the current hashtree implementation even if there was an incrementally buildable Merkle Tree, as there is no read before write into the hashtree to inform the process of what update (if any) to reverse out of the Tree as well as which update to add in.
+
+## Tic-Tac Merkle Trees
+
+Anti-entropy in leveled is supported using the [leveled_tictac](https://github.com/martinsumner/leveled/blob/mas-tictac/src/leveled_tictac.erl) module. This module uses a less secure form of merkle trees that don't prevent information from leaking out, or make the tree tamper-proof, but allow for the trees to be built incrementally, and trees built incrementally to be merged. These Merkle trees we're calling Tic-Tac Trees after the [Tic-Tac language](https://en.wikipedia.org/wiki/Tic-tac) to fit in with Bookmaker-based naming conventions of leveled. The Tic-Tac language has been historically used on racecourses to communicate the state of the market between participants; although the more widespread use of mobile communications means that the use of Tic-Tac is petering out, and rather like Basho employees, there are now only three Tic-Tac practitioners left.
+
+The first change from secure Merkle trees is simply to XOR together hashes to combine them, rather than re-hash a concatenation of keys and hashes. Combined with the use of trees of fixed sizes, this allows for tree merging to be managed through XOR operations. So a segment leaf is calculated from:
+
+``hash(K1, H1) XOR hash(K2, H2) XOR ... hash(Kn, Hn)``
+
+The Keys and hashes can now be combined in any order with any grouping. The use of XOR instead of concatenation is [discouraged in secure Merkle Trees](https://security.stackexchange.com/questions/89847/can-xor-be-used-in-a-merkle-tree-instead-of-concatenation) but is not novel in its use within [trees focused on anti-entropy](http://distributeddatastore.blogspot.co.uk/2013/07/cassandra-using-merkle-trees-to-detect.html).
+
+This enables two things:
+
+- The tree can be built incrementally when scanning across a store not in segment order (i.e. scanning across a store in key order) without needing to hold an state in memory beyond the fixed size of the tree.
+
+- Two trees from stores with non-overlapping key ranges can be merged to reflect the combined state of that store i.e. the trees for each store can be built independently and in parallel and the subsequently merged without needing to build an interim view of the combined state.
+
+It is assumed that the trees will only be transferred securely between trusted actors already permitted to view, store and transfer the real data: so the loss of cryptographic strength of the tree is irrelevant to the overall security of the system.
+
+## Recent and Whole
+
+### Current Riak AAE
+
+Anti-entropy in Riak is a dual-track process:
+
+- there is a capacity to efficiently and rapidly provide an update on overall vnode state that reflects recent additions, by maintaining a separate anti-entropy store in parallel to each primary store;
+
+- there is a capacity to ensure that the anti-entropy view of state represents the state of the whole database, as actually exists on disk, by periodically rebuilding that view from the primary store.
+
+Within the current Riak AAE implementation, tracking recent changes is supported by having a dedicated anti-entropy store organised by segments (an identifier of a leaf of the Merkle tree) so that the Merkle tree can be updated incrementally to reflect recent changes. When a new update is received an AAE tree update is made following the vnode update, and the segment is marked as requiring an update before completing the next Merkle tree exchange.
+
+However as the view of the whole state is maintained in a different store to that holding the actual data: there is an entropy problem between the actual store and the AAE store e.g. data could be lost from the real store, and go undetected as it is not lost from the AAE store. So periodically the AAE store is rebuilt by scanning the whole of the real store. This rebuild can be an expensive process, and the cost is commonly controlled through performing this task infrequently. Prior to the end of Basho there were changes pending in riak\_kv's develop branch to better throttle and schedule these updates - through the `riak_kv_sweeper`, so that the store could be built more frequently with safety (and so that the scans necessary to build the store could be multi-purpose).
+
+The AAE store also needs to be partially scanned on a regular basis to update the current view of the Merkle tree, to reflect the segments which have been altered by recent changes. If a vnode has 100M keys, and there has been 1000 updates since the last merkle tree was updated - then there will need to be o(1000) seeks across subsets of the store returning o(100K) keys in total. As the store grows, the AAE store can grow to a non-trivial size, and these operations may have an impact on the page-cache and disk busyness within the node.
+
+The AAE store is re-usable for checking consistency between databases, but with the following limitations:
+
+- the two stores need to be partitioned equally, constraining replication to other database technologies, and preventing replication from being used as an approach to re-partitioning (ring re-sizing).
+
+- the AAE store is not split by bucket, and so supporting replication configured per bucket is challenging.
+
+The AAE process in production system commonly raises false positives (prompts repairs that are unnecessary), sometimes for [known reasons](https://github.com/basho/riak_kv/issues/1189), sometimes for unknown reasons, especially following rebuilds which follow ring changes. The repair process has [a throttle](http://docs.basho.com/riak/kv/2.2.3/using/cluster-operations/active-anti-entropy/#throttling) to prevent this from impacting a production system, but this commonly needs to be re-tuned based on experience.
+
+### Proposed Leveled AAE
+
+The first stage in considering an alternative approach to anti-entropy, was to question the necessity of having a dedicated AAE database that needs to reflect all key changes in the actual vnode store. This separate store is currently necessary as the hashtree needs a store sorted by segment ID that make that store easier to scan for rebuilds of the tree, hence avoiding the three main costs with scanning over the primary database:
+
+- the impact on the page cache as all keys and values have to be read from disk, including not-recently used values;
+
+- the overall I/O load (primarily disk-related) of scanning the full database from disk;
+
+- the overall I/O load (primarily network-related) of streaming results from the fold.
+
+The third cost can be addressed by the fold output being an incrementally updatable tree of a fixed size; i.e. if the fold builds a Tic-Tac tree and doesn't stream results (like list keys), and guarantees a fixed size output both from a single partition and following merging across multiple partitions.
+
+Within Leveled the first two costs are reduced by design due to the separation of Keys and Metadata from the object value, reducing significantly the workload associated with such a scan; especially where values are large.
+
+The [testing of traditional Riak AAE](https://github.com/martinsumner/leveled/blob/master/docs/VOLUME.md#leveled-aae-rebuild-with-journal-check) already undertaken has shown that scanning the database is not necessarily such a big issue in Leveled. So it does seem potentially feasible to scan the store on a regular basis. The testing of Leveldb with the `riak_kv_sweeper` feature shows that with the improved throttling more regular scanning is also possible here: testing with `riak_kv_sweeper` managed to achieve 10 x the number of sweeps, with only a 9% drop in throughput.
+
+A hypothesis is proposed that regular scanning of the full store to produce a Tic-Tac tree is certainly feasible in Leveled, but also potentially tolerable in other back-ends. However, frequent scanning is likely to still be impractical. If it is not possible to scan the database frequently, if a recent failure event has led to a discrepancy between stores, this will not be detected in a timely manner. It is therefore suggested that there should be an alternative form of anti-entropy that can be run in addition to scanning, that is lower cost and can be run frequently in support of whole database scanning. This additional anti-entropy mechanism would focus on the job of verifying that recent changes have been received.
+
+So there would be two anti-entropy mechanisms, one which can be run frequently (minutes) to check for the receipt of recent changes, and one that can be run regularly but infrequently (hours/days) to check that full database state is consistent.
+
+It is proposed to compare full database state by scanning the actual store, but producing a Tic-Tac Merkle tree as the outcome, one that can be merged across partitions through a coverage query to provide an overall view of the database state. This view could be compared with different coverage query offsets within the same cluster, and with different replicated clusters.
+
+To provide a check on recent changes it is proposed to add a temporary index within the store, with an entry for each change that is built from a rounded last modified date and the hash of the value, so that the index can be scanned to form a Tic-Tac tree of recent changes. This assumes that each object has a Last Modified Date that is consistent (for that version) across all points where that particular version is stored, to use as the field name for the index. The term of the index is based on the segment ID (for the tree) and the hash of the value. This allows for a scan to build a tree of changes for a given range of modified dates, as well as a scan for keys and hashes to be returned for a given segment ID and date range.
+
+As this index only covers recent changes, it will be limited in size, and mainly in-memory, and so it can be scanned frequently in a cost-effective manner to both gather trees for comparison, and discover Keys in segments with variations.
+
+Within the Leveled the index can be made temporary by giving the entry a time-to-live, independent of any object time to live. So once the change is beyond the timescale in which the operator wishes to check for recent changes, it will naturally be removed from the database (through deletion on the next compaction event that hits the entry in the Ledger). Therefore in the long-term, there is no need to maintain additional state outside of the primary database stores, in order to manage anti-entropy. This may also be possible using TTL features in leveldb.
+
+Hence overall this should give:
+
+- A low cost mechanism for checking for the distribution of recent changes.
+
+- A mechanism for infrequently comparing overall state that is naturally consistent with the actual store state, without compromising operational stability of the store.
+
+- No additional long-term overhead (i.e. duplicate key store for anti-entropy).
+
+
+## Leveled Implementation
+
+### Full Database Anti-Entropy
+
+There are two parts to the full database anti-entropy mechanism: the Tic-Tac trees implemented in the leveled_tictac modules; and the queries required to build the trees available through the book_returnfolder function. There are two types of queries supported -
+
+```
+{tictactree_obj,
+ {Tag, Bucket, StartKey, EndKey, CheckPresence},
+ TreeSize,
+ PartitionFilter}
+```
+
+```
+{tictactree_idx,
+ {Bucket, IdxField, StartValue, EndValue},
+ TreeSize,
+ PartitionFilter}
+```
+
+The tictactree_obj folder produces a Tic-Tac tree from a fold across the objects (or more precisely the heads of the objects in the Ledger) using the constraints Tag, Bucket, StartKey and EndKey. CheckPresence can be used to require the folder to confirm if the value is present in the Journal before including it in the tree - this will slow down the fold significantly, but protect from corruption in the Journal not represented in the Ledger. The partition filter can be used where the store holds data from multiple partitions, and only data form a subset of partitions should be included, with the partition filter being a function on the Bucket and Key to make that decision.
+
+The tictactree_idx folder produces a Tic-Tac tree from a range of an index, and so can be used like tictactree_obj but for checking that an index is consistent between coverage offsets or between databases.
+
+These two folds are tested in the tictac_SUITE test suite in the ``many_put_compare`` and ``index_compare`` tests.
+
+### Near Real-Time Anti-Entropy
+
+The near real-time anti-entropy process can be run in two modes: blacklisting and whitelisting. In blacklisting mode, specific buckets can be excluded from anti-entropy management, and all buckets not excluded are managed in a single "$all" bucket. Anti-entropy queries will need to always be requested against the "$all" bucket. In whitelisting mode, only specific buckets are included in anti-entropy management. Anti-entropy queries will need to be requested separately for each whitelisted bucket, and may be scheduled differently for each bucket.
+
+The index entry is then of the form:
+
+- Tag: ?IDX_TAG
+
+- Bucket: Bucket
+
+- Field: Last Modified Date (rounded down to a configured unit in minutes)
+
+- Term: Segment ++ "." ++ Hash
+
+- Key : Key
+
+In blacklist mode the Bucket will be $all, and the Key will actually be a {Bucket, Key} pair.
+
+The index entry is given a TTL of a configurable amount (e.g. 1 hour) - and no index entry may be added if the change is already considered to be too far in the past. The index entry is added to the Ledger in the same transaction as an object value update, and will be re-calculated and re-added out of the Journal under restart conditions where the change has not reached a persisted state in the Ledger prior to the close, for example after a crash.
+
+Querying this anti-entropy index can re-use the ``tictactree_idx`` query feature used for Full Database Anti-Entropy.
+
+The near real-time entropy index currently has four ct tests:
+
+- `recent_aae_noaae` (confirming loading a store with real-time aae disabled has no impact);
+
+- `recent_aae_allaae` (confirming that a single store loaded with data can be compared with the a store where the same data is spread across three leveled instances - with all buckets covered by anti-entropy);
+
+- `recent_aae_bucketaae` (confirming that a single store loaded with data can be compared with the a store where the same data is spread across three leveled instances - with a single buckets covered by anti-entropy);
+
+- `recent_aae_expiry` (confirming that aae index will expire).
+
+### Clock Drift
+
+The proposed near-real-time anti-entropy mechanism depends on a timestamp, so ultimately some false positives and false negatives are unavoidable - especially if clock drift is large. The assumption is:
+
+- that this method is never a single dependency for ensuring consistency, it is supported by other mechanisms to further validate, that would detect false negatives.
+
+- that recovery from false positives will be safely implemented, so that a falsely identified discrepancy is validated before a change is made (e.g. read-repair).
+
+Even with this mitigation, the volume of false positives and negatives needs to be controlled, in particular where clock drift is small (i.e. measured in seconds), and hence likely. If the object has a last modified date set in one place, as with Riak, there is no issue with different actors seeing a different last modified date for the same change. However, as the index object should expire the risk exists that the store will set an inappropriate expiry time, or even not index the object as it considers the object to be a modification too far in the past. The Near Real-Time AAE process has the concept of unit minutes, which represents the level of granularity all times will be rounded to. All expiry times are set with a tolerance equal to the unit minutes, to avoid false positives or negatives when clock drift is small.
+
+## Alternative Approaches
+
+The approach considered for Leveled has been to modify the Merkle trees used to ease their implementation, as well as specifically separating out anti-entropy for recent changes as a different problem to long-term anti-entropy of global state.
+
+[Recent research](http://haslab.uminho.pt/tome/files/global_logical_clocks.pdf) has been released which examines using dotted version vectors at a node level to improve the efficiency of managing key-level consistency, reducing the risks associated with deletes (without permanent tombstones), but also provide an inherently integrated approach to active anti-entropy.
+
+The [Global Logical Clock](https://github.com/ricardobcl/DottedDB) approach does assume that durability is not mutable:
+
+> Nodes have access to durable storage; nodes can crash but
+eventually will recover with the content of the durable storage as at the time of
+the crash.
+
+It is strongly preferred that our anti-entropy approach can deal with the loss of data that had been persisted to disk (e.g. perhaps through administrative error or disk failure), not just the loss of updates not received. This doesn't mean that such an approach is invalid as:
+
+- the near real-time approach element of anti-entropy *is* only focused on the loss of updates not received;
+
+- it may be possible to periodically rebuild the state of bitmapped version vectors based on the data found on disk (similarly to the current hashtree rebuild process in Riak AAE).
+
+Some further consideration has been given to using a version of this Global Logical Clock approach to managing near-real-time anti-entropy only. More understanding of the approach is required to progress though, in particular:
+
+- How to manage comparisons between clusters with different partitioning algorithms (e.g different ring-sizes);
+
+- How to discover key IDs from missing dots where the controlling node for the update has recently failed.
+
+This likely represent gaps in current understanding, rather than flaws in the approach. The evolution of this research will be tracked with interest.
+
+## Some Notes on Riak implementation
+
+Some notes on re-using this alternative anti-entropy mechanism within Riak:
+
+* There is divergence between Leveled and LevelDB with regards to how async folds are implemented. Within LevelDB requesting an async fold returns a folder function that will take a snapshot when it is called. Within Leveled the option exists to take the snapshot before returning the folder function, so that calling the folder function will work on a snapshot of the store taken when the folder was requested. This difference caused issues with testing with riak_kv_sweeeper, as the scheduling in sweeper meant that folds would be requested, and left on a queue for a long enough to be timed out by the time it was called. The quick fix for riak_kv_sweeper testing was to make the folder snapshot behaviour in Leveled consistent with LevelDB. However, the original behaviour opens up some interesting possibilities for AAE implementation in that a coverage set of vnodes could be snapshotted at a point in time, but not all folds need to be run concurrently to make the result consistent to the point in time. This would allow folds could be directly throttled during the coverage process to manage the number of folds running on each node at once, without opening up a time-gap between snapshots that would increase the number of false repairs.
+
+ - It may be possible to make the leveldb behaviour async like leveled. The fold function contains the setup of the iterator and doing the fold, and perhaps these could be separated such that the iterator would be setup prior to the fold function being returned:
+
+ ```
+ fold_keys(Ref, Fun, Acc0, Opts) ->
+ {ok, Itr} = iterator(Ref, Opts, keys_only),
+ do_fold(Itr, Fun, Acc0, Opts).
+ ```
+ - Likewise with bitcask, it currently is async with the snapshot effectively inside of the async folder function returned (for bitcask it opens a new bitcask store in read-only mode), and this could be done outside. This could be moved outside of the async part but, unlike with leveldb and leveled snapshots this is a relatively expensive operation - so this would block the main bitcask process in an unhealthy way. So finding a simple way of snapshotting prior to the fold and outside of the async process would require more work in Bitcask.
+
+ - riak_core supports vnode_worker_pools (currently only one) and riak_kv sets up a pool for folds. The potential may also exist to have a node_worker_pool on each node. It may then be possible to divert snapped async work to this pool (i.e. where the response is {snap, Work, From, NewModState} as opposed to [async](https://github.com/basho/riak_core/blob/2.1.8/src/riak_core_vnode.erl#L358-#L362), the node_worker_pool would be asked to fulfill this work). The second pool could have a more constrained number of concurrent workers, perhaps just one. Therefore no more than one vnode on the node would be active doing this sort of work at any one time, and when that work is finished the next vnode in the queue would pick up and commence its fold.
+
+* In Leveled a special fold currently supports the Tic-Tac tree generation for indexes, and one for objects. It may be better to support this through a offering a more open capability to pass different fold functions and accumulators into index folds. This could be re-used for "reporting indexes", where we want to count terms of different types rather than return all those terms via an accumulating list e.g. an index may have a bitmap style part, and the function will apply a wildcard mask to the bitmap and count the number of hits against each possible output.
+
+* The initial intention is to implement the hashtree query functions based around the coverage_fsm behaviour, but with the option to stipulate externally the offset. So to test for differences between clusters, the user could concurrently query the two clusters for the same offset (or a random offset), whereas to find entropy within a cluster two concurrently run queries could be compared for different offsets.
+
+* A surprising feature of read repair is that it will read repair to fallback nodes, not just primary nodes. This means that in read-intensive workloads, write activity may dramatically increase during node failure (as a large proportion of reads will become write events) - increasing the chance of servers falling domino style. However, in some circumstances the extra duplication can also [increase the chance of data loss](https://github.com/russelldb/russelldb.github.io/blob/master/3.2.kv679-solution.md)! This also increases greatly the volume of unnecessary data to be handed-off when the primary returns. Without active anti-entropy, and in the absence of other safety checks like `notfound_ok` being set to false, or `pr` being set to at least 1 - there will be scenarios where this feature may be helpful. As part of improving active anti-entropy, it may be wise to re-visit the tuning of anti-entropy features that existed prior to AAE, in particular should it be possible to configure read-repair to act on primary nodes only.
diff --git a/include/leveled.hrl b/include/leveled.hrl
index f6b0294..12b2d07 100644
--- a/include/leveled.hrl
+++ b/include/leveled.hrl
@@ -67,6 +67,33 @@
waste_retention_period :: integer(),
reload_strategy = [] :: list()}).
+-record(recent_aae, {filter :: whitelist|blacklist,
+ % the buckets list should either be a
+ % - whitelist - specific buckets are included, and
+ % entries are indexed by bucket name
+ % - blacklist - specific buckets are excluded, and
+ % all other entries are indexes using the special
+ % $all bucket
+
+ buckets :: list(),
+ % whitelist or blacklist of buckets to support recent
+ % AAE
+
+ limit_minutes :: integer(),
+ % how long to retain entries the temporary index for
+ % It will actually be retained for limit + unit minutes
+ % 60 minutes seems sensible
+
+ unit_minutes :: integer(),
+ % What the minimum unit size will be for a query
+ % e.g. the minimum time duration to be used in range
+ % queries of the aae index
+ % 5 minutes seems sensible
+
+ tree_size = small :: atom()
+ % Just defaulted to small for now
+ }).
+
-record(r_content, {
metadata,
value :: term()
diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl
index d12687c..2881a87 100644
--- a/src/leveled_bookie.erl
+++ b/src/leveled_bookie.erl
@@ -82,6 +82,7 @@
-define(CACHE_SIZE_JITTER, 25).
-define(JOURNAL_SIZE_JITTER, 20).
-define(LONG_RUNNING, 80000).
+-define(RECENT_AAE, false).
-record(ledger_cache, {mem :: ets:tab(),
loader = leveled_tree:empty(?CACHE_TYPE)
@@ -94,6 +95,7 @@
-record(state, {inker :: pid(),
penciller :: pid(),
cache_size :: integer(),
+ recent_aae :: false|#recent_aae{},
ledger_cache = #ledger_cache{},
is_snapshot :: boolean(),
slow_offer = false :: boolean(),
@@ -157,7 +159,7 @@ book_start(RootPath, LedgerCacheSize, JournalSize, SyncStrategy) ->
%%
%% TODO:
%% The reload_strategy is exposed as currently no firm decision has been made
-%% about how recovery should work. For instance if we were to trust evrything
+%% about how recovery should work. For instance if we were to trust everything
%% as permanent in the Ledger once it is persisted, then there would be no
%% need to retain a skinny history of key changes in the Journal after
%% compaction. If, as an alternative we assume the Ledger is never permanent,
@@ -308,8 +310,26 @@ book_head(Pid, Bucket, Key, Tag) ->
%% {keylist, Tag, {FoldKeysFun, Acc}} -> list all keys with tag
%% {keylist, Tag, Bucket, {FoldKeysFun, Acc}} -> list all keys within given
%% bucket
-%% {hashtree_query, Tag, JournalCheck} -> return keys and hashes for all
+%% {hashlist_query, Tag, JournalCheck} -> return keys and hashes for all
%% objects with a given tag
+%% {tictactree_idx,
+%% {Bucket, IdxField, StartValue, EndValue},
+%% TreeSize,
+%% PartitionFilter}
+%% -> compile a hashtree for the items on the index. A partition filter is
+%% required to avoid adding an index entry in this vnode as a fallback.
+%% There is no de-duplicate of results, duplicate reuslts corrupt the tree.
+%% {tictactree_obj,
+%% {Bucket, StartKey, EndKey, CheckPresence},
+%% TreeSize,
+%% PartitionFilter}
+%% -> compile a hashtree for all the objects in the range. A partition filter
+%% may be passed to restrict the query to a given partition on this vnode. The
+%% filter should bea function that takes (Bucket, Key) as inputs and outputs
+%% one of the atoms accumulate or pass. There is no de-duplicate of results,
+%% duplicate reuslts corrupt the tree.
+%% CheckPresence can be used if there is a need to do a deeper check to ensure
+%% that the object is in the Journal (or at least indexed within the Journal).
%% {foldobjects_bybucket, Tag, Bucket, FoldObjectsFun} -> fold over all objects
%% in a given bucket
%% {foldobjects_byindex,
@@ -365,15 +385,29 @@ init([Opts]) ->
undefined ->
% Start from file not snapshot
{InkerOpts, PencillerOpts} = set_options(Opts),
- {Inker, Penciller} = startup(InkerOpts, PencillerOpts),
+
CacheJitter = ?CACHE_SIZE div (100 div ?CACHE_SIZE_JITTER),
CacheSize = get_opt(cache_size, Opts, ?CACHE_SIZE)
+ erlang:phash2(self()) rem CacheJitter,
+ RecentAAE =
+ case get_opt(recent_aae, Opts, ?RECENT_AAE) of
+ false ->
+ false;
+ {FilterType, BucketList, LimitMinutes, UnitMinutes} ->
+ #recent_aae{filter = FilterType,
+ buckets = BucketList,
+ limit_minutes = LimitMinutes,
+ unit_minutes = UnitMinutes}
+ end,
+
+ {Inker, Penciller} = startup(InkerOpts, PencillerOpts, RecentAAE),
+
NewETS = ets:new(mem, [ordered_set]),
leveled_log:log("B0001", [Inker, Penciller]),
{ok, #state{inker=Inker,
penciller=Penciller,
cache_size=CacheSize,
+ recent_aae=RecentAAE,
ledger_cache=#ledger_cache{mem = NewETS},
is_snapshot=false}};
Bookie ->
@@ -400,7 +434,8 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) ->
SQN,
Object,
ObjSize,
- {IndexSpecs, TTL}),
+ {IndexSpecs, TTL},
+ State#state.recent_aae),
Cache0 = addto_ledgercache(Changes, State#state.ledger_cache),
T1 = timer:now_diff(os:timestamp(), SW) - T0,
PutTimes = leveled_log:put_timing(bookie, State#state.put_timing, T0, T1),
@@ -531,9 +566,35 @@ handle_call({return_folder, FolderType}, _From, State) ->
{reply,
bucketkey_query(State, Tag, Bucket, {FoldKeysFun, Acc}),
State};
- {hashtree_query, Tag, JournalCheck} ->
+ {hashlist_query, Tag, JournalCheck} ->
{reply,
- hashtree_query(State, Tag, JournalCheck),
+ hashlist_query(State, Tag, JournalCheck),
+ State};
+ {tictactree_obj,
+ {Tag, Bucket, StartKey, EndKey, CheckPresence},
+ TreeSize,
+ PartitionFilter} ->
+ {reply,
+ tictactree(State,
+ Tag,
+ Bucket,
+ {StartKey, EndKey},
+ CheckPresence,
+ TreeSize,
+ PartitionFilter),
+ State};
+ {tictactree_idx,
+ {Bucket, IdxField, StartValue, EndValue},
+ TreeSize,
+ PartitionFilter} ->
+ {reply,
+ tictactree(State,
+ ?IDX_TAG,
+ Bucket,
+ {IdxField, StartValue, EndValue},
+ false,
+ TreeSize,
+ PartitionFilter),
State};
{foldheads_allkeys, Tag, FoldHeadsFun} ->
{reply,
@@ -818,7 +879,7 @@ index_query(State,
{async, Folder}.
-hashtree_query(State, Tag, JournalCheck) ->
+hashlist_query(State, Tag, JournalCheck) ->
SnapType = case JournalCheck of
false ->
ledger;
@@ -848,6 +909,76 @@ hashtree_query(State, Tag, JournalCheck) ->
end,
{async, Folder}.
+tictactree(State, Tag, Bucket, Query, JournalCheck, TreeSize, Filter) ->
+ % Journal check can be used for object key folds to confirm that the
+ % object is still indexed within the journal
+ SnapType = case JournalCheck of
+ false ->
+ ledger;
+ check_presence ->
+ store
+ end,
+ {ok, LedgerSnapshot, JournalSnapshot} = snapshot_store(State,
+ SnapType,
+ no_lookup),
+ Tree = leveled_tictac:new_tree(temp, TreeSize),
+ Folder =
+ fun() ->
+ % The start key and end key will vary depending on whether the
+ % fold is to fold over an index or a key range
+ {StartKey, EndKey, HashFun} =
+ case Tag of
+ ?IDX_TAG ->
+ {IdxField, StartIdx, EndIdx} = Query,
+ HashIdxValFun =
+ fun(_Key, IdxValue) ->
+ erlang:phash2(IdxValue)
+ end,
+ {leveled_codec:to_ledgerkey(Bucket,
+ null,
+ ?IDX_TAG,
+ IdxField,
+ StartIdx),
+ leveled_codec:to_ledgerkey(Bucket,
+ null,
+ ?IDX_TAG,
+ IdxField,
+ EndIdx),
+ HashIdxValFun};
+ _ ->
+ {StartObjKey, EndObjKey} = Query,
+ PassHashFun = fun(_Key, Hash) -> Hash end,
+ {leveled_codec:to_ledgerkey(Bucket,
+ StartObjKey,
+ Tag),
+ leveled_codec:to_ledgerkey(Bucket,
+ EndObjKey,
+ Tag),
+ PassHashFun}
+ end,
+
+ AccFun = accumulate_tree(Filter,
+ JournalCheck,
+ JournalSnapshot,
+ HashFun),
+ Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
+ StartKey,
+ EndKey,
+ AccFun,
+ Tree),
+
+ % Close down snapshot when complete so as not to hold removed
+ % files open
+ ok = leveled_penciller:pcl_close(LedgerSnapshot),
+ case JournalCheck of
+ false ->
+ ok;
+ check_presence ->
+ leveled_inker:ink_close(JournalSnapshot)
+ end,
+ Acc
+ end,
+ {async, Folder}.
foldobjects_allkeys(State, Tag, FoldObjectsFun) ->
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
@@ -1037,14 +1168,14 @@ set_options(Opts) ->
max_inmemory_tablesize = PCLL0CacheSize,
levelzero_cointoss = true}}.
-startup(InkerOpts, PencillerOpts) ->
+startup(InkerOpts, PencillerOpts, RecentAAE) ->
{ok, Inker} = leveled_inker:ink_start(InkerOpts),
{ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts),
LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller),
leveled_log:log("B0005", [LedgerSQN]),
ok = leveled_inker:ink_loadpcl(Inker,
LedgerSQN + 1,
- fun load_fun/5,
+ get_loadfun(RecentAAE),
Penciller),
{Inker, Penciller}.
@@ -1088,27 +1219,51 @@ accumulate_size() ->
AccFun.
accumulate_hashes(JournalCheck, InkerClone) ->
+ AddKeyFun =
+ fun(B, K, H, Acc) ->
+ [{B, K, H}|Acc]
+ end,
+ get_hashaccumulator(JournalCheck,
+ InkerClone,
+ AddKeyFun).
+
+accumulate_tree(FilterFun, JournalCheck, InkerClone, HashFun) ->
+ AddKeyFun =
+ fun(B, K, H, Tree) ->
+ case FilterFun(B, K) of
+ accumulate ->
+ leveled_tictac:add_kv(Tree, K, H, HashFun);
+ pass ->
+ Tree
+ end
+ end,
+ get_hashaccumulator(JournalCheck,
+ InkerClone,
+ AddKeyFun).
+
+get_hashaccumulator(JournalCheck, InkerClone, AddKeyFun) ->
Now = leveled_codec:integer_now(),
- AccFun = fun(LK, V, KHList) ->
- case leveled_codec:is_active(LK, V, Now) of
- true ->
- {B, K, H} = leveled_codec:get_keyandhash(LK, V),
- Check = random:uniform() < ?CHECKJOURNAL_PROB,
- case {JournalCheck, Check} of
- {check_presence, true} ->
- case check_presence(LK, V, InkerClone) of
- true ->
- [{B, K, H}|KHList];
- false ->
- KHList
- end;
- _ ->
- [{B, K, H}|KHList]
+ AccFun =
+ fun(LK, V, Acc) ->
+ case leveled_codec:is_active(LK, V, Now) of
+ true ->
+ {B, K, H} = leveled_codec:get_keyandobjhash(LK, V),
+ Check = random:uniform() < ?CHECKJOURNAL_PROB,
+ case {JournalCheck, Check} of
+ {check_presence, true} ->
+ case check_presence(LK, V, InkerClone) of
+ true ->
+ AddKeyFun(B, K, H, Acc);
+ false ->
+ Acc
end;
- false ->
- KHList
- end
- end,
+ _ ->
+ AddKeyFun(B, K, H, Acc)
+ end;
+ false ->
+ Acc
+ end
+ end,
AccFun.
@@ -1179,8 +1334,6 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
AccFun.
-
-
check_presence(Key, Value, InkerClone) ->
{LedgerKey, SQN} = leveled_codec:strip_to_keyseqonly({Key, Value}),
case leveled_inker:ink_keycheck(InkerClone, LedgerKey, SQN) of
@@ -1245,26 +1398,22 @@ accumulate_index(TermRe, AddFun, FoldKeysFun) ->
preparefor_ledgercache(?INKT_KEYD,
- LedgerKey, SQN, _Obj, _Size, {IndexSpecs, TTL}) ->
+ LedgerKey, SQN, _Obj, _Size, {IdxSpecs, TTL},
+ _AAE) ->
{Bucket, Key} = leveled_codec:from_ledgerkey(LedgerKey),
- KeyChanges = leveled_codec:convert_indexspecs(IndexSpecs,
- Bucket,
- Key,
- SQN,
- TTL),
+ KeyChanges =
+ leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL),
{no_lookup, SQN, KeyChanges};
-preparefor_ledgercache(_Type, LedgerKey, SQN, Obj, Size, {IndexSpecs, TTL}) ->
- {Bucket, Key, ObjKeyChange, H} = leveled_codec:generate_ledgerkv(LedgerKey,
- SQN,
- Obj,
- Size,
- TTL),
- KeyChanges = [ObjKeyChange] ++ leveled_codec:convert_indexspecs(IndexSpecs,
- Bucket,
- Key,
- SQN,
- TTL),
- {H, SQN, KeyChanges}.
+preparefor_ledgercache(_InkTag,
+ LedgerKey, SQN, Obj, Size, {IdxSpecs, TTL},
+ AAE) ->
+ {Bucket, Key, MetaValue, {KeyH, ObjH}, LastMods} =
+ leveled_codec:generate_ledgerkv(LedgerKey, SQN, Obj, Size, TTL),
+ KeyChanges =
+ [{LedgerKey, MetaValue}] ++
+ leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL) ++
+ leveled_codec:aae_indexspecs(AAE, Bucket, Key, SQN, ObjH, LastMods),
+ {KeyH, SQN, KeyChanges}.
addto_ledgercache({H, SQN, KeyChanges}, Cache) ->
@@ -1322,35 +1471,40 @@ maybe_withjitter(CacheSize, MaxCacheSize) ->
end.
-
-load_fun(KeyInJournal, ValueInJournal, _Position, Acc0, ExtractFun) ->
- {MinSQN, MaxSQN, OutputTree} = Acc0,
- {SQN, Type, PK} = KeyInJournal,
- % VBin may already be a term
- {VBin, VSize} = ExtractFun(ValueInJournal),
- {Obj, IndexSpecs} = leveled_codec:split_inkvalue(VBin),
- case SQN of
- SQN when SQN < MinSQN ->
- {loop, Acc0};
- SQN when SQN < MaxSQN ->
- Changes = preparefor_ledgercache(Type, PK, SQN,
- Obj, VSize, IndexSpecs),
- {loop,
- {MinSQN,
- MaxSQN,
- addto_ledgercache(Changes, OutputTree, loader)}};
- MaxSQN ->
- leveled_log:log("B0006", [SQN]),
- Changes = preparefor_ledgercache(Type, PK, SQN,
- Obj, VSize, IndexSpecs),
- {stop,
- {MinSQN,
- MaxSQN,
- addto_ledgercache(Changes, OutputTree, loader)}};
- SQN when SQN > MaxSQN ->
- leveled_log:log("B0007", [MaxSQN, SQN]),
- {stop, Acc0}
- end.
+get_loadfun(RecentAAE) ->
+ PrepareFun =
+ fun(Tag, PK, SQN, Obj, VS, IdxSpecs) ->
+ preparefor_ledgercache(Tag, PK, SQN, Obj, VS, IdxSpecs, RecentAAE)
+ end,
+ LoadFun =
+ fun(KeyInJournal, ValueInJournal, _Pos, Acc0, ExtractFun) ->
+ {MinSQN, MaxSQN, OutputTree} = Acc0,
+ {SQN, InkTag, PK} = KeyInJournal,
+ % VBin may already be a term
+ {VBin, VSize} = ExtractFun(ValueInJournal),
+ {Obj, IdxSpecs} = leveled_codec:split_inkvalue(VBin),
+ case SQN of
+ SQN when SQN < MinSQN ->
+ {loop, Acc0};
+ SQN when SQN < MaxSQN ->
+ Chngs = PrepareFun(InkTag, PK, SQN, Obj, VSize, IdxSpecs),
+ {loop,
+ {MinSQN,
+ MaxSQN,
+ addto_ledgercache(Chngs, OutputTree, loader)}};
+ MaxSQN ->
+ leveled_log:log("B0006", [SQN]),
+ Chngs = PrepareFun(InkTag, PK, SQN, Obj, VSize, IdxSpecs),
+ {stop,
+ {MinSQN,
+ MaxSQN,
+ addto_ledgercache(Chngs, OutputTree, loader)}};
+ SQN when SQN > MaxSQN ->
+ leveled_log:log("B0007", [MaxSQN, SQN]),
+ {stop, Acc0}
+ end
+ end,
+ LoadFun.
get_opt(Key, Opts) ->
@@ -1484,7 +1638,7 @@ ttl_test() ->
ok = book_close(Bookie2),
reset_filestructure().
-hashtree_query_test() ->
+hashlist_query_test() ->
RootPath = reset_filestructure(),
{ok, Bookie1} = book_start([{root_path, RootPath},
{max_journalsize, 1000000},
@@ -1507,22 +1661,22 @@ hashtree_query_test() ->
ObjL2),
% Scan the store for the Bucket, Keys and Hashes
{async, HTFolder} = book_returnfolder(Bookie1,
- {hashtree_query,
+ {hashlist_query,
?STD_TAG,
false}),
KeyHashList = HTFolder(),
- lists:foreach(fun({B, _K, H}) ->
- ?assertMatch("Bucket", B),
- ?assertMatch(true, is_integer(H))
- end,
- KeyHashList),
+ lists:foreach(fun({B, _K, H}) ->
+ ?assertMatch("Bucket", B),
+ ?assertMatch(true, is_integer(H))
+ end,
+ KeyHashList),
?assertMatch(1200, length(KeyHashList)),
ok = book_close(Bookie1),
{ok, Bookie2} = book_start([{root_path, RootPath},
{max_journalsize, 200000},
{cache_size, 500}]),
{async, HTFolder2} = book_returnfolder(Bookie2,
- {hashtree_query,
+ {hashlist_query,
?STD_TAG,
false}),
L0 = length(KeyHashList),
@@ -1532,7 +1686,7 @@ hashtree_query_test() ->
ok = book_close(Bookie2),
reset_filestructure().
-hashtree_query_withjournalcheck_test() ->
+hashlist_query_withjournalcheck_test() ->
RootPath = reset_filestructure(),
{ok, Bookie1} = book_start([{root_path, RootPath},
{max_journalsize, 1000000},
@@ -1546,12 +1700,12 @@ hashtree_query_withjournalcheck_test() ->
Future) end,
ObjL1),
{async, HTFolder1} = book_returnfolder(Bookie1,
- {hashtree_query,
+ {hashlist_query,
?STD_TAG,
false}),
KeyHashList = HTFolder1(),
{async, HTFolder2} = book_returnfolder(Bookie1,
- {hashtree_query,
+ {hashlist_query,
?STD_TAG,
check_presence}),
?assertMatch(KeyHashList, HTFolder2()),
@@ -1572,7 +1726,7 @@ foldobjects_vs_hashtree_test() ->
Future) end,
ObjL1),
{async, HTFolder1} = book_returnfolder(Bookie1,
- {hashtree_query,
+ {hashlist_query,
?STD_TAG,
false}),
KeyHashList1 = lists:usort(HTFolder1()),
diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl
index 1d2bc13..0d49997 100644
--- a/src/leveled_codec.erl
+++ b/src/leveled_codec.erl
@@ -58,8 +58,9 @@
build_metadata_object/2,
generate_ledgerkv/5,
get_size/2,
- get_keyandhash/2,
- convert_indexspecs/5,
+ get_keyandobjhash/2,
+ idx_indexspecs/5,
+ aae_indexspecs/6,
generate_uuid/0,
integer_now/0,
riak_extract_metadata/2,
@@ -68,22 +69,20 @@
-define(V1_VERS, 1).
-define(MAGIC, 53). % riak_kv -> riak_object
+-define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w").
+-define(NRT_IDX, "$aae.").
+-define(ALL_BUCKETS, <<"$all">>).
+-type recent_aae() :: #recent_aae{}.
+
+-spec magic_hash(any()) -> integer().
+%% @doc
%% Use DJ Bernstein magic hash function. Note, this is more expensive than
%% phash2 but provides a much more balanced result.
%%
%% Hash function contains mysterious constants, some explanation here as to
%% what they are -
%% http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function
-
-to_lookup(Key) ->
- case element(1, Key) of
- ?IDX_TAG ->
- no_lookup;
- _ ->
- lookup
- end.
-
magic_hash({?RIAK_TAG, Bucket, Key, _SubKey}) ->
magic_hash({Bucket, Key});
magic_hash({?STD_TAG, Bucket, Key, _SubKey}) ->
@@ -100,7 +99,21 @@ hash1(H, <>) ->
H2 = H1 bxor B,
hash1(H2, Rest).
+%% @doc
+%% Should it be possible to lookup a key in the merge tree. This is not true
+%% For keys that should only be read through range queries. Direct lookup
+%% keys will have presence in bloom filters and other lookup accelerators.
+to_lookup(Key) ->
+ case element(1, Key) of
+ ?IDX_TAG ->
+ no_lookup;
+ _ ->
+ lookup
+ end.
+-spec generate_uuid() -> list().
+%% @doc
+%% Generate a new globally unique ID as a string.
%% Credit to
%% https://github.com/afiskon/erlang-uuid-v4/blob/master/src/uuid.erl
generate_uuid() ->
@@ -168,9 +181,10 @@ is_active(Key, Value, Now) ->
false
end.
-from_ledgerkey({Tag, Bucket, {_IdxField, IdxValue}, Key})
- when Tag == ?IDX_TAG ->
- {Bucket, Key, IdxValue};
+from_ledgerkey({?IDX_TAG, ?ALL_BUCKETS, {_IdxFld, IdxVal}, {Bucket, Key}}) ->
+ {Bucket, Key, IdxVal};
+from_ledgerkey({?IDX_TAG, Bucket, {_IdxFld, IdxVal}, Key}) ->
+ {Bucket, Key, IdxVal};
from_ledgerkey({_Tag, Bucket, Key, null}) ->
{Bucket, Key}.
@@ -363,21 +377,151 @@ endkey_passed({EK1, EK2, EK3, null}, {CK1, CK2, CK3, _}) ->
endkey_passed(EndKey, CheckingKey) ->
EndKey < CheckingKey.
-convert_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) ->
- lists:map(fun({IndexOp, IdxField, IdxValue}) ->
- Status = case IndexOp of
- add ->
- {active, TTL};
- remove ->
- %% TODO: timestamps for delayed reaping
- tomb
- end,
- {to_ledgerkey(Bucket, Key, ?IDX_TAG,
- IdxField, IdxValue),
- {SQN, Status, no_lookup, null}}
- end,
- IndexSpecs).
+idx_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) ->
+ lists:map(
+ fun({IdxOp, IdxFld, IdxTrm}) ->
+ gen_indexspec(Bucket, Key, IdxOp, IdxFld, IdxTrm, SQN, TTL)
+ end,
+ IndexSpecs
+ ).
+gen_indexspec(Bucket, Key, IdxOp, IdxField, IdxTerm, SQN, TTL) ->
+ Status =
+ case IdxOp of
+ add ->
+ {active, TTL};
+ remove ->
+ %% TODO: timestamps for delayed reaping
+ tomb
+ end,
+ case Bucket of
+ {all, RealBucket} ->
+ {to_ledgerkey(?ALL_BUCKETS,
+ {RealBucket, Key},
+ ?IDX_TAG,
+ IdxField,
+ IdxTerm),
+ {SQN, Status, no_lookup, null}};
+ _ ->
+ {to_ledgerkey(Bucket,
+ Key,
+ ?IDX_TAG,
+ IdxField,
+ IdxTerm),
+ {SQN, Status, no_lookup, null}}
+ end.
+
+-spec aae_indexspecs(false|recent_aae(),
+ any(), any(),
+ integer(), integer(),
+ list())
+ -> list().
+%% @doc
+%% Generate an additional index term representing the change, if the last
+%% modified date for the change is within the definition of recency.
+%%
+%% The objetc may have multiple last modified dates (siblings), and in this
+%% case index entries for all dates within the range are added.
+%%
+%% The index should entry auto-expire in the future (when it is no longer
+%% relevant to assessing recent changes)
+aae_indexspecs(false, _Bucket, _Key, _SQN, _H, _LastMods) ->
+ [];
+aae_indexspecs(_AAE, _Bucket, _Key, _SQN, _H, []) ->
+ [];
+aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods) ->
+ InList = lists:member(Bucket, AAE#recent_aae.buckets),
+ Bucket0 =
+ case AAE#recent_aae.filter of
+ blacklist ->
+ case InList of
+ true ->
+ false;
+ false ->
+ {all, Bucket}
+ end;
+ whitelist ->
+ case InList of
+ true ->
+ Bucket;
+ false ->
+ false
+ end
+ end,
+ case Bucket0 of
+ false ->
+ [];
+ Bucket0 ->
+ GenIdxFun =
+ fun(LMD0, Acc) ->
+ Dates = parse_date(LMD0,
+ AAE#recent_aae.unit_minutes,
+ AAE#recent_aae.limit_minutes,
+ integer_now()),
+ case Dates of
+ no_index ->
+ Acc;
+ {LMD1, TTL} ->
+ TreeSize = AAE#recent_aae.tree_size,
+ SegID =
+ leveled_tictac:get_segment(Key, TreeSize),
+ IdxFldStr = ?NRT_IDX ++ LMD1 ++ "_bin",
+ IdxTrmStr =
+ string:right(integer_to_list(SegID), 8, $0) ++
+ "." ++
+ string:right(integer_to_list(H), 8, $0),
+ {IdxK, IdxV} =
+ gen_indexspec(Bucket0, Key,
+ add,
+ list_to_binary(IdxFldStr),
+ list_to_binary(IdxTrmStr),
+ SQN, TTL),
+ [{IdxK, IdxV}|Acc]
+ end
+ end,
+ lists:foldl(GenIdxFun, [], LastMods)
+ end.
+
+-spec parse_date(tuple(), integer(), integer(), integer()) ->
+ no_index|{binary(), integer()}.
+%% @doc
+%% Parse the lat modified date and the AAE date configuration to return a
+%% binary to be used as the last modified date part of the index, and an
+%% integer to be used as the TTL of the index entry.
+%% Return no_index if the change is not recent.
+parse_date(LMD, UnitMins, LimitMins, Now) ->
+ LMDsecs = integer_time(LMD),
+ Recent = (LMDsecs + LimitMins * 60) > Now,
+ case Recent of
+ false ->
+ no_index;
+ true ->
+ {{Y, M, D}, {Hour, Minute, _Second}} =
+ calendar:now_to_datetime(LMD),
+ RoundMins =
+ UnitMins * (Minute div UnitMins),
+ StrTime =
+ lists:flatten(io_lib:format(?LMD_FORMAT,
+ [Y, M, D, Hour, RoundMins])),
+ TTL = min(Now, LMDsecs) + (LimitMins + UnitMins) * 60,
+ {StrTime, TTL}
+ end.
+
+-spec generate_ledgerkv(
+ tuple(), integer(), any(), integer(), tuple()|infinity) ->
+ {any(), any(), any(), {integer()|no_lookup, integer()}, list()}.
+%% @doc
+%% Function to extract from an object the information necessary to populate
+%% the Penciller's ledger.
+%% Outputs -
+%% Bucket - original Bucket extracted from the PrimaryKey
+%% Key - original Key extracted from the PrimaryKey
+%% Value - the value to be used in the Ledger (essentially the extracted
+%% metadata)
+%% {Hash, ObjHash} - A magic hash of the key to accelerate lookups, and a hash
+%% of the value to be used for equality checking between objects
+%% LastMods - the last modified dates for the object (may be multiple due to
+%% siblings)
generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) ->
{Tag, Bucket, Key, _} = PrimaryKey,
Status = case Obj of
@@ -387,11 +531,13 @@ generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) ->
{active, TS}
end,
Hash = magic_hash(PrimaryKey),
+ {MD, LastMods} = extract_metadata(Obj, Size, Tag),
+ ObjHash = get_objhash(Tag, MD),
Value = {SQN,
Status,
Hash,
- extract_metadata(Obj, Size, Tag)},
- {Bucket, Key, {PrimaryKey, Value}, Hash}.
+ MD},
+ {Bucket, Key, Value, {Hash, ObjHash}, LastMods}.
integer_now() ->
@@ -404,7 +550,7 @@ integer_time(TS) ->
extract_metadata(Obj, Size, ?RIAK_TAG) ->
riak_extract_metadata(Obj, Size);
extract_metadata(Obj, Size, ?STD_TAG) ->
- {hash(Obj), Size}.
+ {{hash(Obj), Size}, []}.
get_size(PK, Value) ->
{Tag, _Bucket, _Key, _} = PK,
@@ -417,20 +563,33 @@ get_size(PK, Value) ->
{_Hash, Size} = MD,
Size
end.
-
-get_keyandhash(LK, Value) ->
+
+-spec get_keyandobjhash(tuple(), tuple()) -> tuple().
+%% @doc
+%% Return a tucple of {Bucket, Key, Hash} where hash is a hash of the object
+%% not the key (for example with Riak tagged objects this will be a hash of
+%% the sorted vclock)
+get_keyandobjhash(LK, Value) ->
{Tag, Bucket, Key, _} = LK,
{_, _, _, MD} = Value,
case Tag of
- ?RIAK_TAG ->
- {_RMD, _VC, Hash, _Size} = MD,
- {Bucket, Key, Hash};
- ?STD_TAG ->
- {Hash, _Size} = MD,
- {Bucket, Key, Hash}
+ ?IDX_TAG ->
+ from_ledgerkey(LK); % returns {Bucket, Key, IdxValue}
+ _ ->
+ {Bucket, Key, get_objhash(Tag, MD)}
end.
-
+get_objhash(Tag, ObjMetaData) ->
+ case Tag of
+ ?RIAK_TAG ->
+ {_RMD, _VC, Hash, _Size} = ObjMetaData,
+ Hash;
+ ?STD_TAG ->
+ {Hash, _Size} = ObjMetaData,
+ Hash
+ end.
+
+
build_metadata_object(PrimaryKey, MD) ->
{Tag, _Bucket, _Key, null} = PrimaryKey,
case Tag of
@@ -443,10 +602,14 @@ build_metadata_object(PrimaryKey, MD) ->
riak_extract_metadata(delete, Size) ->
- {delete, null, null, Size};
+ {{delete, null, null, Size}, []};
riak_extract_metadata(ObjBin, Size) ->
- {Vclock, SibBin} = riak_metadata_from_binary(ObjBin),
- {SibBin, Vclock, erlang:phash2(ObjBin), Size}.
+ {VclockBin, SibBin, LastMods} = riak_metadata_from_binary(ObjBin),
+ {{SibBin,
+ VclockBin,
+ erlang:phash2(lists:sort(binary_to_term(VclockBin))),
+ Size},
+ LastMods}.
%% <>.
@@ -461,28 +624,41 @@ riak_metadata_from_binary(V1Binary) ->
<> = V1Binary,
<> = Rest,
- SibMetaBin =
+ {SibMetaBin, LastMods} =
case SibCount of
SC when is_integer(SC) ->
get_metadata_from_siblings(SibsBin,
SibCount,
- <>)
+ <>,
+ [])
end,
- {VclockBin, SibMetaBin}.
+ {VclockBin, SibMetaBin, LastMods}.
-get_metadata_from_siblings(<<>>, 0, SibMetaBin) ->
- SibMetaBin;
+get_metadata_from_siblings(<<>>, 0, SibMetaBin, LastMods) ->
+ {SibMetaBin, LastMods};
get_metadata_from_siblings(<>,
SibCount,
- SibMetaBin) ->
+ SibMetaBin,
+ LastMods) ->
<<_ValBin:ValLen/binary, MetaLen:32/integer, Rest1/binary>> = Rest0,
<> = Rest1,
+ LastMod =
+ case MetaBin of
+ <> ->
+ {MegaSec, Sec, MicroSec};
+ _ ->
+ {0, 0, 0}
+ end,
get_metadata_from_siblings(Rest2,
SibCount - 1,
<>).
+ MetaBin:MetaLen/binary>>,
+ [LastMod|LastMods]).
@@ -498,7 +674,7 @@ indexspecs_test() ->
IndexSpecs = [{add, "t1_int", 456},
{add, "t1_bin", "adbc123"},
{remove, "t1_bin", "abdc456"}],
- Changes = convert_indexspecs(IndexSpecs, "Bucket", "Key2", 1, infinity),
+ Changes = idx_indexspecs(IndexSpecs, "Bucket", "Key2", 1, infinity),
?assertMatch({{i, "Bucket", {"t1_int", 456}, "Key2"},
{1, {active, infinity}, no_lookup, null}},
lists:nth(1, Changes)),
@@ -608,5 +784,79 @@ magichashperf_test() ->
{TimeMH2, _HL1} = timer:tc(lists, map, [fun(K) -> magic_hash(K) end, KL]),
io:format(user, "1000 keys magic hashed in ~w microseconds~n", [TimeMH2]).
+parsedate_test() ->
+ {MeS, S, MiS} = os:timestamp(),
+ timer:sleep(100),
+ Now = integer_now(),
+ UnitMins = 5,
+ LimitMins = 60,
+ PD = parse_date({MeS, S, MiS}, UnitMins, LimitMins, Now),
+ io:format("Parsed Date ~w~n", [PD]),
+ ?assertMatch(true, is_tuple(PD)),
+ check_pd(PD, UnitMins),
+ CheckFun =
+ fun(Offset) ->
+ ModDate = {MeS, S + Offset * 60, MiS},
+ check_pd(parse_date(ModDate, UnitMins, LimitMins, Now), UnitMins)
+ end,
+ lists:foreach(CheckFun, lists:seq(1, 60)).
+
+check_pd(PD, UnitMins) ->
+ {LMDstr, _TTL} = PD,
+ Minutes = list_to_integer(lists:nthtail(10, LMDstr)),
+ ?assertMatch(0, Minutes rem UnitMins).
+
+parseolddate_test() ->
+ LMD = os:timestamp(),
+ timer:sleep(100),
+ Now = integer_now() + 60 * 60,
+ UnitMins = 5,
+ LimitMins = 60,
+ PD = parse_date(LMD, UnitMins, LimitMins, Now),
+ io:format("Parsed Date ~w~n", [PD]),
+ ?assertMatch(no_index, PD).
+
+genaaeidx_test() ->
+ AAE = #recent_aae{filter=blacklist,
+ buckets=[],
+ limit_minutes=60,
+ unit_minutes=5},
+ Bucket = <<"Bucket1">>,
+ Key = <<"Key1">>,
+ SQN = 1,
+ H = erlang:phash2(null),
+ LastMods = [os:timestamp(), os:timestamp()],
+
+ AAESpecs = aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods),
+ ?assertMatch(2, length(AAESpecs)),
+
+ LastMods1 = [os:timestamp()],
+ AAESpecs1 = aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods1),
+ ?assertMatch(1, length(AAESpecs1)),
+ IdxB = element(2, element(1, lists:nth(1, AAESpecs1))),
+ io:format(user, "AAE IDXSpecs1 ~w~n", [AAESpecs1]),
+ ?assertMatch(<<"$all">>, IdxB),
+
+ LastMods0 = [],
+ AAESpecs0 = aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods0),
+ ?assertMatch(0, length(AAESpecs0)),
+
+ AAE0 = AAE#recent_aae{filter=whitelist,
+ buckets=[<<"Bucket0">>]},
+ AAESpecsB0 = aae_indexspecs(AAE0, Bucket, Key, SQN, H, LastMods1),
+ ?assertMatch(0, length(AAESpecsB0)),
+
+ AAESpecsB1 = aae_indexspecs(AAE0, <<"Bucket0">>, Key, SQN, H, LastMods1),
+ ?assertMatch(1, length(AAESpecsB1)),
+ [{{?IDX_TAG, <<"Bucket0">>, {Fld, Term}, <<"Key1">>},
+ {SQN, {active, TS}, no_lookup, null}}] = AAESpecsB1,
+ ?assertMatch(true, is_integer(TS)),
+ ?assertMatch(17, length(binary_to_list(Term))),
+ ?assertMatch("$aae.", lists:sublist(binary_to_list(Fld), 5)),
+
+ AAE1 = AAE#recent_aae{filter=blacklist,
+ buckets=[<<"Bucket0">>]},
+ AAESpecsB2 = aae_indexspecs(AAE1, <<"Bucket0">>, Key, SQN, H, LastMods1),
+ ?assertMatch(0, length(AAESpecsB2)).
-endif.
\ No newline at end of file
diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl
index bfb2bff..41e3732 100644
--- a/src/leveled_penciller.erl
+++ b/src/leveled_penciller.erl
@@ -1002,7 +1002,7 @@ plain_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
element(1, R).
fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
- PosList = leveled_pmem:check_index(Hash, L0Index),
+ PosList = leveled_pmem:check_index(Hash, L0Index),
L0Check = leveled_pmem:check_levelzero(Key, Hash, PosList, L0Cache),
case L0Check of
{false, not_found} ->
diff --git a/src/leveled_pmem.erl b/src/leveled_pmem.erl
index a36d4ac..b5ea64d 100644
--- a/src/leveled_pmem.erl
+++ b/src/leveled_pmem.erl
@@ -351,7 +351,11 @@ compare_method_test() ->
[timer:now_diff(os:timestamp(), SWb), Sz1]),
?assertMatch(Sz0, Sz1).
-with_index_test() ->
+with_index_test_() ->
+ % Otherwise this test may timeout when run with coverage enabled
+ {timeout, 60, fun with_index_test2/0}.
+
+with_index_test2() ->
IndexPrepareFun =
fun({K, _V}, Acc) ->
H = leveled_codec:magic_hash(K),
diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl
index 74f84db..6e2073a 100644
--- a/src/leveled_sst.erl
+++ b/src/leveled_sst.erl
@@ -1481,17 +1481,13 @@ generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) ->
BRand = random:uniform(BRange),
BNumber = string:right(integer_to_list(BucketLow + BRand), 4, $0),
KNumber = string:right(integer_to_list(random:uniform(1000)), 6, $0),
- LedgerKey = leveled_codec:to_ledgerkey("Bucket" ++ BNumber,
- "Key" ++ KNumber,
- o),
- {_B, _K, KV, _H} = leveled_codec:generate_ledgerkv(LedgerKey,
- Seqn,
- crypto:rand_bytes(64),
- 64,
- infinity),
+ LK = leveled_codec:to_ledgerkey("Bucket" ++ BNumber, "Key" ++ KNumber, o),
+ Chunk = crypto:rand_bytes(64),
+ {_B, _K, MV, _H, _LMs} =
+ leveled_codec:generate_ledgerkv(LK, Seqn, Chunk, 64, infinity),
generate_randomkeys(Seqn + 1,
Count - 1,
- [KV|Acc],
+ [{LK, MV}|Acc],
BucketLow,
BRange).
@@ -1507,11 +1503,11 @@ generate_indexkeys(Count, IndexList) ->
generate_indexkey(Term, Count) ->
IndexSpecs = [{add, "t1_int", Term}],
- leveled_codec:convert_indexspecs(IndexSpecs,
- "Bucket",
- "Key" ++ integer_to_list(Count),
- Count,
- infinity).
+ leveled_codec:idx_indexspecs(IndexSpecs,
+ "Bucket",
+ "Key" ++ integer_to_list(Count),
+ Count,
+ infinity).
form_slot_test() ->
% If a skip key happens, mustn't switch to loookup by accident as could be
diff --git a/src/leveled_tictac.erl b/src/leveled_tictac.erl
new file mode 100644
index 0000000..e7ced1e
--- /dev/null
+++ b/src/leveled_tictac.erl
@@ -0,0 +1,397 @@
+%% -------- TIC-TAC ACTOR ---------
+%%
+%% The TicTac actor is responsible for tracking the state of the store and
+%% signalling that state to other trusted actors
+%%
+%% https://en.wikipedia.org/wiki/Tic-tac
+%%
+%% This is achieved through the exchange of merkle trees, but *not* trees that
+%% are secure to interference - there is no attempt to protect the tree from
+%% byzantine faults or tampering. The tree is only suited for use between
+%% trusted actors across secure channels.
+%%
+%% In dropping the cryptographic security requirement, a simpler tree is
+%% possible, and also one that allows for trees of a partitioned database to
+%% be quickly merged to represent a global view of state for the database
+%% across the partition boundaries.
+%%
+%% -------- PERSPECTIVES OF STATE ---------
+%%
+%% The insecure Merkle trees (Tic-Tac Trees) are intended to be used in two
+%% ways:
+%% - To support the building of a merkle tree across a coverage plan to
+%% represent global state across many stores (or vnodes) i.e. scanning over
+%% the real data by bucket, by key range or by index.
+%% - To track changes with "recent" modification dates.
+%%
+%% -------- TIC-TAC TREES ---------
+%%
+%% The Tic-Tac tree takes is split into 256 * 4096 different segments. Every
+%% key is hashed to map it to one of those segment leaves using the
+%% elrang:phash2 function.
+%%
+%% External to the leveled_tictac module, the value should also have been
+%% hashed to a 4-byte integer (presumably based on a tag-specific hash
+%% function). The combination of the Object Key and the Hash is then
+%% hashed together to get a segment-change hash.
+%%
+%% To change a segment-leaf hash, the segment-leaf hash is XORd with the
+%% segment-change hash associated with the changing key. This assumes that
+%% only one version of the key is ever added to the segment-leaf hash if the
+%% tree is to represent the state of store (or partition of the store. If
+%% not, the segment-leaf hash can only represent a history of changes under
+%% that leaf, not the current state (unless the previous segment-change hash
+%% for the key is removed by XORing it once more from the segment-leaf hash
+%% that already contains it).
+%%
+%% A Level 1 hash is then created by XORing the 4096 Level 2 segment-hashes
+%% in the level below it (or XORing both the previous version and the new
+%% version of the segment-leaf hash from the previous level 1 hash).
+%%
+
+
+-module(leveled_tictac).
+
+-include("include/leveled.hrl").
+
+-export([
+ new_tree/1,
+ new_tree/2,
+ add_kv/4,
+ find_dirtyleaves/2,
+ find_dirtysegments/2,
+ fetch_root/1,
+ fetch_leaves/2,
+ merge_trees/2,
+ get_segment/2,
+ tictac_hash/2
+ ]).
+
+
+-include_lib("eunit/include/eunit.hrl").
+
+-define(HASH_SIZE, 4).
+-define(SMALL, {8, 256, 256 * 256}).
+-define(MEDIUM, {9, 512, 512 * 512}).
+-define(LARGE, {10, 1024, 1024 * 1024}).
+-define(XLARGE, {11, 2048, 2048 * 2048}).
+
+-record(tictactree, {treeID :: any(),
+ size :: small|medium|large|xlarge,
+ width :: integer(),
+ bitwidth :: integer(),
+ segment_count :: integer(),
+ level1 :: binary(),
+ level2 :: any() % an array - but OTP compatibility
+ }).
+
+-type tictactree() :: #tictactree{}.
+
+%%%============================================================================
+%%% External functions
+%%%============================================================================
+
+-spec new_tree(any()) -> tictactree().
+%% @doc
+%% Create a new tree, zeroed out.
+new_tree(TreeID) ->
+ new_tree(TreeID, small).
+
+new_tree(TreeID, Size) ->
+ {BitWidth, Width, SegmentCount} = get_size(Size),
+ Lv1Width = Width * ?HASH_SIZE * 8,
+ Lv1Init = <<0:Lv1Width/integer>>,
+ Lv2SegBinSize = Width * ?HASH_SIZE * 8,
+ Lv2SegBinInit = <<0:Lv2SegBinSize/integer>>,
+ Lv2Init = array:new([{size, Width}, {default, Lv2SegBinInit}]),
+ #tictactree{treeID = TreeID,
+ size = Size,
+ width = Width,
+ bitwidth = BitWidth,
+ segment_count = SegmentCount,
+ level1 = Lv1Init,
+ level2 = Lv2Init}.
+
+-spec add_kv(tictactree(), tuple(), tuple(), fun()) -> tictactree().
+%% @doc
+%% Add a Key and value to a tictactree using the HashFun to calculate the Hash
+%% based on that key and value
+add_kv(TicTacTree, Key, Value, HashFun) ->
+ HashV = HashFun(Key, Value),
+ SegChangeHash = tictac_hash(Key, HashV),
+ Segment = get_segment(Key, TicTacTree#tictactree.segment_count),
+
+ Level2Pos =
+ Segment band (TicTacTree#tictactree.width - 1),
+ Level1Pos =
+ (Segment bsr TicTacTree#tictactree.bitwidth)
+ band (TicTacTree#tictactree.width - 1),
+ Level2BytePos = ?HASH_SIZE * Level2Pos,
+ Level1BytePos = ?HASH_SIZE * Level1Pos,
+
+ Level2 = array:get(Level1Pos, TicTacTree#tictactree.level2),
+
+ HashIntLength = ?HASH_SIZE * 8,
+ <> = Level2,
+ <> = TicTacTree#tictactree.level1,
+
+ SegLeaf2Upd = SegLeaf2 bxor SegChangeHash,
+ SegLeaf1Upd = SegLeaf1 bxor SegChangeHash,
+
+ Level1Upd = <>,
+ Level2Upd = <>,
+ TicTacTree#tictactree{level1 = Level1Upd,
+ level2 = array:set(Level1Pos,
+ Level2Upd,
+ TicTacTree#tictactree.level2)}.
+
+-spec find_dirtyleaves(tictactree(), tictactree()) -> list(integer()).
+%% @doc
+%% Returns a list of segment IDs which hold differences between the state
+%% represented by the two trees.
+find_dirtyleaves(SrcTree, SnkTree) ->
+ _Size = SrcTree#tictactree.size,
+ _Size = SnkTree#tictactree.size,
+ Width = SrcTree#tictactree.width,
+
+ IdxList = find_dirtysegments(fetch_root(SrcTree), fetch_root(SnkTree)),
+ SrcLeaves = fetch_leaves(SrcTree, IdxList),
+ SnkLeaves = fetch_leaves(SnkTree, IdxList),
+
+ FoldFun =
+ fun(Idx, Acc) ->
+ {Idx, SrcLeaf} = lists:keyfind(Idx, 1, SrcLeaves),
+ {Idx, SnkLeaf} = lists:keyfind(Idx, 1, SnkLeaves),
+ L2IdxList = segmentcompare(SrcLeaf, SnkLeaf),
+ Acc ++ lists:map(fun(X) -> X + Idx * Width end, L2IdxList)
+ end,
+ lists:sort(lists:foldl(FoldFun, [], IdxList)).
+
+-spec find_dirtysegments(binary(), binary()) -> list(integer()).
+%% @doc
+%% Returns a list of branch IDs that contain differences between the tress.
+%% Pass in level 1 binaries to make the comparison.
+find_dirtysegments(SrcBin, SinkBin) ->
+ segmentcompare(SrcBin, SinkBin).
+
+-spec fetch_root(tictactree()) -> binary().
+%% @doc
+%% Return the level1 binary for a tree.
+fetch_root(TicTacTree) ->
+ TicTacTree#tictactree.level1.
+
+-spec fetch_leaves(tictactree(), list(integer())) -> list().
+%% @doc
+%% Return a keylist for the segment hashes for the leaves of the tree based on
+%% the list of branch IDs provided
+fetch_leaves(TicTacTree, BranchList) ->
+ MapFun =
+ fun(Idx) ->
+ {Idx, array:get(Idx, TicTacTree#tictactree.level2)}
+ end,
+ lists:map(MapFun, BranchList).
+
+-spec merge_trees(tictactree(), tictactree()) -> tictactree().
+%% Merge two trees providing a result that represents the combined state,
+%% assuming that the two trees were correctly partitioned pre-merge. If a key
+%% and value has been added to both trees, then the merge will not give the
+%% expected outcome.
+merge_trees(TreeA, TreeB) ->
+ Size = TreeA#tictactree.size,
+ Size = TreeB#tictactree.size,
+
+ MergedTree = new_tree(merge, Size),
+
+ L1A = fetch_root(TreeA),
+ L1B = fetch_root(TreeB),
+ NewLevel1 = merge_binaries(L1A, L1B),
+
+ MergeFun =
+ fun(SQN, MergeL2) ->
+ L2A = array:get(SQN, TreeA#tictactree.level2),
+ L2B = array:get(SQN, TreeB#tictactree.level2),
+ NewLevel2 = merge_binaries(L2A, L2B),
+ array:set(SQN, NewLevel2, MergeL2)
+ end,
+ NewLevel2 = lists:foldl(MergeFun,
+ MergedTree#tictactree.level2,
+ lists:seq(0, MergedTree#tictactree.width - 1)),
+
+ MergedTree#tictactree{level1 = NewLevel1, level2 = NewLevel2}.
+
+-spec get_segment(any(), integer()|small|medium|large|xlarge) -> integer().
+%% @doc
+%% Return the segment ID for a Key. Can pass the tree size or the actual
+%% segment count derived from the size
+get_segment(Key, SegmentCount) when is_integer(SegmentCount) ->
+ erlang:phash2(Key) band (SegmentCount - 1);
+get_segment(Key, TreeSize) ->
+ get_segment(Key, element(3, get_size(TreeSize))).
+
+
+-spec tictac_hash(tuple(), any()) -> integer().
+%% @doc
+%% Hash the key and term
+tictac_hash(Key, Term) ->
+ erlang:phash2({Key, Term}).
+
+%%%============================================================================
+%%% Internal functions
+%%%============================================================================
+
+get_size(Size) ->
+ case Size of
+ small ->
+ ?SMALL;
+ medium ->
+ ?MEDIUM;
+ large ->
+ ?LARGE;
+ xlarge ->
+ ?XLARGE
+ end.
+
+segmentcompare(SrcBin, SinkBin) when byte_size(SrcBin)==byte_size(SinkBin) ->
+ segmentcompare(SrcBin, SinkBin, [], 0).
+
+segmentcompare(<<>>, <<>>, Acc, _Counter) ->
+ Acc;
+segmentcompare(SrcBin, SnkBin, Acc, Counter) ->
+ <> = SrcBin,
+ <> = SnkBin,
+ case SrcHash of
+ SnkHash ->
+ segmentcompare(SrcTail, SnkTail, Acc, Counter + 1);
+ _ ->
+ segmentcompare(SrcTail, SnkTail, [Counter|Acc], Counter + 1)
+ end.
+
+checktree(TicTacTree) ->
+ checktree(TicTacTree#tictactree.level1, TicTacTree, 0).
+
+checktree(<<>>, TicTacTree, Counter) ->
+ true = TicTacTree#tictactree.width == Counter;
+checktree(Level1Bin, TicTacTree, Counter) ->
+ BitSize = ?HASH_SIZE * 8,
+ <> = Level1Bin,
+ L2Bin = array:get(Counter, TicTacTree#tictactree.level2),
+ true = TopHash == segmentsummarise(L2Bin, 0),
+ checktree(Tail, TicTacTree, Counter + 1).
+
+segmentsummarise(<<>>, L1Acc) ->
+ L1Acc;
+segmentsummarise(L2Bin, L1Acc) ->
+ BitSize = ?HASH_SIZE * 8,
+ <> = L2Bin,
+ segmentsummarise(Tail, L1Acc bxor TopHash).
+
+merge_binaries(BinA, BinB) ->
+ BitSize = bit_size(BinA),
+ BitSize = bit_size(BinB),
+ <> = BinA,
+ <> = BinB,
+ MergedInt = AInt bxor BInt,
+ <>.
+
+%%%============================================================================
+%%% Test
+%%%============================================================================
+
+-ifdef(TEST).
+
+
+simple_bysize_test() ->
+ simple_test_withsize(small),
+ simple_test_withsize(medium),
+ simple_test_withsize(large),
+ simple_test_withsize(xlarge).
+
+simple_test_withsize(Size) ->
+ HashFun = fun(_K, V) -> erlang:phash2(V) end,
+
+ Tree0 = new_tree(0, Size),
+ Tree1 = add_kv(Tree0, {o, "B1", "K1", null}, {caine, 1}, HashFun),
+ Tree2 = add_kv(Tree1, {o, "B1", "K2", null}, {caine, 2}, HashFun),
+ Tree3 = add_kv(Tree2, {o, "B1", "K3", null}, {caine, 3}, HashFun),
+ Tree3A = add_kv(Tree3, {o, "B1", "K3", null}, {caine, 4}, HashFun),
+ ?assertMatch(true, Tree0#tictactree.level1 == Tree0#tictactree.level1),
+ ?assertMatch(false, Tree0#tictactree.level1 == Tree1#tictactree.level1),
+ ?assertMatch(false, Tree1#tictactree.level1 == Tree2#tictactree.level1),
+ ?assertMatch(false, Tree2#tictactree.level1 == Tree3#tictactree.level1),
+ ?assertMatch(false, Tree3#tictactree.level1 == Tree3A#tictactree.level1),
+
+ Tree0X = new_tree(0, Size),
+ Tree1X = add_kv(Tree0X, {o, "B1", "K3", null}, {caine, 3}, HashFun),
+ Tree2X = add_kv(Tree1X, {o, "B1", "K1", null}, {caine, 1}, HashFun),
+ Tree3X = add_kv(Tree2X, {o, "B1", "K2", null}, {caine, 2}, HashFun),
+ Tree3XA = add_kv(Tree3X, {o, "B1", "K3", null}, {caine, 4}, HashFun),
+ ?assertMatch(false, Tree1#tictactree.level1 == Tree1X#tictactree.level1),
+ ?assertMatch(false, Tree2#tictactree.level1 == Tree2X#tictactree.level1),
+ ?assertMatch(true, Tree3#tictactree.level1 == Tree3X#tictactree.level1),
+ ?assertMatch(true, Tree3XA#tictactree.level1 == Tree3XA#tictactree.level1),
+
+ SC = Tree0#tictactree.segment_count,
+
+ DL0 = find_dirtyleaves(Tree1, Tree0),
+ ?assertMatch(true, lists:member(get_segment({o, "B1", "K1", null}, SC), DL0)),
+ DL1 = find_dirtyleaves(Tree3, Tree1),
+ ?assertMatch(true, lists:member(get_segment({o, "B1", "K2", null}, SC), DL1)),
+ ?assertMatch(true, lists:member(get_segment({o, "B1", "K3", null}, SC), DL1)),
+ ?assertMatch(false, lists:member(get_segment({o, "B1", "K1", null}, SC), DL1)).
+
+merge_bysize_small_test() ->
+ merge_test_withsize(small).
+
+merge_bysize_medium_test() ->
+ merge_test_withsize(medium).
+
+merge_bysize_large_test() ->
+ merge_test_withsize(large).
+
+merge_bysize_xlarge_test_() ->
+ {timeout, 60, fun merge_bysize_xlarge_test2/0}.
+
+merge_bysize_xlarge_test2() ->
+ merge_test_withsize(xlarge).
+
+merge_test_withsize(Size) ->
+ HashFun = fun(_K, V) -> erlang:phash2(V) end,
+
+ TreeX0 = new_tree(0, Size),
+ TreeX1 = add_kv(TreeX0, {o, "B1", "X1", null}, {caine, 1}, HashFun),
+ TreeX2 = add_kv(TreeX1, {o, "B1", "X2", null}, {caine, 2}, HashFun),
+ TreeX3 = add_kv(TreeX2, {o, "B1", "X3", null}, {caine, 3}, HashFun),
+ TreeX4 = add_kv(TreeX3, {o, "B1", "X3", null}, {caine, 4}, HashFun),
+
+ TreeY0 = new_tree(0, Size),
+ TreeY1 = add_kv(TreeY0, {o, "B1", "Y1", null}, {caine, 101}, HashFun),
+ TreeY2 = add_kv(TreeY1, {o, "B1", "Y2", null}, {caine, 102}, HashFun),
+ TreeY3 = add_kv(TreeY2, {o, "B1", "Y3", null}, {caine, 103}, HashFun),
+ TreeY4 = add_kv(TreeY3, {o, "B1", "Y3", null}, {caine, 104}, HashFun),
+
+ TreeZ1 = add_kv(TreeX4, {o, "B1", "Y1", null}, {caine, 101}, HashFun),
+ TreeZ2 = add_kv(TreeZ1, {o, "B1", "Y2", null}, {caine, 102}, HashFun),
+ TreeZ3 = add_kv(TreeZ2, {o, "B1", "Y3", null}, {caine, 103}, HashFun),
+ TreeZ4 = add_kv(TreeZ3, {o, "B1", "Y3", null}, {caine, 104}, HashFun),
+
+ TreeM0 = merge_trees(TreeX4, TreeY4),
+ checktree(TreeM0),
+ ?assertMatch(true, TreeM0#tictactree.level1 == TreeZ4#tictactree.level1),
+
+ TreeM1 = merge_trees(TreeX3, TreeY4),
+ checktree(TreeM1),
+ ?assertMatch(false, TreeM1#tictactree.level1 == TreeZ4#tictactree.level1).
+
+-endif.
+
+
+
+
diff --git a/src/leveled_tinybloom.erl b/src/leveled_tinybloom.erl
index 9d0ae32..15bd732 100644
--- a/src/leveled_tinybloom.erl
+++ b/src/leveled_tinybloom.erl
@@ -238,17 +238,13 @@ generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) ->
BRand = random:uniform(BRange),
BNumber = string:right(integer_to_list(BucketLow + BRand), 4, $0),
KNumber = string:right(integer_to_list(random:uniform(10000)), 6, $0),
- LedgerKey = leveled_codec:to_ledgerkey("Bucket" ++ BNumber,
- "Key" ++ KNumber,
- o),
- {_B, _K, KV, _H} = leveled_codec:generate_ledgerkv(LedgerKey,
- Seqn,
- crypto:rand_bytes(64),
- 64,
- infinity),
+ LK = leveled_codec:to_ledgerkey("Bucket" ++ BNumber, "Key" ++ KNumber, o),
+ Chunk = crypto:rand_bytes(64),
+ {_B, _K, MV, _H, _LMs} =
+ leveled_codec:generate_ledgerkv(LK, Seqn, Chunk, 64, infinity),
generate_randomkeys(Seqn + 1,
Count - 1,
- [KV|Acc],
+ [{LK, MV}|Acc],
BucketLow,
BRange).
diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl
index 2f3337d..ce8f4b0 100644
--- a/test/end_to_end/basic_SUITE.erl
+++ b/test/end_to_end/basic_SUITE.erl
@@ -123,11 +123,11 @@ journal_compaction(_Config) ->
ChkList1 = lists:sublist(lists:sort(ObjList1), 10000),
testutil:check_forlist(Bookie1, ChkList1),
testutil:check_forobject(Bookie1, TestObject),
- {B2, K2, V2, Spec2, MD} = {"Bucket1",
- "Key1",
- "Value1",
+ {B2, K2, V2, Spec2, MD} = {"Bucket2",
+ "Key2",
+ "Value2",
[],
- [{"MDK1", "MDV1"}]},
+ [{"MDK2", "MDV2"}]},
{TestObject2, TestSpec2} = testutil:generate_testobject(B2, K2,
V2, Spec2, MD),
ok = testutil:book_riakput(Bookie1, TestObject2, TestSpec2),
diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl
index ccca1ea..70d0b97 100644
--- a/test/end_to_end/recovery_SUITE.erl
+++ b/test/end_to_end/recovery_SUITE.erl
@@ -80,7 +80,7 @@ recovr_strategy(_Config) ->
Q = fun(RT) -> {index_query,
"Bucket6",
{fun testutil:foldkeysfun/3, []},
- {"idx1_bin", "#", "~"},
+ {"idx1_bin", "#", "|"},
{RT, undefined}}
end,
{async, TFolder} = leveled_bookie:book_returnfolder(Book1, Q(true)),
@@ -190,13 +190,13 @@ aae_bustedjournal(_Config) ->
true = GetCount < HeadCount,
{async, HashTreeF1} = leveled_bookie:book_returnfolder(Bookie2,
- {hashtree_query,
+ {hashlist_query,
?RIAK_TAG,
false}),
KeyHashList1 = HashTreeF1(),
20001 = length(KeyHashList1),
{async, HashTreeF2} = leveled_bookie:book_returnfolder(Bookie2,
- {hashtree_query,
+ {hashlist_query,
?RIAK_TAG,
check_presence}),
KeyHashList2 = HashTreeF2(),
@@ -205,8 +205,12 @@ aae_bustedjournal(_Config) ->
% Will need to remove the file or corrupt the hashtree to get presence to
% fail
- FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, erlang:phash2(V)}|Acc]
- end,
+ FoldObjectsFun =
+ fun(B, K, V, Acc) ->
+ VC = testutil:get_vclock(V),
+ H = erlang:phash2(lists:sort(VC)),
+ [{B, K, H}|Acc]
+ end,
SW = os:timestamp(),
{async, HashTreeF3} = leveled_bookie:book_returnfolder(Bookie2,
{foldobjects_allkeys,
@@ -264,7 +268,7 @@ aae_bustedjournal(_Config) ->
length(KeyHashList5)]),
{async, HashTreeF6} = leveled_bookie:book_returnfolder(Bookie4,
- {hashtree_query,
+ {hashlist_query,
?RIAK_TAG,
check_presence}),
KeyHashList6 = HashTreeF6(),
diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl
index 923b81e..2952d48 100644
--- a/test/end_to_end/testutil.erl
+++ b/test/end_to_end/testutil.erl
@@ -31,6 +31,7 @@
get_randomindexes_generator/1,
name_list/0,
load_objects/5,
+ load_objects/6,
put_indexed_objects/3,
put_altered_indexed_objects/3,
put_altered_indexed_objects/4,
@@ -42,6 +43,7 @@
find_journals/1,
wait_for_compaction/1,
foldkeysfun/3,
+ foldkeysfun_returnbucket/3,
sync_strategy/0]).
-define(RETURN_TERMS, {true, undefined}).
@@ -52,6 +54,7 @@
-define(MD_LASTMOD, <<"X-Riak-Last-Modified">>).
-define(MD_DELETED, <<"X-Riak-Deleted">>).
-define(EMPTY_VTAG_BIN, <<"e">>).
+-define(ROOT_PATH, "test").
%% =================================================
%% From riak_object
@@ -169,13 +172,17 @@ riakload(Bookie, ObjectList) ->
reset_filestructure() ->
- reset_filestructure(0).
+ reset_filestructure(0, ?ROOT_PATH).
-reset_filestructure(Wait) ->
- io:format("Waiting ~w ms to give a chance for all file closes " ++
+reset_filestructure(Wait) when is_integer(Wait) ->
+ reset_filestructure(Wait, ?ROOT_PATH);
+reset_filestructure(RootPath) when is_list(RootPath) ->
+ reset_filestructure(0, RootPath).
+
+reset_filestructure(Wait, RootPath) ->
+ io:format("Waiting ~w ms to give a chance for all file closes " ++
"to complete~n", [Wait]),
- timer:sleep(Wait),
- RootPath = "test",
+ timer:sleep(Wait),
filelib:ensure_dir(RootPath ++ "/journal/"),
filelib:ensure_dir(RootPath ++ "/ledger/"),
leveled_inker:clean_testdir(RootPath ++ "/journal"),
@@ -258,10 +265,8 @@ check_forobject(Bookie, TestObject) ->
{ok, HeadBinary} = book_riakhead(Bookie,
TestObject#r_object.bucket,
TestObject#r_object.key),
- {_SibMetaBin,
- Vclock,
- _Hash,
- size} = leveled_codec:riak_extract_metadata(HeadBinary, size),
+ {{_SibMetaBin, Vclock, _Hash, size}, _LMS}
+ = leveled_codec:riak_extract_metadata(HeadBinary, size),
true = binary_to_term(Vclock) == TestObject#r_object.vclock.
check_formissingobject(Bookie, Bucket, Key) ->
@@ -278,8 +283,12 @@ generate_testobject() ->
generate_testobject(B1, K1, V1, Spec1, MD).
generate_testobject(B, K, V, Spec, MD) ->
- Content = #r_content{metadata=dict:from_list(MD), value=V},
- {#r_object{bucket=B, key=K, contents=[Content], vclock=[{'a',1}]},
+ MD0 = [{?MD_LASTMOD, os:timestamp()}|MD],
+ Content = #r_content{metadata=dict:from_list(MD0), value=V},
+ {#r_object{bucket=B,
+ key=K,
+ contents=[Content],
+ vclock=generate_vclock()},
Spec}.
@@ -370,7 +379,8 @@ set_object(Bucket, Key, Value, IndexGen, Indexes2Remove) ->
{remove, IdxF, IdxV} end,
Indexes2Remove),
[{"MDK", "MDV" ++ Key},
- {"MDK2", "MDV" ++ Key}]},
+ {"MDK2", "MDV" ++ Key},
+ {?MD_LASTMOD, os:timestamp()}]},
{B1, K1, V1, Spec1, MD} = Obj,
Content = #r_content{metadata=dict:from_list(MD), value=V1},
{#r_object{bucket=B1,
@@ -420,6 +430,9 @@ get_vclock(ObjectBin) ->
binary_to_term(VclockBin).
load_objects(ChunkSize, GenList, Bookie, TestObject, Generator) ->
+ load_objects(ChunkSize, GenList, Bookie, TestObject, Generator, 1000).
+
+load_objects(ChunkSize, GenList, Bookie, TestObject, Generator, SubListL) ->
lists:map(fun(KN) ->
ObjListA = Generator(ChunkSize, KN),
StartWatchA = os:timestamp(),
@@ -433,7 +446,7 @@ load_objects(ChunkSize, GenList, Bookie, TestObject, Generator) ->
true ->
check_forobject(Bookie, TestObject)
end,
- lists:sublist(ObjListA, 1000) end,
+ lists:sublist(ObjListA, SubListL) end,
GenList).
@@ -472,6 +485,11 @@ get_randomdate() ->
foldkeysfun(_Bucket, Item, Acc) -> Acc ++ [Item].
+foldkeysfun_returnbucket(Bucket, {Term, Key}, Acc) ->
+ Acc ++ [{Term, {Bucket, Key}}];
+foldkeysfun_returnbucket(Bucket, Key, Acc) ->
+ Acc ++ [{Bucket, Key}].
+
check_indexed_objects(Book, B, KSpecL, V) ->
% Check all objects match, return what should be the results of an all
% index query
@@ -490,7 +508,7 @@ check_indexed_objects(Book, B, KSpecL, V) ->
{fun foldkeysfun/3, []},
{"idx1_bin",
"0",
- "~"},
+ "|"},
?RETURN_TERMS}),
SW = os:timestamp(),
{async, Fldr} = R,
diff --git a/test/end_to_end/tictac_SUITE.erl b/test/end_to_end/tictac_SUITE.erl
new file mode 100644
index 0000000..3c65327
--- /dev/null
+++ b/test/end_to_end/tictac_SUITE.erl
@@ -0,0 +1,1025 @@
+-module(tictac_SUITE).
+-include_lib("common_test/include/ct.hrl").
+-include("include/leveled.hrl").
+-export([all/0]).
+-export([
+ many_put_compare/1,
+ index_compare/1,
+ recent_aae_noaae/1,
+ recent_aae_allaae/1,
+ recent_aae_bucketaae/1,
+ recent_aae_expiry/1
+ ]).
+
+all() -> [
+ many_put_compare,
+ index_compare,
+ recent_aae_noaae,
+ recent_aae_allaae,
+ recent_aae_bucketaae,
+ recent_aae_expiry
+ ].
+
+-define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w").
+
+many_put_compare(_Config) ->
+ TreeSize = small,
+ SegmentCount = 256 * 256,
+ % Test requires multiple different databases, so want to mount them all
+ % on individual file paths
+ RootPathA = testutil:reset_filestructure("testA"),
+ RootPathB = testutil:reset_filestructure("testB"),
+ RootPathC = testutil:reset_filestructure("testC"),
+ RootPathD = testutil:reset_filestructure("testD"),
+
+ % Start the first database, load a test object, close it, start it again
+ StartOpts1 = [{root_path, RootPathA},
+ {max_pencillercachesize, 16000},
+ {sync_strategy, riak_sync}],
+ {ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
+ {B1, K1, V1, S1, MD} = {"Bucket",
+ "Key1.1.4567.4321",
+ "Value1",
+ [],
+ [{"MDK1", "MDV1"}]},
+ {TestObject, TestSpec} = testutil:generate_testobject(B1, K1, V1, S1, MD),
+ ok = testutil:book_riakput(Bookie1, TestObject, TestSpec),
+ testutil:check_forobject(Bookie1, TestObject),
+ ok = leveled_bookie:book_close(Bookie1),
+ StartOpts2 = [{root_path, RootPathA},
+ {max_journalsize, 500000000},
+ {max_pencillercachesize, 32000},
+ {sync_strategy, testutil:sync_strategy()}],
+ {ok, Bookie2} = leveled_bookie:book_start(StartOpts2),
+ testutil:check_forobject(Bookie2, TestObject),
+
+ % Generate 200K objects to be sued within the test, and load them into
+ % the first store (outputting the generated objects as a list of lists)
+ % to be used elsewhere
+
+ GenList = [2, 20002, 40002, 60002, 80002,
+ 100002, 120002, 140002, 160002, 180002],
+ CLs = testutil:load_objects(20000,
+ GenList,
+ Bookie2,
+ TestObject,
+ fun testutil:generate_smallobjects/2,
+ 20000),
+
+ % Start a new store, and load the same objects (except fot the original
+ % test object) into this store
+
+ StartOpts3 = [{root_path, RootPathB},
+ {max_journalsize, 200000000},
+ {max_pencillercachesize, 16000},
+ {sync_strategy, testutil:sync_strategy()}],
+ {ok, Bookie3} = leveled_bookie:book_start(StartOpts3),
+ lists:foreach(fun(ObjL) -> testutil:riakload(Bookie3, ObjL) end, CLs),
+
+ % Now run a tictac query against both stores to see th extent to which
+ % state between stores is consistent
+
+ TicTacQ = {tictactree_obj,
+ {o_rkv, "Bucket", null, null, false},
+ TreeSize,
+ fun(_B, _K) -> accumulate end},
+ {async, TreeAFolder} = leveled_bookie:book_returnfolder(Bookie2, TicTacQ),
+ {async, TreeBFolder} = leveled_bookie:book_returnfolder(Bookie3, TicTacQ),
+ SWA0 = os:timestamp(),
+ TreeA = TreeAFolder(),
+ io:format("Build tictac tree with 200K objects in ~w~n",
+ [timer:now_diff(os:timestamp(), SWA0)]),
+ SWB0 = os:timestamp(),
+ TreeB = TreeBFolder(),
+ io:format("Build tictac tree with 200K objects in ~w~n",
+ [timer:now_diff(os:timestamp(), SWB0)]),
+ SWC0 = os:timestamp(),
+ SegList0 = leveled_tictac:find_dirtyleaves(TreeA, TreeB),
+ io:format("Compare tictac trees with 200K objects in ~w~n",
+ [timer:now_diff(os:timestamp(), SWC0)]),
+ io:format("Tree comparison shows ~w different leaves~n",
+ [length(SegList0)]),
+ AltList = leveled_tictac:find_dirtyleaves(TreeA,
+ leveled_tictac:new_tree(0)),
+ io:format("Tree comparison shows ~w altered leaves~n",
+ [length(AltList)]),
+ true = length(SegList0) == 1,
+ % only the test object should be different
+ true = length(AltList) > 10000,
+ % check there are a significant number of differences from empty
+
+ FoldKeysFun =
+ fun(SegListToFind) ->
+ fun(_B, K, Acc) ->
+ Seg = leveled_tictac:get_segment(K, SegmentCount),
+ case lists:member(Seg, SegListToFind) of
+ true ->
+ [K|Acc];
+ false ->
+ Acc
+ end
+ end
+ end,
+ SegQuery = {keylist, o_rkv, "Bucket", {FoldKeysFun(SegList0), []}},
+ {async, SegKeyFinder} =
+ leveled_bookie:book_returnfolder(Bookie2, SegQuery),
+ SWSKL0 = os:timestamp(),
+ SegKeyList = SegKeyFinder(),
+ io:format("Finding ~w keys in ~w dirty segments in ~w~n",
+ [length(SegKeyList),
+ length(SegList0),
+ timer:now_diff(os:timestamp(), SWSKL0)]),
+
+ true = length(SegKeyList) >= 1,
+ true = length(SegKeyList) < 10,
+ true = lists:member("Key1.1.4567.4321", SegKeyList),
+
+ % Now remove the object which represents the difference between these
+ % stores and confirm that the tictac trees will now match
+
+ testutil:book_riakdelete(Bookie2, B1, K1, []),
+ {async, TreeAFolder0} = leveled_bookie:book_returnfolder(Bookie2, TicTacQ),
+ SWA1 = os:timestamp(),
+ TreeA0 = TreeAFolder0(),
+ io:format("Build tictac tree with 200K objects in ~w~n",
+ [timer:now_diff(os:timestamp(), SWA1)]),
+
+ SegList1 = leveled_tictac:find_dirtyleaves(TreeA0, TreeB),
+ io:format("Tree comparison following delete shows ~w different leaves~n",
+ [length(SegList1)]),
+ true = length(SegList1) == 0,
+ % Removed test object so tictac trees should match
+
+ ok = testutil:book_riakput(Bookie3, TestObject, TestSpec),
+ {async, TreeBFolder0} = leveled_bookie:book_returnfolder(Bookie3, TicTacQ),
+ SWB1 = os:timestamp(),
+ TreeB0 = TreeBFolder0(),
+ io:format("Build tictac tree with 200K objects in ~w~n",
+ [timer:now_diff(os:timestamp(), SWB1)]),
+ SegList2 = leveled_tictac:find_dirtyleaves(TreeA0, TreeB0),
+ true = SegList2 == SegList0,
+ % There is an identical difference now the difference is on Bookie3 not
+ % Bookie 2 (compared to it being in Bookie2 not Bookie3)
+
+ ok = leveled_bookie:book_close(Bookie3),
+
+ % Replace Bookie 3 with two stores Bookie 4 and Bookie 5 where the ojects
+ % have been randomly split between the stores
+
+ StartOpts4 = [{root_path, RootPathC},
+ {max_journalsize, 200000000},
+ {max_pencillercachesize, 24000},
+ {sync_strategy, testutil:sync_strategy()}],
+ {ok, Bookie4} = leveled_bookie:book_start(StartOpts4),
+ StartOpts5 = [{root_path, RootPathD},
+ {max_journalsize, 200000000},
+ {max_pencillercachesize, 24000},
+ {sync_strategy, testutil:sync_strategy()}],
+ {ok, Bookie5} = leveled_bookie:book_start(StartOpts5),
+
+ SplitFun =
+ fun(Obj) ->
+ case erlang:phash2(Obj) rem 2 of
+ 0 ->
+ true;
+ 1 ->
+ false
+ end
+ end,
+ lists:foreach(fun(ObjL) ->
+ {ObjLA, ObjLB} = lists:partition(SplitFun, ObjL),
+ testutil:riakload(Bookie4, ObjLA),
+ testutil:riakload(Bookie5, ObjLB)
+ end,
+ CLs),
+
+ % query both the stores, then merge the trees - the result should be the
+ % same as the result from the tree created aginst the store with both
+ % partitions
+
+ {async, TreeC0Folder} = leveled_bookie:book_returnfolder(Bookie4, TicTacQ),
+ {async, TreeC1Folder} = leveled_bookie:book_returnfolder(Bookie5, TicTacQ),
+ SWD0 = os:timestamp(),
+ TreeC0 = TreeC0Folder(),
+ io:format("Build tictac tree with 100K objects in ~w~n",
+ [timer:now_diff(os:timestamp(), SWD0)]),
+ SWD1 = os:timestamp(),
+ TreeC1 = TreeC1Folder(),
+ io:format("Build tictac tree with 100K objects in ~w~n",
+ [timer:now_diff(os:timestamp(), SWD1)]),
+
+ TreeC2 = leveled_tictac:merge_trees(TreeC0, TreeC1),
+ SegList3 = leveled_tictac:find_dirtyleaves(TreeC2, TreeB),
+ io:format("Tree comparison following delete shows ~w different leaves~n",
+ [length(SegList3)]),
+ true = length(SegList3) == 0,
+
+
+ ok = leveled_bookie:book_close(Bookie2),
+ ok = leveled_bookie:book_close(Bookie4),
+ ok = leveled_bookie:book_close(Bookie5).
+
+
+index_compare(_Config) ->
+ TreeSize = small,
+ LS = 2000,
+ JS = 50000000,
+ SS = testutil:sync_strategy(),
+ SegmentCount = 256 * 256,
+
+ % Test requires multiple different databases, so want to mount them all
+ % on individual file paths
+ RootPathA = testutil:reset_filestructure("testA"),
+ RootPathB = testutil:reset_filestructure("testB"),
+ RootPathC = testutil:reset_filestructure("testC"),
+ RootPathD = testutil:reset_filestructure("testD"),
+ % Book1A to get all objects
+ {ok, Book1A} = leveled_bookie:book_start(RootPathA, LS, JS, SS),
+ % Book1B/C/D will have objects partitioned across it
+ {ok, Book1B} = leveled_bookie:book_start(RootPathB, LS, JS, SS),
+ {ok, Book1C} = leveled_bookie:book_start(RootPathC, LS, JS, SS),
+ {ok, Book1D} = leveled_bookie:book_start(RootPathD, LS, JS, SS),
+
+ % Generate nine lists of objects
+ BucketBin = list_to_binary("Bucket"),
+ GenMapFun =
+ fun(_X) ->
+ V = testutil:get_compressiblevalue(),
+ Indexes = testutil:get_randomindexes_generator(8),
+ testutil:generate_objects(10000, binary_uuid, [], V, Indexes)
+ end,
+
+ ObjLists = lists:map(GenMapFun, lists:seq(1, 9)),
+
+ % Load all nine lists into Book1A
+ lists:foreach(fun(ObjL) -> testutil:riakload(Book1A, ObjL) end,
+ ObjLists),
+
+ % Split nine lists across Book1B to Book1D, three object lists in each
+ lists:foreach(fun(ObjL) -> testutil:riakload(Book1B, ObjL) end,
+ lists:sublist(ObjLists, 1, 3)),
+ lists:foreach(fun(ObjL) -> testutil:riakload(Book1C, ObjL) end,
+ lists:sublist(ObjLists, 4, 3)),
+ lists:foreach(fun(ObjL) -> testutil:riakload(Book1D, ObjL) end,
+ lists:sublist(ObjLists, 7, 3)),
+
+ GetTicTacTreeFun =
+ fun(X, Bookie) ->
+ SW = os:timestamp(),
+ ST = "!",
+ ET = "|",
+ Q = {tictactree_idx,
+ {BucketBin, "idx" ++ integer_to_list(X) ++ "_bin", ST, ET},
+ TreeSize,
+ fun(_B, _K) -> accumulate end},
+ {async, Folder} = leveled_bookie:book_returnfolder(Bookie, Q),
+ R = Folder(),
+ io:format("TicTac Tree for index ~w took " ++
+ "~w microseconds~n",
+ [X, timer:now_diff(os:timestamp(), SW)]),
+ R
+ end,
+
+ % Get a TicTac tree representing one of the indexes in Bucket A
+ TicTacTree1_Full = GetTicTacTreeFun(1, Book1A),
+ TicTacTree1_P1 = GetTicTacTreeFun(1, Book1B),
+ TicTacTree1_P2 = GetTicTacTreeFun(1, Book1C),
+ TicTacTree1_P3 = GetTicTacTreeFun(1, Book1D),
+
+ % Merge the tree across the partitions
+ TicTacTree1_Joined = lists:foldl(fun leveled_tictac:merge_trees/2,
+ TicTacTree1_P1,
+ [TicTacTree1_P2, TicTacTree1_P3]),
+
+ % Go compare! Also check we're not comparing empty trees
+ DL1_0 = leveled_tictac:find_dirtyleaves(TicTacTree1_Full,
+ TicTacTree1_Joined),
+ EmptyTree = leveled_tictac:new_tree(empty, TreeSize),
+ DL1_1 = leveled_tictac:find_dirtyleaves(TicTacTree1_Full, EmptyTree),
+ true = DL1_0 == [],
+ true = length(DL1_1) > 100,
+
+ ok = leveled_bookie:book_close(Book1A),
+ ok = leveled_bookie:book_close(Book1B),
+ ok = leveled_bookie:book_close(Book1C),
+ ok = leveled_bookie:book_close(Book1D),
+
+ % Double chekc all is well still after a restart
+ % Book1A to get all objects
+ {ok, Book2A} = leveled_bookie:book_start(RootPathA, LS, JS, SS),
+ % Book1B/C/D will have objects partitioned across it
+ {ok, Book2B} = leveled_bookie:book_start(RootPathB, LS, JS, SS),
+ {ok, Book2C} = leveled_bookie:book_start(RootPathC, LS, JS, SS),
+ {ok, Book2D} = leveled_bookie:book_start(RootPathD, LS, JS, SS),
+ % Get a TicTac tree representing one of the indexes in Bucket A
+ TicTacTree2_Full = GetTicTacTreeFun(2, Book2A),
+ TicTacTree2_P1 = GetTicTacTreeFun(2, Book2B),
+ TicTacTree2_P2 = GetTicTacTreeFun(2, Book2C),
+ TicTacTree2_P3 = GetTicTacTreeFun(2, Book2D),
+
+ % Merge the tree across the partitions
+ TicTacTree2_Joined = lists:foldl(fun leveled_tictac:merge_trees/2,
+ TicTacTree2_P1,
+ [TicTacTree2_P2, TicTacTree2_P3]),
+
+ % Go compare! Also check we're not comparing empty trees
+ DL2_0 = leveled_tictac:find_dirtyleaves(TicTacTree2_Full,
+ TicTacTree2_Joined),
+ EmptyTree = leveled_tictac:new_tree(empty, TreeSize),
+ DL2_1 = leveled_tictac:find_dirtyleaves(TicTacTree2_Full, EmptyTree),
+ true = DL2_0 == [],
+ true = length(DL2_1) > 100,
+
+ IdxSpc = {add, "idx2_bin", "zz999"},
+ {TestObj, TestSpc} = testutil:generate_testobject(BucketBin,
+ term_to_binary("K9.Z"),
+ "Value1",
+ [IdxSpc],
+ [{"MDK1", "MDV1"}]),
+ ok = testutil:book_riakput(Book2C, TestObj, TestSpc),
+ testutil:check_forobject(Book2C, TestObj),
+
+ TicTacTree3_Full = GetTicTacTreeFun(2, Book2A),
+ TicTacTree3_P1 = GetTicTacTreeFun(2, Book2B),
+ TicTacTree3_P2 = GetTicTacTreeFun(2, Book2C),
+ TicTacTree3_P3 = GetTicTacTreeFun(2, Book2D),
+
+ % Merge the tree across the partitions
+ TicTacTree3_Joined = lists:foldl(fun leveled_tictac:merge_trees/2,
+ TicTacTree3_P1,
+ [TicTacTree3_P2, TicTacTree3_P3]),
+
+ % Find all keys index, and then just the last key
+ IdxQ1 = {index_query,
+ BucketBin,
+ {fun testutil:foldkeysfun/3, []},
+ {"idx2_bin", "zz", "zz|"},
+ {true, undefined}},
+ {async, IdxFolder1} = leveled_bookie:book_returnfolder(Book2C, IdxQ1),
+ true = IdxFolder1() >= 1,
+
+ DL_3to2B = leveled_tictac:find_dirtyleaves(TicTacTree2_P1,
+ TicTacTree3_P1),
+ DL_3to2C = leveled_tictac:find_dirtyleaves(TicTacTree2_P2,
+ TicTacTree3_P2),
+ DL_3to2D = leveled_tictac:find_dirtyleaves(TicTacTree2_P3,
+ TicTacTree3_P3),
+ io:format("Individual tree comparison found dirty leaves of ~w ~w ~w~n",
+ [DL_3to2B, DL_3to2C, DL_3to2D]),
+
+ true = length(DL_3to2B) == 0,
+ true = length(DL_3to2C) == 1,
+ true = length(DL_3to2D) == 0,
+
+ % Go compare! Should find a difference in one leaf
+ DL3_0 = leveled_tictac:find_dirtyleaves(TicTacTree3_Full,
+ TicTacTree3_Joined),
+ io:format("Different leaves count ~w~n", [length(DL3_0)]),
+ true = length(DL3_0) == 1,
+
+ % Now we want to find for the {Term, Key} pairs that make up the segment
+ % diferrence (there should only be one)
+ %
+ % We want the database to filter on segment - so this doesn't have the
+ % overheads of key listing
+
+ FoldKeysIndexQFun =
+ fun(_Bucket, {Term, Key}, Acc) ->
+ Seg = leveled_tictac:get_segment(Key, SegmentCount),
+ case lists:member(Seg, DL3_0) of
+ true ->
+ [{Term, Key}|Acc];
+ false ->
+ Acc
+ end
+ end,
+
+ MismatchQ = {index_query,
+ BucketBin,
+ {FoldKeysIndexQFun, []},
+ {"idx2_bin", "!", "|"},
+ {true, undefined}},
+ {async, MMFldr_2A} = leveled_bookie:book_returnfolder(Book2A, MismatchQ),
+ {async, MMFldr_2B} = leveled_bookie:book_returnfolder(Book2B, MismatchQ),
+ {async, MMFldr_2C} = leveled_bookie:book_returnfolder(Book2C, MismatchQ),
+ {async, MMFldr_2D} = leveled_bookie:book_returnfolder(Book2D, MismatchQ),
+
+ SWSS = os:timestamp(),
+ SL_Joined = MMFldr_2B() ++ MMFldr_2C() ++ MMFldr_2D(),
+ SL_Full = MMFldr_2A(),
+ io:format("Segment search across both clusters took ~w~n",
+ [timer:now_diff(os:timestamp(), SWSS)]),
+
+ io:format("Joined SegList ~w~n", [SL_Joined]),
+ io:format("Full SegList ~w~n", [SL_Full]),
+
+ Diffs = lists:subtract(SL_Full, SL_Joined)
+ ++ lists:subtract(SL_Joined, SL_Full),
+
+ io:format("Differences between lists ~w~n", [Diffs]),
+
+ % The actual difference is discovered
+ true = lists:member({"zz999", term_to_binary("K9.Z")}, Diffs),
+ % Without discovering too many others
+ true = length(Diffs) < 20,
+
+
+ ok = leveled_bookie:book_close(Book2A),
+ ok = leveled_bookie:book_close(Book2B),
+ ok = leveled_bookie:book_close(Book2C),
+ ok = leveled_bookie:book_close(Book2D).
+
+
+recent_aae_noaae(_Config) ->
+ % Starts databases with recent_aae tables, and attempt to query to fetch
+ % recent aae trees returns empty trees as no index entries are found.
+
+ TreeSize = small,
+ % SegmentCount = 256 * 256,
+ UnitMins = 2,
+
+ % Test requires multiple different databases, so want to mount them all
+ % on individual file paths
+ RootPathA = testutil:reset_filestructure("testA"),
+ RootPathB = testutil:reset_filestructure("testB"),
+ RootPathC = testutil:reset_filestructure("testC"),
+ RootPathD = testutil:reset_filestructure("testD"),
+ StartOptsA = aae_startopts(RootPathA, false),
+ StartOptsB = aae_startopts(RootPathB, false),
+ StartOptsC = aae_startopts(RootPathC, false),
+ StartOptsD = aae_startopts(RootPathD, false),
+
+ % Book1A to get all objects
+ {ok, Book1A} = leveled_bookie:book_start(StartOptsA),
+ % Book1B/C/D will have objects partitioned across it
+ {ok, Book1B} = leveled_bookie:book_start(StartOptsB),
+ {ok, Book1C} = leveled_bookie:book_start(StartOptsC),
+ {ok, Book1D} = leveled_bookie:book_start(StartOptsD),
+
+ {B1, K1, V1, S1, MD} = {"Bucket",
+ "Key1.1.4567.4321",
+ "Value1",
+ [],
+ [{"MDK1", "MDV1"}]},
+ {TestObject, TestSpec} = testutil:generate_testobject(B1, K1, V1, S1, MD),
+
+ SW_StartLoad = os:timestamp(),
+
+ ok = testutil:book_riakput(Book1A, TestObject, TestSpec),
+ ok = testutil:book_riakput(Book1B, TestObject, TestSpec),
+ testutil:check_forobject(Book1A, TestObject),
+ testutil:check_forobject(Book1B, TestObject),
+
+ {TicTacTreeJoined, TicTacTreeFull, EmptyTree, _LMDIndexes} =
+ load_and_check_recentaae(Book1A, Book1B, Book1C, Book1D,
+ SW_StartLoad, TreeSize, UnitMins,
+ false),
+ % Go compare! Also confirm we're not comparing empty trees
+ DL1_0 = leveled_tictac:find_dirtyleaves(TicTacTreeFull,
+ TicTacTreeJoined),
+
+ DL1_1 = leveled_tictac:find_dirtyleaves(TicTacTreeFull, EmptyTree),
+ true = DL1_0 == [],
+ true = length(DL1_1) == 0,
+
+ ok = leveled_bookie:book_close(Book1A),
+ ok = leveled_bookie:book_close(Book1B),
+ ok = leveled_bookie:book_close(Book1C),
+ ok = leveled_bookie:book_close(Book1D).
+
+
+recent_aae_allaae(_Config) ->
+ % Leveled is started in blacklisted mode with no buckets blacklisted.
+ %
+ % A number of changes are then loaded into a store, and also partitioned
+ % across a separate set of three stores. A merge tree is returned from
+ % both the single store and the partitioned store, and proven to compare
+ % the same.
+ %
+ % A single change is then made, but into one half of the system only. The
+ % aae index is then re-queried and it is verified that a signle segment
+ % difference is found.
+ %
+ % The segment Id found is then used in a query to find the Keys that make
+ % up that segment, and the delta discovered should be just that one key
+ % which was known to have been changed
+
+ TreeSize = small,
+ % SegmentCount = 256 * 256,
+ UnitMins = 2,
+ AAE = {blacklist, [], 60, UnitMins},
+
+ % Test requires multiple different databases, so want to mount them all
+ % on individual file paths
+ RootPathA = testutil:reset_filestructure("testA"),
+ RootPathB = testutil:reset_filestructure("testB"),
+ RootPathC = testutil:reset_filestructure("testC"),
+ RootPathD = testutil:reset_filestructure("testD"),
+ StartOptsA = aae_startopts(RootPathA, AAE),
+ StartOptsB = aae_startopts(RootPathB, AAE),
+ StartOptsC = aae_startopts(RootPathC, AAE),
+ StartOptsD = aae_startopts(RootPathD, AAE),
+
+ % Book1A to get all objects
+ {ok, Book1A} = leveled_bookie:book_start(StartOptsA),
+ % Book1B/C/D will have objects partitioned across it
+ {ok, Book1B} = leveled_bookie:book_start(StartOptsB),
+ {ok, Book1C} = leveled_bookie:book_start(StartOptsC),
+ {ok, Book1D} = leveled_bookie:book_start(StartOptsD),
+
+ {B1, K1, V1, S1, MD} = {"Bucket",
+ "Key1.1.4567.4321",
+ "Value1",
+ [],
+ [{"MDK1", "MDV1"}]},
+ {TestObject, TestSpec} = testutil:generate_testobject(B1, K1, V1, S1, MD),
+
+ SW_StartLoad = os:timestamp(),
+
+ ok = testutil:book_riakput(Book1A, TestObject, TestSpec),
+ ok = testutil:book_riakput(Book1B, TestObject, TestSpec),
+ testutil:check_forobject(Book1A, TestObject),
+ testutil:check_forobject(Book1B, TestObject),
+
+ {TicTacTreeJoined, TicTacTreeFull, EmptyTree, LMDIndexes} =
+ load_and_check_recentaae(Book1A, Book1B, Book1C, Book1D,
+ SW_StartLoad, TreeSize, UnitMins,
+ false),
+ % Go compare! Also confirm we're not comparing empty trees
+ DL1_0 = leveled_tictac:find_dirtyleaves(TicTacTreeFull,
+ TicTacTreeJoined),
+
+ DL1_1 = leveled_tictac:find_dirtyleaves(TicTacTreeFull, EmptyTree),
+ true = DL1_0 == [],
+ true = length(DL1_1) > 100,
+
+ ok = leveled_bookie:book_close(Book1A),
+ ok = leveled_bookie:book_close(Book1B),
+ ok = leveled_bookie:book_close(Book1C),
+ ok = leveled_bookie:book_close(Book1D),
+
+ % Book2A to get all objects
+ {ok, Book2A} = leveled_bookie:book_start(StartOptsA),
+ % Book2B/C/D will have objects partitioned across it
+ {ok, Book2B} = leveled_bookie:book_start(StartOptsB),
+ {ok, Book2C} = leveled_bookie:book_start(StartOptsC),
+ {ok, Book2D} = leveled_bookie:book_start(StartOptsD),
+
+ {TicTacTreeJoined, TicTacTreeFull, EmptyTree, LMDIndexes} =
+ load_and_check_recentaae(Book2A, Book2B, Book2C, Book2D,
+ SW_StartLoad, TreeSize, UnitMins,
+ LMDIndexes),
+ % Go compare! Also confirm we're not comparing empty trees
+ DL1_0 = leveled_tictac:find_dirtyleaves(TicTacTreeFull,
+ TicTacTreeJoined),
+
+ DL1_1 = leveled_tictac:find_dirtyleaves(TicTacTreeFull, EmptyTree),
+ true = DL1_0 == [],
+ true = length(DL1_1) > 100,
+
+ V2 = "Value2",
+ {TestObject2, TestSpec2} =
+ testutil:generate_testobject(B1, K1, V2, S1, MD),
+
+ New_startTS = os:timestamp(),
+
+ ok = testutil:book_riakput(Book2B, TestObject2, TestSpec2),
+ testutil:check_forobject(Book2B, TestObject2),
+ testutil:check_forobject(Book2A, TestObject),
+
+ New_endTS = os:timestamp(),
+ NewLMDIndexes = determine_lmd_indexes(New_startTS, New_endTS, UnitMins),
+ {TicTacTreeJoined2, TicTacTreeFull2, _EmptyTree, NewLMDIndexes} =
+ load_and_check_recentaae(Book2A, Book2B, Book2C, Book2D,
+ New_startTS, TreeSize, UnitMins,
+ NewLMDIndexes),
+ DL2_0 = leveled_tictac:find_dirtyleaves(TicTacTreeFull2,
+ TicTacTreeJoined2),
+
+ % DL2_1 = leveled_tictac:find_dirtyleaves(TicTacTreeFull, EmptyTree),
+ true = length(DL2_0) == 1,
+
+ [DirtySeg] = DL2_0,
+ TermPrefix = string:right(integer_to_list(DirtySeg), 8, $0),
+
+ LMDSegFolder =
+ fun(LMD, {Acc, Bookie}) ->
+ IdxLMD = list_to_binary("$aae." ++ LMD ++ "_bin"),
+ IdxQ1 =
+ {index_query,
+ <<"$all">>,
+ {fun testutil:foldkeysfun_returnbucket/3, []},
+ {IdxLMD,
+ list_to_binary(TermPrefix ++ "."),
+ list_to_binary(TermPrefix ++ "|")},
+ {true, undefined}},
+ {async, IdxFolder} =
+ leveled_bookie:book_returnfolder(Bookie, IdxQ1),
+ {Acc ++ IdxFolder(), Bookie}
+ end,
+ {KeysTerms2A, _} = lists:foldl(LMDSegFolder,
+ {[], Book2A},
+ lists:usort(LMDIndexes ++ NewLMDIndexes)),
+ true = length(KeysTerms2A) >= 1,
+
+ {KeysTerms2B, _} = lists:foldl(LMDSegFolder,
+ {[], Book2B},
+ lists:usort(LMDIndexes ++ NewLMDIndexes)),
+ {KeysTerms2C, _} = lists:foldl(LMDSegFolder,
+ {[], Book2C},
+ lists:usort(LMDIndexes ++ NewLMDIndexes)),
+ {KeysTerms2D, _} = lists:foldl(LMDSegFolder,
+ {[], Book2D},
+ lists:usort(LMDIndexes ++ NewLMDIndexes)),
+
+ KeysTerms2Joined = KeysTerms2B ++ KeysTerms2C ++ KeysTerms2D,
+ DeltaX = lists:subtract(KeysTerms2A, KeysTerms2Joined),
+ DeltaY = lists:subtract(KeysTerms2Joined, KeysTerms2A),
+
+ io:format("DeltaX ~w~n", [DeltaX]),
+ io:format("DeltaY ~w~n", [DeltaY]),
+
+ true = length(DeltaX) == 0, % This hasn't seen any extra changes
+ true = length(DeltaY) == 1, % This has seen an extra change
+ [{_, {B1, K1}}] = DeltaY,
+
+ ok = leveled_bookie:book_close(Book2A),
+ ok = leveled_bookie:book_close(Book2B),
+ ok = leveled_bookie:book_close(Book2C),
+ ok = leveled_bookie:book_close(Book2D).
+
+
+
+recent_aae_bucketaae(_Config) ->
+ % Configure AAE to work only on a single whitelisted bucket
+ % Confirm that we can spot a delta in this bucket, but not
+ % in another bucket
+
+ TreeSize = small,
+ % SegmentCount = 256 * 256,
+ UnitMins = 2,
+ AAE = {whitelist, [<<"Bucket">>], 60, UnitMins},
+
+ % Test requires multiple different databases, so want to mount them all
+ % on individual file paths
+ RootPathA = testutil:reset_filestructure("testA"),
+ RootPathB = testutil:reset_filestructure("testB"),
+ RootPathC = testutil:reset_filestructure("testC"),
+ RootPathD = testutil:reset_filestructure("testD"),
+ StartOptsA = aae_startopts(RootPathA, AAE),
+ StartOptsB = aae_startopts(RootPathB, AAE),
+ StartOptsC = aae_startopts(RootPathC, AAE),
+ StartOptsD = aae_startopts(RootPathD, AAE),
+
+ % Book1A to get all objects
+ {ok, Book1A} = leveled_bookie:book_start(StartOptsA),
+ % Book1B/C/D will have objects partitioned across it
+ {ok, Book1B} = leveled_bookie:book_start(StartOptsB),
+ {ok, Book1C} = leveled_bookie:book_start(StartOptsC),
+ {ok, Book1D} = leveled_bookie:book_start(StartOptsD),
+
+ {B1, K1, V1, S1, MD} = {<<"Bucket">>,
+ "Key1.1.4567.4321",
+ "Value1",
+ [],
+ [{"MDK1", "MDV1"}]},
+ {TestObject, TestSpec} = testutil:generate_testobject(B1, K1, V1, S1, MD),
+
+ SW_StartLoad = os:timestamp(),
+
+ ok = testutil:book_riakput(Book1A, TestObject, TestSpec),
+ ok = testutil:book_riakput(Book1B, TestObject, TestSpec),
+ testutil:check_forobject(Book1A, TestObject),
+ testutil:check_forobject(Book1B, TestObject),
+
+ {TicTacTreeJoined, TicTacTreeFull, EmptyTree, LMDIndexes} =
+ load_and_check_recentaae(Book1A, Book1B, Book1C, Book1D,
+ SW_StartLoad, TreeSize, UnitMins,
+ false, <<"Bucket">>),
+ % Go compare! Also confirm we're not comparing empty trees
+ DL1_0 = leveled_tictac:find_dirtyleaves(TicTacTreeFull,
+ TicTacTreeJoined),
+
+ DL1_1 = leveled_tictac:find_dirtyleaves(TicTacTreeFull, EmptyTree),
+ true = DL1_0 == [],
+ true = length(DL1_1) > 100,
+
+ ok = leveled_bookie:book_close(Book1A),
+ ok = leveled_bookie:book_close(Book1B),
+ ok = leveled_bookie:book_close(Book1C),
+ ok = leveled_bookie:book_close(Book1D),
+
+ % Book2A to get all objects
+ {ok, Book2A} = leveled_bookie:book_start(StartOptsA),
+ % Book2B/C/D will have objects partitioned across it
+ {ok, Book2B} = leveled_bookie:book_start(StartOptsB),
+ {ok, Book2C} = leveled_bookie:book_start(StartOptsC),
+ {ok, Book2D} = leveled_bookie:book_start(StartOptsD),
+
+ % Change the value for a key in another bucket
+ % If we get trees for this period, no difference should be found
+
+ V2 = "Value2",
+ {TestObject2, TestSpec2} =
+ testutil:generate_testobject(<<"NotBucket">>, K1, V2, S1, MD),
+
+ New_startTS2 = os:timestamp(),
+
+ ok = testutil:book_riakput(Book2B, TestObject2, TestSpec2),
+ testutil:check_forobject(Book2B, TestObject2),
+ testutil:check_forobject(Book2A, TestObject),
+
+ New_endTS2 = os:timestamp(),
+ NewLMDIndexes2 = determine_lmd_indexes(New_startTS2, New_endTS2, UnitMins),
+ {TicTacTreeJoined2, TicTacTreeFull2, _EmptyTree, NewLMDIndexes2} =
+ load_and_check_recentaae(Book2A, Book2B, Book2C, Book2D,
+ New_startTS2, TreeSize, UnitMins,
+ NewLMDIndexes2, <<"Bucket">>),
+ DL2_0 = leveled_tictac:find_dirtyleaves(TicTacTreeFull2,
+ TicTacTreeJoined2),
+ true = length(DL2_0) == 0,
+
+ % Now create an object that is a change to an existing key in the
+ % monitored bucket. A differrence should be found
+
+ {TestObject3, TestSpec3} =
+ testutil:generate_testobject(B1, K1, V2, S1, MD),
+
+ New_startTS3 = os:timestamp(),
+
+ ok = testutil:book_riakput(Book2B, TestObject3, TestSpec3),
+ testutil:check_forobject(Book2B, TestObject3),
+ testutil:check_forobject(Book2A, TestObject),
+
+ New_endTS3 = os:timestamp(),
+ NewLMDIndexes3 = determine_lmd_indexes(New_startTS3, New_endTS3, UnitMins),
+ {TicTacTreeJoined3, TicTacTreeFull3, _EmptyTree, NewLMDIndexes3} =
+ load_and_check_recentaae(Book2A, Book2B, Book2C, Book2D,
+ New_startTS3, TreeSize, UnitMins,
+ NewLMDIndexes3, <<"Bucket">>),
+ DL3_0 = leveled_tictac:find_dirtyleaves(TicTacTreeFull3,
+ TicTacTreeJoined3),
+
+ % DL2_1 = leveled_tictac:find_dirtyleaves(TicTacTreeFull, EmptyTree),
+ true = length(DL3_0) == 1,
+
+ % Find the dirty segment, and use that to find the dirty key
+ %
+ % Note that unlike when monitoring $all, fold_keys can be used as there
+ % is no need to return the Bucket (as hte bucket is known)
+
+ [DirtySeg] = DL3_0,
+ TermPrefix = string:right(integer_to_list(DirtySeg), 8, $0),
+
+ LMDSegFolder =
+ fun(LMD, {Acc, Bookie}) ->
+ IdxLMD = list_to_binary("$aae." ++ LMD ++ "_bin"),
+ IdxQ1 =
+ {index_query,
+ <<"Bucket">>,
+ {fun testutil:foldkeysfun/3, []},
+ {IdxLMD,
+ list_to_binary(TermPrefix ++ "."),
+ list_to_binary(TermPrefix ++ "|")},
+ {true, undefined}},
+ {async, IdxFolder} =
+ leveled_bookie:book_returnfolder(Bookie, IdxQ1),
+ {Acc ++ IdxFolder(), Bookie}
+ end,
+ {KeysTerms2A, _} = lists:foldl(LMDSegFolder,
+ {[], Book2A},
+ lists:usort(LMDIndexes ++ NewLMDIndexes3)),
+ true = length(KeysTerms2A) >= 1,
+
+ {KeysTerms2B, _} = lists:foldl(LMDSegFolder,
+ {[], Book2B},
+ lists:usort(LMDIndexes ++ NewLMDIndexes3)),
+ {KeysTerms2C, _} = lists:foldl(LMDSegFolder,
+ {[], Book2C},
+ lists:usort(LMDIndexes ++ NewLMDIndexes3)),
+ {KeysTerms2D, _} = lists:foldl(LMDSegFolder,
+ {[], Book2D},
+ lists:usort(LMDIndexes ++ NewLMDIndexes3)),
+
+ KeysTerms2Joined = KeysTerms2B ++ KeysTerms2C ++ KeysTerms2D,
+ DeltaX = lists:subtract(KeysTerms2A, KeysTerms2Joined),
+ DeltaY = lists:subtract(KeysTerms2Joined, KeysTerms2A),
+
+ io:format("DeltaX ~w~n", [DeltaX]),
+ io:format("DeltaY ~w~n", [DeltaY]),
+
+ true = length(DeltaX) == 0, % This hasn't seen any extra changes
+ true = length(DeltaY) == 1, % This has seen an extra change
+ [{_, K1}] = DeltaY,
+
+ ok = leveled_bookie:book_close(Book2A),
+ ok = leveled_bookie:book_close(Book2B),
+ ok = leveled_bookie:book_close(Book2C),
+ ok = leveled_bookie:book_close(Book2D).
+
+
+recent_aae_expiry(_Config) ->
+ % Proof that the index entries are indeed expired
+
+ TreeSize = small,
+ % SegmentCount = 256 * 256,
+ UnitMins = 1,
+ TotalMins = 2,
+ AAE = {blacklist, [], TotalMins, UnitMins},
+
+ % Test requires multiple different databases, so want to mount them all
+ % on individual file paths
+ RootPathA = testutil:reset_filestructure("testA"),
+ StartOptsA = aae_startopts(RootPathA, AAE),
+
+ % Book1A to get all objects
+ {ok, Book1A} = leveled_bookie:book_start(StartOptsA),
+
+ GenMapFun =
+ fun(_X) ->
+ V = testutil:get_compressiblevalue(),
+ Indexes = testutil:get_randomindexes_generator(8),
+ testutil:generate_objects(5000,
+ binary_uuid,
+ [],
+ V,
+ Indexes)
+ end,
+
+ ObjLists = lists:map(GenMapFun, lists:seq(1, 3)),
+
+ SW0 = os:timestamp(),
+ % Load all nine lists into Book1A
+ lists:foreach(fun(ObjL) -> testutil:riakload(Book1A, ObjL) end,
+ ObjLists),
+ SW1 = os:timestamp(),
+ % sleep for two minutes, so all index entries will have expired
+ GetTicTacTreeFun =
+ fun(Bookie) ->
+ get_tictactree_fun(Bookie, <<"$all">>, TreeSize)
+ end,
+ EmptyTree = leveled_tictac:new_tree(empty, TreeSize),
+ LMDIndexes = determine_lmd_indexes(SW0, SW1, UnitMins),
+
+ % Should get a non-empty answer to the query
+ TicTacTree1_Full =
+ lists:foldl(GetTicTacTreeFun(Book1A), EmptyTree, LMDIndexes),
+ DL3_0 = leveled_tictac:find_dirtyleaves(TicTacTree1_Full, EmptyTree),
+ true = length(DL3_0) > 0,
+
+ SecondsSinceLMD = timer:now_diff(os:timestamp(), SW0) div 1000000,
+ SecondsToExpiry = (TotalMins + UnitMins) * 60,
+
+ case SecondsToExpiry > SecondsSinceLMD of
+ true ->
+ timer:sleep((SecondsToExpiry - SecondsSinceLMD) * 1000);
+ false ->
+ tier:sleep(0)
+ end,
+
+ % Should now get an empty answer - all entries have expired
+ TicTacTree2_Full =
+ lists:foldl(GetTicTacTreeFun(Book1A), EmptyTree, LMDIndexes),
+
+ DL4_0 = leveled_tictac:find_dirtyleaves(TicTacTree2_Full, EmptyTree),
+ true = length(DL4_0) == 0.
+
+
+
+load_and_check_recentaae(Book1A, Book1B, Book1C, Book1D,
+ SW_StartLoad, TreeSize, UnitMins,
+ LMDIndexes_Loaded) ->
+ load_and_check_recentaae(Book1A, Book1B, Book1C, Book1D,
+ SW_StartLoad, TreeSize, UnitMins,
+ LMDIndexes_Loaded, <<"$all">>).
+
+load_and_check_recentaae(Book1A, Book1B, Book1C, Book1D,
+ SW_StartLoad, TreeSize, UnitMins,
+ LMDIndexes_Loaded, Bucket) ->
+ LMDIndexes =
+ case LMDIndexes_Loaded of
+ false ->
+ % Generate nine lists of objects
+ % BucketBin = list_to_binary("Bucket"),
+ GenMapFun =
+ fun(_X) ->
+ V = testutil:get_compressiblevalue(),
+ Indexes = testutil:get_randomindexes_generator(8),
+ testutil:generate_objects(5000,
+ binary_uuid,
+ [],
+ V,
+ Indexes)
+ end,
+
+ ObjLists = lists:map(GenMapFun, lists:seq(1, 9)),
+
+ % Load all nine lists into Book1A
+ lists:foreach(fun(ObjL) -> testutil:riakload(Book1A, ObjL) end,
+ ObjLists),
+
+ % Split nine lists across Book1B to Book1D, three object lists
+ % in each
+ lists:foreach(fun(ObjL) -> testutil:riakload(Book1B, ObjL) end,
+ lists:sublist(ObjLists, 1, 3)),
+ lists:foreach(fun(ObjL) -> testutil:riakload(Book1C, ObjL) end,
+ lists:sublist(ObjLists, 4, 3)),
+ lists:foreach(fun(ObjL) -> testutil:riakload(Book1D, ObjL) end,
+ lists:sublist(ObjLists, 7, 3)),
+
+ SW_EndLoad = os:timestamp(),
+ determine_lmd_indexes(SW_StartLoad, SW_EndLoad, UnitMins);
+ _ ->
+ LMDIndexes_Loaded
+ end,
+
+ EmptyTree = leveled_tictac:new_tree(empty, TreeSize),
+
+ GetTicTacTreeFun =
+ fun(Bookie) ->
+ get_tictactree_fun(Bookie, Bucket, TreeSize)
+ end,
+
+ % Get a TicTac tree representing one of the indexes in Bucket A
+ TicTacTree1_Full =
+ lists:foldl(GetTicTacTreeFun(Book1A), EmptyTree, LMDIndexes),
+
+ TicTacTree1_P1 =
+ lists:foldl(GetTicTacTreeFun(Book1B), EmptyTree, LMDIndexes),
+ TicTacTree1_P2 =
+ lists:foldl(GetTicTacTreeFun(Book1C), EmptyTree, LMDIndexes),
+ TicTacTree1_P3 =
+ lists:foldl(GetTicTacTreeFun(Book1D), EmptyTree, LMDIndexes),
+
+ % Merge the tree across the partitions
+ TicTacTree1_Joined = lists:foldl(fun leveled_tictac:merge_trees/2,
+ TicTacTree1_P1,
+ [TicTacTree1_P2, TicTacTree1_P3]),
+
+ {TicTacTree1_Full, TicTacTree1_Joined, EmptyTree, LMDIndexes}.
+
+
+aae_startopts(RootPath, AAE) ->
+ LS = 2000,
+ JS = 50000000,
+ SS = testutil:sync_strategy(),
+ [{root_path, RootPath},
+ {sync_strategy, SS},
+ {cache_size, LS},
+ {max_journalsize, JS},
+ {recent_aae, AAE}].
+
+
+determine_lmd_indexes(StartTS, EndTS, UnitMins) ->
+ StartDT = calendar:now_to_datetime(StartTS),
+ EndDT = calendar:now_to_datetime(EndTS),
+ StartTimeStr = get_strtime(StartDT, UnitMins),
+ EndTimeStr = get_strtime(EndDT, UnitMins),
+
+ AddTimeFun =
+ fun(X, Acc) ->
+ case lists:member(EndTimeStr, Acc) of
+ true ->
+ Acc;
+ false ->
+ NextTime =
+ UnitMins * 60 * X +
+ calendar:datetime_to_gregorian_seconds(StartDT),
+ NextDT =
+ calendar:gregorian_seconds_to_datetime(NextTime),
+ Acc ++ [get_strtime(NextDT, UnitMins)]
+ end
+ end,
+
+ lists:foldl(AddTimeFun, [StartTimeStr], lists:seq(1, 10)).
+
+
+get_strtime(DateTime, UnitMins) ->
+ {{Y, M, D}, {Hour, Minute, _Second}} = DateTime,
+ RoundMins =
+ UnitMins * (Minute div UnitMins),
+ StrTime =
+ lists:flatten(io_lib:format(?LMD_FORMAT,
+ [Y, M, D, Hour, RoundMins])),
+ StrTime.
+
+
+get_tictactree_fun(Bookie, Bucket, TreeSize) ->
+ fun(LMD, Acc) ->
+ SW = os:timestamp(),
+ ST = <<"0">>,
+ ET = <<"A">>,
+ Q = {tictactree_idx,
+ {Bucket,
+ list_to_binary("$aae." ++ LMD ++ "_bin"),
+ ST,
+ ET},
+ TreeSize,
+ fun(_B, _K) -> accumulate end},
+ {async, Folder} = leveled_bookie:book_returnfolder(Bookie, Q),
+ R = Folder(),
+ io:format("TicTac Tree for index ~s took " ++
+ "~w microseconds~n",
+ [LMD, timer:now_diff(os:timestamp(), SW)]),
+ leveled_tictac:merge_trees(R, Acc)
+ end.
\ No newline at end of file