Merge pull request #225 from martinsumner/mas-i222-extractmetadata
Mas i222 extractmetadata
This commit is contained in:
commit
d395b4fb48
14 changed files with 705 additions and 364 deletions
61
README.md
61
README.md
|
@ -2,17 +2,19 @@
|
|||
|
||||
## Introduction
|
||||
|
||||
Leveled is a <b>work-in-progress</b> prototype of a simple Key-Value store based on the concept of Log-Structured Merge Trees, with the following characteristics:
|
||||
Leveled is a simple Key-Value store based on the concept of Log-Structured Merge Trees, with the following characteristics:
|
||||
|
||||
- Optimised for workloads with <b>larger values</b> (e.g. > 4KB).
|
||||
|
||||
- Explicitly supports <b>HEAD requests</b> in addition to GET requests.
|
||||
- Splits the storage of value between keys/metadata and body,
|
||||
- Explicitly supports <b>HEAD requests</b> in addition to GET requests:
|
||||
- Splits the storage of value between keys/metadata and body (assuming some definition of metadata is provided);
|
||||
- Allows for the application to define what constitutes object metadata and what constitutes the body (value-part) of the object - and assign tags to objects to manage multiple object-types with different extract rules;
|
||||
- Stores keys/metadata in a merge tree and the full object in a journal of [CDB files](https://en.wikipedia.org/wiki/Cdb_(software))
|
||||
- allowing for HEAD requests which have lower overheads than GET requests, and
|
||||
- queries which traverse keys/metadatas to be supported with fewer side effects on the page cache.
|
||||
- allowing for HEAD requests which have lower overheads than GET requests; and
|
||||
- queries which traverse keys/metadatas to be supported with fewer side effects on the page cache than folds over keys/objects.
|
||||
|
||||
- Support for tagging of <b>object types</b> and the implementation of alternative store behaviour based on type.
|
||||
- Allows for changes to extract specific information as metadata to be returned from HEAD requests;
|
||||
- Potentially usable for objects with special retention or merge properties.
|
||||
|
||||
- Support for low-cost clones without locking to provide for <b>scanning queries</b> (e.g. secondary indexes).
|
||||
|
@ -24,6 +26,10 @@ The store has been developed with a <b>focus on being a potential backend to a R
|
|||
|
||||
An optimised version of Riak KV has been produced in parallel which will exploit the availability of HEAD requests (to access object metadata including version vectors), where a full GET is not required. This, along with reduced write amplification when compared to leveldb, is expected to offer significant improvement in the volume and predictability of throughput for workloads with larger (> 4KB) object sizes, as well as reduced tail latency.
|
||||
|
||||
There may be more general uses of Leveled, with the following caveats:
|
||||
- Leveled should be extended to define new tags that specify what metadata is to be extracted for the inserted objects (or to override the behaviour for the ?STD_TAG). Without this, there will be limited scope to take advantage of the relative efficiency of HEAD and FOLD_HEAD requests.
|
||||
- If objects are small, the [`head_only` mode](docs/STARTUP_OPTIONS.md#head-only) may be used, which will cease separation of object body from header and use the Key/Metadata store as the only long-term persisted store. In this mode all of the object is treated as Metadata, and the behaviour is closer to that of the leveldb LSM-tree, although with higher median latency.
|
||||
|
||||
## More Details
|
||||
|
||||
For more details on the store:
|
||||
|
@ -71,28 +77,6 @@ More information can be found in the [volume testing section](docs/VOLUME.md).
|
|||
|
||||
As a general rule though, the most interesting thing is the potential to enable [new features](docs/FUTURE.md). The tagging of different object types, with an ability to set different rules for both compaction and metadata creation by tag, is a potential enabler for further change. Further, having a separate key/metadata store which can be scanned without breaking the page cache or working against mitigation for write amplifications, is also potentially an enabler to offer features to both the developer and the operator.
|
||||
|
||||
## Next Steps
|
||||
|
||||
Further volume test scenarios are the immediate priority, in particular volume test scenarios with:
|
||||
|
||||
- Significant use of secondary indexes;
|
||||
|
||||
- Use of newly available [EC2 hardware](https://aws.amazon.com/about-aws/whats-new/2017/02/now-available-amazon-ec2-i3-instances-next-generation-storage-optimized-high-i-o-instances/) which potentially is a significant changes to assumptions about hardware efficiency and cost.
|
||||
|
||||
- Create riak_test tests for new Riak features enabled by leveled.
|
||||
|
||||
However a number of other changes are planned in the next month to (my branch of) riak_kv to better use leveled:
|
||||
|
||||
- Support for rapid rebuild of hashtrees
|
||||
|
||||
- Fixes to [priority issues](https://github.com/martinsumner/leveled/issues)
|
||||
|
||||
- Experiments with flexible sync on write settings
|
||||
|
||||
- A cleaner and easier build of Riak with leveled included, including cuttlefish configuration support
|
||||
|
||||
More information can be found in the [future section](docs/FUTURE.md).
|
||||
|
||||
## Feedback
|
||||
|
||||
Please create an issue if you have any suggestions. You can ping me <b>@masleeds</b> if you wish
|
||||
|
@ -104,28 +88,14 @@ Unit and current tests in leveled should run with rebar3. Leveled has been test
|
|||
A new database can be started by running
|
||||
|
||||
```
|
||||
{ok, Bookie} = leveled_bookie:book_start(RootPath, LedgerCacheSize, JournalSize, SyncStrategy)
|
||||
{ok, Bookie} = leveled_bookie:book_start(StartupOptions)
|
||||
```
|
||||
|
||||
This will start a new Bookie. It will start and look for existing data files, under the RootPath, and start empty if none exist. A LedgerCacheSize of `2000`, a JournalSize of `500000000` (500MB) and a SyncStrategy of `none` should work OK. Further information on startup options can be found [here](docs/STARTUP_OPTIONS.md).
|
||||
This will start a new Bookie. It will start and look for existing data files, under the RootPath, and start empty if none exist. Further information on startup options can be found here [here](docs/STARTUP_OPTIONS.md).
|
||||
|
||||
The book_start method should respond once startup is complete. The [leveled_bookie module](src/leveled_bookie.erl) includes the full API for external use of the store.
|
||||
|
||||
It should run anywhere that OTP will run - it has been tested on Ubuntu 14, MAC OS X and Windows 10.
|
||||
|
||||
Running in Riak requires one of the branches of riak_kv referenced [here](docs/FUTURE.md). There is a [Riak branch](https://github.com/martinsumner/riak/tree/mas-leveleddb) intended to support the automatic build of this, and the configuration via cuttlefish. However, the auto-build fails due to other dependencies (e.g. riak_search) bringing in an alternative version of riak_kv, and the configuration via cuttlefish is broken for reasons unknown.
|
||||
|
||||
Building this from source as part of Riak will require a bit of fiddling around.
|
||||
|
||||
- clone and build [riak](https://github.com/martinsumner/riak/tree/mas-leveleddb)
|
||||
- cd deps
|
||||
- rm -rf riak_kv
|
||||
- git clone -b mas-leveled-putfsm --single-branch https://github.com/martinsumner/riak_kv.git
|
||||
- cd ..
|
||||
- make rel
|
||||
- remember to set the storage backend to leveled in riak.conf
|
||||
|
||||
To help with the breakdown of cuttlefish, leveled parameters can be set via riak_kv/include/riak_kv_leveled.hrl - although a new make will be required for these changes to take effect.
|
||||
Running in Riak requires Riak 2.9 or beyond, which is available from January 2019.
|
||||
|
||||
### Contributing
|
||||
|
||||
|
@ -136,5 +106,4 @@ ct with 100% coverage.
|
|||
|
||||
To have rebar3 execute the full set of tests, run:
|
||||
|
||||
rebar3 as test do cover --reset, eunit --cover, ct --cover, cover --verbose
|
||||
|
||||
`rebar3 as test do cover --reset, eunit --cover, ct --cover, cover --verbose`
|
||||
|
|
|
@ -82,6 +82,8 @@ Three types are initially supported:
|
|||
|
||||
All Ledger Keys created for any type must be 4-tuples starting with the tag. Abstraction with regards to types is currently imperfect, but the expectation is that these types will make support for application specific behaviours easier to achieve, such as behaviours which maybe required to support different [CRDTs](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type).
|
||||
|
||||
Currently user-defined tags are supported as an experimental feature along with the ability to override the function which controls how metadata is split from the object value. Good choice of metadata is important to ensure the improved efficiency of folds over heads (as opposed to folds over objects), and the use of HEAD requests (as opposed to GET requests), can be exploited by applications using leveled.
|
||||
|
||||
## GET/PUT Paths
|
||||
|
||||
The PUT path for new objects and object changes depends on the Bookie interacting with the Inker to ensure that the change has been persisted with the Journal, the Ledger is updated in batches after the PUT has been completed.
|
||||
|
@ -128,7 +130,7 @@ Backups are taken of the Journal only, as the Ledger can be recreated on startu
|
|||
|
||||
The backup uses hard-links, so at the point the backup is taken, there will be a minimal change to the on-disk footprint of the store. However, as journal compaction is run, the hard-links will prevent space from getting released by the dropping of replaced journal files - so backups will cause the size of the store to grow faster than it would otherwise do. It is an operator responsibility to garbage collect old backups, to prevent this growth from being an issue.
|
||||
|
||||
As backups depend on hard-links, they cannot be taken with a `BackupPath` on a different file system to the standard data path. The move a backup across to a different file system, standard tools should be used such as rsync. The leveled backups should be relatively friendly for rsync-like delta-based backup approaches due to significantly lower write amplification when compared to other LSM stores (e.g. leveldb).
|
||||
As backups depend on hard-links, they cannot be taken with a `BackupPath` on a different file system to the standard data path. The move a backup across to a different file system, standard tools should be used such as rsync. The leveled backups should be relatively friendly for rsync-like delta-based backup approaches due to significantly lower write amplification when compared to other LSM stores (e.g. leveldb).
|
||||
|
||||
## Head only
|
||||
|
||||
|
|
|
@ -22,6 +22,18 @@ There is no stats facility within leveled, the stats are only available from the
|
|||
|
||||
The `forced_logs` option will force a particular log reference to be logged regardless of the log level that has been set. This can be used to log at a higher level than `info`, whilst allowing for specific logs to still be logged out, such as logs providing sample performance statistics.
|
||||
|
||||
## User-Defined Tags
|
||||
|
||||
There are 2 primary object tags - ?STD_TAG (o) which is the default, and ?RIAK_TAG (o_rkv). Objects PUT into the store with different tags may have different behaviours in leveled.
|
||||
|
||||
The differences between tags are encapsulated within the `leveled_head` module. The primary difference of interest is the alternative handling within the function `extract_metadata/3`. Significant efficiency can be gained in leveled (as opposed to other LSM-stores) through using book_head requests when book_get would otherwise be necessary. If 80% of the requests are interested in less than 20% of the information within an object, then having that 20% in the object metadata and switching fetch requests to the book_head API, will improve efficiency. Also folds over heads are much more efficient that folds over objects, so significant improvements can be also be made within folds by having the right information within the metadata.
|
||||
|
||||
To make use of this efficiency, metadata needs to be extracted on PUT, and made into leveled object metadata. For the ?RIAK_TAG this work is within the `leveled_head` module. If an application wants to control this behaviour for its application, then a tag can be created, and the `leveled_head` module updated. However, it is also possible to have more dynamic definitions for handling of application-defined tags, by passing in alternative versions of one or more of the functions `extract_metadata/3`, `build_head/1` and `key_to_canonicalbinary/1` on start-up. These functions will be applied to user-defined tags (but will not override the behaviour for pre-defined tags).
|
||||
|
||||
The startup option `override_functions` can be used to manage this override. [This test](../test/end_to_end/appdefined_SUITE.erl) provides a simple example of using override_functions.
|
||||
|
||||
This option is currently experimental. Issues such as versioning, and handling a failure to consistently start a store with the same override_functions, should be handled by the application.
|
||||
|
||||
## Max Journal Size
|
||||
|
||||
The maximum size of an individual Journal file can be set using `{max_journalsize, integer()}`, which sets the size in bytes. The default value is 1,000,000,000 (~1GB). The maximum size, which cannot be exceed is `2^32`. It is not expected that the Journal Size should normally set to lower than 100 MB, it should be sized to hold many thousands of objects at least.
|
||||
|
@ -61,13 +73,13 @@ The purpose of the reload strategy is to define the behaviour at compaction of t
|
|||
|
||||
By default nothing is compacted from the Journal if the SQN of the Journal entry is greater than the largest sequence number which has been persisted in the Ledger. So when an object is compacted in the Journal (as it has been replaced), it should not need to be replayed from the Journal into the Ledger in the future - as it, and all its related key changes, have already been persisted to the Ledger.
|
||||
|
||||
However, what if the Ledger had been erased? This could happen due to some corruption, or perhaps because only the Journal is to be backed up. As the object has been replaced, the value is not required - however KeyChanges ay be required (such as indexes which are built incrementally across a series of object changes). So to revert the indexes to their previous state the Key Changes would need to be retained in this case, so the indexes in the Ledger would be correctly rebuilt.
|
||||
However, what if the Ledger had been erased? This could happen due to some corruption, or perhaps because only the Journal is to be backed up. As the object has been replaced, the value is not required - however KeyChanges may be required (such as indexes which are built incrementally across a series of object changes). So to revert the indexes to their previous state the Key Changes would need to be retained in this case, so the indexes in the Ledger would be correctly rebuilt.
|
||||
|
||||
The are three potential strategies:
|
||||
|
||||
`skip` - don't worry about this scenario, require the Ledger to be backed up;
|
||||
`retain` - discard the object itself on compaction but keep the key changes;
|
||||
`recalc` - recalculate the indexes on reload by comparing the information on the object with the current state of the Ledger (as would be required by the PUT process when comparing IndexSpecs at PUT time).
|
||||
- `skip` - don't worry about this scenario, require the Ledger to be backed up;
|
||||
- `retain` - discard the object itself on compaction but keep the key changes;
|
||||
- `recalc` - recalculate the indexes on reload by comparing the information on the object with the current state of the Ledger (as would be required by the PUT process when comparing IndexSpecs at PUT time).
|
||||
|
||||
There is no code for `recalc` at present it is simply a logical possibility. So to set a reload strategy there should be an entry like `{reload_strategy, [{TagName, skip|retain}]}`. By default tags are pre-set to `retain`. If there is no need to handle a corrupted Ledger, then all tags could be set to `skip`.
|
||||
|
||||
|
|
|
@ -130,7 +130,8 @@
|
|||
{compression_method, ?COMPRESSION_METHOD},
|
||||
{compression_point, ?COMPRESSION_POINT},
|
||||
{log_level, ?LOG_LEVEL},
|
||||
{forced_logs, []}]).
|
||||
{forced_logs, []},
|
||||
{override_functions, []}]).
|
||||
|
||||
-record(ledger_cache, {mem :: ets:tab(),
|
||||
loader = leveled_tree:empty(?CACHE_TYPE)
|
||||
|
@ -314,7 +315,7 @@
|
|||
% moving to higher log levels will at present make the operator
|
||||
% blind to sample performance statistics of leveled sub-components
|
||||
% etc
|
||||
{forced_logs, list(string())}
|
||||
{forced_logs, list(string())} |
|
||||
% Forced logs allow for specific info level logs, such as those
|
||||
% logging stats to be logged even when the default log level has
|
||||
% been set to a higher log level. Using:
|
||||
|
@ -323,6 +324,9 @@
|
|||
% "P0032", "SST12", "CDB19", "SST13", "I0019"]}
|
||||
% Will log all timing points even when log_level is not set to
|
||||
% support info
|
||||
{override_functions, list(leveled_head:appdefinable_function_tuple())}
|
||||
% Provide a list of override functions that will be used for
|
||||
% user-defined tags
|
||||
].
|
||||
|
||||
|
||||
|
@ -456,7 +460,7 @@ book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag) ->
|
|||
leveled_codec:index_specs(),
|
||||
leveled_codec:tag(), infinity|integer()) -> ok|pause.
|
||||
|
||||
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) ->
|
||||
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) when is_atom(Tag) ->
|
||||
gen_server:call(Pid,
|
||||
{put, Bucket, Key, Object, IndexSpecs, Tag, TTL},
|
||||
infinity).
|
||||
|
@ -1029,10 +1033,8 @@ book_destroy(Pid) ->
|
|||
%% The function will be 1-arity, and can be passed the absolute folder name
|
||||
%% to store the backup.
|
||||
%%
|
||||
%% Backup files are hard-linked. Does not work in head_only mode
|
||||
%%
|
||||
%% TODO: Can extend to head_only mode, and also support another parameter
|
||||
%% which would backup persisted part of ledger (to make restart faster)
|
||||
%% Backup files are hard-linked. Does not work in head_only mode, or if
|
||||
%% index changes are used with a `skip` compaction/reload strategy
|
||||
book_hotbackup(Pid) ->
|
||||
gen_server:call(Pid, hot_backup, infinity).
|
||||
|
||||
|
@ -1065,6 +1067,13 @@ init([Opts]) ->
|
|||
ForcedLogs = proplists:get_value(forced_logs, Opts),
|
||||
ok = application:set_env(leveled, forced_logs, ForcedLogs),
|
||||
|
||||
OverrideFunctions = proplists:get_value(override_functions, Opts),
|
||||
SetFun =
|
||||
fun({FuncName, Func}) ->
|
||||
application:set_env(leveled, FuncName, Func)
|
||||
end,
|
||||
lists:foreach(SetFun, OverrideFunctions),
|
||||
|
||||
ConfiguredCacheSize =
|
||||
max(proplists:get_value(cache_size, Opts), ?MIN_CACHE_SIZE),
|
||||
CacheJitter =
|
||||
|
@ -1270,7 +1279,7 @@ handle_call({head, Bucket, Key, Tag}, _From, State)
|
|||
not_found ->
|
||||
not_found;
|
||||
_ ->
|
||||
{ok, leveled_codec:build_metadata_object(LK, LedgerMD)}
|
||||
{ok, leveled_head:build_head(Tag, LedgerMD)}
|
||||
end,
|
||||
{_SW, UpdTimingsR} =
|
||||
update_timings(SWr, {head, rsp}, UpdTimingsP),
|
||||
|
@ -1437,7 +1446,7 @@ snapshot_store(State, SnapType, Query, LongRunning) ->
|
|||
Query,
|
||||
LongRunning).
|
||||
|
||||
-spec fetch_value(pid(), {any(), integer()}) -> not_present|any().
|
||||
-spec fetch_value(pid(), leveled_codec:journal_ref()) -> not_present|any().
|
||||
%% @doc
|
||||
%% Fetch a value from the Journal
|
||||
fetch_value(Inker, {Key, SQN}) ->
|
||||
|
@ -2517,7 +2526,7 @@ foldobjects_vs_hashtree_testto() ->
|
|||
MD,
|
||||
_Size,
|
||||
_Fetcher} = binary_to_term(ProxyV),
|
||||
{Hash, _Size} = MD,
|
||||
{Hash, _Size, _UserDefinedMD} = MD,
|
||||
[{B, K, Hash}|Acc]
|
||||
end,
|
||||
|
||||
|
|
|
@ -2,28 +2,8 @@
|
|||
%%
|
||||
%% Functions for manipulating keys and values within leveled.
|
||||
%%
|
||||
%%
|
||||
%% Within the LEDGER:
|
||||
%% Keys are of the form -
|
||||
%% {Tag, Bucket, Key, SubKey|null}
|
||||
%% Values are of the form
|
||||
%% {SQN, Status, MD}
|
||||
%%
|
||||
%% Within the JOURNAL:
|
||||
%% Keys are of the form -
|
||||
%% {SQN, LedgerKey}
|
||||
%% Values are of the form
|
||||
%% {Object, IndexSpecs} (as a binary)
|
||||
%%
|
||||
%% IndexSpecs are of the form of a Ledger Key/Value
|
||||
%%
|
||||
%% Tags need to be set during PUT operations and each Tag used must be
|
||||
%% supported in an extract_metadata and a build_metadata_object function clause
|
||||
%%
|
||||
%% Currently the only tags supported are:
|
||||
%% - o (standard objects)
|
||||
%% - o_rkv (riak objects)
|
||||
%% - i (index entries)
|
||||
%% Any thing specific to handling of a given tag should be encapsulated
|
||||
%% within the leveled_head module
|
||||
|
||||
|
||||
-module(leveled_codec).
|
||||
|
@ -48,40 +28,33 @@
|
|||
to_ledgerkey/5,
|
||||
from_ledgerkey/1,
|
||||
from_ledgerkey/2,
|
||||
isvalid_ledgerkey/1,
|
||||
to_inkerkey/2,
|
||||
to_inkerkv/6,
|
||||
from_inkerkv/1,
|
||||
from_inkerkv/2,
|
||||
from_journalkey/1,
|
||||
compact_inkerkvc/2,
|
||||
revert_to_keydeltas/2,
|
||||
split_inkvalue/1,
|
||||
check_forinkertype/2,
|
||||
get_tagstrategy/2,
|
||||
maybe_compress/2,
|
||||
create_value_for_journal/3,
|
||||
build_metadata_object/2,
|
||||
generate_ledgerkv/5,
|
||||
get_size/2,
|
||||
get_keyandobjhash/2,
|
||||
idx_indexspecs/5,
|
||||
obj_objectspecs/3,
|
||||
riak_extract_metadata/2,
|
||||
segment_hash/1,
|
||||
to_lookup/1,
|
||||
riak_metadata_to_binary/2,
|
||||
next_key/1]).
|
||||
next_key/1,
|
||||
return_proxy/4]).
|
||||
|
||||
-define(V1_VERS, 1).
|
||||
-define(MAGIC, 53). % riak_kv -> riak_object
|
||||
-define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w").
|
||||
-define(NRT_IDX, "$aae.").
|
||||
|
||||
-type riak_metadata() :: {binary()|delete, % Sibling Metadata
|
||||
binary()|null, % Vclock Metadata
|
||||
integer()|null, % Hash of vclock - non-exportable
|
||||
integer()}. % Size in bytes of real object
|
||||
|
||||
-type tag() ::
|
||||
?STD_TAG|?RIAK_TAG|?IDX_TAG|?HEAD_TAG.
|
||||
leveled_head:object_tag()|?IDX_TAG|?HEAD_TAG|atom().
|
||||
-type key() ::
|
||||
binary()|string()|{binary(), binary()}.
|
||||
% Keys SHOULD be binary()
|
||||
|
@ -113,12 +86,16 @@
|
|||
{sqn(), ledger_status(), segment_hash(), metadata(), last_moddate()}.
|
||||
-type ledger_kv() ::
|
||||
{ledger_key(), ledger_value()}.
|
||||
-type compaction_method() ::
|
||||
retain|skip|recalc.
|
||||
-type compaction_strategy() ::
|
||||
list({tag(), retain|skip|recalc}).
|
||||
list({tag(), compaction_method()}).
|
||||
-type journal_key_tag() ::
|
||||
?INKT_STND|?INKT_TOMB|?INKT_MPUT|?INKT_KEYD.
|
||||
-type journal_key() ::
|
||||
{integer(), journal_key_tag(), ledger_key()}.
|
||||
{sqn(), journal_key_tag(), ledger_key()}.
|
||||
-type journal_ref() ::
|
||||
{ledger_key(), sqn()}.
|
||||
-type object_spec_v0() ::
|
||||
{add|remove, key(), key(), key()|null, any()}.
|
||||
-type object_spec_v1() ::
|
||||
|
@ -139,6 +116,17 @@
|
|||
% first element must be re_pattern, but tuple may change legnth with
|
||||
% versions
|
||||
|
||||
-type value_fetcher() ::
|
||||
{fun((pid(), leveled_codec:journal_key()) -> any()),
|
||||
pid(), leveled_codec:journal_key()}.
|
||||
% A 2-arity function, which when passed the other two elements of the tuple
|
||||
% will return the value
|
||||
-type proxy_object() ::
|
||||
{proxy_object, leveled_head:head(), non_neg_integer(), value_fetcher()}.
|
||||
% Returns the head, size and a tuple for accessing the value
|
||||
-type proxy_objectbin() ::
|
||||
binary().
|
||||
% using term_to_binary(proxy_object())
|
||||
|
||||
|
||||
-type segment_list()
|
||||
|
@ -153,8 +141,10 @@
|
|||
ledger_value/0,
|
||||
ledger_kv/0,
|
||||
compaction_strategy/0,
|
||||
compaction_method/0,
|
||||
journal_key_tag/0,
|
||||
journal_key/0,
|
||||
journal_ref/0,
|
||||
compression_method/0,
|
||||
journal_keychanges/0,
|
||||
index_specs/0,
|
||||
|
@ -162,7 +152,9 @@
|
|||
maybe_lookup/0,
|
||||
last_moddate/0,
|
||||
lastmod_range/0,
|
||||
regular_expression/0]).
|
||||
regular_expression/0,
|
||||
value_fetcher/0,
|
||||
proxy_object/0]).
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
|
@ -179,29 +171,33 @@ segment_hash(Key) when is_binary(Key) ->
|
|||
{segment_hash, SegmentID, ExtraHash, _AltHash}
|
||||
= leveled_tictac:keyto_segment48(Key),
|
||||
{SegmentID, ExtraHash};
|
||||
segment_hash({?RIAK_TAG, Bucket, Key, null})
|
||||
when is_binary(Bucket), is_binary(Key) ->
|
||||
segment_hash(<<Bucket/binary, Key/binary>>);
|
||||
segment_hash({?RIAK_TAG, {BucketType, Bucket}, Key, SubKey})
|
||||
when is_binary(BucketType), is_binary(Bucket) ->
|
||||
segment_hash({?RIAK_TAG,
|
||||
<<BucketType/binary, Bucket/binary>>,
|
||||
Key,
|
||||
SubKey});
|
||||
segment_hash({?HEAD_TAG, Bucket, Key, SubK})
|
||||
segment_hash(KeyTuple) when is_tuple(KeyTuple) ->
|
||||
BinKey =
|
||||
case element(1, KeyTuple) of
|
||||
?HEAD_TAG ->
|
||||
headkey_to_canonicalbinary(KeyTuple);
|
||||
_ ->
|
||||
leveled_head:key_to_canonicalbinary(KeyTuple)
|
||||
end,
|
||||
segment_hash(BinKey).
|
||||
|
||||
|
||||
headkey_to_canonicalbinary({?HEAD_TAG, Bucket, Key, SubK})
|
||||
when is_binary(Bucket), is_binary(Key), is_binary(SubK) ->
|
||||
segment_hash(<<Bucket/binary, Key/binary, SubK/binary>>);
|
||||
segment_hash({?HEAD_TAG, Bucket, Key, _SubK})
|
||||
<<Bucket/binary, Key/binary, SubK/binary>>;
|
||||
headkey_to_canonicalbinary({?HEAD_TAG, Bucket, Key, null})
|
||||
when is_binary(Bucket), is_binary(Key) ->
|
||||
segment_hash(<<Bucket/binary, Key/binary>>);
|
||||
segment_hash({?HEAD_TAG, {BucketType, Bucket}, Key, SubKey})
|
||||
<<Bucket/binary, Key/binary>>;
|
||||
headkey_to_canonicalbinary({?HEAD_TAG, {BucketType, Bucket}, Key, SubKey})
|
||||
when is_binary(BucketType), is_binary(Bucket) ->
|
||||
segment_hash({?HEAD_TAG,
|
||||
<<BucketType/binary, Bucket/binary>>,
|
||||
Key,
|
||||
SubKey});
|
||||
segment_hash(Key) ->
|
||||
segment_hash(term_to_binary(Key)).
|
||||
headkey_to_canonicalbinary({?HEAD_TAG,
|
||||
<<BucketType/binary, Bucket/binary>>,
|
||||
Key,
|
||||
SubKey});
|
||||
headkey_to_canonicalbinary(Key) when element(1, Key) == ?HEAD_TAG ->
|
||||
% In unit tests head specs can have non-binary keys, so handle
|
||||
% this through hashing the whole key
|
||||
term_to_binary(Key).
|
||||
|
||||
|
||||
-spec to_lookup(ledger_key()) -> maybe_lookup().
|
||||
|
@ -330,6 +326,15 @@ to_ledgerkey(Bucket, {Key, SubKey}, ?HEAD_TAG) ->
|
|||
to_ledgerkey(Bucket, Key, Tag) ->
|
||||
{Tag, Bucket, Key, null}.
|
||||
|
||||
%% No spec - due to tests
|
||||
%% @doc
|
||||
%% Check that the ledgerkey is a valid format, to handle un-checksummed keys
|
||||
%% that may be returned corrupted (such as from the Journal)
|
||||
isvalid_ledgerkey({Tag, _B, _K, _SK}) ->
|
||||
is_atom(Tag);
|
||||
isvalid_ledgerkey(_LK) ->
|
||||
false.
|
||||
|
||||
-spec endkey_passed(ledger_key(), ledger_key()) -> boolean().
|
||||
%% @oc
|
||||
%% Compare a key against a query key, only comparing elements that are non-null
|
||||
|
@ -355,7 +360,9 @@ endkey_passed(EndKey, CheckingKey) ->
|
|||
%% Take the default startegy for compaction, and override the approach for any
|
||||
%% tags passed in
|
||||
inker_reload_strategy(AltList) ->
|
||||
ReloadStrategy0 = [{?RIAK_TAG, retain}, {?STD_TAG, retain}],
|
||||
ReloadStrategy0 =
|
||||
lists:map(fun leveled_head:default_reload_strategy/1,
|
||||
leveled_head:defined_objecttags()),
|
||||
lists:foldl(fun({X, Y}, SList) ->
|
||||
lists:keyreplace(X, 1, SList, {X, Y})
|
||||
end,
|
||||
|
@ -363,47 +370,17 @@ inker_reload_strategy(AltList) ->
|
|||
AltList).
|
||||
|
||||
|
||||
-spec compact_inkerkvc({journal_key(), any(), boolean()},
|
||||
compaction_strategy()) ->
|
||||
skip|{retain, any()}|{recalc, null}.
|
||||
%% @doc
|
||||
%% Decide whether a superceded object should be replicated in the compacted
|
||||
%% file and in what format.
|
||||
compact_inkerkvc({_InkerKey, crc_wonky, false}, _Strategy) ->
|
||||
skip;
|
||||
compact_inkerkvc({{_SQN, ?INKT_TOMB, _LK}, _V, _CrcCheck}, _Strategy) ->
|
||||
skip;
|
||||
compact_inkerkvc({{SQN, ?INKT_KEYD, LK}, V, CrcCheck}, Strategy) ->
|
||||
case get_tagstrategy(LK, Strategy) of
|
||||
skip ->
|
||||
skip;
|
||||
retain ->
|
||||
{retain, {{SQN, ?INKT_KEYD, LK}, V, CrcCheck}};
|
||||
TagStrat ->
|
||||
{TagStrat, null}
|
||||
end;
|
||||
compact_inkerkvc({{SQN, ?INKT_STND, LK}, V, CrcCheck}, Strategy) ->
|
||||
case get_tagstrategy(LK, Strategy) of
|
||||
skip ->
|
||||
skip;
|
||||
retain ->
|
||||
{_V, KeyDeltas} = revert_value_from_journal(V),
|
||||
{retain, {{SQN, ?INKT_KEYD, LK}, {null, KeyDeltas}, CrcCheck}};
|
||||
TagStrat ->
|
||||
{TagStrat, null}
|
||||
end.
|
||||
|
||||
-spec get_tagstrategy(ledger_key(), compaction_strategy())
|
||||
-> skip|retain|recalc.
|
||||
%% @doc
|
||||
%% Work out the compaction startegy for the key
|
||||
%% Work out the compaction strategy for the key
|
||||
get_tagstrategy({Tag, _, _, _}, Strategy) ->
|
||||
case lists:keyfind(Tag, 1, Strategy) of
|
||||
{Tag, TagStrat} ->
|
||||
TagStrat;
|
||||
false ->
|
||||
leveled_log:log("IC012", [Tag, Strategy]),
|
||||
skip
|
||||
retain
|
||||
end.
|
||||
|
||||
%%%============================================================================
|
||||
|
@ -427,6 +404,17 @@ to_inkerkv(LedgerKey, SQN, Object, KeyChanges, PressMethod, Compress) ->
|
|||
create_value_for_journal({Object, KeyChanges}, Compress, PressMethod),
|
||||
{{SQN, InkerType, LedgerKey}, Value}.
|
||||
|
||||
-spec revert_to_keydeltas(journal_key(), any()) -> {journal_key(), any()}.
|
||||
%% @doc
|
||||
%% If we wish to retain key deltas when an object in the Journal has been
|
||||
%% replaced - then this converts a Journal Key and Value into one which has no
|
||||
%% object body just the key deltas.
|
||||
revert_to_keydeltas({SQN, ?INKT_STND, LedgerKey}, InkerV) ->
|
||||
{_V, KeyDeltas} = revert_value_from_journal(InkerV),
|
||||
{{SQN, ?INKT_KEYD, LedgerKey}, {null, KeyDeltas}};
|
||||
revert_to_keydeltas(JournalKey, InkerV) ->
|
||||
{JournalKey, InkerV}.
|
||||
|
||||
%% Used when fetching objects, so only handles standard, hashable entries
|
||||
from_inkerkv(Object) ->
|
||||
from_inkerkv(Object, false).
|
||||
|
@ -571,8 +559,6 @@ check_forinkertype(_LedgerKey, head_only) ->
|
|||
check_forinkertype(_LedgerKey, _Object) ->
|
||||
?INKT_STND.
|
||||
|
||||
hash(Obj) ->
|
||||
erlang:phash2(term_to_binary(Obj)).
|
||||
|
||||
|
||||
|
||||
|
@ -624,6 +610,27 @@ gen_headspec({IdxOp, Bucket, Key, SubKey, Value}, SQN, TTL) ->
|
|||
{K, {SQN, Status, segment_hash(K), Value, undefined}}.
|
||||
|
||||
|
||||
-spec return_proxy(leveled_head:object_tag()|leveled_head:headonly_tag(),
|
||||
leveled_head:object_metadata(),
|
||||
pid(), journal_ref())
|
||||
-> proxy_objectbin()|leveled_head:object_metadata().
|
||||
%% @doc
|
||||
%% If the object has a value, return the metadata and a proxy through which
|
||||
%% the applictaion or runner can access the value. If it is a ?HEAD_TAG
|
||||
%% then it has no value, so just return the metadata
|
||||
return_proxy(?HEAD_TAG, ObjectMetadata, _InkerClone, _JR) ->
|
||||
% Object has no value - so proxy object makese no sense, just return the
|
||||
% metadata as is
|
||||
ObjectMetadata;
|
||||
return_proxy(Tag, ObjMetadata, InkerClone, JournalRef) ->
|
||||
Size = leveled_head:get_size(Tag, ObjMetadata),
|
||||
HeadBin = leveled_head:build_head(Tag, ObjMetadata),
|
||||
term_to_binary({proxy_object,
|
||||
HeadBin,
|
||||
Size,
|
||||
{fun leveled_bookie:fetch_value/2,
|
||||
InkerClone,
|
||||
JournalRef}}).
|
||||
|
||||
set_status(add, TTL) ->
|
||||
{active, TTL};
|
||||
|
@ -657,8 +664,8 @@ generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) ->
|
|||
{active, TS}
|
||||
end,
|
||||
Hash = segment_hash(PrimaryKey),
|
||||
{MD, LastMods} = extract_metadata(Obj, Size, Tag),
|
||||
ObjHash = get_objhash(Tag, MD),
|
||||
{MD, LastMods} = leveled_head:extract_metadata(Tag, Size, Obj),
|
||||
ObjHash = leveled_head:get_hash(Tag, MD),
|
||||
Value = {SQN,
|
||||
Status,
|
||||
Hash,
|
||||
|
@ -679,23 +686,10 @@ get_last_lastmodification(LastMods) ->
|
|||
{Mega, Sec, _Micro} = lists:max(LastMods),
|
||||
Mega * 1000000 + Sec.
|
||||
|
||||
|
||||
extract_metadata(Obj, Size, ?RIAK_TAG) ->
|
||||
riak_extract_metadata(Obj, Size);
|
||||
extract_metadata(Obj, Size, ?STD_TAG) ->
|
||||
{{hash(Obj), Size}, []}.
|
||||
|
||||
get_size(PK, Value) ->
|
||||
{Tag, _Bucket, _Key, _} = PK,
|
||||
MD = element(4, Value),
|
||||
case Tag of
|
||||
?RIAK_TAG ->
|
||||
{_RMD, _VC, _Hash, Size} = MD,
|
||||
Size;
|
||||
?STD_TAG ->
|
||||
{_Hash, Size} = MD,
|
||||
Size
|
||||
end.
|
||||
leveled_head:get_size(Tag, MD).
|
||||
|
||||
-spec get_keyandobjhash(tuple(), tuple()) -> tuple().
|
||||
%% @doc
|
||||
|
@ -709,105 +703,9 @@ get_keyandobjhash(LK, Value) ->
|
|||
?IDX_TAG ->
|
||||
from_ledgerkey(LK); % returns {Bucket, Key, IdxValue}
|
||||
_ ->
|
||||
{Bucket, Key, get_objhash(Tag, MD)}
|
||||
{Bucket, Key, leveled_head:get_hash(Tag, MD)}
|
||||
end.
|
||||
|
||||
get_objhash(Tag, ObjMetaData) ->
|
||||
case Tag of
|
||||
?RIAK_TAG ->
|
||||
{_RMD, _VC, Hash, _Size} = ObjMetaData,
|
||||
Hash;
|
||||
?STD_TAG ->
|
||||
{Hash, _Size} = ObjMetaData,
|
||||
Hash
|
||||
end.
|
||||
|
||||
|
||||
build_metadata_object(PrimaryKey, MD) ->
|
||||
{Tag, _Bucket, _Key, _SubKey} = PrimaryKey,
|
||||
case Tag of
|
||||
?RIAK_TAG ->
|
||||
{SibData, Vclock, _Hash, _Size} = MD,
|
||||
riak_metadata_to_binary(Vclock, SibData);
|
||||
?STD_TAG ->
|
||||
MD;
|
||||
?HEAD_TAG ->
|
||||
MD
|
||||
end.
|
||||
|
||||
|
||||
-spec riak_extract_metadata(binary()|delete, non_neg_integer()) ->
|
||||
{riak_metadata(), list()}.
|
||||
%% @doc
|
||||
%% Riak extract metadata should extract a metadata object which is a
|
||||
%% five-tuple of:
|
||||
%% - Binary of sibling Metadata
|
||||
%% - Binary of vector clock metadata
|
||||
%% - Non-exportable hash of the vector clock metadata
|
||||
%% - The largest last modified date of the object
|
||||
%% - Size of the object
|
||||
%%
|
||||
%% The metadata object should be returned with the full list of last
|
||||
%% modified dates (which will be used for recent anti-entropy index creation)
|
||||
riak_extract_metadata(delete, Size) ->
|
||||
{{delete, null, null, Size}, []};
|
||||
riak_extract_metadata(ObjBin, Size) ->
|
||||
{VclockBin, SibBin, LastMods} = riak_metadata_from_binary(ObjBin),
|
||||
{{SibBin,
|
||||
VclockBin,
|
||||
erlang:phash2(lists:sort(binary_to_term(VclockBin))),
|
||||
Size},
|
||||
LastMods}.
|
||||
|
||||
%% <<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
|
||||
%%% VclockBin/binary, SibCount:32/integer, SibsBin/binary>>.
|
||||
|
||||
riak_metadata_to_binary(VclockBin, SibMetaBin) ->
|
||||
VclockLen = byte_size(VclockBin),
|
||||
<<?MAGIC:8/integer, ?V1_VERS:8/integer,
|
||||
VclockLen:32/integer, VclockBin/binary,
|
||||
SibMetaBin/binary>>.
|
||||
|
||||
riak_metadata_from_binary(V1Binary) ->
|
||||
<<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
|
||||
Rest/binary>> = V1Binary,
|
||||
<<VclockBin:VclockLen/binary, SibCount:32/integer, SibsBin/binary>> = Rest,
|
||||
{SibMetaBin, LastMods} =
|
||||
case SibCount of
|
||||
SC when is_integer(SC) ->
|
||||
get_metadata_from_siblings(SibsBin,
|
||||
SibCount,
|
||||
<<SibCount:32/integer>>,
|
||||
[])
|
||||
end,
|
||||
{VclockBin, SibMetaBin, LastMods}.
|
||||
|
||||
get_metadata_from_siblings(<<>>, 0, SibMetaBin, LastMods) ->
|
||||
{SibMetaBin, LastMods};
|
||||
get_metadata_from_siblings(<<ValLen:32/integer, Rest0/binary>>,
|
||||
SibCount,
|
||||
SibMetaBin,
|
||||
LastMods) ->
|
||||
<<_ValBin:ValLen/binary, MetaLen:32/integer, Rest1/binary>> = Rest0,
|
||||
<<MetaBin:MetaLen/binary, Rest2/binary>> = Rest1,
|
||||
LastMod =
|
||||
case MetaBin of
|
||||
<<MegaSec:32/integer,
|
||||
Sec:32/integer,
|
||||
MicroSec:32/integer,
|
||||
_Rest/binary>> ->
|
||||
{MegaSec, Sec, MicroSec};
|
||||
_ ->
|
||||
{0, 0, 0}
|
||||
end,
|
||||
get_metadata_from_siblings(Rest2,
|
||||
SibCount - 1,
|
||||
<<SibMetaBin/binary,
|
||||
0:32/integer,
|
||||
MetaLen:32/integer,
|
||||
MetaBin:MetaLen/binary>>,
|
||||
[LastMod|LastMods]).
|
||||
|
||||
-spec next_key(key()) -> key().
|
||||
%% @doc
|
||||
%% Get the next key to iterate from a given point
|
||||
|
@ -825,6 +723,14 @@ next_key({Type, Bucket}) when is_binary(Type), is_binary(Bucket) ->
|
|||
|
||||
-ifdef(TEST).
|
||||
|
||||
valid_ledgerkey_test() ->
|
||||
UserDefTag = {user_defined, <<"B">>, <<"K">>, null},
|
||||
?assertMatch(true, isvalid_ledgerkey(UserDefTag)),
|
||||
KeyNotTuple = [?STD_TAG, <<"B">>, <<"K">>, null],
|
||||
?assertMatch(false, isvalid_ledgerkey(KeyNotTuple)),
|
||||
TagNotAtom = {"tag", <<"B">>, <<"K">>, null},
|
||||
?assertMatch(false, isvalid_ledgerkey(TagNotAtom)),
|
||||
?assertMatch(retain, get_tagstrategy(UserDefTag, inker_reload_strategy([]))).
|
||||
|
||||
indexspecs_test() ->
|
||||
IndexSpecs = [{add, "t1_int", 456},
|
||||
|
@ -849,44 +755,6 @@ endkey_passed_test() ->
|
|||
?assertMatch(true, endkey_passed(TestKey, K2)).
|
||||
|
||||
|
||||
general_skip_strategy_test() ->
|
||||
% Confirm that we will skip if the strategy says so
|
||||
TagStrat1 = compact_inkerkvc({{1,
|
||||
?INKT_STND,
|
||||
{?STD_TAG, "B1", "K1andSK", null}},
|
||||
{},
|
||||
true},
|
||||
[{?STD_TAG, skip}]),
|
||||
?assertMatch(skip, TagStrat1),
|
||||
TagStrat2 = compact_inkerkvc({{1,
|
||||
?INKT_KEYD,
|
||||
{?STD_TAG, "B1", "K1andSK", null}},
|
||||
{},
|
||||
true},
|
||||
[{?STD_TAG, skip}]),
|
||||
?assertMatch(skip, TagStrat2),
|
||||
TagStrat3 = compact_inkerkvc({{1,
|
||||
?INKT_KEYD,
|
||||
{?IDX_TAG, "B1", "K1", "SK"}},
|
||||
{},
|
||||
true},
|
||||
[{?STD_TAG, skip}]),
|
||||
?assertMatch(skip, TagStrat3),
|
||||
TagStrat4 = compact_inkerkvc({{1,
|
||||
?INKT_KEYD,
|
||||
{?IDX_TAG, "B1", "K1", "SK"}},
|
||||
{},
|
||||
true},
|
||||
[{?STD_TAG, skip}, {?IDX_TAG, recalc}]),
|
||||
?assertMatch({recalc, null}, TagStrat4),
|
||||
TagStrat5 = compact_inkerkvc({{1,
|
||||
?INKT_TOMB,
|
||||
{?IDX_TAG, "B1", "K1", "SK"}},
|
||||
{},
|
||||
true},
|
||||
[{?STD_TAG, skip}, {?IDX_TAG, recalc}]),
|
||||
?assertMatch(skip, TagStrat5).
|
||||
|
||||
|
||||
%% Test below proved that the overhead of performing hashes was trivial
|
||||
%% Maybe 5 microseconds per hash
|
||||
|
|
318
src/leveled_head.erl
Normal file
318
src/leveled_head.erl
Normal file
|
@ -0,0 +1,318 @@
|
|||
%% -------- Metadata Seperation - Head and Body ---------
|
||||
%%
|
||||
%% The definition of the part of the object that belongs to the HEAD, and
|
||||
%% the part which belongs to the body.
|
||||
%%
|
||||
%% For the ?RIAK tag this is pre-defined. For the ?STD_TAG there is minimal
|
||||
%% definition. For best use of Riak define a new tag and use pattern matching
|
||||
%% to extend these exported functions.
|
||||
%%
|
||||
%% Dynamic user-defined tags are allowed, and to support these user-defined
|
||||
%% shadow versions of the functions:
|
||||
%% - key_to_canonicalbinary/1 -> binary(),
|
||||
%% - build_head/2 -> head(),
|
||||
%% - extract_metadata/3 -> {std_metadata(), list(erlang:timestamp()}
|
||||
%% That support all the user-defined tags that are to be used
|
||||
|
||||
-module(leveled_head).
|
||||
|
||||
-include("include/leveled.hrl").
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-export([key_to_canonicalbinary/1,
|
||||
build_head/2,
|
||||
extract_metadata/3
|
||||
]).
|
||||
|
||||
-export([get_size/2,
|
||||
get_hash/2,
|
||||
defined_objecttags/0,
|
||||
default_reload_strategy/1,
|
||||
standard_hash/1
|
||||
]).
|
||||
|
||||
%% Exported for testing purposes
|
||||
-export([riak_metadata_to_binary/2,
|
||||
riak_extract_metadata/2]).
|
||||
|
||||
|
||||
-define(MAGIC, 53). % riak_kv -> riak_object
|
||||
-define(V1_VERS, 1).
|
||||
|
||||
-type object_tag() :: ?STD_TAG|?RIAK_TAG.
|
||||
% tags assigned to objects
|
||||
% (not other special entities such as ?HEAD or ?IDX)
|
||||
-type headonly_tag() :: ?HEAD_TAG.
|
||||
% Tag assigned to head_only objects. Behaviour cannot be changed
|
||||
|
||||
-type riak_metadata() :: {binary()|delete,
|
||||
% Sibling Metadata
|
||||
binary()|null,
|
||||
% Vclock Metadata
|
||||
non_neg_integer()|null,
|
||||
% Hash of vclock - non-exportable
|
||||
non_neg_integer()
|
||||
% Size in bytes of real object
|
||||
}.
|
||||
-type std_metadata() :: {non_neg_integer()|null,
|
||||
% Hash of value
|
||||
non_neg_integer(),
|
||||
% Size in bytes of real object
|
||||
list(tuple())|undefined
|
||||
% User-define metadata
|
||||
}.
|
||||
-type head_metadata() :: {non_neg_integer()|null,
|
||||
% Hash of value
|
||||
non_neg_integer()
|
||||
% Size in bytes of real object
|
||||
}.
|
||||
|
||||
-type object_metadata() :: riak_metadata()|std_metadata()|head_metadata().
|
||||
|
||||
-type appdefinable_function() ::
|
||||
key_to_canonicalbinary | build_head | extract_metadata.
|
||||
% Functions for which default behaviour can be over-written for the
|
||||
% application's own tags
|
||||
-type appdefinable_function_tuple() ::
|
||||
{appdefinable_function(), fun()}.
|
||||
|
||||
-type head() ::
|
||||
binary()|tuple().
|
||||
% TODO:
|
||||
% This is currently not always a binary. Wish is to migrate this so that
|
||||
% it is predictably a binary
|
||||
|
||||
|
||||
-export_type([object_tag/0,
|
||||
head/0,
|
||||
object_metadata/0,
|
||||
appdefinable_function_tuple/0]).
|
||||
|
||||
%%%============================================================================
|
||||
%%% Mutable External Functions
|
||||
%%%============================================================================
|
||||
|
||||
-spec key_to_canonicalbinary(tuple()) -> binary().
|
||||
%% @doc
|
||||
%% Convert a key to a binary in a consistent way for the tag. The binary will
|
||||
%% then be used to create the hash
|
||||
key_to_canonicalbinary({?RIAK_TAG, Bucket, Key, null})
|
||||
when is_binary(Bucket), is_binary(Key) ->
|
||||
<<Bucket/binary, Key/binary>>;
|
||||
key_to_canonicalbinary({?RIAK_TAG, {BucketType, Bucket}, Key, SubKey})
|
||||
when is_binary(BucketType), is_binary(Bucket) ->
|
||||
key_to_canonicalbinary({?RIAK_TAG,
|
||||
<<BucketType/binary, Bucket/binary>>,
|
||||
Key,
|
||||
SubKey});
|
||||
key_to_canonicalbinary(Key) when element(1, Key) == ?STD_TAG ->
|
||||
default_key_to_canonicalbinary(Key);
|
||||
key_to_canonicalbinary(Key) ->
|
||||
OverrideFun =
|
||||
get_appdefined_function(key_to_canonicalbinary,
|
||||
fun default_key_to_canonicalbinary/1,
|
||||
1),
|
||||
OverrideFun(Key).
|
||||
|
||||
default_key_to_canonicalbinary(Key) ->
|
||||
term_to_binary(Key).
|
||||
|
||||
|
||||
-spec build_head(object_tag()|headonly_tag(), object_metadata()) -> head().
|
||||
%% @doc
|
||||
%% Return the object metadata as a binary to be the "head" of the object
|
||||
build_head(?HEAD_TAG, Value) ->
|
||||
% Metadata is not extracted with head objects, the head response is
|
||||
% just the unfiltered value that was input.
|
||||
default_build_head(?HEAD_TAG, Value);
|
||||
build_head(?RIAK_TAG, Metadata) ->
|
||||
{SibData, Vclock, _Hash, _Size} = Metadata,
|
||||
riak_metadata_to_binary(Vclock, SibData);
|
||||
build_head(?STD_TAG, Metadata) ->
|
||||
default_build_head(?STD_TAG, Metadata);
|
||||
build_head(Tag, Metadata) ->
|
||||
OverrideFun =
|
||||
get_appdefined_function(build_head,
|
||||
fun default_build_head/2,
|
||||
2),
|
||||
OverrideFun(Tag, Metadata).
|
||||
|
||||
default_build_head(_Tag, Metadata) ->
|
||||
Metadata.
|
||||
|
||||
|
||||
-spec extract_metadata(object_tag(), non_neg_integer(), any())
|
||||
-> {object_metadata(), list(erlang:timestamp())}.
|
||||
%% @doc
|
||||
%% Take the inbound object and extract from it the metadata to be stored within
|
||||
%% the ledger (and ultimately returned from a leveled_boookie:book_head/4
|
||||
%% request (after conversion using build_head/2).
|
||||
%%
|
||||
%% As part of the response also return a list of last_modification_dates
|
||||
%% associated with the object - with those dates being expressed as erlang
|
||||
%% timestamps.
|
||||
%%
|
||||
%% The Object Size passed in to this function is as calculated when writing
|
||||
%% the object to the Journal. It may be recalculated here, if an alternative
|
||||
%% view of size is required within the header
|
||||
%%
|
||||
%% Note objects with a ?HEAD_TAG should never be passed, as there is no
|
||||
extract_metadata(?RIAK_TAG, SizeAsStoredInJournal, RiakObj) ->
|
||||
riak_extract_metadata(RiakObj, SizeAsStoredInJournal);
|
||||
extract_metadata(?STD_TAG, SizeAsStoredInJournal, Obj) ->
|
||||
default_extract_metadata(?STD_TAG, SizeAsStoredInJournal, Obj);
|
||||
extract_metadata(Tag, SizeAsStoredInJournal, Obj) ->
|
||||
OverrideFun =
|
||||
get_appdefined_function(extract_metadata,
|
||||
fun default_extract_metadata/3,
|
||||
3),
|
||||
OverrideFun(Tag, SizeAsStoredInJournal, Obj).
|
||||
|
||||
default_extract_metadata(_Tag, SizeAsStoredInJournal, Obj) ->
|
||||
{{standard_hash(Obj), SizeAsStoredInJournal, undefined}, []}.
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
%%% Standard External Functions
|
||||
%%%============================================================================
|
||||
|
||||
-spec defined_objecttags() -> list(object_tag()).
|
||||
%% @doc
|
||||
%% Return the list of object tags
|
||||
defined_objecttags() ->
|
||||
[?STD_TAG, ?RIAK_TAG].
|
||||
|
||||
|
||||
-spec default_reload_strategy(object_tag())
|
||||
-> {object_tag(),
|
||||
leveled_codec:compaction_method()}.
|
||||
%% @doc
|
||||
%% State the compaction_method to be used when reloading the Ledger from the
|
||||
%% journal for each object tag. Note, no compaction startegy required for
|
||||
%% head_only tag
|
||||
default_reload_strategy(Tag) ->
|
||||
{Tag, retain}.
|
||||
|
||||
|
||||
-spec get_size(object_tag()|headonly_tag(), object_metadata())
|
||||
-> non_neg_integer().
|
||||
%% @doc
|
||||
%% Fetch the size from the metadata
|
||||
get_size(?RIAK_TAG, RiakObjectMetadata) ->
|
||||
element(4, RiakObjectMetadata);
|
||||
get_size(_Tag, ObjectMetadata) ->
|
||||
element(2, ObjectMetadata).
|
||||
|
||||
|
||||
-spec get_hash(object_tag()|headonly_tag(), object_metadata())
|
||||
-> non_neg_integer().
|
||||
%% @doc
|
||||
%% Fetch the hash from the metadata
|
||||
get_hash(?RIAK_TAG, RiakObjectMetadata) ->
|
||||
element(3, RiakObjectMetadata);
|
||||
get_hash(_Tag, ObjectMetadata) ->
|
||||
element(1, ObjectMetadata).
|
||||
|
||||
-spec standard_hash(any()) -> non_neg_integer().
|
||||
%% @doc
|
||||
%% Hash the whole object
|
||||
standard_hash(Obj) ->
|
||||
erlang:phash2(term_to_binary(Obj)).
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
%%% Handling Override Functions
|
||||
%%%============================================================================
|
||||
|
||||
-spec get_appdefined_function(appdefinable_function(),
|
||||
fun(),
|
||||
non_neg_integer()) -> fun().
|
||||
%% @doc
|
||||
%% If a keylist of [{function_name, fun()}] has been set as an environment
|
||||
%% variable for a tag, then this FunctionName can be used instead of the
|
||||
%% default
|
||||
get_appdefined_function(FunctionName, DefaultFun, RequiredArity) ->
|
||||
case application:get_env(leveled, FunctionName) of
|
||||
undefined ->
|
||||
DefaultFun;
|
||||
{ok, Fun} when is_function(Fun, RequiredArity) ->
|
||||
Fun
|
||||
end.
|
||||
|
||||
%%%============================================================================
|
||||
%%% Tag-specific Internal Functions
|
||||
%%%============================================================================
|
||||
|
||||
|
||||
-spec riak_extract_metadata(binary()|delete, non_neg_integer()) ->
|
||||
{riak_metadata(), list()}.
|
||||
%% @doc
|
||||
%% Riak extract metadata should extract a metadata object which is a
|
||||
%% five-tuple of:
|
||||
%% - Binary of sibling Metadata
|
||||
%% - Binary of vector clock metadata
|
||||
%% - Non-exportable hash of the vector clock metadata
|
||||
%% - The largest last modified date of the object
|
||||
%% - Size of the object
|
||||
%%
|
||||
%% The metadata object should be returned with the full list of last
|
||||
%% modified dates (which will be used for recent anti-entropy index creation)
|
||||
riak_extract_metadata(delete, Size) ->
|
||||
{{delete, null, null, Size}, []};
|
||||
riak_extract_metadata(ObjBin, Size) ->
|
||||
{VclockBin, SibBin, LastMods} = riak_metadata_from_binary(ObjBin),
|
||||
{{SibBin,
|
||||
VclockBin,
|
||||
erlang:phash2(lists:sort(binary_to_term(VclockBin))),
|
||||
Size},
|
||||
LastMods}.
|
||||
|
||||
%% <<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
|
||||
%%% VclockBin/binary, SibCount:32/integer, SibsBin/binary>>.
|
||||
|
||||
riak_metadata_to_binary(VclockBin, SibMetaBin) ->
|
||||
VclockLen = byte_size(VclockBin),
|
||||
<<?MAGIC:8/integer, ?V1_VERS:8/integer,
|
||||
VclockLen:32/integer, VclockBin/binary,
|
||||
SibMetaBin/binary>>.
|
||||
|
||||
riak_metadata_from_binary(V1Binary) ->
|
||||
<<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
|
||||
Rest/binary>> = V1Binary,
|
||||
<<VclockBin:VclockLen/binary, SibCount:32/integer, SibsBin/binary>> = Rest,
|
||||
{SibMetaBin, LastMods} =
|
||||
case SibCount of
|
||||
SC when is_integer(SC) ->
|
||||
get_metadata_from_siblings(SibsBin,
|
||||
SibCount,
|
||||
<<SibCount:32/integer>>,
|
||||
[])
|
||||
end,
|
||||
{VclockBin, SibMetaBin, LastMods}.
|
||||
|
||||
get_metadata_from_siblings(<<>>, 0, SibMetaBin, LastMods) ->
|
||||
{SibMetaBin, LastMods};
|
||||
get_metadata_from_siblings(<<ValLen:32/integer, Rest0/binary>>,
|
||||
SibCount,
|
||||
SibMetaBin,
|
||||
LastMods) ->
|
||||
<<_ValBin:ValLen/binary, MetaLen:32/integer, Rest1/binary>> = Rest0,
|
||||
<<MetaBin:MetaLen/binary, Rest2/binary>> = Rest1,
|
||||
LastMod =
|
||||
case MetaBin of
|
||||
<<MegaSec:32/integer,
|
||||
Sec:32/integer,
|
||||
MicroSec:32/integer,
|
||||
_Rest/binary>> ->
|
||||
{MegaSec, Sec, MicroSec};
|
||||
_ ->
|
||||
{0, 0, 0}
|
||||
end,
|
||||
get_metadata_from_siblings(Rest2,
|
||||
SibCount - 1,
|
||||
<<SibMetaBin/binary,
|
||||
0:32/integer,
|
||||
MetaLen:32/integer,
|
||||
MetaBin:MetaLen/binary>>,
|
||||
[LastMod|LastMods]).
|
|
@ -654,29 +654,56 @@ split_positions_into_batches(Positions, Journal, Batches) ->
|
|||
Batches ++ [{Journal, ThisBatch}]).
|
||||
|
||||
|
||||
%% @doc
|
||||
%% For the Keys and values taken from the Journal file, which are required
|
||||
%% in the compacted journal file. To be required, they must still be active
|
||||
%% (i.e. be the current SQN for that LedgerKey in the Ledger). However, if
|
||||
%% it is not active, we still need to retain some information if for this
|
||||
%% object tag we want to be able to rebuild the KeyStore by relaoding the
|
||||
%% KeyDeltas (the retain reload strategy)
|
||||
%%
|
||||
%% If the reload strategy is recalc, we assume that we can reload by
|
||||
%% recalculating the KeyChanges by looking at the object when we reload. So
|
||||
%% old objects can be discarded.
|
||||
%%
|
||||
%% If the strategy is skip, we don't care about KeyDeltas. Note though, that
|
||||
%% if the ledger is deleted it may not be possible to safely rebuild a KeyStore
|
||||
%% if it contains index entries. The hot_backup approach is also not safe with
|
||||
%% a `skip` strategy.
|
||||
filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) ->
|
||||
lists:foldl(fun(KVC0, Acc) ->
|
||||
R = leveled_codec:compact_inkerkvc(KVC0, ReloadStrategy),
|
||||
case R of
|
||||
skip ->
|
||||
Acc;
|
||||
{TStrat, KVC1} ->
|
||||
{K, _V, CrcCheck} = KVC0,
|
||||
{SQN, LedgerKey} = leveled_codec:from_journalkey(K),
|
||||
KeyValid = FilterFun(FilterServer, LedgerKey, SQN),
|
||||
case {KeyValid, CrcCheck, SQN > MaxSQN, TStrat} of
|
||||
{false, true, false, retain} ->
|
||||
Acc ++ [KVC1];
|
||||
{false, true, false, _} ->
|
||||
Acc;
|
||||
_ ->
|
||||
Acc ++ [KVC0]
|
||||
end
|
||||
FoldFun =
|
||||
fun(KVC0, Acc) ->
|
||||
case KVC0 of
|
||||
{_InkKey, crc_wonky, false} ->
|
||||
% Bad entry, disregard, don't check
|
||||
Acc;
|
||||
{JK, JV, _Check} ->
|
||||
{SQN, LK} =
|
||||
leveled_codec:from_journalkey(JK),
|
||||
CompactStrategy =
|
||||
leveled_codec:get_tagstrategy(LK, ReloadStrategy),
|
||||
KeyValid = FilterFun(FilterServer, LK, SQN),
|
||||
IsInMemory = SQN > MaxSQN,
|
||||
case {KeyValid or IsInMemory, CompactStrategy} of
|
||||
{true, _} ->
|
||||
% This entry may still be required regardless of
|
||||
% strategy
|
||||
[KVC0|Acc];
|
||||
{false, retain} ->
|
||||
% If we have a retain startegy, it can't be
|
||||
% discarded - but the value part is no longer
|
||||
% required as this version has been replaced
|
||||
{JK0, JV0} =
|
||||
leveled_codec:revert_to_keydeltas(JK, JV),
|
||||
[{JK0, JV0, null}|Acc];
|
||||
{false, _} ->
|
||||
% This is out of date and not retained - discard
|
||||
Acc
|
||||
end
|
||||
end,
|
||||
[],
|
||||
KVCs).
|
||||
|
||||
end
|
||||
end,
|
||||
lists:reverse(lists:foldl(FoldFun, [], KVCs)).
|
||||
|
||||
|
||||
write_values([], _CDBopts, Journal0, ManSlice0, _PressMethod) ->
|
||||
{Journal0, ManSlice0};
|
||||
|
|
|
@ -357,7 +357,8 @@ ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) ->
|
|||
ink_compactjournal(Pid, Bookie, Timeout) ->
|
||||
CheckerInitiateFun = fun initiate_penciller_snapshot/1,
|
||||
CheckerCloseFun = fun leveled_penciller:pcl_close/1,
|
||||
CheckerFilterFun = fun leveled_penciller:pcl_checksequencenumber/3,
|
||||
CheckerFilterFun =
|
||||
wrap_checkfilterfun(fun leveled_penciller:pcl_checksequencenumber/3),
|
||||
gen_server:call(Pid,
|
||||
{compact,
|
||||
Bookie,
|
||||
|
@ -1185,6 +1186,20 @@ initiate_penciller_snapshot(Bookie) ->
|
|||
MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap),
|
||||
{LedgerSnap, MaxSQN}.
|
||||
|
||||
|
||||
-spec wrap_checkfilterfun(fun()) -> fun().
|
||||
%% @doc
|
||||
%% Make a check of the validity of the key being passed into the CheckFilterFun
|
||||
wrap_checkfilterfun(CheckFilterFun) ->
|
||||
fun(Pcl, LK, SQN) ->
|
||||
case leveled_codec:isvalid_ledgerkey(LK) of
|
||||
true ->
|
||||
CheckFilterFun(Pcl, LK, SQN);
|
||||
false ->
|
||||
false
|
||||
end
|
||||
end.
|
||||
|
||||
%%%============================================================================
|
||||
%%% Test
|
||||
%%%============================================================================
|
||||
|
@ -1438,6 +1453,16 @@ empty_manifest_test() ->
|
|||
ink_close(Ink2),
|
||||
clean_testdir(RootPath).
|
||||
|
||||
|
||||
wrapper_test() ->
|
||||
KeyNotTuple = [?STD_TAG, <<"B">>, <<"K">>, null],
|
||||
TagNotAtom = {"tag", <<"B">>, <<"K">>, null},
|
||||
CheckFilterFun = fun(_Pcl, _LK, _SQN) -> true end,
|
||||
WrappedFun = wrap_checkfilterfun(CheckFilterFun),
|
||||
?assertMatch(false, WrappedFun(null, KeyNotTuple, 1)),
|
||||
?assertMatch(false, WrappedFun(null, TagNotAtom, 1)).
|
||||
|
||||
|
||||
coverage_cheat_test() ->
|
||||
{noreply, _State0} = handle_info(timeout, #state{}),
|
||||
{ok, _State1} = code_change(null, #state{}, null).
|
||||
|
|
|
@ -300,6 +300,7 @@
|
|||
:: {pos_integer(),
|
||||
list(leveled_codec:ledger_kv()|leveled_sst:expandable_pointer())}.
|
||||
-type iterator() :: list(iterator_entry()).
|
||||
-type bad_ledgerkey() :: list().
|
||||
|
||||
%%%============================================================================
|
||||
%%% API
|
||||
|
@ -472,7 +473,7 @@ pcl_fetchnextkey(Pid, StartKey, EndKey, AccFun, InitAcc) ->
|
|||
infinity).
|
||||
|
||||
-spec pcl_checksequencenumber(pid(),
|
||||
leveled_codec:ledger_key(),
|
||||
leveled_codec:ledger_key()|bad_ledgerkey(),
|
||||
integer()) -> boolean().
|
||||
%% @doc
|
||||
%% Check if the sequence number of the passed key is not replaced by a change
|
||||
|
@ -2003,6 +2004,7 @@ simple_server_test() ->
|
|||
"Key0004",
|
||||
null},
|
||||
3004)),
|
||||
|
||||
% Add some more keys and confirm that check sequence number still
|
||||
% sees the old version in the previous snapshot, but will see the new
|
||||
% version in a new snapshot
|
||||
|
@ -2299,6 +2301,7 @@ handle_down_test() ->
|
|||
pcl_close(PCLr),
|
||||
clean_testdir(RootPath).
|
||||
|
||||
|
||||
%% the fake bookie. Some calls to leveled_bookie (like the two below)
|
||||
%% do not go via the gen_server (but it looks like they expect to be
|
||||
%% called by the gen_server, internally!) they use "self()" to
|
||||
|
|
|
@ -678,27 +678,25 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
|
|||
end,
|
||||
JK = {leveled_codec:to_ledgerkey(B, K, Tag), SQN},
|
||||
case DeferredFetch of
|
||||
{true, true} ->
|
||||
InJournal =
|
||||
leveled_inker:ink_keycheck(InkerClone,
|
||||
LK,
|
||||
SQN),
|
||||
case InJournal of
|
||||
probably ->
|
||||
ProxyObj =
|
||||
make_proxy_object(Tag,
|
||||
LK, JK, MD, V,
|
||||
InkerClone),
|
||||
FoldObjectsFun(B, K, ProxyObj, Acc);
|
||||
missing ->
|
||||
Acc
|
||||
end;
|
||||
{true, false} ->
|
||||
{true, JournalCheck} ->
|
||||
ProxyObj =
|
||||
make_proxy_object(Tag,
|
||||
LK, JK, MD, V,
|
||||
InkerClone),
|
||||
FoldObjectsFun(B, K, ProxyObj, Acc);
|
||||
leveled_codec:return_proxy(Tag, MD,
|
||||
InkerClone, JK),
|
||||
case JournalCheck of
|
||||
true ->
|
||||
InJournal =
|
||||
leveled_inker:ink_keycheck(InkerClone,
|
||||
LK,
|
||||
SQN),
|
||||
case InJournal of
|
||||
probably ->
|
||||
FoldObjectsFun(B, K, ProxyObj, Acc);
|
||||
missing ->
|
||||
Acc
|
||||
end;
|
||||
false ->
|
||||
FoldObjectsFun(B, K, ProxyObj, Acc)
|
||||
end;
|
||||
false ->
|
||||
R = leveled_bookie:fetch_value(InkerClone, JK),
|
||||
case R of
|
||||
|
@ -706,7 +704,6 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
|
|||
Acc;
|
||||
Value ->
|
||||
FoldObjectsFun(B, K, Value, Acc)
|
||||
|
||||
end
|
||||
end;
|
||||
false ->
|
||||
|
@ -716,16 +713,6 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
|
|||
AccFun.
|
||||
|
||||
|
||||
make_proxy_object(?HEAD_TAG, _LK, _JK, MD, _V, _InkerClone) ->
|
||||
MD;
|
||||
make_proxy_object(_Tag, LK, JK, MD, V, InkerClone) ->
|
||||
Size = leveled_codec:get_size(LK, V),
|
||||
MDBin = leveled_codec:build_metadata_object(LK, MD),
|
||||
term_to_binary({proxy_object,
|
||||
MDBin,
|
||||
Size,
|
||||
{fun leveled_bookie:fetch_value/2, InkerClone, JK}}).
|
||||
|
||||
check_presence(Key, Value, InkerClone) ->
|
||||
{LedgerKey, SQN} = leveled_codec:strip_to_keyseqonly({Key, Value}),
|
||||
case leveled_inker:ink_keycheck(InkerClone, LedgerKey, SQN) of
|
||||
|
|
|
@ -3259,7 +3259,7 @@ nonsense_coverage_test() ->
|
|||
|
||||
hashmatching_bytreesize_test() ->
|
||||
B = <<"Bucket">>,
|
||||
V = leveled_codec:riak_metadata_to_binary(term_to_binary([{"actor1", 1}]),
|
||||
V = leveled_head:riak_metadata_to_binary(term_to_binary([{"actor1", 1}]),
|
||||
<<1:32/integer,
|
||||
0:32/integer,
|
||||
0:32/integer>>),
|
||||
|
|
121
test/end_to_end/appdefined_SUITE.erl
Normal file
121
test/end_to_end/appdefined_SUITE.erl
Normal file
|
@ -0,0 +1,121 @@
|
|||
-module(appdefined_SUITE).
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include("include/leveled.hrl").
|
||||
-export([all/0]).
|
||||
-export([application_defined_tag/1
|
||||
]).
|
||||
|
||||
all() -> [
|
||||
application_defined_tag
|
||||
].
|
||||
|
||||
|
||||
|
||||
application_defined_tag(_Config) ->
|
||||
T1 = os:timestamp(),
|
||||
application_defined_tag_tester(40000, ?STD_TAG, [], false),
|
||||
io:format("Completed with std tag in ~w ms~n",
|
||||
[timer:now_diff(os:timestamp(), T1)/1000]),
|
||||
|
||||
T2 = os:timestamp(),
|
||||
application_defined_tag_tester(40000, bespoke_tag1, [], false),
|
||||
io:format("Completed with app tag but not function in ~w ms~n",
|
||||
[timer:now_diff(os:timestamp(), T2)/1000]),
|
||||
|
||||
ExtractMDFun =
|
||||
fun(Tag, Size, Obj) ->
|
||||
[{hash, Hash}, {shard, Shard}, {random, Random}, {value, _V}]
|
||||
= Obj,
|
||||
case Tag of
|
||||
bespoke_tag1 ->
|
||||
{{Hash, Size, [{shard, Shard}, {random, Random}]}, []};
|
||||
bespoke_tag2 ->
|
||||
{{Hash, Size, [{shard, Shard}]}, [os:timestamp()]}
|
||||
end
|
||||
end,
|
||||
|
||||
T3 = os:timestamp(),
|
||||
application_defined_tag_tester(40000, ?STD_TAG,
|
||||
[{extract_metadata, ExtractMDFun}],
|
||||
false),
|
||||
io:format("Completed with std tag and override function in ~w ms~n",
|
||||
[timer:now_diff(os:timestamp(), T3)/1000]),
|
||||
|
||||
T4 = os:timestamp(),
|
||||
application_defined_tag_tester(40000, bespoke_tag1,
|
||||
[{extract_metadata, ExtractMDFun}],
|
||||
true),
|
||||
io:format("Completed with app tag and override function in ~w ms~n",
|
||||
[timer:now_diff(os:timestamp(), T4)/1000]),
|
||||
|
||||
|
||||
T5 = os:timestamp(),
|
||||
application_defined_tag_tester(40000, bespoke_tag2,
|
||||
[{extract_metadata, ExtractMDFun}],
|
||||
true),
|
||||
io:format("Completed with app tag and override function in ~w ms~n",
|
||||
[timer:now_diff(os:timestamp(), T5)/1000]).
|
||||
|
||||
|
||||
application_defined_tag_tester(KeyCount, Tag, Functions, ExpectMD) ->
|
||||
RootPath = testutil:reset_filestructure(),
|
||||
StartOpts1 = [{root_path, RootPath},
|
||||
{sync_strategy, testutil:sync_strategy()},
|
||||
{log_level, warn},
|
||||
{override_functions, Functions}],
|
||||
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
|
||||
Value = leveled_rand:rand_bytes(512),
|
||||
MapFun =
|
||||
fun(C) ->
|
||||
{C, object_generator(C, Value)}
|
||||
end,
|
||||
CBKVL = lists:map(MapFun, lists:seq(1, KeyCount)),
|
||||
|
||||
PutFun =
|
||||
fun({_C, {B, K, O}}) ->
|
||||
R = leveled_bookie:book_put(Bookie1, B, K, O, [], Tag),
|
||||
case R of
|
||||
ok -> ok;
|
||||
pause -> timer:sleep(100)
|
||||
end
|
||||
end,
|
||||
lists:foreach(PutFun, CBKVL),
|
||||
|
||||
CheckFun =
|
||||
fun(Book) ->
|
||||
fun({C, {B, K, O}}) ->
|
||||
{ok, O} = leveled_bookie:book_get(Book, B, K, Tag),
|
||||
{ok, H} = leveled_bookie:book_head(Book, B, K, Tag),
|
||||
MD = element(3, H),
|
||||
case ExpectMD of
|
||||
true ->
|
||||
true =
|
||||
{shard, C rem 10} == lists:keyfind(shard, 1, MD);
|
||||
false ->
|
||||
true =
|
||||
undefined == MD
|
||||
end
|
||||
end
|
||||
end,
|
||||
|
||||
lists:foreach(CheckFun(Bookie1), CBKVL),
|
||||
|
||||
ok = leveled_bookie:book_close(Bookie1),
|
||||
{ok, Bookie2} = leveled_bookie:book_start(StartOpts1),
|
||||
|
||||
lists:foreach(CheckFun(Bookie2), CBKVL),
|
||||
|
||||
ok = leveled_bookie:book_close(Bookie2).
|
||||
|
||||
|
||||
|
||||
|
||||
object_generator(Count, V) ->
|
||||
Hash = erlang:phash2({count, V}),
|
||||
Random = leveled_rand:uniform(1000),
|
||||
Key = list_to_binary(leveled_util:generate_uuid()),
|
||||
Bucket = <<"B">>,
|
||||
{Bucket,
|
||||
Key,
|
||||
[{hash, Hash}, {shard, Count rem 10},
|
||||
{random, Random}, {value, V}]}.
|
|
@ -69,9 +69,9 @@ simple_test_withlog(LogLevel, ForcedLogs) ->
|
|||
ok = leveled_bookie:book_put(Bookie2, "Bucket1", "Key2", "Value2",
|
||||
[{add, "Index1", "Term1"}]),
|
||||
{ok, "Value2"} = leveled_bookie:book_get(Bookie2, "Bucket1", "Key2"),
|
||||
{ok, {62888926, 60}} = leveled_bookie:book_head(Bookie2,
|
||||
"Bucket1",
|
||||
"Key2"),
|
||||
{ok, {62888926, 60, undefined}} = leveled_bookie:book_head(Bookie2,
|
||||
"Bucket1",
|
||||
"Key2"),
|
||||
testutil:check_formissingobject(Bookie2, "Bucket1", "Key2"),
|
||||
ok = leveled_bookie:book_put(Bookie2, "Bucket1", "Key2", <<"Value2">>,
|
||||
[{remove, "Index1", "Term1"},
|
||||
|
|
|
@ -363,7 +363,7 @@ check_forobject(Bookie, TestObject) ->
|
|||
TestObject#r_object.bucket,
|
||||
TestObject#r_object.key),
|
||||
{{_SibMetaBin, Vclock, _Hash, size}, _LMS}
|
||||
= leveled_codec:riak_extract_metadata(HeadBinary, size),
|
||||
= leveled_head:riak_extract_metadata(HeadBinary, size),
|
||||
true = binary_to_term(Vclock) == TestObject#r_object.vclock.
|
||||
|
||||
check_formissingobject(Bookie, Bucket, Key) ->
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue