Merge pull request #127 from martinsumner/mas-i115-simplettcache

Mas i115 simplettcache
This commit is contained in:
Martin Sumner 2018-04-05 09:15:51 +01:00 committed by GitHub
commit c6769c6659
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 941 additions and 151 deletions

View file

@ -20,9 +20,50 @@ There were three primary issues with these mechanisms:
To address these weaknesses Active Anti-Entropy (AAE) was introduced to Riak, as a configurable option. Configuring Active Anti-Entropy would start a new AAE "hashtree" store for every primary vnode in the ring. The vnode process would, following a successful put, [update this hashtree store process](https://github.com/basho/riak_kv/blob/2.1.7/src/riak_kv_vnode.erl#L2139-L2169) by sending it the updated object after converting it from its binary format. This would generally happen in via async message passing, but periodically the change would block the vnode to confirm that the AAE process was keeping up. The hashtree store process would hash the riak object to create a hash for the update, and hash the Key to map it to one of 1024 * 1024 segments - and then in batches update the store with a key of {$t, Partition, Segment, Key}, and a value of the object hash.
From this persisted store a [Merkle tree](https://en.wikipedia.org/wiki/Merkle_tree) is maintained for each Partition. These Merkle trees can then then be exchanged with another vnode's AAE hashtree store if that vnode is also a primary vnode for that same partition. Exchanging the Merkle tree would highlight any segments which had a variance - and then it would be possible to iterate over the store segment by segment to discover which keys actually differed. The objects associated with these keys could then be re-read within the actual vnode stores, so that read repair would correct any entropy that had been indicated by the discrepancy between the hashtree stores.
From this persisted store a [Merkle tree](https://en.wikipedia.org/wiki/Merkle_tree) is maintained for each preflist head and n-val combination supported by the vnode (if and only if the vnode is a primary for that preflist). So if there was a ring-size of 8 in a cluster there would be 8 preflists for each n-val, so for n-val of 3:
The process of maintaining the hashtree is partially deferred to the point of the exchange, and this update process is of a low but not-necessarily non-trivial cost. the cost is low enough so that these exchanges can occur with reasonable frequency (i.e. many minutes between exchanges) without creating significant background load.
{P0, 3} -> {V0, V1, V2}
{P1, 3} -> {V1, V2, V3}
...
{P7, 3} -> {V7, V0, V1}
For n-val of 4:
{P0, 4} -> {V0, V1, V2, V3}
{P1, 4} -> {V1, V2, V3, V4}
...
{P7, 4} -> {V7, V0, V1, V2}
So for Vnode0, it would keep 7 hashtrees:
{P0, 3} - to be exchanged with V1 and V2
{P7, 3} - to be exchanged with V7 and V1
{P6, 3} - to be exchanged with V6 and V7
{P0, 4} - to be exchanged with V1 and V2 and V3
{P7, 4} - to be exchanged with V7 and V1 and V2
{P6, 4} - to be exchanged with V6 and V7 and V1
{P5, 4} - to be exchanged with V5 and V6 and V7
So with a ring-size of 128, and n-vals of 3 and 5, there would be 8 hashtrees per vnode to be kept (1024 overall), and 26 exchanges per vnode (1,664 exchanges overall for full synchronisation - assuming each exchange achieves bi-directional synchronisation).
[Note that, the volume of hashtrees to be exchanged could be reduced by keeping just a single hashtree for each vnode with which there is a need to exchange. However, then for each PUT there would be a need to update up to `n-1` hashtrees]
The process of maintaining the hashtree is partially deferred to the point of the exchange, and this update process is of a low but not-necessarily non-trivial cost. At the point of receiving an PUT the PUT at each vnode will belong to precisely one hashtree, and that hashtree will have the segment of the Merkle tree associated with that Bucket and Key to be marked as dirty. When an exchange is prompted, for each dirty segment in the hashtree to be exchanges, there must be a range query to discover all the Keys and Hashes for the required combination of `{Partition, NVal, Segment}`, for all dirty segments in that hashtree.
Infrequently, but regularly, the hashtree store would be cleared and rebuilt from an object fold over the vnode store to ensure that it reflected the actual persisted state in the store. This rebuild process depends on some cluster-wide lock acquisition and other throttling techniques, as it has to avoid generating false negative results from exchanges scheduled to occur during the rebuild, avoid unexpected conditions on shutdown during the rebuild, avoid excessive concurrency of rebuild operations within the cluster, and avoid flooding the cluster with read-repair events following a rebuild. Despite precautionary measures within the design, the rebuild process is, when compared to other Riak features, a relatively common trigger for production issues.
@ -46,7 +87,7 @@ Although this represented an improvement in terms of entropy management, there w
- The hashtrees are not externally exposed, and so cannot be used for externally managed replication (e.g. to another database).
- The rebuilds of the hashtree still require the relatively expensive fold_objects operation, and so parallelisation of rebuilds may need to be controlled to prevent an impact on cluster performance. Measuring the impact is difficult in pre-production load tests due to the scheduled and infrequent nature of AAE rebuilds.
- The rebuilds of the hashtree still require the relatively expensive fold_objects operation, and so parallelisation of rebuilds may need to be controlled to prevent an impact on cluster performance. Measuring the impact is difficult in pre-production load tests due to the scheduled and infrequent nature of AAE rebuilds, and this was mitigated through the throttling of rebuild folds within Riak.
- Improvements to hashtrees require significant devleopment and test for transition, due to the potential for hashtree changes to break many things (e.g. Solr integration, MDC), and also the difficulty in coordinating changes between different dependent systems that independently build state over long periods of time.
@ -288,9 +329,9 @@ Some notes on re-using this alternative anti-entropy mechanism within Riak:
* A surprising feature of read repair is that it will read repair to fallback nodes, not just primary nodes. This means that in read-intensive workloads, write activity may dramatically increase during node failure (as a large proportion of reads will become write events) - increasing the chance of servers falling domino style. However, in some circumstances the extra duplication can also [increase the chance of data loss](https://github.com/russelldb/russelldb.github.io/blob/master/3.2.kv679-solution.md)! This also increases greatly the volume of unnecessary data to be handed-off when the primary returns. Without active anti-entropy, and in the absence of other safety checks like `notfound_ok` being set to false, or `pr` being set to at least 1 - there will be scenarios where this feature may be helpful. As part of improving active anti-entropy, it may be wise to re-visit the tuning of anti-entropy features that existed prior to AAE, in particular should it be possible to configure read-repair to act on primary nodes only.
## Some notes on the experience of Riak implementation
## Phase 1 - Initial Implementation Notes
### Phase 1 - Initial Test of Folds with Core node_worker_pool
### Initial Test of Folds with Core node_worker_pool
As an initial proving stage of implementation, the riak_core_node_worker_pool has been implemented in riak_kv and riak_core, and then the listkeys function has been change so that it can be switched between using the node_worker_pool (new behaviour) and running in parallel using the vnode_worker_pool (old behaviour).
@ -384,6 +425,14 @@ The AAE hashtree lock situation is complex, but can be summarised as:
- however during a coverage fold for MDC AAE, a build may crash the fold so there may be a need to check for running folds before prompting a rebuild.
### Phase 2
## Phase 2 - Reconsider
For phase 2 the issues of how to efficiently manage AAE queries in leveldb and bitcask will be put to one side, and the focus will be on getting an effective solution up and running with leveled.
Feedback, and the implementation experience of the initial design raised the following issues:
- This was an answer that worked with the Leveled backend, but not other backends (specifically not bitcask). It would be better if there could be feature compatibility between backends, and in particular can entropy management be improved without requiring a migration of backends.
- Separating out the problem of near real-time anti-entropy adds complexity. Production experience of AAE full-sync was that hen it worked (and sometimes it didn't), it was very fast (e.g. sub minute) and low cost. It seems a retrograde step to no longer do full database synchronisation at this speed and cost.
- Timing problems would lead to false negative results, and false negative results were expensive. For example on a busy cluster it is likely that there may be o(1000) differences just because of timing deltas when vnodes in different clusters received requests (racing with the timing difference in arrivals of PUTs).
## Phase 3 - Redesign

View file

@ -0,0 +1,128 @@
# Background
Further helpful details on the background to this work can be found in the previous [Anti-Entropy](ANTI_ENTROPY.md) write-up.
The aim is to provide a better answer to the active anti-entropy in Riak. Specifically, it would be preferable to resolve the following issues:
- Rebuild times. Both the cost of rebuilds but also the cost in the failure of AAE-dependent processes during rebuilds. For example, due to the [rate-limiting of rebuilds](https://github.com/basho/riak_kv/blob/2.1.7/src/riak_kv_index_hashtree.erl#L98-L101), rebuilding a single vnode can take o(10) hours. during this rebuild time, these partitions are not subject to internal AAE, and Multi-Data Centre AAE [may be blocked altogether](https://github.com/basho/riak_repl/issues/772).
- Version inconsistencies. The process of trying to make the transition from one version of AAE to another smooth, is potentially [too disruptive](https://github.com/basho/riak_kv/issues/1659), and leaves a long legacy in [future versions](https://github.com/basho/riak_kv/issues/1656).
- Cost of AAE. Every AAE exchange requires in effect a [range scan](https://github.com/basho/riak_core/blob/2.1.9/src/hashtree.erl#L65-L72) in the key-store for every key updated since the last exchange for that partition. This contributes to a 10% performance overhead associated with running AAE.
- Support for native AAE support within backends. The Leveled backend can support optimisations for by-segment scanning over its native key-store, potentially rendering the need to keep (and periodically rebuild) a dedicated key-store for AAE unnecessary. It would be beneficial to have an improved AAE that can exploit this advantage, without preventing the anti-entropy solution form being used on backends that would require a dedicated anti-entropy store.
# Overview Of Needs
The high level changes proposed are:
- Have an AAE solution per vnode where the key-store is both optional (and so can be avoided where native support renders it unnecessary), and has swappable backends (including a pure Erlang alternative to Leveldb).
- Keep the actual AAE Merkle Trees cached using TicTac trees to support updates to the tree without scanning.
- Use per-partition TicTac trees so that the Merkle trees can be merged across vnodes, to make AAE backed synchronisation possible between clusters of different ring sizes.
- Allow rebuilds to take place in parallel to maintaining the old store and cache of the Merkle tree - so exchanges can continue through the rebuild process.
- Formalise the use of dotted version vector as the basis for the object hash to reduce the cost of object binary changes and copying. Also allow for intelligent comparisons between clusters by exchanging keys & clocks, not just keys & hashes.
- Have the new AAE solution work in parallel to the legacy solution, so that migration is controlled through administration/configuration, and the legacy solution can be permanently forgotten by the cluster.
- Externalise the AAE functions, so that the same functions can be used for synchronisation with different database platforms, without requiring internal changes to Riak.
# AAE design
## Actors, States and Messages
The primary actors
- AAEController (1 per vnode) - gen_server
- KeyStore (1 per Controller) - gen_fsm
- TreeCache (n per Controller) - gen_fsm
### AAEController
The AAEController will manage all communication between the vnode and the other parts of the AAE system. The AAEController can receive data updates from the vnode in four forms:
- {IndexN, Bucket, Key, CurrentClock, unidentified} for PUTs marshalled via the blind_put (for LWW buckets without 2i support in the backend e.g. LWW -> Bitcask), or when a rehash request has been made for a single object;
- {IndexN, Bucket, Key, CurrentClock, PreviousClock} for standard object updates (PreviousClock will be none for fresh objects);
- {IndexN, Bucket, Key, none, PreviousClock} for actual backend deletes (e.g. post tombstone).
The AAE Controller will handle these update messages by casting a message to the appropriate TreeCache to prompt an update, and then adding the update to a queue to be batch written to the KeyStore. There is an additional penalty for changes where the PreviousClock is unidentified in that they will require a range query to fetch the previous result from the KeyStore.
The AAE controller may also receive requests to retrieve the branch or leaf hashes for a given partition TreeCache, as well as trigger rebuilds or rehashes.
### KeyStore
The KeyStore needs to support three operations:
- A batch PUT of objects
- An object fold bounded by multiple ranges supporting different object folds and accumulators
- An is_empty check
this can be achieved by having a KeyStore run in `parallel` mode or `native` mode. In `parallel` mode the KeyStore has its own dedicated backend store (which can be any supported Key/Value store). In `native` mode, the vnode store must support the efficient object fold operations (in particular old by segmentID), and so a separate KeyStore is not required, and requests are diverted to the native vnode backend. It is assumed that only the Leveled backend will support `native` mode.
On startup the AAEController must be informed by the vnode the is_empty status of the actual vnode key store, and this should match the is_empty status of the AAE key store. If there is a mismatch then the KeyStore must be rebuilt.
When a KeyStore is being rebuilt at startup, it is rebuilt from a fold of the actual vnode backend store - and new PUTs of objects (following the snapshot) will be queued to be played into the store after the fold has been completed.
When a KeyStore is being rebuilt due to expiry of an existing store, the existing store is kept online until the rebuild is complete (and used for answering any fold requests). Both stores are kept up to date with new changes (through queueing on the rebuilt store, and application on the expiring store).
As vnode changes are made, these changes should be reflected in batches in the KeyStore. The Bucket part of the Key used in the KeyStore will be `{IndexN, SegmentID}`, and Key part will be the actual `{Bucket, Key}` of the object. The Value of the object should be a tuple of `{Version, VectorClock, Hash, ObjectSize, SiblingCount, Head}`:
- Version represents the version of this object currently in use (to allow the value format to be changed in future releases);
- VectorClock is the actual current VectorClock of the object;
- Hash is the hash of the VectorClock;
- ObjectSize is the byte size of the object which the Key and Clock represent;
- SiblingCount is the number of siblings for that object within this vnode;
- Head is the object header as a binary (a riak object binary with contents removed).
by default Head will be the atom `disabled`. Changing the mode of anti-entropy can enable the storage of the Head part - and this can then be used for 2i anti-entropy, of for metadata-based coverage folds.
Activity in the KeyStore should be optimised for the vast majority of traffic being PUTs. Queries are only used for the KeyStore when:
- Folding over all objects by IndexN and SegmentID to return Keys/Clocks for a given segment;
- Folding over all objects to recalculate an AAE tree for each IndexN;
- Fetching of a specific object by IndexN, SegmentID, Bucket and Key to recalculate a specific hash in the AAE tree when the update to the AAEController has a PreviousClock of `unidentified`.
A manifest file should be kept to indicate which is the current active store to be used on a restart.
If the vnode backend has native support for the queries required by the AAE KeyStore, then the KeyStore can be run in native mode - ignoring the batch puts, and re-directing the queries to the actual vnode backend. In native mode `unidentified` previous clocks cannot be supported (and should not be needed).
### TreeCache
There is a TreeCache process for each IndexN managed by the AAEController. The TreeCache receives changes in the form {SegmentID, HashChange}. The HashChange is calculated by performing an XOR operation on the hash of the current clock, and the hash of the previous clock. The SegmentID is calculated from the hash of the <<Bucket, Key>> binary.
The TreeCache process should respond to each update by changing the tree to reflect that update.
The TreeCache can be in a `loading` state, for instance when a new cache is being built by the AAEController, or when expiry has occurred. In this sate the TreeCache continues to update the existing tree, but also queues up received changes, whilst a new Tree is built from the KeyStore. Once the TreeCache has been built the TreeCache should replace the old tree with the new tree and replay the queued updates.
## Startup and Shutdown
On shutdown any incomplete batches should be passed to the KeyStore and the KeyStore shutdown. All functioning TreeCaches should be shutdown, and on shutdown should write a CRC-checked file containing the serialised tree. At the point the shutdown is requested, the TreeCache may be at a more advanced state than the KeyStore, and if sync_on_write is not enabled in the vnode backend the KeyStore could be in advance of the backend. To try and protect against situations on startup where the TreeCache reflects a more advanced state than the actual vnode - the TreeCache should not be persisted until the vnode backend and the AAE KeyStore have both successfully closed.
On startup, if shutdown was completed normally, the TreeCaches should be restored from disk, as well as the KeyStore. Any partially rebuilt KeyStore should be destroyed.
On recovering a TreeCache from disk, the TreeCache process should delete the TreeCache from disk before receiving any update.
If the shutdown was unclean, and there is a KeyStore, but no persisted TreeCache, then before completing startup the AAEController should enforce a fold over the KeyStore to rebuild the TreeCaches.
If the KeyStore has missing updates due to an abrupt shutdown, this will cause (potentially false) repairs of the keys, and the repair will also trigger a rehash. the rehash should prompt a correction in the AAE KeyStore (through an `unidentified`) previous clock to bring the TreeCache and KeyStore back into line.
## Rebuilds and Rehashes
If an AAE KeyStore is used in non-native mode, periodically the Keystore should be rebuilt, should there be entropy from disk in the actual KeyStore. This is achieved using the `replacing-store` state in the AAEController.
When replacing a store, the previous version of the store will be kept up to date and used throughout the rebuild process, in order to prevent the blocking of exchanges. The only exception to this is when a rebuild has been prompted by a conflict of `is_empty` properties on startup - in which case the vnode startup process should be paused to allow for the rebuild to complete.
To avoid the need to do reads before writes when updating the AAE KeyStore from the vnode backend fold (so as not to replace a new update with an older snapshot value from the backend) new updates must be parked in a DiskLog process whilst the fold completes. Once the fold is complete, the rebuild of store can be finished by catching up on updates from the DiskLog.
At this stage the old Keystore can be deleted, and the new KeyStore be used. At this stage though, the TreeCache does not necessarily reflect the state of the new KeyStore - the `replacing-tree` state is used to resolve this. When replacing the tree, new empty TreeCaches are started and maintained in parallel to the existing TreeCaches (which continue to be used in exchanges). A fold of the KeyStore is now commenced, whilst new updates are cached in a DiskLog. Once the fold is complete, the new updates are applied and the TreeCache can be migrated from the old cache to the new cache.

View file

@ -380,3 +380,21 @@ CPU utilisation was also generally high (and notably higher than in the Riak/lev
All this has implications for future backend choices, but also for the nature of the GET and PUT FSMs. The most positive non-functional characteristic is the external response time stability in face of internal resource pressure. What isn't clear is to the extent that this is delivered simply through a backend change, or by the change in the nature of the FSM which naturally diverts load away from vnodes with longer queues (e.g. delays) evening out the load in face of localised pressures.
It will be interesting to see in the case of both leveldb and leveled backends the potential improvements which may arise from the use of [vnode_proxy soft overload checks and a switch to 1 GET, n-1 HEADS](https://github.com/basho/riak_kv/issues/1661).
### Extending to 24 hours
Running the test over 24 hours provides this comparison between 200M and 400M accumulated operations:
![](pics/28Feb_24HourTest.png)
The trendline has been removed from the Leveled graph as the trend is obvious. The difference between the trends for the two backends is consistently 30%-35% throughout the extended portion.
These graphs show side-by-side comparisons of disk utilisation (median, mean and max), and read_await and write_await times - with the Leveled test in the first 24 hours, and the leveldb test in the second 24 hour period.
![](pics/28Feb_DiskUtilCompare.png)
![](pics/28Feb_AwaitCompare.png)
Both tests become constrained by disk, but the Leveled test pushes the disk in a more consistent manner producing more predictable results.
The other notable factor in running the test for 24 hours was that the mean 2i response time continued to rise in the Leveldb test, but not the leveled test. By the 24th hour of the test the Leveldb test had a mean 2i response time of over 3s, whereas the mean response time in the Leveled remained constant at around 120ms.

Binary file not shown.

After

Width:  |  Height:  |  Size: 462 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 384 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 347 KiB

View file

@ -5,10 +5,15 @@
-define(STD_TAG, o).
%% Tag used for secondary index keys
-define(IDX_TAG, i).
%% Tag used for head-only objects
-define(HEAD_TAG, h).
%% Inker key type used for 'normal' objects
-define(INKT_STND, stnd).
%% Inker key type used for 'batch' objects
-define(INKT_MPUT, mput).
%% Inker key type used for objects which contain no value, only key changes
%% This is used currently for objects formed under a 'retain' strategy on Inker
%% compaction

View file

@ -51,6 +51,8 @@
book_put/5,
book_put/6,
book_tempput/7,
book_mput/2,
book_mput/3,
book_delete/4,
book_get/3,
book_get/4,
@ -60,8 +62,10 @@
book_snapshot/4,
book_compactjournal/2,
book_islastcompactionpending/1,
book_trimjournal/1,
book_close/1,
book_destroy/1]).
book_destroy/1,
book_isempty/2]).
-export([get_opt/2,
get_opt/3,
@ -85,6 +89,7 @@
-define(COMPRESSION_POINT, on_receipt).
-define(TIMING_SAMPLESIZE, 100).
-define(TIMING_SAMPLECOUNTDOWN, 10000).
-define(DUMMY, dummy). % Dummy key used for mput operations
-record(ledger_cache, {mem :: ets:tab(),
loader = leveled_tree:empty(?CACHE_TYPE)
@ -97,10 +102,14 @@
-record(state, {inker :: pid() | undefined,
penciller :: pid() | undefined,
cache_size :: integer() | undefined,
recent_aae :: false | #recent_aae{} | undefined,
recent_aae :: recent_aae(),
ledger_cache = #ledger_cache{},
is_snapshot :: boolean() | undefined,
slow_offer = false :: boolean(),
head_only = false :: boolean(),
head_lookup = true :: boolean(),
put_countdown = 0 :: integer(),
get_countdown = 0 :: integer(),
fold_countdown = 0 :: integer(),
@ -137,6 +146,7 @@
-type fold_timings() :: no_timing|#fold_timings{}.
-type head_timings() :: no_timing|#head_timings{}.
-type timing_types() :: head|get|put|fold.
-type recent_aae() :: false|#recent_aae{}|undefined.
%%%============================================================================
%%% API
@ -310,6 +320,32 @@ book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) ->
infinity).
-spec book_mput(pid(), list(tuple())) -> ok|pause.
%% @doc
%%
%% When the store is being run in head_only mode, batches fo object specs may
%% be inserted in to the store using book_mput/2. ObjectSpecs should be
%% of the form {ObjectOp, Bucket, Key, SubKey, Value}. The Value will be
%% stored within the HEAD of the object (in the Ledger), so the full object
%% is retrievable using a HEAD request. The ObjectOp is either add or remove.
%%
%% The list should be de-duplicated before it is passed to the bookie.
book_mput(Pid, ObjectSpecs) ->
book_mput(Pid, ObjectSpecs, infinity).
-spec book_mput(pid(), list(tuple()), infinity|integer()) -> ok|pause.
%% @doc
%%
%% When the store is being run in head_only mode, batches fo object specs may
%% be inserted in to the store using book_mput/2. ObjectSpecs should be
%% of the form {action, Bucket, Key, SubKey, Value}. The Value will be
%% stored within the HEAD of the object (in the Ledger), so the full object
%% is retrievable using a HEAD request.
%%
%% The list should be de-duplicated before it is passed to the bookie.
book_mput(Pid, ObjectSpecs, TTL) ->
gen_server:call(Pid, {mput, ObjectSpecs, TTL}, infinity).
-spec book_delete(pid(), any(), any(), list()) -> ok|pause.
%% @doc
@ -419,6 +455,7 @@ book_snapshot(Pid, SnapType, Query, LongRunning) ->
-spec book_compactjournal(pid(), integer()) -> ok.
-spec book_islastcompactionpending(pid()) -> boolean().
-spec book_trimjournal(pid()) -> ok.
%% @doc Call for compaction of the Journal
%%
@ -433,6 +470,13 @@ book_compactjournal(Pid, Timeout) ->
book_islastcompactionpending(Pid) ->
gen_server:call(Pid, confirm_compact, infinity).
%% @doc Trim the journal when in head_only mode
%%
%% In head_only mode the journlacna be trimmed of entries which are before the
%% persisted SQN. This is much quicker than compacting the journal
book_trimjournal(Pid) ->
gen_server:call(Pid, trim, infinity).
-spec book_close(pid()) -> ok.
-spec book_destroy(pid()) -> ok.
@ -441,15 +485,24 @@ book_islastcompactionpending(Pid) ->
%%
%% A clean shutdown will persist all the information in the Penciller memory
%% before closing, so shutdown is not instantaneous.
book_close(Pid) ->
gen_server:call(Pid, close, infinity).
%% @doc Close and clean-out files
book_destroy(Pid) ->
gen_server:call(Pid, destroy, infinity).
-spec book_isempty(pid(), atom()) -> boolean().
%% @doc
%% Confirm if the store is empty, or if it contains a Key and Value for a
%% given tag
book_isempty(Pid, Tag) ->
FoldAccT = {fun(_B, _Acc) -> false end, true},
{async, Runner} =
gen_server:call(Pid, {return_runner, {first_bucket, Tag, FoldAccT}}),
Runner().
%%%============================================================================
%%% gen_server callbacks
%%%============================================================================
@ -474,18 +527,31 @@ init([Opts]) ->
limit_minutes = LimitMinutes,
unit_minutes = UnitMinutes}
end,
{HeadOnly, HeadLookup} =
case get_opt(head_only, Opts, false) of
false ->
{false, true};
with_lookup ->
{true, true};
no_lookup ->
{true, false}
end,
State0 = #state{cache_size=CacheSize,
recent_aae=RecentAAE,
is_snapshot=false,
head_only=HeadOnly,
head_lookup = HeadLookup},
{Inker, Penciller} =
startup(InkerOpts, PencillerOpts, RecentAAE),
startup(InkerOpts, PencillerOpts, State0),
NewETS = ets:new(mem, [ordered_set]),
leveled_log:log("B0001", [Inker, Penciller]),
{ok, #state{inker=Inker,
penciller=Penciller,
cache_size=CacheSize,
recent_aae=RecentAAE,
ledger_cache=#ledger_cache{mem = NewETS},
is_snapshot=false}};
{ok, State0#state{inker=Inker,
penciller=Penciller,
ledger_cache=#ledger_cache{mem = NewETS}}};
Bookie ->
{ok, Penciller, Inker} =
book_snapshot(Bookie, store, undefined, true),
@ -496,7 +562,8 @@ init([Opts]) ->
end.
handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) ->
handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State)
when State#state.head_only == false ->
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
SW0 = os:timestamp(),
{ok, SQN, ObjSize} = leveled_inker:ink_put(State#state.inker,
@ -511,7 +578,7 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) ->
Object,
ObjSize,
{IndexSpecs, TTL},
State#state.recent_aae),
State),
Cache0 = addto_ledgercache(Changes, State#state.ledger_cache),
{_SW2, Timings2} = update_timings(SW1, {put, mem}, Timings1),
@ -541,7 +608,34 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) ->
put_countdown = CountDown,
slow_offer = true}}
end;
handle_call({get, Bucket, Key, Tag}, _From, State) ->
handle_call({mput, ObjectSpecs, TTL}, From, State)
when State#state.head_only == true ->
{ok, SQN} =
leveled_inker:ink_mput(State#state.inker, dummy, {ObjectSpecs, TTL}),
Changes =
preparefor_ledgercache(?INKT_MPUT, ?DUMMY,
SQN, null, length(ObjectSpecs),
{ObjectSpecs, TTL},
State),
Cache0 = addto_ledgercache(Changes, State#state.ledger_cache),
case State#state.slow_offer of
true ->
gen_server:reply(From, pause);
false ->
gen_server:reply(From, ok)
end,
case maybepush_ledgercache(State#state.cache_size,
Cache0,
State#state.penciller) of
{ok, NewCache} ->
{noreply, State#state{ledger_cache = NewCache,
slow_offer = false}};
{returned, NewCache} ->
{noreply, State#state{ledger_cache = NewCache,
slow_offer = true}}
end;
handle_call({get, Bucket, Key, Tag}, _From, State)
when State#state.head_only == false ->
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
SWh = os:timestamp(),
HeadResult =
@ -586,7 +680,8 @@ handle_call({get, Bucket, Key, Tag}, _From, State) ->
update_statetimings(get, Timings2, State#state.get_countdown),
{reply, Reply, State#state{get_timings = Timings,
get_countdown = CountDown}};
handle_call({head, Bucket, Key, Tag}, _From, State) ->
handle_call({head, Bucket, Key, Tag}, _From, State)
when State#state.head_lookup == true ->
SWp = os:timestamp(),
LK = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
case fetch_head(LK, State#state.penciller, State#state.ledger_cache) of
@ -634,17 +729,29 @@ handle_call({return_runner, QueryType}, _From, State) ->
update_statetimings(fold, Timings1, State#state.fold_countdown),
{reply, Runner, State#state{fold_timings = Timings,
fold_countdown = CountDown}};
handle_call({compact_journal, Timeout}, _From, State) ->
handle_call({compact_journal, Timeout}, _From, State)
when State#state.head_only == false ->
ok = leveled_inker:ink_compactjournal(State#state.inker,
self(),
Timeout),
{reply, ok, State};
handle_call(confirm_compact, _From, State) ->
handle_call(confirm_compact, _From, State)
when State#state.head_only == false ->
{reply, leveled_inker:ink_compactionpending(State#state.inker), State};
handle_call(trim, _From, State) when State#state.head_only == true ->
PSQN = leveled_penciller:pcl_persistedsqn(State#state.penciller),
{reply, leveled_inker:ink_trim(State#state.inker, PSQN), State};
handle_call(close, _From, State) ->
{stop, normal, ok, State};
handle_call(destroy, _From, State=#state{is_snapshot=Snp}) when Snp == false ->
{stop, destroy, ok, State}.
leveled_log:log("B0011", []),
{ok, InkPathList} = leveled_inker:ink_doom(State#state.inker),
{ok, PCLPathList} = leveled_penciller:pcl_doom(State#state.penciller),
lists:foreach(fun(DirPath) -> delete_path(DirPath) end, InkPathList),
lists:foreach(fun(DirPath) -> delete_path(DirPath) end, PCLPathList),
{stop, normal, ok, State};
handle_call(Msg, _From, State) ->
{reply, {unsupported_message, element(1, Msg)}, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
@ -652,17 +759,22 @@ handle_cast(_Msg, State) ->
handle_info(_Info, State) ->
{noreply, State}.
terminate(destroy, State) ->
leveled_log:log("B0011", []),
{ok, InkPathList} = leveled_inker:ink_doom(State#state.inker),
{ok, PCLPathList} = leveled_penciller:pcl_doom(State#state.penciller),
lists:foreach(fun(DirPath) -> delete_path(DirPath) end, InkPathList),
lists:foreach(fun(DirPath) -> delete_path(DirPath) end, PCLPathList),
ok;
terminate(Reason, State) ->
leveled_log:log("B0003", [Reason]),
ok = leveled_inker:ink_close(State#state.inker),
ok = leveled_penciller:pcl_close(State#state.penciller).
ok =
case is_process_alive(State#state.inker) of
true ->
leveled_inker:ink_close(State#state.inker);
false ->
ok
end,
ok =
case is_process_alive(State#state.penciller) of
true ->
leveled_penciller:pcl_close(State#state.penciller);
false ->
ok
end.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@ -838,16 +950,36 @@ get_runner(State,
{foldobjects_allkeys, Tag, FoldFun, SnapPreFold, sqn_order}) ->
SnapFun = return_snapfun(State, store, undefined, true, SnapPreFold),
leveled_runner:foldobjects_allkeys(SnapFun, Tag, FoldFun, sqn_order);
get_runner(State,
{foldheads_bybucket,
Tag,
BucketList, bucket_list,
FoldFun,
JournalCheck, SnapPreFold, SegmentList}) ->
KeyRangeFun =
fun(Bucket) ->
{StartKey, EndKey, _} = return_ledger_keyrange(Tag, Bucket, all),
{StartKey, EndKey}
end,
SnapType = snaptype_by_presence(JournalCheck),
SnapFun = return_snapfun(State, SnapType, no_lookup, true, SnapPreFold),
leveled_runner:foldheads_bybucket(SnapFun,
Tag,
lists:map(KeyRangeFun, BucketList),
FoldFun,
JournalCheck, SegmentList);
get_runner(State,
{foldheads_bybucket,
Tag, Bucket, KeyRange,
Tag,
Bucket, KeyRange,
FoldFun,
JournalCheck, SnapPreFold, SegmentList}) ->
{StartKey, EndKey, SnapQ} = return_ledger_keyrange(Tag, Bucket, KeyRange),
SnapType = snaptype_by_presence(JournalCheck),
SnapFun = return_snapfun(State, SnapType, SnapQ, true, SnapPreFold),
leveled_runner:foldheads_bybucket(SnapFun,
{Tag, StartKey, EndKey},
Tag,
[{StartKey, EndKey}],
FoldFun,
JournalCheck, SegmentList);
get_runner(State,
@ -858,7 +990,8 @@ get_runner(State,
{StartKey, EndKey, SnapQ} = return_ledger_keyrange(Tag, Bucket, KeyRange),
SnapFun = return_snapfun(State, store, SnapQ, true, SnapPreFold),
leveled_runner:foldobjects_bybucket(SnapFun,
{Tag, StartKey, EndKey},
Tag,
[{StartKey, EndKey}],
FoldFun);
get_runner(State,
{foldobjects_byindex,
@ -869,7 +1002,14 @@ get_runner(State,
leveled_runner:foldobjects_byindex(SnapFun,
{Tag, Bucket, Field, FromTerm, ToTerm},
FoldObjectsFun);
get_runner(State, {binary_bucketlist, Tag, FoldAccT}) ->
{FoldBucketsFun, Acc} = FoldAccT,
SnapFun = return_snapfun(State, ledger, no_lookup, false, false),
leveled_runner:binary_bucketlist(SnapFun, Tag, FoldBucketsFun, Acc);
get_runner(State, {first_bucket, Tag, FoldAccT}) ->
{FoldBucketsFun, Acc} = FoldAccT,
SnapFun = return_snapfun(State, ledger, no_lookup, false, false),
leveled_runner:binary_bucketlist(SnapFun, Tag, FoldBucketsFun, Acc, 1);
%% Set of specific runners, primarily used as exmaples for tests
get_runner(State, DeprecatedQuery) ->
get_deprecatedrunner(State, DeprecatedQuery).
@ -878,7 +1018,7 @@ get_runner(State, DeprecatedQuery) ->
-spec get_deprecatedrunner(book_state(), tuple()) -> {async, fun()}.
%% @doc
%% Get an {async, Runner} for a given fold type. Fold types have different
%% tuple inputs. These folds are currentyl used in tests, but are deprecated.
%% tuple inputs. These folds are currently used in tests, but are deprecated.
%% Most of these folds should be achievable through other available folds.
get_deprecatedrunner(State, {bucket_stats, Bucket}) ->
SnapFun = return_snapfun(State, ledger, no_lookup, true, true),
@ -886,10 +1026,6 @@ get_deprecatedrunner(State, {bucket_stats, Bucket}) ->
get_deprecatedrunner(State, {riakbucket_stats, Bucket}) ->
SnapFun = return_snapfun(State, ledger, no_lookup, true, true),
leveled_runner:bucket_sizestats(SnapFun, Bucket, ?RIAK_TAG);
get_deprecatedrunner(State, {binary_bucketlist, Tag, FoldAccT}) ->
{FoldKeysFun, Acc} = FoldAccT,
SnapFun = return_snapfun(State, ledger, no_lookup, false, false),
leveled_runner:binary_bucketlist(SnapFun, Tag, FoldKeysFun, Acc);
get_deprecatedrunner(State, {hashlist_query, Tag, JournalCheck}) ->
SnapType = snaptype_by_presence(JournalCheck),
SnapFun = return_snapfun(State, SnapType, no_lookup, true, true),
@ -919,7 +1055,7 @@ get_deprecatedrunner(State,
PartitionFilter).
-spec return_ledger_keyrange(atom(), any(), tuple()) ->
-spec return_ledger_keyrange(atom(), any(), tuple()|all) ->
{tuple(), tuple(), tuple()|no_lookup}.
%% @doc
%% Convert a range of binary keys into a ledger key range, returning
@ -950,7 +1086,6 @@ return_ledger_keyrange(Tag, Bucket, KeyRange) ->
end,
{StartKey, EndKey, SnapQuery}.
maybe_longrunning(SW, Aspect) ->
case timer:now_diff(os:timestamp(), SW) of
@ -1081,14 +1216,14 @@ set_options(Opts) ->
levelzero_cointoss = true,
compression_method = CompressionMethod}}.
startup(InkerOpts, PencillerOpts, RecentAAE) ->
startup(InkerOpts, PencillerOpts, State) ->
{ok, Inker} = leveled_inker:ink_start(InkerOpts),
{ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts),
LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller),
leveled_log:log("B0005", [LedgerSQN]),
ok = leveled_inker:ink_loadpcl(Inker,
LedgerSQN + 1,
get_loadfun(RecentAAE),
get_loadfun(State),
Penciller),
{Inker, Penciller}.
@ -1118,22 +1253,34 @@ fetch_head(Key, Penciller, LedgerCache) ->
end.
-spec preparefor_ledgercache(atom(), any(), integer(), any(),
integer(), tuple(), book_state()) ->
{integer()|no_lookup, integer(), list()}.
%% @doc
%% Prepare an object and its related key changes for addition to the Ledger
%% via the Ledger Cache.
preparefor_ledgercache(?INKT_MPUT,
?DUMMY, SQN, _O, _S, {ObjSpecs, TTL},
_State) ->
ObjChanges = leveled_codec:obj_objectspecs(ObjSpecs, SQN, TTL),
{no_lookup, SQN, ObjChanges};
preparefor_ledgercache(?INKT_KEYD,
LedgerKey, SQN, _Obj, _Size, {IdxSpecs, TTL},
_AAE) ->
_State) ->
{Bucket, Key} = leveled_codec:from_ledgerkey(LedgerKey),
KeyChanges =
leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL),
{no_lookup, SQN, KeyChanges};
preparefor_ledgercache(_InkTag,
LedgerKey, SQN, Obj, Size, {IdxSpecs, TTL},
AAE) ->
State) ->
{Bucket, Key, MetaValue, {KeyH, ObjH}, LastMods} =
leveled_codec:generate_ledgerkv(LedgerKey, SQN, Obj, Size, TTL),
KeyChanges =
[{LedgerKey, MetaValue}] ++
leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL) ++
leveled_codec:aae_indexspecs(AAE, Bucket, Key, SQN, ObjH, LastMods),
leveled_codec:aae_indexspecs(State#state.recent_aae,
Bucket, Key, SQN, ObjH, LastMods),
{KeyH, SQN, KeyChanges}.
@ -1192,10 +1339,10 @@ maybe_withjitter(CacheSize, MaxCacheSize) ->
end.
get_loadfun(RecentAAE) ->
get_loadfun(State) ->
PrepareFun =
fun(Tag, PK, SQN, Obj, VS, IdxSpecs) ->
preparefor_ledgercache(Tag, PK, SQN, Obj, VS, IdxSpecs, RecentAAE)
preparefor_ledgercache(Tag, PK, SQN, Obj, VS, IdxSpecs, State)
end,
LoadFun =
fun(KeyInJournal, ValueInJournal, _Pos, Acc0, ExtractFun) ->
@ -1754,7 +1901,7 @@ foldobjects_vs_foldheads_bybucket_testto() ->
{foldheads_bybucket,
?STD_TAG,
"BucketB",
{"Key", "Key4zzzz"},
{"Key", "Key4|"},
FoldHeadsFun,
true, false, false}),
KeyHashList2E = HTFolder2E(),
@ -1763,13 +1910,17 @@ foldobjects_vs_foldheads_bybucket_testto() ->
{foldheads_bybucket,
?STD_TAG,
"BucketB",
{"Key5", <<"all">>},
{"Key5", "Key|"},
FoldHeadsFun,
true, false, false}),
KeyHashList2F = HTFolder2F(),
?assertMatch(true, length(KeyHashList2E) > 0),
?assertMatch(true, length(KeyHashList2F) > 0),
io:format("Length of 2B ~w 2E ~w 2F ~w~n",
[length(KeyHashList2B),
length(KeyHashList2E),
length(KeyHashList2F)]),
?assertMatch(true,
lists:usort(KeyHashList2B) ==
lists:usort(KeyHashList2E ++ KeyHashList2F)),
@ -1777,6 +1928,21 @@ foldobjects_vs_foldheads_bybucket_testto() ->
ok = book_close(Bookie1),
reset_filestructure().
is_empty_test() ->
RootPath = reset_filestructure(),
{ok, Bookie1} = book_start([{root_path, RootPath},
{max_journalsize, 1000000},
{cache_size, 500}]),
% Put in an object with a TTL in the future
Future = leveled_codec:integer_now() + 300,
?assertMatch(true, leveled_bookie:book_isempty(Bookie1, ?STD_TAG)),
ok = book_tempput(Bookie1,
<<"B">>, <<"K">>, {value, <<"V">>}, [],
?STD_TAG, Future),
?assertMatch(false, leveled_bookie:book_isempty(Bookie1, ?STD_TAG)),
?assertMatch(true, leveled_bookie:book_isempty(Bookie1, ?RIAK_TAG)),
ok = leveled_bookie:book_close(Bookie1).
scan_table_test() ->
K1 = leveled_codec:to_ledgerkey(<<"B1">>,

View file

@ -62,6 +62,7 @@
get_size/2,
get_keyandobjhash/2,
idx_indexspecs/5,
obj_objectspecs/3,
aae_indexspecs/6,
generate_uuid/0,
integer_now/0,
@ -91,12 +92,17 @@
%% speed can be gained if just the segment ID is known - but more can be
%% gained should the extended hash (with the second element) is known
segment_hash(Key) when is_binary(Key) ->
<<SegmentID:16/integer, ExtraHash:32/integer, _Rest/binary>> =
crypto:hash(md5, Key),
{segment_hash, SegmentID, ExtraHash} = leveled_tictac:keyto_segment48(Key),
{SegmentID, ExtraHash};
segment_hash({?RIAK_TAG, Bucket, Key, null})
when is_binary(Bucket), is_binary(Key) ->
segment_hash(<<Bucket/binary, Key/binary>>);
segment_hash({?HEAD_TAG, Bucket, Key, SubK})
when is_binary(Bucket), is_binary(Key), is_binary(SubK) ->
segment_hash(<<Bucket/binary, Key/binary, SubK/binary>>);
segment_hash({?HEAD_TAG, Bucket, Key, _SubK})
when is_binary(Bucket), is_binary(Key) ->
segment_hash(<<Bucket/binary, Key/binary>>);
segment_hash(Key) ->
segment_hash(term_to_binary(Key)).
@ -222,15 +228,18 @@ from_ledgerkey({?IDX_TAG, ?ALL_BUCKETS, {_IdxFld, IdxVal}, {Bucket, Key}}) ->
{Bucket, Key, IdxVal};
from_ledgerkey({?IDX_TAG, Bucket, {_IdxFld, IdxVal}, Key}) ->
{Bucket, Key, IdxVal};
from_ledgerkey({_Tag, Bucket, Key, null}) ->
from_ledgerkey({_Tag, Bucket, Key, _SubKey}) ->
{Bucket, Key}.
to_ledgerkey(Bucket, Key, Tag, Field, Value) when Tag == ?IDX_TAG ->
{?IDX_TAG, Bucket, {Field, Value}, Key}.
to_ledgerkey(Bucket, {Key, SubKey}, ?HEAD_TAG) ->
{?HEAD_TAG, Bucket, Key, SubKey};
to_ledgerkey(Bucket, Key, Tag) ->
{Tag, Bucket, Key, null}.
%% Return the Key, Value and Hash Option for this object. The hash option
%% indicates whether the key would ever be looked up directly, and so if it
%% requires an entry in the hash table
@ -404,6 +413,8 @@ split_inkvalue(VBin) when is_binary(VBin) ->
check_forinkertype(_LedgerKey, delete) ->
?INKT_TOMB;
check_forinkertype(_LedgerKey, head_only) ->
?INKT_MPUT;
check_forinkertype(_LedgerKey, _Object) ->
?INKT_STND.
@ -424,6 +435,14 @@ endkey_passed({EK1, EK2, EK3, null}, {CK1, CK2, CK3, _}) ->
endkey_passed(EndKey, CheckingKey) ->
EndKey < CheckingKey.
obj_objectspecs(ObjectSpecs, SQN, TTL) ->
lists:map(fun({IdxOp, Bucket, Key, SubKey, Value}) ->
gen_headspec(Bucket, Key, IdxOp, SubKey, Value, SQN, TTL)
end,
ObjectSpecs).
idx_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) ->
lists:map(
fun({IdxOp, IdxFld, IdxTrm}) ->
@ -433,14 +452,7 @@ idx_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) ->
).
gen_indexspec(Bucket, Key, IdxOp, IdxField, IdxTerm, SQN, TTL) ->
Status =
case IdxOp of
add ->
{active, TTL};
remove ->
%% TODO: timestamps for delayed reaping
tomb
end,
Status = set_status(IdxOp, TTL),
case Bucket of
{all, RealBucket} ->
{to_ledgerkey(?ALL_BUCKETS,
@ -458,6 +470,18 @@ gen_indexspec(Bucket, Key, IdxOp, IdxField, IdxTerm, SQN, TTL) ->
{SQN, Status, no_lookup, null}}
end.
gen_headspec(Bucket, Key, IdxOp, SubKey, Value, SQN, TTL) ->
Status = set_status(IdxOp, TTL),
K = to_ledgerkey(Bucket, {Key, SubKey}, ?HEAD_TAG),
{K, {SQN, Status, segment_hash(K), Value}}.
set_status(add, TTL) ->
{active, TTL};
set_status(remove, _TTL) ->
%% TODO: timestamps for delayed reaping
tomb.
-spec aae_indexspecs(false|recent_aae(),
any(), any(),
integer(), integer(),
@ -641,12 +665,14 @@ get_objhash(Tag, ObjMetaData) ->
build_metadata_object(PrimaryKey, MD) ->
{Tag, _Bucket, _Key, null} = PrimaryKey,
{Tag, _Bucket, _Key, _SubKey} = PrimaryKey,
case Tag of
?RIAK_TAG ->
{SibData, Vclock, _Hash, _Size} = MD,
riak_metadata_to_binary(Vclock, SibData);
?STD_TAG ->
MD;
?HEAD_TAG ->
MD
end.
@ -936,5 +962,13 @@ delayedupdate_aaeidx_test() ->
AAESpecs = aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods),
?assertMatch(0, length(AAESpecs)).
head_segment_compare_test() ->
% Reminder to align native and parallel(leveled_ko) key stores for
% kv_index_tictactree
H1 = segment_hash({?HEAD_TAG, <<"B1">>, <<"K1">>, null}),
H2 = segment_hash({?RIAK_TAG, <<"B1">>, <<"K1">>, null}),
H3 = segment_hash({?HEAD_TAG, <<"B1">>, <<"K1">>, <<>>}),
?assertMatch(H1, H2),
?assertMatch(H1, H3).
-endif.

View file

@ -82,6 +82,7 @@
clerk_new/1,
clerk_compact/7,
clerk_hashtablecalc/3,
clerk_trim/3,
clerk_stop/1,
code_change/3]).
@ -144,6 +145,12 @@ clerk_compact(Pid, Checker, InitiateFun, CloseFun, FilterFun, Inker, TimeO) ->
Inker,
TimeO}).
-spec clerk_trim(pid(), pid(), integer()) -> ok.
%% @doc
%% Trim the Inker back to the persisted SQN
clerk_trim(Pid, Inker, PersistedSQN) ->
gen_server:cast(Pid, {trim, Inker, PersistedSQN}).
-spec clerk_hashtablecalc(ets:tid(), integer(), pid()) -> ok.
%% @doc
%% Spawn a dedicated clerk for the process of calculating the binary view
@ -235,6 +242,12 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO},
ok = CloseFun(FilterServer),
{noreply, State}
end;
handle_cast({trim, Inker, PersistedSQN}, State) ->
ManifestAsList = leveled_inker:ink_getmanifest(Inker),
FilesToDelete =
leveled_imanifest:find_persistedentries(PersistedSQN, ManifestAsList),
ok = update_inker(Inker, [], FilesToDelete),
{noreply, State};
handle_cast({hashtable_calc, HashTree, StartPos, CDBpid}, State) ->
{IndexList, HashTreeBin} = leveled_cdb:hashtable_calc(HashTree, StartPos),
ok = leveled_cdb:cdb_returnhashtable(CDBpid, IndexList, HashTreeBin),
@ -527,7 +540,7 @@ update_inker(Inker, ManifestSlice, FilesToDelete) ->
Inker)
end,
FilesToDelete),
ok.
ok.
compact_files(BestRun, CDBopts, FilterFun, FilterServer,
MaxSQN, RStrategy, PressMethod) ->

View file

@ -14,6 +14,7 @@
append_lastkey/3,
remove_entry/2,
find_entry/2,
find_persistedentries/2,
head_entry/1,
to_list/1,
from_list/1,
@ -21,7 +22,6 @@
writer/3,
printer/1,
complete_filex/0
]).
-define(MANIFEST_FILEX, "man").
@ -106,9 +106,26 @@ find_entry(SQN, [{SQNMarker, SubL}|_Tail]) when SQN >= SQNMarker ->
find_entry(SQN, [_TopEntry|Tail]) ->
find_entry(SQN, Tail).
-spec find_persistedentries(integer(), list()) -> list(manifest_entry()).
%% @doc
%% Find the entries in the manifest where all items are < than the persisted
%% SQN in the ledger
find_persistedentries(SQN, ManifestAsList) ->
DropFun =
fun({ME_SQN, _FN, _ME_P, _LK}) ->
ME_SQN > SQN
end,
Entries = lists:dropwhile(DropFun, ManifestAsList),
case Entries of
[_Head|Tail] ->
Tail;
[] ->
[]
end.
-spec head_entry(manifest()) -> manifest_entry().
%% @doc
%% Return the head manifets entry (the most recent journal)
%% Return the head manifest entry (the most recent journal)
head_entry(Manifest) ->
[{_SQNMarker, SQNL}|_Tail] = Manifest,
[HeadEntry|_SQNL_Tail] = SQNL,
@ -239,6 +256,19 @@ buildfromend_test() ->
test_testmanifest(Man0),
?assertMatch(ManL, to_list(Man0)).
findpersisted_test() ->
Man = from_list(build_testmanifest_aslist()),
FilesToDelete1 = find_persistedentries(2001, to_list(Man)),
?assertMatch(2, length(FilesToDelete1)),
FilesToDelete2 = find_persistedentries(3000, to_list(Man)),
?assertMatch(3, length(FilesToDelete2)),
FilesToDelete3 = find_persistedentries(2999, to_list(Man)),
?assertMatch(2, length(FilesToDelete3)),
FilesToDelete4 = find_persistedentries(999, to_list(Man)),
?assertMatch([], FilesToDelete4),
FilesToDelete5 = find_persistedentries(0, to_list(Man)),
?assertMatch([], FilesToDelete5).
buildrandomfashion_test() ->
ManL0 = build_testmanifest_aslist(),
RandMapFun =

View file

@ -95,6 +95,7 @@
code_change/3,
ink_start/1,
ink_put/4,
ink_mput/3,
ink_get/3,
ink_fetch/3,
ink_keycheck/3,
@ -105,6 +106,7 @@
ink_compactjournal/3,
ink_compactioncomplete/1,
ink_compactionpending/1,
ink_trim/2,
ink_getmanifest/1,
ink_updatemanifest/3,
ink_printmanifest/1,
@ -185,6 +187,16 @@ ink_start(InkerOpts) ->
ink_put(Pid, PrimaryKey, Object, KeyChanges) ->
gen_server:call(Pid, {put, PrimaryKey, Object, KeyChanges}, infinity).
-spec ink_mput(pid(), any(), {list(), integer()|infinity}) -> {ok, integer()}.
%% @doc
%% MPUT as series of object specifications, which will be converted into
%% objects in the Ledger. This should only be used when the Bookie is
%% running in head_only mode. The journal entries arekept only for handling
%% consistency on startup
ink_mput(Pid, PrimaryKey, ObjectChanges) ->
gen_server:call(Pid, {mput, PrimaryKey, ObjectChanges}, infinity).
-spec ink_get(pid(),
{atom(), any(), any(), any()}|string(),
integer()) ->
@ -361,10 +373,17 @@ ink_compactioncomplete(Pid) ->
-spec ink_compactionpending(pid()) -> boolean().
%% @doc
%% Is there ongoing compaction work? No compaction work should be initiated
%5 if there is already some compaction work ongoing.
%% if there is already some compaction work ongoing.
ink_compactionpending(Pid) ->
gen_server:call(Pid, compaction_pending, infinity).
-spec ink_trim(pid(), integer()) -> ok.
%% @doc
%% Trim the Journal to just those files that contain entries since the
%% Penciller's persisted SQN
ink_trim(Pid, PersistedSQN) ->
gen_server:call(Pid, {trim, PersistedSQN}, infinity).
-spec ink_getmanifest(pid()) -> list().
%% @doc
%% Allows the clerk to fetch the manifest at the point it starts a compaction
@ -420,6 +439,11 @@ handle_call({put, Key, Object, KeyChanges}, _From, State) ->
{_, UpdState, ObjSize} ->
{reply, {ok, UpdState#state.journal_sqn, ObjSize}, UpdState}
end;
handle_call({mput, Key, ObjChanges}, _From, State) ->
case put_object(Key, head_only, ObjChanges, State) of
{_, UpdState, _ObjSize} ->
{reply, {ok, UpdState#state.journal_sqn}, UpdState}
end;
handle_call({fetch, Key, SQN}, _From, State) ->
case get_object(Key, SQN, State#state.manifest, true) of
{{SQN, Key}, {Value, _IndexSpecs}} ->
@ -503,6 +527,9 @@ handle_call(compaction_complete, _From, State) ->
{reply, ok, State#state{compaction_pending=false}};
handle_call(compaction_pending, _From, State) ->
{reply, State#state.compaction_pending, State};
handle_call({trim, PersistedSQN}, _From, State) ->
ok = leveled_iclerk:clerk_trim(State#state.clerk, self(), PersistedSQN),
{reply, ok, State};
handle_call(close, _From, State) ->
{stop, normal, ok, State};
handle_call(doom, _From, State) ->

View file

@ -189,7 +189,8 @@
pcl_registersnapshot/5,
pcl_getstartupsequencenumber/1,
pcl_checkbloomtest/2,
pcl_checkforwork/1]).
pcl_checkforwork/1,
pcl_persistedsqn/1]).
-export([
sst_rootpath/1,
@ -504,6 +505,14 @@ pcl_registersnapshot(Pid, Snapshot, Query, BookiesMem, LR) ->
pcl_releasesnapshot(Pid, Snapshot) ->
gen_server:cast(Pid, {release_snapshot, Snapshot}).
-spec pcl_persistedsqn(pid()) -> integer().
%% @doc
%% Return the persisted SQN, the highest SQN which has been persisted into the
%% Ledger
pcl_persistedsqn(Pid) ->
gen_server:call(Pid, persisted_sqn, infinity).
-spec pcl_close(pid()) -> ok.
%% @doc
%% Close the penciller neatly, trying to persist to disk anything in the memory
@ -668,7 +677,7 @@ handle_call({fetch_keys,
{AccFun, InitAcc},
{SegmentList, MaxKeys}),
{reply, Acc, State#state{levelzero_astree = L0AsList}};
{reply, Acc, State};
handle_call(get_startup_sqn, _From, State) ->
{reply, State#state.persisted_sqn, State};
handle_call({register_snapshot, Snapshot, Query, BookiesMem, LR}, _From, State) ->
@ -781,7 +790,9 @@ handle_call({checkbloom_fortest, Key, Hash}, _From, State) ->
handle_call(check_for_work, _From, State) ->
{_WL, WC} = leveled_pmanifest:check_for_work(State#state.manifest,
?LEVEL_SCALEFACTOR),
{reply, WC > 0, State}.
{reply, WC > 0, State};
handle_call(persisted_sqn, _From, State) ->
{reply, State#state.persisted_sqn, State}.
handle_cast({manifest_change, NewManifest}, State) ->
NewManSQN = leveled_pmanifest:get_manifest_sqn(NewManifest),
@ -1262,15 +1273,6 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator},
{SegmentList, MaxKeys}) ->
{StartKey, EndKey} = KeyRange,
case {IMMKey < StartKey, leveled_codec:endkey_passed(EndKey, IMMKey)} of
{true, _} ->
% Normally everything is pre-filterd, but the IMM iterator can
% be re-used and so may be behind the StartKey if the StartKey has
% advanced from the previous use
keyfolder({NxIMMiterator, SSTiterator},
KeyRange,
{AccFun, Acc},
{SegmentList, MaxKeys});
{false, true} ->
% There are no more keys in-range in the in-memory
% iterator, so take action as if this iterator is empty

View file

@ -24,14 +24,15 @@
-export([
bucket_sizestats/3,
binary_bucketlist/4,
binary_bucketlist/5,
index_query/3,
bucketkey_query/4,
hashlist_query/3,
tictactree/5,
foldheads_allkeys/5,
foldobjects_allkeys/4,
foldheads_bybucket/5,
foldobjects_bybucket/3,
foldheads_bybucket/6,
foldobjects_bybucket/4,
foldobjects_byindex/3
]).
@ -69,10 +70,20 @@ bucket_sizestats(SnapFun, Bucket, Tag) ->
%% @doc
%% List buckets for tag, assuming bucket names are all binary type
binary_bucketlist(SnapFun, Tag, FoldBucketsFun, InitAcc) ->
binary_bucketlist(SnapFun, Tag, FoldBucketsFun, InitAcc, -1).
-spec binary_bucketlist(fun(), atom(), fun(), any(), integer())
-> {async, fun()}.
%% @doc
%% set Max Buckets to -1 to list all buckets, otherwise will only return
%% MaxBuckets (use 1 to confirm that there exists any bucket for a given Tag)
binary_bucketlist(SnapFun, Tag, FoldBucketsFun, InitAcc, MaxBuckets) ->
Runner =
fun() ->
{ok, LedgerSnapshot, _JournalSnapshot} = SnapFun(),
BucketAcc = get_nextbucket(null, null, Tag, LedgerSnapshot, []),
BucketAcc =
get_nextbucket(null, null,
Tag, LedgerSnapshot, [], {0, MaxBuckets}),
ok = leveled_penciller:pcl_close(LedgerSnapshot),
lists:foldl(fun({B, _K}, Acc) -> FoldBucketsFun(B, Acc) end,
InitAcc,
@ -222,9 +233,11 @@ foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck, SegmentList) ->
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
foldobjects(SnapFun,
Tag, StartKey, EndKey,
Tag,
[{StartKey, EndKey}],
FoldFun,
{true, JournalCheck}, SegmentList).
{true, JournalCheck},
SegmentList).
-spec foldobjects_allkeys(fun(), atom(), fun(), key_order|sqn_order)
-> {async, fun()}.
@ -234,9 +247,11 @@ foldobjects_allkeys(SnapFun, Tag, FoldFun, key_order) ->
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
foldobjects(SnapFun,
Tag, StartKey, EndKey,
Tag,
[{StartKey, EndKey}],
FoldFun,
false, false);
false,
false);
foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) ->
% Fold over the journal in order of receipt
{FoldFun, InitAcc} =
@ -321,31 +336,37 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) ->
{async, Folder}.
-spec foldobjects_bybucket(fun(), {atom(), any(), any()}, fun()) ->
-spec foldobjects_bybucket(fun(), atom(), list({any(), any()}), fun()) ->
{async, fun()}.
%% @doc
%% Fold over all objects within a given key range in a bucket
foldobjects_bybucket(SnapFun, {Tag, StartKey, EndKey}, FoldFun) ->
foldobjects_bybucket(SnapFun, Tag, KeyRanges, FoldFun) ->
foldobjects(SnapFun,
Tag, StartKey, EndKey,
Tag,
KeyRanges,
FoldFun,
false, false).
false,
false).
-spec foldheads_bybucket(fun(),
{atom(), any(), any()},
atom(),
list({any(), any()}),
fun(),
boolean(), false|list(integer()))
-> {async, fun()}.
%% @doc
%% Fold over all object metadata within a given key range in a bucket
foldheads_bybucket(SnapFun,
{Tag, StartKey, EndKey},
Tag,
KeyRanges,
FoldFun,
JournalCheck, SegmentList) ->
foldobjects(SnapFun,
Tag, StartKey, EndKey,
Tag,
KeyRanges,
FoldFun,
{true, JournalCheck}, SegmentList).
{true, JournalCheck},
SegmentList).
-spec foldobjects_byindex(fun(), tuple(), fun()) -> {async, fun()}.
%% @doc
@ -357,9 +378,11 @@ foldobjects_byindex(SnapFun, {Tag, Bucket, Field, FromTerm, ToTerm}, FoldFun) ->
EndKey =
leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, ToTerm),
foldobjects(SnapFun,
Tag, StartKey, EndKey,
Tag,
[{StartKey, EndKey}],
FoldFun,
false, false).
false,
false).
@ -368,7 +391,9 @@ foldobjects_byindex(SnapFun, {Tag, Bucket, Field, FromTerm, ToTerm}, FoldFun) ->
%%% Internal functions
%%%============================================================================
get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList) ->
get_nextbucket(_NextB, _NextK, _Tag, _LS, BKList, {Limit, Limit}) ->
BKList;
get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList, {C, L}) ->
Now = leveled_codec:integer_now(),
StartKey = leveled_codec:to_ledgerkey(NextBucket, NextKey, Tag),
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
@ -393,13 +418,15 @@ get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList) ->
null,
Tag,
LedgerSnapshot,
[{B, K}|BKList]);
[{B, K}|BKList],
{C + 1, L});
false ->
get_nextbucket(B,
<<K/binary, 0>>,
Tag,
LedgerSnapshot,
BKList)
BKList,
{C, L})
end;
{NB, _V} ->
leveled_log:log("B0010",[NB]),
@ -407,7 +434,7 @@ get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList) ->
end.
-spec foldobjects(fun(), atom(), tuple(), tuple(), fun(),
-spec foldobjects(fun(), atom(), list(), fun(),
false|{true, boolean()}, false|list(integer())) ->
{async, fun()}.
%% @doc
@ -417,19 +444,16 @@ get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList) ->
%% will be created that if understood by the fold function will allow the fold
%% function to work on the head of the object, and defer fetching the body in
%% case such a fetch is unecessary.
foldobjects(SnapFun,
Tag, StartKey, EndKey,
FoldObjectsFun,
DeferredFetch, SegmentList) ->
foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, SegmentList) ->
{FoldFun, InitAcc} =
case is_tuple(FoldObjectsFun) of
case is_tuple(FoldObjFun) of
true ->
% FoldObjectsFun is already a tuple with a Fold function and an
% initial accumulator
FoldObjectsFun;
FoldObjFun;
false ->
% no initial accumulatr passed, and so should be just a list
{FoldObjectsFun, []}
{FoldObjFun, []}
end,
Folder =
@ -440,12 +464,17 @@ foldobjects(SnapFun,
JournalSnapshot,
Tag,
DeferredFetch),
Acc = leveled_penciller:pcl_fetchkeysbysegment(LedgerSnapshot,
StartKey,
EndKey,
AccFun,
InitAcc,
SegmentList),
ListFoldFun =
fun({StartKey, EndKey}, FoldAcc) ->
leveled_penciller:pcl_fetchkeysbysegment(LedgerSnapshot,
StartKey,
EndKey,
AccFun,
FoldAcc,
SegmentList)
end,
Acc = lists:foldl(ListFoldFun, InitAcc, KeyRanges),
ok = leveled_penciller:pcl_close(LedgerSnapshot),
case DeferredFetch of
{true, false} ->
@ -553,17 +582,19 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
SQN),
case InJournal of
probably ->
ProxyObj = make_proxy_object(LK, JK,
MD, V,
InkerClone),
ProxyObj =
make_proxy_object(Tag,
LK, JK, MD, V,
InkerClone),
FoldObjectsFun(B, K, ProxyObj, Acc);
missing ->
Acc
end;
{true, false} ->
ProxyObj = make_proxy_object(LK, JK,
MD, V,
InkerClone),
ProxyObj =
make_proxy_object(Tag,
LK, JK, MD, V,
InkerClone),
FoldObjectsFun(B, K, ProxyObj, Acc);
false ->
R = leveled_bookie:fetch_value(InkerClone, JK),
@ -581,7 +612,10 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
end,
AccFun.
make_proxy_object(LK, JK, MD, V, InkerClone) ->
make_proxy_object(?HEAD_TAG, _LK, _JK, MD, _V, _InkerClone) ->
MD;
make_proxy_object(_Tag, LK, JK, MD, V, InkerClone) ->
Size = leveled_codec:get_size(LK, V),
MDBin = leveled_codec:build_metadata_object(LK, MD),
term_to_binary({proxy_object,

View file

@ -319,6 +319,13 @@ sst_getkvrange(Pid, StartKey, EndKey, ScanWidth) ->
%%
%% To make the range open-ended (either to start, end or both) the all atom
%% can be used in place of the Key tuple.
%%
%% A segment list can also be passed, which inidcates a subset of segment
%% hashes of interest in the query.
%%
%% TODO: Optimise this so that passing a list of segments that tune to the
%% same hash is faster - perhaps provide an exportable function in
%% leveled_tictac
sst_getfilteredrange(Pid, StartKey, EndKey, ScanWidth, SegList) ->
SegList0 = tune_seglist(SegList),
case gen_fsm:sync_send_event(Pid,

View file

@ -69,23 +69,28 @@
import_tree/1,
valid_size/1,
keyto_segment32/1,
generate_segmentfilter_list/2
keyto_segment48/1,
generate_segmentfilter_list/2,
merge_binaries/2,
join_segment/2
]).
-include_lib("eunit/include/eunit.hrl").
-define(HASH_SIZE, 4).
-define(L2_CHUNKSIZE, 256).
-define(L2_BITSIZE, 8).
%% UNSUUPPORTED tree sizes for accelerated segment filtering
-define(XXSMALL, {6, 64, 64 * 64}).
-define(XSMALL, {7, 128, 128 * 128}).
-define(XXSMALL, 16).
-define(XSMALL, 64).
%% SUPPORTED tree sizes for accelerated segment filtering
-define(SMALL, {8, 256, 256 * 256}).
-define(MEDIUM, {9, 512, 512 * 512}).
-define(LARGE, {10, 1024, 1024 * 1024}).
-define(XLARGE, {11, 2048, 2048 * 2048}).
-define(SMALL, 256).
-define(MEDIUM, 1024).
-define(LARGE, 4096).
-define(XLARGE, 16384).
-define(EMPTY, <<0:8/integer>>).
@ -94,7 +99,6 @@
-record(tictactree, {treeID :: any(),
size :: xxsmall|xsmall|small|medium|large|xlarge,
width :: integer(),
bitwidth :: integer(),
segment_count :: integer(),
level1 :: binary(),
level2 :: any() % an array - but OTP compatibility
@ -119,15 +123,14 @@ new_tree(TreeID) ->
new_tree(TreeID, small).
new_tree(TreeID, Size) ->
{BitWidth, Width, SegmentCount} = get_size(Size),
Width = get_size(Size),
Lv1Width = Width * ?HASH_SIZE * 8,
Lv1Init = <<0:Lv1Width/integer>>,
Lv2Init = array:new([{size, Width}, {default, ?EMPTY}]),
#tictactree{treeID = TreeID,
size = Size,
width = Width,
bitwidth = BitWidth,
segment_count = SegmentCount,
segment_count = Width * ?L2_CHUNKSIZE,
level1 = Lv1Init,
level2 = Lv2Init}.
@ -156,12 +159,11 @@ import_tree(ExportedTree) ->
[{<<"level1">>, L1Base64},
{<<"level2">>, {struct, L2List}}]} = ExportedTree,
L1Bin = base64:decode(L1Base64),
Sizes =
lists:map(fun(SizeTag) -> {SizeTag, element(2, get_size(SizeTag))} end,
?VALID_SIZES),
Sizes = lists:map(fun(SizeTag) -> {SizeTag, get_size(SizeTag)} end,
?VALID_SIZES),
Width = byte_size(L1Bin) div ?HASH_SIZE,
{Size, Width} = lists:keyfind(Width, 2, Sizes),
{BitWidth, Width, SegmentCount} = get_size(Size),
Width = get_size(Size),
Lv2Init = array:new([{size, Width}]),
FoldFun =
fun({X, EncodedL2SegBin}, L2Array) ->
@ -172,8 +174,7 @@ import_tree(ExportedTree) ->
#tictactree{treeID = import,
size = Size,
width = Width,
bitwidth = BitWidth,
segment_count = SegmentCount,
segment_count = Width * ?L2_CHUNKSIZE,
level1 = L1Bin,
level2 = Lv2}.
@ -190,10 +191,11 @@ add_kv(TicTacTree, Key, Value, BinExtractFun) ->
Segment = get_segment(SegHash, TicTacTree#tictactree.segment_count),
Level2Pos =
Segment band (TicTacTree#tictactree.width - 1),
Segment band (?L2_CHUNKSIZE - 1),
Level1Pos =
(Segment bsr TicTacTree#tictactree.bitwidth)
(Segment bsr ?L2_BITSIZE)
band (TicTacTree#tictactree.width - 1),
Level2BytePos = ?HASH_SIZE * Level2Pos,
Level1BytePos = ?HASH_SIZE * Level1Pos,
@ -228,7 +230,6 @@ add_kv(TicTacTree, Key, Value, BinExtractFun) ->
find_dirtyleaves(SrcTree, SnkTree) ->
_Size = SrcTree#tictactree.size,
_Size = SnkTree#tictactree.size,
Width = SrcTree#tictactree.width,
IdxList = find_dirtysegments(fetch_root(SrcTree), fetch_root(SnkTree)),
SrcLeaves = fetch_leaves(SrcTree, IdxList),
@ -239,7 +240,7 @@ find_dirtyleaves(SrcTree, SnkTree) ->
{Idx, SrcLeaf} = lists:keyfind(Idx, 1, SrcLeaves),
{Idx, SnkLeaf} = lists:keyfind(Idx, 1, SnkLeaves),
L2IdxList = segmentcompare(SrcLeaf, SnkLeaf),
Acc ++ lists:map(fun(X) -> X + Idx * Width end, L2IdxList)
Acc ++ lists:map(fun(X) -> X + Idx * ?L2_CHUNKSIZE end, L2IdxList)
end,
lists:sort(lists:foldl(FoldFun, [], IdxList)).
@ -304,7 +305,7 @@ merge_trees(TreeA, TreeB) ->
get_segment(Hash, SegmentCount) when is_integer(SegmentCount) ->
Hash band (SegmentCount - 1);
get_segment(Hash, TreeSize) ->
get_segment(Hash, element(3, get_size(TreeSize))).
get_segment(Hash, ?L2_CHUNKSIZE * get_size(TreeSize)).
-spec tictac_hash(binary(), any()) -> {integer(), integer()}.
@ -329,12 +330,23 @@ tictac_hash(BinKey, Val) when is_binary(BinKey) ->
%% @doc
%% The first 16 bits of the segment hash used in the tictac tree should be
%% made up of the segment ID part (which is used to accelerate queries)
keyto_segment32(BinKey) when is_binary(BinKey) ->
{SegmentID, ExtraHash} = leveled_codec:segment_hash(BinKey),
keyto_segment32({segment_hash, SegmentID, ExtraHash})
when is_integer(SegmentID), is_integer(ExtraHash) ->
(ExtraHash band 65535) bsl 16 + SegmentID;
keyto_segment32(BinKey) when is_binary(BinKey) ->
keyto_segment32(keyto_segment48(BinKey));
keyto_segment32(Key) ->
keyto_segment32(term_to_binary(Key)).
-spec keyto_segment48(binary()) -> {segment_hash, integer(), integer()}.
%% @doc
%% Produce a segment with an Extra Hash part - for tictac use most of the
%% ExtraHash will be discarded
keyto_segment48(BinKey) ->
<<SegmentID:16/integer, ExtraHash:32/integer, _Rest/binary>> =
crypto:hash(md5, BinKey),
{segment_hash, SegmentID, ExtraHash}.
-spec generate_segmentfilter_list(list(integer()), atom())
-> false|list(integer()).
%% @doc
@ -361,6 +373,12 @@ generate_segmentfilter_list(SegmentList, Size) ->
SegmentList
end.
-spec join_segment(integer(), integer()) -> integer().
%% @doc
%% Generate a segment ID for the Branch and Leaf ID co-ordinates
join_segment(BranchID, LeafID) ->
BranchID bsl ?L2_BITSIZE + LeafID.
%%%============================================================================
%%% Internal functions
%%%============================================================================
@ -369,7 +387,7 @@ generate_segmentfilter_list(SegmentList, Size) ->
get_level2(TicTacTree, L1Pos) ->
case array:get(L1Pos, TicTacTree#tictactree.level2) of
?EMPTY ->
Lv2SegBinSize = TicTacTree#tictactree.width * ?HASH_SIZE * 8,
Lv2SegBinSize = ?L2_CHUNKSIZE * ?HASH_SIZE * 8,
<<0:Lv2SegBinSize/integer>>;
SrcL2 ->
SrcL2
@ -461,6 +479,18 @@ simple_test_withsize(Size) ->
Tree0 = new_tree(0, Size),
Tree1 = add_kv(Tree0, K1, {caine, 1}, BinFun),
% Check that we can get to the segment ID that has changed, and confirm it
% is the segment ID expected
Root1 = fetch_root(Tree1),
Root0 = fetch_root(Tree0),
[BranchID] = find_dirtysegments(Root0, Root1),
[{BranchID, Branch1}] = fetch_leaves(Tree1, [BranchID]),
[{BranchID, Branch0}] = fetch_leaves(Tree0, [BranchID]),
[LeafID] = find_dirtysegments(Branch0, Branch1),
SegK1 = keyto_segment32(K1) band (get_size(Size) * 256 - 1),
?assertMatch(SegK1, join_segment(BranchID, LeafID)),
Tree2 = add_kv(Tree1, K2, {caine, 2}, BinFun),
Tree3 = add_kv(Tree2, K3, {caine, 3}, BinFun),
Tree3A = add_kv(Tree3, K3, {caine, 4}, BinFun),

View file

@ -9,12 +9,14 @@
-export([single_object_with2i/1,
small_load_with2i/1,
query_count/1,
multibucket_fold/1,
rotating_objects/1]).
all() -> [
single_object_with2i,
small_load_with2i,
query_count,
multibucket_fold,
rotating_objects
].
@ -454,7 +456,7 @@ count_termsonindex(Bucket, IdxField, Book, QType) ->
lists:foldl(fun(X, Acc) ->
SW = os:timestamp(),
ST = integer_to_list(X),
ET = ST ++ "~",
ET = ST ++ "|",
Q = {index_query,
Bucket,
{fun testutil:foldkeysfun/3, []},
@ -473,6 +475,78 @@ count_termsonindex(Bucket, IdxField, Book, QType) ->
0,
lists:seq(190, 221)).
multibucket_fold(_Config) ->
RootPath = testutil:reset_filestructure(),
{ok, Bookie1} = leveled_bookie:book_start(RootPath,
2000,
50000000,
testutil:sync_strategy()),
ObjectGen = testutil:get_compressiblevalue_andinteger(),
IndexGen = fun() -> [] end,
ObjL1 = testutil:generate_objects(13000,
uuid,
[],
ObjectGen,
IndexGen,
<<"Bucket1">>),
testutil:riakload(Bookie1, ObjL1),
ObjL2 = testutil:generate_objects(17000,
uuid,
[],
ObjectGen,
IndexGen,
<<"Bucket2">>),
testutil:riakload(Bookie1, ObjL2),
ObjL3 = testutil:generate_objects(7000,
uuid,
[],
ObjectGen,
IndexGen,
<<"Bucket3">>),
testutil:riakload(Bookie1, ObjL3),
ObjL4 = testutil:generate_objects(23000,
uuid,
[],
ObjectGen,
IndexGen,
<<"Bucket4">>),
testutil:riakload(Bookie1, ObjL4),
Q1 = {foldheads_bybucket,
?RIAK_TAG,
[<<"Bucket1">>, <<"Bucket4">>], bucket_list,
fun(B, K, _PO, Acc) ->
[{B, K}|Acc]
end,
false,
true,
false},
{async, R1} = leveled_bookie:book_returnfolder(Bookie1, Q1),
O1 = length(R1()),
io:format("Result R1 of length ~w~n", [O1]),
Q2 = {foldheads_bybucket,
?RIAK_TAG,
[<<"Bucket2">>, <<"Bucket3">>], bucket_list,
{fun(_B, _K, _PO, Acc) ->
Acc +1
end,
0},
false,
true,
false},
{async, R2} = leveled_bookie:book_returnfolder(Bookie1, Q2),
O2 = R2(),
io:format("Result R2 of ~w~n", [O2]),
true = 36000 == O1,
true = 24000 == O2,
ok = leveled_bookie:book_close(Bookie1),
testutil:reset_filestructure().
rotating_objects(_Config) ->
RootPath = testutil:reset_filestructure(),

View file

@ -8,7 +8,8 @@
recent_aae_noaae/1,
recent_aae_allaae/1,
recent_aae_bucketaae/1,
recent_aae_expiry/1
recent_aae_expiry/1,
basic_headonly/1
]).
all() -> [
@ -17,7 +18,8 @@ all() -> [
recent_aae_noaae,
recent_aae_allaae,
recent_aae_bucketaae,
recent_aae_expiry
recent_aae_expiry,
basic_headonly
].
-define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w").
@ -1010,6 +1012,177 @@ recent_aae_expiry(_Config) ->
true = length(DL4_0) == 0.
basic_headonly(_Config) ->
ObjectCount = 200000,
RemoveCount = 100,
basic_headonly_test(ObjectCount, RemoveCount, with_lookup),
basic_headonly_test(ObjectCount, RemoveCount, no_lookup).
basic_headonly_test(ObjectCount, RemoveCount, HeadOnly) ->
% Load some AAE type objects into Leveled using the read_only mode. This
% should allow for the items to be added in batches. Confirm that the
% journal is garbage collected as expected, and that it is possible to
% perform a fold_heads style query
RootPathHO = testutil:reset_filestructure("testHO"),
StartOpts1 = [{root_path, RootPathHO},
{max_pencillercachesize, 16000},
{sync_strategy, sync},
{head_only, HeadOnly},
{max_journalsize, 500000}],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
{B1, K1, V1, S1, MD} = {"Bucket",
"Key1.1.4567.4321",
"Value1",
[],
[{"MDK1", "MDV1"}]},
{TestObject, TestSpec} = testutil:generate_testobject(B1, K1, V1, S1, MD),
{unsupported_message, put} =
testutil:book_riakput(Bookie1, TestObject, TestSpec),
ObjectSpecFun =
fun(Op) ->
fun(N) ->
Bucket = <<"B", N:8/integer>>,
Key = <<"K", N:32/integer>>,
<<SegmentID:20/integer, _RestBS/bitstring>> =
crypto:hash(md5, term_to_binary({Bucket, Key})),
<<Hash:32/integer, _RestBN/bitstring>> =
crypto:hash(md5, <<N:32/integer>>),
{Op, <<SegmentID:32/integer>>, Bucket, Key, Hash}
end
end,
ObjectSpecL = lists:map(ObjectSpecFun(add), lists:seq(1, ObjectCount)),
SW0 = os:timestamp(),
ok = load_objectspecs(ObjectSpecL, 32, Bookie1),
io:format("Loaded an object count of ~w in ~w microseconds with ~w~n",
[ObjectCount, timer:now_diff(os:timestamp(), SW0), HeadOnly]),
FoldFun =
fun(_B, _K, V, {HashAcc, CountAcc}) ->
{HashAcc bxor V, CountAcc + 1}
end,
InitAcc = {0, 0},
RunnerDefinition =
{foldheads_allkeys, h, {FoldFun, InitAcc}, false, false, false},
{async, Runner1} =
leveled_bookie:book_returnfolder(Bookie1, RunnerDefinition),
SW1 = os:timestamp(),
{AccH1, AccC1} = Runner1(),
io:format("AccH and AccC of ~w ~w in ~w microseconds~n",
[AccH1, AccC1, timer:now_diff(os:timestamp(), SW1)]),
true = AccC1 == ObjectCount,
JFP = RootPathHO ++ "/journal/journal_files",
{ok, FNs} = file:list_dir(JFP),
ok = leveled_bookie:book_trimjournal(Bookie1),
WaitForTrimFun =
fun(N, _Acc) ->
{ok, PollFNs} = file:list_dir(JFP),
case length(PollFNs) < length(FNs) of
true ->
true;
false ->
timer:sleep(N * 1000),
false
end
end,
true = lists:foldl(WaitForTrimFun, false, [1, 2, 3, 5, 8, 13]),
{ok, FinalFNs} = file:list_dir(JFP),
[{add, SegmentID0, Bucket0, Key0, Hash0}|_Rest] = ObjectSpecL,
case HeadOnly of
with_lookup ->
% If we allow HEAD_TAG to be suubject to a lookup, then test this
% here
{ok, Hash0} =
leveled_bookie:book_head(Bookie1,
SegmentID0,
{Bucket0, Key0},
h);
no_lookup ->
{unsupported_message, head} =
leveled_bookie:book_head(Bookie1,
SegmentID0,
{Bucket0, Key0},
h)
end,
ok = leveled_bookie:book_close(Bookie1),
{ok, FinalJournals} = file:list_dir(JFP),
io:format("Trim has reduced journal count from " ++
"~w to ~w and ~w after restart~n",
[length(FNs), length(FinalFNs), length(FinalJournals)]),
{ok, Bookie2} = leveled_bookie:book_start(StartOpts1),
{async, Runner2} =
leveled_bookie:book_returnfolder(Bookie2, RunnerDefinition),
{AccH2, AccC2} = Runner2(),
true = AccC2 == ObjectCount,
case HeadOnly of
with_lookup ->
% If we allow HEAD_TAG to be suubject to a lookup, then test this
% here
{ok, Hash0} =
leveled_bookie:book_head(Bookie2,
SegmentID0,
{Bucket0, Key0},
h);
no_lookup ->
{unsupported_message, head} =
leveled_bookie:book_head(Bookie2,
SegmentID0,
{Bucket0, Key0},
h)
end,
RemoveSpecL0 = lists:sublist(ObjectSpecL, RemoveCount),
RemoveSpecL1 =
lists:map(fun(Spec) -> setelement(1, Spec, remove) end, RemoveSpecL0),
ok = load_objectspecs(RemoveSpecL1, 32, Bookie2),
{async, Runner3} =
leveled_bookie:book_returnfolder(Bookie2, RunnerDefinition),
{AccH3, AccC3} = Runner3(),
true = AccC3 == (ObjectCount - RemoveCount),
false = AccH3 == AccH2,
ok = leveled_bookie:book_close(Bookie2).
load_objectspecs([], _SliceSize, _Bookie) ->
ok;
load_objectspecs(ObjectSpecL, SliceSize, Bookie)
when length(ObjectSpecL) < SliceSize ->
load_objectspecs(ObjectSpecL, length(ObjectSpecL), Bookie);
load_objectspecs(ObjectSpecL, SliceSize, Bookie) ->
{Head, Tail} = lists:split(SliceSize, ObjectSpecL),
case leveled_bookie:book_mput(Bookie, Head) of
ok ->
load_objectspecs(Tail, SliceSize, Bookie);
pause ->
timer:sleep(10),
load_objectspecs(Tail, SliceSize, Bookie)
end.
load_and_check_recentaae(Book1A, Book1B, Book1C, Book1D,
SW_StartLoad, TreeSize, UnitMins,