2016-08-02 13:44:48 +01:00
|
|
|
%% -------- Overview ---------
|
|
|
|
%%
|
|
|
|
%% The eleveleddb is based on the LSM-tree similar to leveldb, except that:
|
|
|
|
%% - Keys, Metadata and Values are not persisted together - the Keys and
|
|
|
|
%% Metadata are kept in a tree-based ledger, whereas the values are stored
|
|
|
|
%% only in a sequential Journal.
|
2017-02-27 12:24:26 +00:00
|
|
|
%% - Different file formats are used for Journal (based on DJ Bernstein
|
|
|
|
%% constant database), and the ledger (based on sst)
|
2016-11-07 11:02:38 +00:00
|
|
|
%% - It is not intended to be general purpose, but be primarily suited for
|
2016-08-02 13:44:48 +01:00
|
|
|
%% use as a Riak backend in specific circumstances (relatively large values,
|
|
|
|
%% and frequent use of iterators)
|
|
|
|
%% - The Journal is an extended nursery log in leveldb terms. It is keyed
|
|
|
|
%% on the sequence number of the write
|
2016-10-03 23:34:28 +01:00
|
|
|
%% - The ledger is a merge tree, where the key is the actaul object key, and
|
2016-08-02 13:44:48 +01:00
|
|
|
%% the value is the metadata of the object including the sequence number
|
|
|
|
%%
|
|
|
|
%%
|
2016-11-07 11:07:12 +00:00
|
|
|
%% -------- Actors ---------
|
2016-08-02 13:44:48 +01:00
|
|
|
%%
|
|
|
|
%% The store is fronted by a Bookie, who takes support from different actors:
|
2016-10-09 22:33:45 +01:00
|
|
|
%% - An Inker who persists new data into the journal, and returns items from
|
2016-08-02 13:44:48 +01:00
|
|
|
%% the journal based on sequence number
|
2016-10-09 22:33:45 +01:00
|
|
|
%% - A Penciller who periodically redraws the ledger, that associates keys with
|
|
|
|
%% sequence numbers and other metadata, as well as secondary keys (for index
|
|
|
|
%% queries)
|
2016-08-02 13:44:48 +01:00
|
|
|
%% - One or more Clerks, who may be used by either the inker or the penciller
|
|
|
|
%% to fulfill background tasks
|
|
|
|
%%
|
|
|
|
%% Both the Inker and the Penciller maintain a manifest of the files which
|
|
|
|
%% represent the current state of the Journal and the Ledger repsectively.
|
|
|
|
%% For the Inker the manifest maps ranges of sequence numbers to cdb files.
|
|
|
|
%% For the Penciller the manifest maps key ranges to files at each level of
|
|
|
|
%% the Ledger.
|
|
|
|
%%
|
|
|
|
|
|
|
|
|
|
|
|
-module(leveled_bookie).
|
|
|
|
|
2016-09-08 14:21:30 +01:00
|
|
|
-behaviour(gen_server).
|
|
|
|
|
2016-10-18 01:59:03 +01:00
|
|
|
-include("include/leveled.hrl").
|
2016-09-08 14:21:30 +01:00
|
|
|
|
|
|
|
-export([init/1,
|
|
|
|
handle_call/3,
|
|
|
|
handle_cast/2,
|
|
|
|
handle_info/2,
|
|
|
|
terminate/2,
|
2016-09-15 10:53:24 +01:00
|
|
|
code_change/3,
|
|
|
|
book_start/1,
|
2016-11-25 17:41:08 +00:00
|
|
|
book_start/4,
|
2016-10-14 18:43:16 +01:00
|
|
|
book_put/5,
|
2016-10-31 12:12:06 +00:00
|
|
|
book_put/6,
|
|
|
|
book_tempput/7,
|
2016-10-16 15:41:09 +01:00
|
|
|
book_delete/4,
|
2016-10-14 18:43:16 +01:00
|
|
|
book_get/3,
|
2016-11-07 10:42:49 +00:00
|
|
|
book_get/4,
|
2016-10-14 18:43:16 +01:00
|
|
|
book_head/3,
|
2016-11-07 10:42:49 +00:00
|
|
|
book_head/4,
|
2016-10-12 17:12:49 +01:00
|
|
|
book_returnfolder/2,
|
2016-09-23 18:50:29 +01:00
|
|
|
book_snapshotstore/3,
|
|
|
|
book_snapshotledger/3,
|
2016-10-03 23:34:28 +01:00
|
|
|
book_compactjournal/2,
|
2016-10-26 20:39:16 +01:00
|
|
|
book_islastcompactionpending/1,
|
2016-11-21 12:34:40 +00:00
|
|
|
book_close/1,
|
|
|
|
book_destroy/1]).
|
2016-09-08 14:21:30 +01:00
|
|
|
|
2016-11-02 12:58:27 +00:00
|
|
|
-export([get_opt/2,
|
2016-12-11 01:02:56 +00:00
|
|
|
get_opt/3,
|
|
|
|
empty_ledgercache/0,
|
2017-01-20 16:36:20 +00:00
|
|
|
loadqueue_ledgercache/1,
|
2017-03-06 18:42:32 +00:00
|
|
|
push_ledgercache/2,
|
2017-04-07 12:09:11 +00:00
|
|
|
snapshot_store/5,
|
|
|
|
fetch_value/2]).
|
2016-11-02 12:58:27 +00:00
|
|
|
|
2016-09-08 14:21:30 +01:00
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
|
2016-12-13 22:29:51 +00:00
|
|
|
-define(CACHE_SIZE, 2500).
|
2016-09-15 15:14:49 +01:00
|
|
|
-define(JOURNAL_FP, "journal").
|
|
|
|
-define(LEDGER_FP, "ledger").
|
2016-10-05 09:54:53 +01:00
|
|
|
-define(SNAPSHOT_TIMEOUT, 300000).
|
2016-10-31 18:51:23 +00:00
|
|
|
-define(CHECKJOURNAL_PROB, 0.2).
|
2016-12-09 09:52:31 +00:00
|
|
|
-define(CACHE_SIZE_JITTER, 25).
|
2016-12-09 14:36:03 +00:00
|
|
|
-define(JOURNAL_SIZE_JITTER, 20).
|
2016-12-22 14:03:31 +00:00
|
|
|
-define(LONG_RUNNING, 80000).
|
2016-09-08 14:21:30 +01:00
|
|
|
|
2017-01-05 17:00:12 +00:00
|
|
|
-record(ledger_cache, {mem :: ets:tab(),
|
2017-03-02 21:24:40 +00:00
|
|
|
loader = leveled_tree:empty(?CACHE_TYPE)
|
|
|
|
:: tuple()|empty_cache,
|
2017-01-20 16:36:20 +00:00
|
|
|
load_queue = [] :: list(),
|
2017-03-02 17:49:43 +00:00
|
|
|
index = leveled_pmem:new_index(), % array or empty_index
|
2016-12-11 01:02:56 +00:00
|
|
|
min_sqn = infinity :: integer()|infinity,
|
|
|
|
max_sqn = 0 :: integer()}).
|
|
|
|
|
2016-09-08 14:21:30 +01:00
|
|
|
-record(state, {inker :: pid(),
|
2016-09-15 10:53:24 +01:00
|
|
|
penciller :: pid(),
|
|
|
|
cache_size :: integer(),
|
2016-12-11 01:02:56 +00:00
|
|
|
ledger_cache = #ledger_cache{},
|
2016-11-05 14:31:10 +00:00
|
|
|
is_snapshot :: boolean(),
|
2016-12-20 23:11:50 +00:00
|
|
|
slow_offer = false :: boolean(),
|
2016-12-22 15:45:38 +00:00
|
|
|
put_timing :: tuple(),
|
|
|
|
get_timing :: tuple()}).
|
2016-09-09 15:58:19 +01:00
|
|
|
|
|
|
|
|
2016-09-08 14:21:30 +01:00
|
|
|
%%%============================================================================
|
|
|
|
%%% API
|
|
|
|
%%%============================================================================
|
|
|
|
|
2017-02-27 11:58:16 +00:00
|
|
|
%% @doc Start a Leveled Key/Value store - limited options support.
|
|
|
|
%%
|
|
|
|
%% The most common startup parameters are extracted out from the options to
|
|
|
|
%% provide this startup method. This will start a KV store from the previous
|
|
|
|
%% store at root path - or an empty one if there is no store at the path.
|
|
|
|
%%
|
|
|
|
%% Fiddling with the LedgerCacheSize and JournalSize may improve performance,
|
|
|
|
%% but these are primarily exposed to support special situations (e.g. very
|
|
|
|
%% low memory installations), there should not be huge variance in outcomes
|
|
|
|
%% from modifying these numbers.
|
|
|
|
%%
|
2017-02-27 17:32:38 +00:00
|
|
|
%% The sync_strategy determines if the store is going to flush writes to disk
|
|
|
|
%% before returning an ack. There are three settings currrently supported:
|
|
|
|
%% - sync - sync to disk by passing the sync flag to the file writer (only
|
|
|
|
%% works in OTP 18)
|
|
|
|
%% - riak_sync - sync to disk by explicitly calling data_sync after the write
|
|
|
|
%% - none - leave it to the operating system to control flushing
|
|
|
|
%%
|
2017-02-27 11:58:16 +00:00
|
|
|
%% On startup the Bookie must restart both the Inker to load the Journal, and
|
|
|
|
%% the Penciller to load the Ledger. Once the Penciller has started, the
|
|
|
|
%% Bookie should request the highest sequence number in the Ledger, and then
|
|
|
|
%% and try and rebuild any missing information from the Journal.
|
|
|
|
%%
|
|
|
|
%% To rebuild the Ledger it requests the Inker to scan over the files from
|
|
|
|
%% the sequence number and re-generate the Ledger changes - pushing the changes
|
|
|
|
%% directly back into the Ledger.
|
|
|
|
|
2016-11-25 17:41:08 +00:00
|
|
|
book_start(RootPath, LedgerCacheSize, JournalSize, SyncStrategy) ->
|
2016-11-02 12:58:27 +00:00
|
|
|
book_start([{root_path, RootPath},
|
|
|
|
{cache_size, LedgerCacheSize},
|
2016-11-25 17:41:08 +00:00
|
|
|
{max_journalsize, JournalSize},
|
|
|
|
{sync_strategy, SyncStrategy}]).
|
2016-10-19 17:34:58 +01:00
|
|
|
|
2017-02-27 11:58:16 +00:00
|
|
|
%% @doc Start a Leveled Key/Value store - full options support.
|
|
|
|
%%
|
|
|
|
%% Allows an options proplists to be passed for setting options. There are
|
|
|
|
%% two primary additional options this allows over book_start/4:
|
|
|
|
%% - retain_strategy
|
|
|
|
%% - waste_retention_period
|
|
|
|
%%
|
|
|
|
%% Both of these relate to compaction in the Journal. The retain_strategy
|
|
|
|
%% determines if a skinny record of the object should be retained following
|
|
|
|
%% compaction, and how thta should be used when recovering lost state in the
|
|
|
|
%% Ledger.
|
|
|
|
%%
|
|
|
|
%% Currently compacted records no longer in use are not removed but moved to
|
|
|
|
%% a journal_waste folder, and the waste_retention_period determines how long
|
|
|
|
%% this history should be kept for (for example to allow for it to be backed
|
|
|
|
%% up before deletion)
|
|
|
|
%%
|
|
|
|
%% TODO:
|
|
|
|
%% The reload_strategy is exposed as currently no firm decision has been made
|
|
|
|
%% about how recovery should work. For instance if we were to trust evrything
|
|
|
|
%% as permanent in the Ledger once it is persisted, then there would be no
|
|
|
|
%% need to retain a skinny history of key changes in the Journal after
|
|
|
|
%% compaction. If, as an alternative we assume the Ledger is never permanent,
|
|
|
|
%% and retain the skinny hisory - then backups need only be made against the
|
|
|
|
%% Journal. The skinny history of key changes is primarily related to the
|
|
|
|
%% issue of supporting secondary indexes in Riak.
|
|
|
|
%%
|
|
|
|
%% These two strategies are referred to as recovr (assume we can recover any
|
|
|
|
%% deltas from a lost ledger and a lost history through resilience outside of
|
|
|
|
%% the store), or retain (retain a history of key changes, even when the object
|
|
|
|
%% value has been compacted). There is a third, unimplemented strategy, which
|
|
|
|
%% is recalc - which would require when reloading the Ledger from the Journal,
|
|
|
|
%% to recalculate the index changes based on the current state of the Ledger
|
|
|
|
%% and the object metadata.
|
|
|
|
|
2016-09-15 10:53:24 +01:00
|
|
|
book_start(Opts) ->
|
|
|
|
gen_server:start(?MODULE, [Opts], []).
|
|
|
|
|
2017-02-27 11:58:16 +00:00
|
|
|
%% @doc Put an object with an expiry time
|
|
|
|
%%
|
|
|
|
%% Put an item in the store but with a Time To Live - the time when the object
|
|
|
|
%% should expire, in gregorian_sconds (add the required number of seconds to
|
|
|
|
%% leveled_codec:integer_time/1).
|
|
|
|
%%
|
|
|
|
%% There exists the possibility of per object expiry times, not just whole
|
|
|
|
%% store expiry times as has traditionally been the feature in Riak. Care
|
|
|
|
%% will need to be taken if implementing per-object times about the choice of
|
|
|
|
%% reload_strategy. If expired objects are to be compacted entirely, then the
|
|
|
|
%% history of KeyChanges will be lost on reload.
|
|
|
|
|
|
|
|
book_tempput(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL)
|
|
|
|
when is_integer(TTL) ->
|
2016-10-31 12:12:06 +00:00
|
|
|
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL).
|
|
|
|
|
2017-02-27 11:58:16 +00:00
|
|
|
%% @doc - Standard PUT
|
|
|
|
%%
|
|
|
|
%% A PUT request consists of
|
|
|
|
%% - A Primary Key and a Value
|
|
|
|
%% - IndexSpecs - a set of secondary key changes associated with the
|
|
|
|
%% transaction
|
|
|
|
%% - A tag indictaing the type of object. Behaviour for metadata extraction,
|
|
|
|
%% and ledger compaction will vary by type. There are three currently
|
|
|
|
%% implemented types i (Index), o (Standard), o_rkv (Riak). Keys added with
|
|
|
|
%% Index tags are not fetchable (as they will not be hashed), but are
|
|
|
|
%% extractable via range query.
|
|
|
|
%%
|
|
|
|
%% The Bookie takes the request and passes it first to the Inker to add the
|
|
|
|
%% request to the journal.
|
|
|
|
%%
|
|
|
|
%% The inker will pass the PK/Value/IndexSpecs to the current (append only)
|
|
|
|
%% CDB journal file to persist the change. The call should return either 'ok'
|
|
|
|
%% or 'roll'. -'roll' indicates that the CDB file has insufficient capacity for
|
|
|
|
%% this write, and a new journal file should be created (with appropriate
|
|
|
|
%% manifest changes to be made).
|
|
|
|
%%
|
|
|
|
%% The inker will return the SQN which the change has been made at, as well as
|
|
|
|
%% the object size on disk within the Journal.
|
|
|
|
%%
|
|
|
|
%% Once the object has been persisted to the Journal, the Ledger can be updated.
|
|
|
|
%% The Ledger is updated by the Bookie applying a function (extract_metadata/4)
|
|
|
|
%% to the Value to return the Object Metadata, a function to generate a hash
|
|
|
|
%% of the Value and also taking the Primary Key, the IndexSpecs, the Sequence
|
|
|
|
%% Number in the Journal and the Object Size (returned from the Inker).
|
|
|
|
%%
|
|
|
|
%% A set of Ledger Key changes are then generated and placed in the Bookie's
|
|
|
|
%% Ledger Key cache.
|
|
|
|
%%
|
|
|
|
%% The PUT can now be acknowledged. In the background the Bookie may then
|
|
|
|
%% choose to push the cache to the Penciller for eventual persistence within
|
|
|
|
%% the ledger. This push will either be acccepted or returned (if the
|
|
|
|
%% Penciller has a backlog of key changes). The back-pressure should lead to
|
|
|
|
%% the Bookie entering into a slow-offer status whereby the next PUT will be
|
|
|
|
%% acknowledged by a PAUSE signal - with the expectation that the this will
|
|
|
|
%% lead to a back-off behaviour.
|
|
|
|
|
2016-10-14 18:43:16 +01:00
|
|
|
book_put(Pid, Bucket, Key, Object, IndexSpecs) ->
|
2016-10-16 15:41:09 +01:00
|
|
|
book_put(Pid, Bucket, Key, Object, IndexSpecs, ?STD_TAG).
|
|
|
|
|
2016-10-31 12:12:06 +00:00
|
|
|
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag) ->
|
|
|
|
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, infinity).
|
|
|
|
|
2017-02-27 11:58:16 +00:00
|
|
|
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) ->
|
|
|
|
gen_server:call(Pid,
|
|
|
|
{put, Bucket, Key, Object, IndexSpecs, Tag, TTL},
|
|
|
|
infinity).
|
|
|
|
|
|
|
|
%% @doc - Standard PUT
|
|
|
|
%%
|
|
|
|
%% A thin wrap around the put of a special tombstone object. There is no
|
|
|
|
%% immediate reclaim of space, simply the addition of a more recent tombstone.
|
|
|
|
|
2016-10-16 15:41:09 +01:00
|
|
|
book_delete(Pid, Bucket, Key, IndexSpecs) ->
|
|
|
|
book_put(Pid, Bucket, Key, delete, IndexSpecs, ?STD_TAG).
|
2016-09-08 14:21:30 +01:00
|
|
|
|
2017-02-27 11:58:16 +00:00
|
|
|
%% @doc - GET and HAD requests
|
|
|
|
%%
|
|
|
|
%% The Bookie supports both GET and HEAD requests, with the HEAD request
|
|
|
|
%% returning only the metadata and not the actual object value. The HEAD
|
|
|
|
%% requets cna be serviced by reference to the Ledger Cache and the Penciller.
|
|
|
|
%%
|
|
|
|
%% GET requests first follow the path of a HEAD request, and if an object is
|
|
|
|
%% found, then fetch the value from the Journal via the Inker.
|
|
|
|
|
2016-10-14 18:43:16 +01:00
|
|
|
book_get(Pid, Bucket, Key) ->
|
2016-10-16 15:41:09 +01:00
|
|
|
book_get(Pid, Bucket, Key, ?STD_TAG).
|
2016-09-15 10:53:24 +01:00
|
|
|
|
2016-10-14 18:43:16 +01:00
|
|
|
book_head(Pid, Bucket, Key) ->
|
2016-10-16 15:41:09 +01:00
|
|
|
book_head(Pid, Bucket, Key, ?STD_TAG).
|
2016-10-14 18:43:16 +01:00
|
|
|
|
|
|
|
book_get(Pid, Bucket, Key, Tag) ->
|
|
|
|
gen_server:call(Pid, {get, Bucket, Key, Tag}, infinity).
|
|
|
|
|
|
|
|
book_head(Pid, Bucket, Key, Tag) ->
|
|
|
|
gen_server:call(Pid, {head, Bucket, Key, Tag}, infinity).
|
2016-09-08 14:21:30 +01:00
|
|
|
|
2017-02-27 11:58:16 +00:00
|
|
|
%% @doc Snapshots/Clones
|
|
|
|
%%
|
|
|
|
%% If there is a snapshot request (e.g. to iterate over the keys) the Bookie
|
|
|
|
%% may request a clone of the Penciller, or clones of both the Penciller and
|
|
|
|
%% the Inker should values also need to be accessed.
|
|
|
|
%%
|
|
|
|
%% The clone is seeded with the manifest SQN. The clone should be registered
|
|
|
|
%% with the real Inker/Penciller, so that the real Inker/Penciller may prevent
|
|
|
|
%% the deletion of files still in use by a snapshot clone.
|
|
|
|
%%
|
|
|
|
%% Iterators should de-register themselves from the Penciller on completion.
|
|
|
|
%% Iterators should be automatically release after a timeout period. A file
|
|
|
|
%% can only be deleted from the Ledger if it is no longer in the manifest, and
|
|
|
|
%% there are no registered iterators from before the point the file was
|
|
|
|
%% removed from the manifest.
|
|
|
|
%%
|
|
|
|
%% Clones are simply new gen_servers with copies of the relevant
|
|
|
|
%% StateData.
|
|
|
|
%%
|
|
|
|
%% There are a series of specific folders implemented that provide pre-canned
|
|
|
|
%% snapshot functionality:
|
|
|
|
%%
|
|
|
|
%% {bucket_stats, Bucket} -> return a key count and total object size within
|
|
|
|
%% a bucket
|
|
|
|
%% {riakbucket_stats, Bucket} -> as above, but for buckets with the Riak Tag
|
|
|
|
%% {binary_bucketlist, Tag, {FoldKeysFun, Acc}} -> if we assume buckets and
|
|
|
|
%% keys are binaries, provides a fast bucket list function
|
|
|
|
%% {index_query,
|
|
|
|
%% Constraint,
|
|
|
|
%% {FoldKeysFun, Acc},
|
|
|
|
%% {IdxField, StartValue, EndValue},
|
|
|
|
%% {ReturnTerms, TermRegex}} -> secondray index query
|
|
|
|
%% {keylist, Tag, {FoldKeysFun, Acc}} -> list all keys with tag
|
|
|
|
%% {keylist, Tag, Bucket, {FoldKeysFun, Acc}} -> list all keys within given
|
|
|
|
%% bucket
|
|
|
|
%% {hashtree_query, Tag, JournalCheck} -> return keys and hashes for all
|
|
|
|
%% objects with a given tag
|
|
|
|
%% {foldobjects_bybucket, Tag, Bucket, FoldObjectsFun} -> fold over all objects
|
|
|
|
%% in a given bucket
|
|
|
|
%% {foldobjects_byindex,
|
|
|
|
%% Tag,
|
|
|
|
%% Bucket,
|
|
|
|
%% {Field, FromTerm, ToTerm},
|
|
|
|
%% FoldObjectsFun} -> fold over all objects with an entry in a given
|
|
|
|
%% range on a given index
|
|
|
|
|
2016-10-12 17:12:49 +01:00
|
|
|
book_returnfolder(Pid, FolderType) ->
|
|
|
|
gen_server:call(Pid, {return_folder, FolderType}, infinity).
|
|
|
|
|
2016-09-23 18:50:29 +01:00
|
|
|
book_snapshotstore(Pid, Requestor, Timeout) ->
|
|
|
|
gen_server:call(Pid, {snapshot, Requestor, store, Timeout}, infinity).
|
|
|
|
|
|
|
|
book_snapshotledger(Pid, Requestor, Timeout) ->
|
|
|
|
gen_server:call(Pid, {snapshot, Requestor, ledger, Timeout}, infinity).
|
|
|
|
|
2017-02-27 11:58:16 +00:00
|
|
|
%% @doc Call for compaction of the Journal
|
|
|
|
%%
|
|
|
|
%% the scheduling of Journla compaction is called externally, so it is assumed
|
|
|
|
%% in Riak it will be triggered by a vnode callback.
|
|
|
|
|
2016-10-03 23:34:28 +01:00
|
|
|
book_compactjournal(Pid, Timeout) ->
|
|
|
|
gen_server:call(Pid, {compact_journal, Timeout}, infinity).
|
|
|
|
|
2017-02-27 11:58:16 +00:00
|
|
|
%% @doc Check on progress of the last compaction
|
|
|
|
|
2016-10-26 20:39:16 +01:00
|
|
|
book_islastcompactionpending(Pid) ->
|
|
|
|
gen_server:call(Pid, confirm_compact, infinity).
|
|
|
|
|
2017-02-27 11:58:16 +00:00
|
|
|
%% @doc Clean shutdown
|
|
|
|
%%
|
|
|
|
%% A clean shutdown will persist all the information in the Penciller memory
|
|
|
|
%% before closing, so shutdown is not instantaneous.
|
|
|
|
|
2016-09-15 15:14:49 +01:00
|
|
|
book_close(Pid) ->
|
|
|
|
gen_server:call(Pid, close, infinity).
|
|
|
|
|
2017-02-27 11:58:16 +00:00
|
|
|
%% @doc Close and clean-out files
|
|
|
|
|
2016-11-21 12:34:40 +00:00
|
|
|
book_destroy(Pid) ->
|
|
|
|
gen_server:call(Pid, destroy, infinity).
|
|
|
|
|
2016-09-08 14:21:30 +01:00
|
|
|
%%%============================================================================
|
|
|
|
%%% gen_server callbacks
|
|
|
|
%%%============================================================================
|
|
|
|
|
|
|
|
init([Opts]) ->
|
2017-03-06 21:35:02 +00:00
|
|
|
SW = os:timestamp(),
|
|
|
|
random:seed(erlang:phash2(self()), element(2, SW), element(3, SW)),
|
2016-11-02 12:58:27 +00:00
|
|
|
case get_opt(snapshot_bookie, Opts) of
|
2016-10-05 09:54:53 +01:00
|
|
|
undefined ->
|
|
|
|
% Start from file not snapshot
|
|
|
|
{InkerOpts, PencillerOpts} = set_options(Opts),
|
|
|
|
{Inker, Penciller} = startup(InkerOpts, PencillerOpts),
|
2016-12-08 21:02:39 +00:00
|
|
|
CacheJitter = ?CACHE_SIZE div (100 div ?CACHE_SIZE_JITTER),
|
2016-12-08 16:38:44 +00:00
|
|
|
CacheSize = get_opt(cache_size, Opts, ?CACHE_SIZE)
|
2016-12-09 16:34:15 +00:00
|
|
|
+ erlang:phash2(self()) rem CacheJitter,
|
2017-01-05 17:00:12 +00:00
|
|
|
NewETS = ets:new(mem, [ordered_set]),
|
2016-11-02 18:14:46 +00:00
|
|
|
leveled_log:log("B0001", [Inker, Penciller]),
|
2016-10-05 09:54:53 +01:00
|
|
|
{ok, #state{inker=Inker,
|
|
|
|
penciller=Penciller,
|
|
|
|
cache_size=CacheSize,
|
2017-01-05 17:00:12 +00:00
|
|
|
ledger_cache=#ledger_cache{mem = NewETS},
|
2016-10-05 09:54:53 +01:00
|
|
|
is_snapshot=false}};
|
|
|
|
Bookie ->
|
2017-03-06 18:42:32 +00:00
|
|
|
{ok, Penciller, Inker} = book_snapshotstore(Bookie,
|
|
|
|
self(),
|
|
|
|
?SNAPSHOT_TIMEOUT),
|
2016-11-02 18:14:46 +00:00
|
|
|
leveled_log:log("B0002", [Inker, Penciller]),
|
2016-10-05 09:54:53 +01:00
|
|
|
{ok, #state{penciller=Penciller,
|
|
|
|
inker=Inker,
|
|
|
|
is_snapshot=true}}
|
|
|
|
end.
|
2016-09-15 10:53:24 +01:00
|
|
|
|
|
|
|
|
2016-10-31 12:12:06 +00:00
|
|
|
handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) ->
|
2016-10-14 18:43:16 +01:00
|
|
|
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
|
2016-12-20 23:11:50 +00:00
|
|
|
SW = os:timestamp(),
|
2016-09-15 15:14:49 +01:00
|
|
|
{ok, SQN, ObjSize} = leveled_inker:ink_put(State#state.inker,
|
2016-10-14 18:43:16 +01:00
|
|
|
LedgerKey,
|
2016-09-15 15:14:49 +01:00
|
|
|
Object,
|
2016-10-31 12:12:06 +00:00
|
|
|
{IndexSpecs, TTL}),
|
2016-12-20 23:11:50 +00:00
|
|
|
T0 = timer:now_diff(os:timestamp(), SW),
|
2016-10-25 23:13:14 +01:00
|
|
|
Changes = preparefor_ledgercache(no_type_assigned,
|
|
|
|
LedgerKey,
|
2016-09-15 10:53:24 +01:00
|
|
|
SQN,
|
|
|
|
Object,
|
|
|
|
ObjSize,
|
2016-10-31 12:12:06 +00:00
|
|
|
{IndexSpecs, TTL}),
|
2016-09-15 10:53:24 +01:00
|
|
|
Cache0 = addto_ledgercache(Changes, State#state.ledger_cache),
|
2016-12-20 23:11:50 +00:00
|
|
|
T1 = timer:now_diff(os:timestamp(), SW) - T0,
|
2016-12-22 15:45:38 +00:00
|
|
|
PutTimes = leveled_log:put_timing(bookie, State#state.put_timing, T0, T1),
|
2016-11-05 14:31:10 +00:00
|
|
|
% If the previous push to memory was returned then punish this PUT with a
|
|
|
|
% delay. If the back-pressure in the Penciller continues, these delays
|
|
|
|
% will beocme more frequent
|
|
|
|
case State#state.slow_offer of
|
|
|
|
true ->
|
2016-11-07 10:27:38 +00:00
|
|
|
gen_server:reply(From, pause);
|
2016-11-05 14:31:10 +00:00
|
|
|
false ->
|
2016-11-07 10:27:38 +00:00
|
|
|
gen_server:reply(From, ok)
|
2016-11-05 14:31:10 +00:00
|
|
|
end,
|
2016-12-21 01:56:12 +00:00
|
|
|
maybe_longrunning(SW, overall_put),
|
2016-12-11 01:02:56 +00:00
|
|
|
case maybepush_ledgercache(State#state.cache_size,
|
|
|
|
Cache0,
|
|
|
|
State#state.penciller) of
|
2016-11-05 14:31:10 +00:00
|
|
|
{ok, NewCache} ->
|
2016-12-20 23:11:50 +00:00
|
|
|
{noreply, State#state{ledger_cache=NewCache,
|
2016-12-22 14:03:31 +00:00
|
|
|
put_timing=PutTimes,
|
2016-12-20 23:11:50 +00:00
|
|
|
slow_offer=false}};
|
2016-11-05 14:31:10 +00:00
|
|
|
{returned, NewCache} ->
|
2016-12-20 23:11:50 +00:00
|
|
|
{noreply, State#state{ledger_cache=NewCache,
|
2016-12-22 14:03:31 +00:00
|
|
|
put_timing=PutTimes,
|
2016-12-20 23:11:50 +00:00
|
|
|
slow_offer=true}}
|
2016-11-05 14:31:10 +00:00
|
|
|
end;
|
2016-10-14 18:43:16 +01:00
|
|
|
handle_call({get, Bucket, Key, Tag}, _From, State) ->
|
|
|
|
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
|
2016-12-22 15:45:38 +00:00
|
|
|
SWh = os:timestamp(),
|
2016-10-14 18:43:16 +01:00
|
|
|
case fetch_head(LedgerKey,
|
|
|
|
State#state.penciller,
|
|
|
|
State#state.ledger_cache) of
|
2016-09-15 10:53:24 +01:00
|
|
|
not_present ->
|
2016-12-22 15:45:38 +00:00
|
|
|
GT0 = leveled_log:get_timing(State#state.get_timing,
|
|
|
|
SWh,
|
|
|
|
head_not_present),
|
|
|
|
{reply, not_found, State#state{get_timing=GT0}};
|
2016-09-15 10:53:24 +01:00
|
|
|
Head ->
|
2016-12-22 15:45:38 +00:00
|
|
|
GT0 = leveled_log:get_timing(State#state.get_timing,
|
|
|
|
SWh,
|
|
|
|
head_found),
|
|
|
|
SWg = os:timestamp(),
|
2016-12-11 01:02:56 +00:00
|
|
|
{Seqn, Status, _MH, _MD} = leveled_codec:striphead_to_details(Head),
|
2016-09-15 10:53:24 +01:00
|
|
|
case Status of
|
2016-10-16 15:41:09 +01:00
|
|
|
tomb ->
|
2016-09-15 10:53:24 +01:00
|
|
|
{reply, not_found, State};
|
2016-10-31 12:12:06 +00:00
|
|
|
{active, TS} ->
|
|
|
|
Active = TS >= leveled_codec:integer_now(),
|
2017-04-07 12:09:11 +00:00
|
|
|
Object = fetch_value(State#state.inker, {LedgerKey, Seqn}),
|
2016-12-22 15:45:38 +00:00
|
|
|
GT1 = leveled_log:get_timing(GT0, SWg, fetch),
|
|
|
|
case {Active, Object} of
|
2016-10-31 20:58:19 +00:00
|
|
|
{_, not_present} ->
|
2016-12-22 15:45:38 +00:00
|
|
|
{reply, not_found, State#state{get_timing=GT1}};
|
2016-10-31 12:12:06 +00:00
|
|
|
{true, Object} ->
|
2016-12-22 15:45:38 +00:00
|
|
|
{reply, {ok, Object}, State#state{get_timing=GT1}};
|
2016-10-31 12:12:06 +00:00
|
|
|
_ ->
|
2016-12-22 15:45:38 +00:00
|
|
|
{reply, not_found, State#state{get_timing=GT1}}
|
2016-09-15 10:53:24 +01:00
|
|
|
end
|
|
|
|
end
|
|
|
|
end;
|
2016-10-14 18:43:16 +01:00
|
|
|
handle_call({head, Bucket, Key, Tag}, _From, State) ->
|
|
|
|
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
|
|
|
|
case fetch_head(LedgerKey,
|
|
|
|
State#state.penciller,
|
|
|
|
State#state.ledger_cache) of
|
2016-09-15 10:53:24 +01:00
|
|
|
not_present ->
|
|
|
|
{reply, not_found, State};
|
|
|
|
Head ->
|
2016-12-11 01:02:56 +00:00
|
|
|
case leveled_codec:striphead_to_details(Head) of
|
|
|
|
{_SeqN, tomb, _MH, _MD} ->
|
2016-09-15 10:53:24 +01:00
|
|
|
{reply, not_found, State};
|
2016-12-11 01:02:56 +00:00
|
|
|
{_SeqN, {active, TS}, _MH, MD} ->
|
2016-10-31 12:12:06 +00:00
|
|
|
case TS >= leveled_codec:integer_now() of
|
|
|
|
true ->
|
2017-02-27 12:24:26 +00:00
|
|
|
OMD =
|
|
|
|
leveled_codec:build_metadata_object(LedgerKey,
|
|
|
|
MD),
|
2016-10-31 12:12:06 +00:00
|
|
|
{reply, {ok, OMD}, State};
|
|
|
|
false ->
|
|
|
|
{reply, not_found, State}
|
|
|
|
end
|
2016-09-15 10:53:24 +01:00
|
|
|
end
|
2016-09-15 15:14:49 +01:00
|
|
|
end;
|
2016-10-07 10:04:48 +01:00
|
|
|
handle_call({snapshot, _Requestor, SnapType, _Timeout}, _From, State) ->
|
2016-10-31 17:26:28 +00:00
|
|
|
Reply = snapshot_store(State, SnapType),
|
|
|
|
{reply, Reply, State};
|
2016-10-12 17:12:49 +01:00
|
|
|
handle_call({return_folder, FolderType}, _From, State) ->
|
|
|
|
case FolderType of
|
2016-10-31 12:12:06 +00:00
|
|
|
{bucket_stats, Bucket} ->
|
|
|
|
{reply,
|
2016-10-31 17:26:28 +00:00
|
|
|
bucket_stats(State, Bucket, ?STD_TAG),
|
2016-10-31 12:12:06 +00:00
|
|
|
State};
|
2016-10-14 18:43:16 +01:00
|
|
|
{riakbucket_stats, Bucket} ->
|
2016-10-12 17:12:49 +01:00
|
|
|
{reply,
|
2016-10-31 17:26:28 +00:00
|
|
|
bucket_stats(State, Bucket, ?RIAK_TAG),
|
2016-10-18 01:59:03 +01:00
|
|
|
State};
|
2016-11-20 21:21:31 +00:00
|
|
|
{binary_bucketlist, Tag, {FoldKeysFun, Acc}} ->
|
|
|
|
{reply,
|
|
|
|
binary_bucketlist(State, Tag, {FoldKeysFun, Acc}),
|
|
|
|
State};
|
2016-10-18 01:59:03 +01:00
|
|
|
{index_query,
|
2016-11-18 15:53:22 +00:00
|
|
|
Constraint,
|
2016-11-18 11:53:14 +00:00
|
|
|
{FoldKeysFun, Acc},
|
2016-10-18 01:59:03 +01:00
|
|
|
{IdxField, StartValue, EndValue},
|
|
|
|
{ReturnTerms, TermRegex}} ->
|
2017-03-02 17:49:43 +00:00
|
|
|
{Bucket, StartObjKey} =
|
|
|
|
case Constraint of
|
|
|
|
{B, SK} ->
|
|
|
|
{B, SK};
|
|
|
|
B ->
|
|
|
|
{B, null}
|
|
|
|
end,
|
2016-10-18 01:59:03 +01:00
|
|
|
{reply,
|
2016-10-31 17:26:28 +00:00
|
|
|
index_query(State,
|
2017-03-02 17:49:43 +00:00
|
|
|
Bucket,
|
|
|
|
StartObjKey,
|
2016-11-18 11:53:14 +00:00
|
|
|
{FoldKeysFun, Acc},
|
2016-10-18 01:59:03 +01:00
|
|
|
{IdxField, StartValue, EndValue},
|
|
|
|
{ReturnTerms, TermRegex}),
|
2016-10-23 22:45:43 +01:00
|
|
|
State};
|
2016-11-18 15:53:22 +00:00
|
|
|
{keylist, Tag, {FoldKeysFun, Acc}} ->
|
2016-10-31 17:26:28 +00:00
|
|
|
{reply,
|
2016-11-18 15:53:22 +00:00
|
|
|
allkey_query(State, Tag, {FoldKeysFun, Acc}),
|
2016-10-31 17:26:28 +00:00
|
|
|
State};
|
2016-11-18 15:53:22 +00:00
|
|
|
{keylist, Tag, Bucket, {FoldKeysFun, Acc}} ->
|
2016-11-17 15:55:29 +00:00
|
|
|
{reply,
|
2016-11-18 15:53:22 +00:00
|
|
|
bucketkey_query(State, Tag, Bucket, {FoldKeysFun, Acc}),
|
2016-11-17 15:55:29 +00:00
|
|
|
State};
|
2016-10-31 16:02:32 +00:00
|
|
|
{hashtree_query, Tag, JournalCheck} ->
|
2016-10-31 17:26:28 +00:00
|
|
|
{reply,
|
|
|
|
hashtree_query(State, Tag, JournalCheck),
|
2016-11-02 15:38:51 +00:00
|
|
|
State};
|
2017-04-07 12:09:11 +00:00
|
|
|
{foldheads_allkeys, Tag, FoldHeadsFun} ->
|
|
|
|
{reply,
|
|
|
|
foldheads_allkeys(State, Tag, FoldHeadsFun),
|
|
|
|
State};
|
2017-04-07 14:56:28 +00:00
|
|
|
{foldheads_bybucket, Tag, Bucket, FoldHeadsFun} ->
|
|
|
|
{reply,
|
|
|
|
foldheads_bybucket(State, Tag, Bucket, FoldHeadsFun),
|
|
|
|
State};
|
2016-11-02 15:38:51 +00:00
|
|
|
{foldobjects_allkeys, Tag, FoldObjectsFun} ->
|
|
|
|
{reply,
|
|
|
|
foldobjects_allkeys(State, Tag, FoldObjectsFun),
|
2016-11-04 11:01:37 +00:00
|
|
|
State};
|
|
|
|
{foldobjects_bybucket, Tag, Bucket, FoldObjectsFun} ->
|
|
|
|
{reply,
|
|
|
|
foldobjects_bybucket(State, Tag, Bucket, FoldObjectsFun),
|
|
|
|
State};
|
|
|
|
{foldobjects_byindex,
|
|
|
|
Tag,
|
|
|
|
Bucket,
|
|
|
|
{Field, FromTerm, ToTerm},
|
|
|
|
FoldObjectsFun} ->
|
|
|
|
{reply,
|
|
|
|
foldobjects_byindex(State,
|
|
|
|
Tag, Bucket,
|
|
|
|
Field, FromTerm, ToTerm,
|
|
|
|
FoldObjectsFun),
|
2016-10-31 17:26:28 +00:00
|
|
|
State}
|
2016-11-04 11:01:37 +00:00
|
|
|
|
2016-10-12 17:12:49 +01:00
|
|
|
end;
|
2016-10-03 23:34:28 +01:00
|
|
|
handle_call({compact_journal, Timeout}, _From, State) ->
|
|
|
|
ok = leveled_inker:ink_compactjournal(State#state.inker,
|
2016-10-05 18:28:31 +01:00
|
|
|
self(),
|
2016-10-03 23:34:28 +01:00
|
|
|
Timeout),
|
|
|
|
{reply, ok, State};
|
2016-10-26 20:39:16 +01:00
|
|
|
handle_call(confirm_compact, _From, State) ->
|
|
|
|
{reply, leveled_inker:ink_compactionpending(State#state.inker), State};
|
2016-09-15 15:14:49 +01:00
|
|
|
handle_call(close, _From, State) ->
|
2016-11-21 12:34:40 +00:00
|
|
|
{stop, normal, ok, State};
|
2016-11-21 12:35:20 +00:00
|
|
|
handle_call(destroy, _From, State=#state{is_snapshot=Snp}) when Snp == false ->
|
2016-11-21 12:34:40 +00:00
|
|
|
{stop, destroy, ok, State}.
|
2016-09-08 14:21:30 +01:00
|
|
|
|
|
|
|
handle_cast(_Msg, State) ->
|
|
|
|
{noreply, State}.
|
|
|
|
|
|
|
|
handle_info(_Info, State) ->
|
|
|
|
{noreply, State}.
|
|
|
|
|
2016-11-21 12:34:40 +00:00
|
|
|
terminate(destroy, State) ->
|
|
|
|
leveled_log:log("B0011", []),
|
|
|
|
{ok, InkPathList} = leveled_inker:ink_doom(State#state.inker),
|
|
|
|
{ok, PCLPathList} = leveled_penciller:pcl_doom(State#state.penciller),
|
|
|
|
lists:foreach(fun(DirPath) -> delete_path(DirPath) end, InkPathList),
|
|
|
|
lists:foreach(fun(DirPath) -> delete_path(DirPath) end, PCLPathList),
|
|
|
|
ok;
|
2016-09-15 15:14:49 +01:00
|
|
|
terminate(Reason, State) ->
|
2016-11-02 18:14:46 +00:00
|
|
|
leveled_log:log("B0003", [Reason]),
|
2016-11-03 16:05:43 +00:00
|
|
|
ok = leveled_inker:ink_close(State#state.inker),
|
2016-09-15 15:14:49 +01:00
|
|
|
ok = leveled_penciller:pcl_close(State#state.penciller).
|
2016-09-08 14:21:30 +01:00
|
|
|
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
|
|
{ok, State}.
|
|
|
|
|
2016-12-11 01:02:56 +00:00
|
|
|
%%%============================================================================
|
|
|
|
%%% External functions
|
|
|
|
%%%============================================================================
|
|
|
|
|
2017-03-02 17:49:43 +00:00
|
|
|
%% @doc Empty the ledger cache table following a push
|
2016-12-11 01:02:56 +00:00
|
|
|
empty_ledgercache() ->
|
2017-01-05 17:00:12 +00:00
|
|
|
#ledger_cache{mem = ets:new(empty, [ordered_set])}.
|
2016-12-11 01:02:56 +00:00
|
|
|
|
2017-03-02 17:49:43 +00:00
|
|
|
%% @doc push the ledgercache to the Penciller - which should respond ok or
|
|
|
|
%% returned. If the response is ok the cache can be flushed, but if the
|
|
|
|
%% response is returned the cache should continue to build and it should try
|
|
|
|
%% to flush at a later date
|
2016-12-11 01:02:56 +00:00
|
|
|
push_ledgercache(Penciller, Cache) ->
|
2017-01-05 17:00:12 +00:00
|
|
|
CacheToLoad = {Cache#ledger_cache.loader,
|
2017-01-05 21:58:33 +00:00
|
|
|
Cache#ledger_cache.index,
|
|
|
|
Cache#ledger_cache.min_sqn,
|
|
|
|
Cache#ledger_cache.max_sqn},
|
2016-12-11 01:02:56 +00:00
|
|
|
leveled_penciller:pcl_pushmem(Penciller, CacheToLoad).
|
2016-09-08 14:21:30 +01:00
|
|
|
|
2017-03-02 17:49:43 +00:00
|
|
|
%% @doc the ledger cache can be built from a queue, for example when
|
|
|
|
%% loading the ledger from the head of the journal on startup
|
|
|
|
%%
|
|
|
|
%% The queue should be build using [NewKey|Acc] so that the most recent
|
|
|
|
%% key is kept in the sort
|
2017-01-20 16:36:20 +00:00
|
|
|
loadqueue_ledgercache(Cache) ->
|
|
|
|
SL = lists:ukeysort(1, Cache#ledger_cache.load_queue),
|
2017-01-21 11:38:26 +00:00
|
|
|
T = leveled_tree:from_orderedlist(SL, ?CACHE_TYPE),
|
2017-01-20 16:36:20 +00:00
|
|
|
Cache#ledger_cache{load_queue = [], loader = T}.
|
|
|
|
|
2017-03-06 18:42:32 +00:00
|
|
|
%% @doc Allow all a snapshot to be created from part of the store, preferably
|
|
|
|
%% passing in a query filter so that all of the LoopState does not need to
|
|
|
|
%% be copied from the real actor to the clone
|
|
|
|
%%
|
|
|
|
%% SnapType can be store (requires journal and ledger) or ledger (requires
|
|
|
|
%% ledger only)
|
|
|
|
%%
|
|
|
|
%% Query can be no_lookup, indicating the snapshot will be used for non-specific
|
|
|
|
%% range queries and not direct fetch requests. {StartKey, EndKey} if the the
|
|
|
|
%% snapshot is to be used for one specific query only (this is much quicker to
|
|
|
|
%% setup, assuming the range is a small subset of the overall key space).
|
|
|
|
snapshot_store(LedgerCache0, Penciller, Inker, SnapType, Query) ->
|
|
|
|
LedgerCache = readycache_forsnapshot(LedgerCache0, Query),
|
|
|
|
BookiesMem = {LedgerCache#ledger_cache.loader,
|
|
|
|
LedgerCache#ledger_cache.index,
|
|
|
|
LedgerCache#ledger_cache.min_sqn,
|
|
|
|
LedgerCache#ledger_cache.max_sqn},
|
|
|
|
PCLopts = #penciller_options{start_snapshot = true,
|
|
|
|
source_penciller = Penciller,
|
|
|
|
snapshot_query = Query,
|
|
|
|
bookies_mem = BookiesMem},
|
|
|
|
{ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts),
|
|
|
|
case SnapType of
|
|
|
|
store ->
|
|
|
|
InkerOpts = #inker_options{start_snapshot=true,
|
|
|
|
source_inker=Inker},
|
|
|
|
{ok, JournalSnapshot} = leveled_inker:ink_start(InkerOpts),
|
|
|
|
{ok, LedgerSnapshot, JournalSnapshot};
|
|
|
|
ledger ->
|
|
|
|
{ok, LedgerSnapshot, null}
|
|
|
|
end.
|
|
|
|
|
|
|
|
snapshot_store(State, SnapType) ->
|
|
|
|
snapshot_store(State, SnapType, undefined).
|
|
|
|
|
|
|
|
snapshot_store(State, SnapType, Query) ->
|
|
|
|
snapshot_store(State#state.ledger_cache,
|
|
|
|
State#state.penciller,
|
|
|
|
State#state.inker,
|
|
|
|
SnapType,
|
|
|
|
Query).
|
|
|
|
|
2017-04-07 12:09:11 +00:00
|
|
|
fetch_value(Inker, {Key, SQN}) ->
|
|
|
|
SW = os:timestamp(),
|
|
|
|
case leveled_inker:ink_fetch(Inker, Key, SQN) of
|
|
|
|
{ok, Value} ->
|
|
|
|
maybe_longrunning(SW, inker_fetch),
|
|
|
|
Value;
|
|
|
|
not_present ->
|
|
|
|
not_present
|
|
|
|
end.
|
|
|
|
|
2017-03-06 18:42:32 +00:00
|
|
|
|
2016-09-08 14:21:30 +01:00
|
|
|
%%%============================================================================
|
|
|
|
%%% Internal functions
|
|
|
|
%%%============================================================================
|
|
|
|
|
2016-12-21 01:56:12 +00:00
|
|
|
maybe_longrunning(SW, Aspect) ->
|
|
|
|
case timer:now_diff(os:timestamp(), SW) of
|
2016-12-22 14:03:31 +00:00
|
|
|
N when N > ?LONG_RUNNING ->
|
2016-12-21 01:56:12 +00:00
|
|
|
leveled_log:log("B0013", [N, Aspect]);
|
|
|
|
_ ->
|
|
|
|
ok
|
|
|
|
end.
|
|
|
|
|
2016-10-31 17:26:28 +00:00
|
|
|
bucket_stats(State, Bucket, Tag) ->
|
2017-03-06 18:42:32 +00:00
|
|
|
{ok, LedgerSnapshot, _JournalSnapshot} = snapshot_store(State,
|
|
|
|
ledger,
|
|
|
|
no_lookup),
|
2016-10-12 17:12:49 +01:00
|
|
|
Folder = fun() ->
|
2016-10-14 18:43:16 +01:00
|
|
|
StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
|
|
|
EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
2016-10-31 12:12:06 +00:00
|
|
|
AccFun = accumulate_size(),
|
2016-10-12 17:12:49 +01:00
|
|
|
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
|
|
|
StartKey,
|
|
|
|
EndKey,
|
2016-10-31 12:12:06 +00:00
|
|
|
AccFun,
|
2016-10-12 17:12:49 +01:00
|
|
|
{0, 0}),
|
|
|
|
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
|
|
|
Acc
|
|
|
|
end,
|
|
|
|
{async, Folder}.
|
|
|
|
|
2016-11-20 21:21:31 +00:00
|
|
|
|
2016-11-21 14:12:17 +00:00
|
|
|
binary_bucketlist(State, Tag, {FoldBucketsFun, InitAcc}) ->
|
2016-11-20 21:21:31 +00:00
|
|
|
% List buckets for tag, assuming bucket names are all binary type
|
2017-03-06 18:42:32 +00:00
|
|
|
{ok, LedgerSnapshot, _JournalSnapshot} = snapshot_store(State,
|
|
|
|
ledger,
|
|
|
|
no_lookup),
|
2016-11-20 21:21:31 +00:00
|
|
|
Folder = fun() ->
|
|
|
|
BucketAcc = get_nextbucket(null,
|
|
|
|
Tag,
|
|
|
|
LedgerSnapshot,
|
|
|
|
[]),
|
|
|
|
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
2016-11-21 14:12:17 +00:00
|
|
|
lists:foldl(fun({B, _K}, Acc) -> FoldBucketsFun(B, Acc) end,
|
2016-11-20 21:21:31 +00:00
|
|
|
InitAcc,
|
|
|
|
BucketAcc)
|
|
|
|
end,
|
|
|
|
{async, Folder}.
|
|
|
|
|
|
|
|
get_nextbucket(NextBucket, Tag, LedgerSnapshot, BKList) ->
|
|
|
|
StartKey = leveled_codec:to_ledgerkey(NextBucket, null, Tag),
|
|
|
|
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
|
|
|
ExtractFun = fun(LK, _V, _Acc) -> leveled_codec:from_ledgerkey(LK) end,
|
|
|
|
BK = leveled_penciller:pcl_fetchnextkey(LedgerSnapshot,
|
|
|
|
StartKey,
|
|
|
|
EndKey,
|
|
|
|
ExtractFun,
|
|
|
|
null),
|
|
|
|
case BK of
|
|
|
|
null ->
|
|
|
|
leveled_log:log("B0008",[]),
|
|
|
|
BKList;
|
|
|
|
{B, K} when is_binary(B) ->
|
|
|
|
leveled_log:log("B0009",[B]),
|
|
|
|
get_nextbucket(<<B/binary, 0>>,
|
|
|
|
Tag,
|
|
|
|
LedgerSnapshot,
|
|
|
|
[{B, K}|BKList]);
|
|
|
|
NB ->
|
|
|
|
leveled_log:log("B0010",[NB]),
|
|
|
|
[]
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
2016-10-31 17:26:28 +00:00
|
|
|
index_query(State,
|
2017-03-02 17:49:43 +00:00
|
|
|
Bucket,
|
|
|
|
StartObjKey,
|
2016-11-18 11:53:14 +00:00
|
|
|
{FoldKeysFun, InitAcc},
|
2016-10-18 01:59:03 +01:00
|
|
|
{IdxField, StartValue, EndValue},
|
|
|
|
{ReturnTerms, TermRegex}) ->
|
2017-03-02 17:49:43 +00:00
|
|
|
StartKey = leveled_codec:to_ledgerkey(Bucket,
|
|
|
|
StartObjKey,
|
|
|
|
?IDX_TAG,
|
|
|
|
IdxField,
|
|
|
|
StartValue),
|
|
|
|
EndKey = leveled_codec:to_ledgerkey(Bucket,
|
|
|
|
null,
|
|
|
|
?IDX_TAG,
|
|
|
|
IdxField,
|
|
|
|
EndValue),
|
2017-03-06 18:42:32 +00:00
|
|
|
{ok, LedgerSnapshot, _JournalSnapshot} = snapshot_store(State,
|
|
|
|
ledger,
|
|
|
|
{StartKey,
|
|
|
|
EndKey}),
|
2016-10-18 01:59:03 +01:00
|
|
|
Folder = fun() ->
|
|
|
|
AddFun = case ReturnTerms of
|
|
|
|
true ->
|
2016-11-18 11:53:14 +00:00
|
|
|
fun add_terms/2;
|
2016-10-18 01:59:03 +01:00
|
|
|
_ ->
|
2016-11-18 11:53:14 +00:00
|
|
|
fun add_keys/2
|
2016-10-18 01:59:03 +01:00
|
|
|
end,
|
2016-11-18 11:53:14 +00:00
|
|
|
AccFun = accumulate_index(TermRegex, AddFun, FoldKeysFun),
|
2016-10-18 01:59:03 +01:00
|
|
|
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
|
|
|
StartKey,
|
|
|
|
EndKey,
|
|
|
|
AccFun,
|
2016-11-18 11:53:14 +00:00
|
|
|
InitAcc),
|
2016-10-18 01:59:03 +01:00
|
|
|
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
|
|
|
Acc
|
|
|
|
end,
|
|
|
|
{async, Folder}.
|
|
|
|
|
2016-10-31 16:02:32 +00:00
|
|
|
|
2016-10-31 17:26:28 +00:00
|
|
|
hashtree_query(State, Tag, JournalCheck) ->
|
|
|
|
SnapType = case JournalCheck of
|
2016-10-31 16:02:32 +00:00
|
|
|
false ->
|
2016-10-31 17:26:28 +00:00
|
|
|
ledger;
|
|
|
|
check_presence ->
|
|
|
|
store
|
2016-10-31 16:02:32 +00:00
|
|
|
end,
|
2017-03-06 18:42:32 +00:00
|
|
|
{ok, LedgerSnapshot, JournalSnapshot} = snapshot_store(State,
|
|
|
|
SnapType,
|
|
|
|
no_lookup),
|
2016-10-31 16:02:32 +00:00
|
|
|
Folder = fun() ->
|
|
|
|
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
|
|
|
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
2016-10-31 17:26:28 +00:00
|
|
|
AccFun = accumulate_hashes(JournalCheck, JournalSnapshot),
|
2016-10-31 16:02:32 +00:00
|
|
|
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
|
|
|
StartKey,
|
|
|
|
EndKey,
|
|
|
|
AccFun,
|
|
|
|
[]),
|
|
|
|
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
2016-10-31 18:51:23 +00:00
|
|
|
case JournalCheck of
|
|
|
|
false ->
|
|
|
|
ok;
|
|
|
|
check_presence ->
|
|
|
|
leveled_inker:ink_close(JournalSnapshot)
|
|
|
|
end,
|
2016-10-31 16:02:32 +00:00
|
|
|
Acc
|
|
|
|
end,
|
|
|
|
{async, Folder}.
|
|
|
|
|
2016-11-02 15:38:51 +00:00
|
|
|
|
|
|
|
foldobjects_allkeys(State, Tag, FoldObjectsFun) ->
|
2016-11-04 11:01:37 +00:00
|
|
|
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
|
|
|
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
2017-04-07 12:09:11 +00:00
|
|
|
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun, false).
|
|
|
|
|
|
|
|
foldheads_allkeys(State, Tag, FoldHeadsFun) ->
|
|
|
|
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
|
|
|
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
|
|
|
foldobjects(State, Tag, StartKey, EndKey, FoldHeadsFun, true).
|
2016-11-04 11:01:37 +00:00
|
|
|
|
|
|
|
foldobjects_bybucket(State, Tag, Bucket, FoldObjectsFun) ->
|
|
|
|
StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
|
|
|
EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
2017-04-07 12:09:11 +00:00
|
|
|
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun, false).
|
|
|
|
|
2017-04-07 14:56:28 +00:00
|
|
|
foldheads_bybucket(State, Tag, Bucket, FoldHeadsFun) ->
|
|
|
|
StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
|
|
|
EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
|
|
|
foldobjects(State, Tag, StartKey, EndKey, FoldHeadsFun, true).
|
|
|
|
|
2017-04-07 12:09:11 +00:00
|
|
|
foldobjects_byindex(State, Tag, Bucket,
|
|
|
|
Field, FromTerm, ToTerm, FoldObjectsFun) ->
|
|
|
|
StartKey =
|
|
|
|
leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, FromTerm),
|
|
|
|
EndKey =
|
|
|
|
leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, ToTerm),
|
|
|
|
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun, false).
|
2016-11-04 11:01:37 +00:00
|
|
|
|
|
|
|
|
2017-04-07 12:09:11 +00:00
|
|
|
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun, DeferredFetch) ->
|
2017-03-06 18:42:32 +00:00
|
|
|
{ok, LedgerSnapshot, JournalSnapshot} = snapshot_store(State, store),
|
2016-11-04 14:23:37 +00:00
|
|
|
{FoldFun, InitAcc} = case is_tuple(FoldObjectsFun) of
|
|
|
|
true ->
|
|
|
|
FoldObjectsFun;
|
|
|
|
false ->
|
|
|
|
{FoldObjectsFun, []}
|
|
|
|
end,
|
2016-11-02 15:38:51 +00:00
|
|
|
Folder = fun() ->
|
2017-04-07 12:09:11 +00:00
|
|
|
AccFun = accumulate_objects(FoldFun,
|
|
|
|
JournalSnapshot,
|
|
|
|
Tag,
|
|
|
|
DeferredFetch),
|
2016-11-02 15:38:51 +00:00
|
|
|
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
|
|
|
StartKey,
|
|
|
|
EndKey,
|
|
|
|
AccFun,
|
2016-11-04 14:23:37 +00:00
|
|
|
InitAcc),
|
2016-11-02 15:38:51 +00:00
|
|
|
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
|
|
|
ok = leveled_inker:ink_close(JournalSnapshot),
|
|
|
|
Acc
|
|
|
|
end,
|
|
|
|
{async, Folder}.
|
|
|
|
|
2016-11-04 11:01:37 +00:00
|
|
|
|
2016-11-18 15:53:22 +00:00
|
|
|
bucketkey_query(State, Tag, Bucket, {FoldKeysFun, InitAcc}) ->
|
2017-03-06 18:42:32 +00:00
|
|
|
{ok, LedgerSnapshot, _JournalSnapshot} = snapshot_store(State,
|
|
|
|
ledger,
|
|
|
|
no_lookup),
|
2016-10-23 22:45:43 +01:00
|
|
|
Folder = fun() ->
|
2016-11-17 15:55:29 +00:00
|
|
|
SK = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
|
|
|
EK = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
2016-11-18 15:53:22 +00:00
|
|
|
AccFun = accumulate_keys(FoldKeysFun),
|
2016-10-23 22:45:43 +01:00
|
|
|
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
|
|
|
SK,
|
|
|
|
EK,
|
2016-10-31 12:12:06 +00:00
|
|
|
AccFun,
|
2016-11-18 15:53:22 +00:00
|
|
|
InitAcc),
|
2016-10-23 22:45:43 +01:00
|
|
|
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
2016-12-02 00:40:00 +00:00
|
|
|
Acc
|
2016-10-23 22:45:43 +01:00
|
|
|
end,
|
|
|
|
{async, Folder}.
|
|
|
|
|
2016-11-18 15:53:22 +00:00
|
|
|
allkey_query(State, Tag, {FoldKeysFun, InitAcc}) ->
|
|
|
|
bucketkey_query(State, Tag, null, {FoldKeysFun, InitAcc}).
|
2016-11-17 15:55:29 +00:00
|
|
|
|
2016-10-18 01:59:03 +01:00
|
|
|
|
2017-03-02 17:49:43 +00:00
|
|
|
readycache_forsnapshot(LedgerCache, {StartKey, EndKey}) ->
|
|
|
|
{KL, MinSQN, MaxSQN} = scan_table(LedgerCache#ledger_cache.mem,
|
|
|
|
StartKey,
|
|
|
|
EndKey),
|
|
|
|
case KL of
|
|
|
|
[] ->
|
|
|
|
#ledger_cache{loader=empty_cache,
|
|
|
|
index=empty_index,
|
|
|
|
min_sqn=MinSQN,
|
|
|
|
max_sqn=MaxSQN};
|
|
|
|
_ ->
|
|
|
|
#ledger_cache{loader=leveled_tree:from_orderedlist(KL,
|
|
|
|
?CACHE_TYPE),
|
|
|
|
index=empty_index,
|
|
|
|
min_sqn=MinSQN,
|
|
|
|
max_sqn=MaxSQN}
|
2017-03-02 18:23:47 +00:00
|
|
|
end;
|
|
|
|
readycache_forsnapshot(LedgerCache, Query) ->
|
|
|
|
% Need to convert the Ledger Cache away from using the ETS table
|
|
|
|
Tree = leveled_tree:from_orderedset(LedgerCache#ledger_cache.mem,
|
|
|
|
?CACHE_TYPE),
|
|
|
|
case leveled_tree:tsize(Tree) of
|
|
|
|
0 ->
|
|
|
|
#ledger_cache{loader=empty_cache,
|
|
|
|
index=empty_index,
|
|
|
|
min_sqn=LedgerCache#ledger_cache.min_sqn,
|
|
|
|
max_sqn=LedgerCache#ledger_cache.max_sqn};
|
|
|
|
_ ->
|
|
|
|
Idx =
|
|
|
|
case Query of
|
|
|
|
no_lookup ->
|
|
|
|
empty_index;
|
|
|
|
_ ->
|
|
|
|
LedgerCache#ledger_cache.index
|
|
|
|
end,
|
|
|
|
#ledger_cache{loader=Tree,
|
|
|
|
index=Idx,
|
|
|
|
min_sqn=LedgerCache#ledger_cache.min_sqn,
|
|
|
|
max_sqn=LedgerCache#ledger_cache.max_sqn}
|
2017-03-02 17:49:43 +00:00
|
|
|
end.
|
|
|
|
|
|
|
|
scan_table(Table, StartKey, EndKey) ->
|
|
|
|
scan_table(Table, StartKey, EndKey, [], infinity, 0).
|
|
|
|
|
|
|
|
scan_table(Table, StartKey, EndKey, Acc, MinSQN, MaxSQN) ->
|
|
|
|
case ets:next(Table, StartKey) of
|
|
|
|
'$end_of_table' ->
|
|
|
|
{lists:reverse(Acc), MinSQN, MaxSQN};
|
|
|
|
NextKey ->
|
|
|
|
case leveled_codec:endkey_passed(EndKey, NextKey) of
|
|
|
|
true ->
|
|
|
|
{lists:reverse(Acc), MinSQN, MaxSQN};
|
|
|
|
false ->
|
|
|
|
[{NextKey, NextVal}] = ets:lookup(Table, NextKey),
|
|
|
|
SQN = leveled_codec:strip_to_seqonly({NextKey, NextVal}),
|
|
|
|
scan_table(Table,
|
|
|
|
NextKey,
|
|
|
|
EndKey,
|
|
|
|
[{NextKey, NextVal}|Acc],
|
|
|
|
min(MinSQN, SQN),
|
|
|
|
max(MaxSQN, SQN))
|
|
|
|
end
|
|
|
|
end.
|
|
|
|
|
2017-01-05 18:43:55 +00:00
|
|
|
|
2016-09-15 10:53:24 +01:00
|
|
|
set_options(Opts) ->
|
2016-12-08 18:35:20 +00:00
|
|
|
MaxJournalSize0 = get_opt(max_journalsize, Opts, 10000000000),
|
2016-12-08 21:02:39 +00:00
|
|
|
JournalSizeJitter = MaxJournalSize0 div (100 div ?JOURNAL_SIZE_JITTER),
|
2016-12-08 18:35:20 +00:00
|
|
|
MaxJournalSize = MaxJournalSize0 -
|
2016-12-09 16:34:15 +00:00
|
|
|
erlang:phash2(self()) rem JournalSizeJitter,
|
2016-12-08 18:35:20 +00:00
|
|
|
|
2016-11-25 17:41:08 +00:00
|
|
|
SyncStrat = get_opt(sync_strategy, Opts, sync),
|
2016-11-14 11:17:14 +00:00
|
|
|
WRP = get_opt(waste_retention_period, Opts),
|
|
|
|
|
2016-11-02 12:58:27 +00:00
|
|
|
AltStrategy = get_opt(reload_strategy, Opts, []),
|
2016-10-25 23:13:14 +01:00
|
|
|
ReloadStrategy = leveled_codec:inker_reload_strategy(AltStrategy),
|
2016-11-02 12:58:27 +00:00
|
|
|
|
|
|
|
PCLL0CacheSize = get_opt(max_pencillercachesize, Opts),
|
|
|
|
RootPath = get_opt(root_path, Opts),
|
2016-11-14 11:17:14 +00:00
|
|
|
|
2016-11-02 12:58:27 +00:00
|
|
|
JournalFP = RootPath ++ "/" ++ ?JOURNAL_FP,
|
|
|
|
LedgerFP = RootPath ++ "/" ++ ?LEDGER_FP,
|
2017-04-07 12:09:11 +00:00
|
|
|
ok = filelib:ensure_dir(JournalFP),
|
|
|
|
ok = filelib:ensure_dir(LedgerFP),
|
2016-11-02 12:58:27 +00:00
|
|
|
|
2016-10-25 23:13:14 +01:00
|
|
|
{#inker_options{root_path = JournalFP,
|
|
|
|
reload_strategy = ReloadStrategy,
|
2016-11-02 12:58:27 +00:00
|
|
|
max_run_length = get_opt(max_run_length, Opts),
|
2016-11-14 11:17:14 +00:00
|
|
|
waste_retention_period = WRP,
|
2016-10-08 22:15:48 +01:00
|
|
|
cdb_options = #cdb_options{max_size=MaxJournalSize,
|
2016-11-25 17:41:08 +00:00
|
|
|
binary_mode=true,
|
|
|
|
sync_strategy=SyncStrat}},
|
2016-10-31 01:33:33 +00:00
|
|
|
#penciller_options{root_path = LedgerFP,
|
2016-12-09 14:36:03 +00:00
|
|
|
max_inmemory_tablesize = PCLL0CacheSize,
|
|
|
|
levelzero_cointoss = true}}.
|
2016-09-08 14:21:30 +01:00
|
|
|
|
|
|
|
startup(InkerOpts, PencillerOpts) ->
|
|
|
|
{ok, Inker} = leveled_inker:ink_start(InkerOpts),
|
|
|
|
{ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts),
|
|
|
|
LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller),
|
2016-11-02 18:14:46 +00:00
|
|
|
leveled_log:log("B0005", [LedgerSQN]),
|
2016-09-15 18:38:23 +01:00
|
|
|
ok = leveled_inker:ink_loadpcl(Inker,
|
|
|
|
LedgerSQN + 1,
|
|
|
|
fun load_fun/5,
|
|
|
|
Penciller),
|
2016-09-08 14:21:30 +01:00
|
|
|
{Inker, Penciller}.
|
|
|
|
|
|
|
|
|
2016-10-19 00:10:48 +01:00
|
|
|
fetch_head(Key, Penciller, LedgerCache) ->
|
2016-12-21 01:56:12 +00:00
|
|
|
SW = os:timestamp(),
|
2017-01-05 17:55:27 +00:00
|
|
|
CacheResult =
|
|
|
|
case LedgerCache#ledger_cache.mem of
|
|
|
|
undefined ->
|
|
|
|
[];
|
|
|
|
Tab ->
|
|
|
|
ets:lookup(Tab, Key)
|
|
|
|
end,
|
|
|
|
case CacheResult of
|
2017-01-05 17:00:12 +00:00
|
|
|
[{Key, Head}] ->
|
|
|
|
Head;
|
|
|
|
[] ->
|
|
|
|
Hash = leveled_codec:magic_hash(Key),
|
|
|
|
case leveled_penciller:pcl_fetch(Penciller, Key, Hash) of
|
|
|
|
{Key, Head} ->
|
|
|
|
maybe_longrunning(SW, pcl_head),
|
2016-09-15 10:53:24 +01:00
|
|
|
Head;
|
2017-01-05 17:00:12 +00:00
|
|
|
not_present ->
|
|
|
|
maybe_longrunning(SW, pcl_head),
|
|
|
|
not_present
|
2016-09-15 10:53:24 +01:00
|
|
|
end
|
|
|
|
end.
|
|
|
|
|
2016-10-23 22:45:43 +01:00
|
|
|
|
2016-10-31 12:12:06 +00:00
|
|
|
accumulate_size() ->
|
|
|
|
Now = leveled_codec:integer_now(),
|
|
|
|
AccFun = fun(Key, Value, {Size, Count}) ->
|
|
|
|
case leveled_codec:is_active(Key, Value, Now) of
|
|
|
|
true ->
|
|
|
|
{Size + leveled_codec:get_size(Key, Value),
|
|
|
|
Count + 1};
|
|
|
|
false ->
|
|
|
|
{Size, Count}
|
|
|
|
end
|
|
|
|
end,
|
|
|
|
AccFun.
|
|
|
|
|
2016-10-31 17:26:28 +00:00
|
|
|
accumulate_hashes(JournalCheck, InkerClone) ->
|
2016-10-31 16:02:32 +00:00
|
|
|
Now = leveled_codec:integer_now(),
|
2016-10-31 17:26:28 +00:00
|
|
|
AccFun = fun(LK, V, KHList) ->
|
|
|
|
case leveled_codec:is_active(LK, V, Now) of
|
2016-10-31 16:02:32 +00:00
|
|
|
true ->
|
2016-10-31 17:26:28 +00:00
|
|
|
{B, K, H} = leveled_codec:get_keyandhash(LK, V),
|
2016-10-31 18:51:23 +00:00
|
|
|
Check = random:uniform() < ?CHECKJOURNAL_PROB,
|
|
|
|
case {JournalCheck, Check} of
|
|
|
|
{check_presence, true} ->
|
2016-10-31 17:26:28 +00:00
|
|
|
case check_presence(LK, V, InkerClone) of
|
|
|
|
true ->
|
|
|
|
[{B, K, H}|KHList];
|
|
|
|
false ->
|
|
|
|
KHList
|
2016-10-31 18:51:23 +00:00
|
|
|
end;
|
|
|
|
_ ->
|
|
|
|
[{B, K, H}|KHList]
|
2016-10-31 17:26:28 +00:00
|
|
|
end;
|
2016-10-31 16:02:32 +00:00
|
|
|
false ->
|
|
|
|
KHList
|
|
|
|
end
|
|
|
|
end,
|
|
|
|
AccFun.
|
|
|
|
|
2017-04-07 12:09:11 +00:00
|
|
|
|
|
|
|
accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
|
2016-11-02 15:38:51 +00:00
|
|
|
Now = leveled_codec:integer_now(),
|
2017-04-07 12:09:11 +00:00
|
|
|
AccFun =
|
|
|
|
fun(LK, V, Acc) ->
|
|
|
|
case leveled_codec:is_active(LK, V, Now) of
|
|
|
|
true ->
|
|
|
|
{SQN, _St, _MH, MD} =
|
|
|
|
leveled_codec:striphead_to_details(V),
|
|
|
|
{B, K} =
|
|
|
|
case leveled_codec:from_ledgerkey(LK) of
|
|
|
|
{B0, K0} ->
|
|
|
|
{B0, K0};
|
|
|
|
{B0, K0, _T0} ->
|
|
|
|
{B0, K0}
|
|
|
|
end,
|
|
|
|
JK = {leveled_codec:to_ledgerkey(B, K, Tag), SQN},
|
|
|
|
case DeferredFetch of
|
2016-11-02 15:38:51 +00:00
|
|
|
true ->
|
2017-04-07 12:09:11 +00:00
|
|
|
Size = leveled_codec:get_size(LK, V),
|
|
|
|
MDBin =
|
|
|
|
leveled_codec:build_metadata_object(LK, MD),
|
|
|
|
Value = {proxy_object,
|
|
|
|
MDBin,
|
|
|
|
Size,
|
|
|
|
{fun fetch_value/2, InkerClone, JK}},
|
|
|
|
FoldObjectsFun(B, K, Value, Acc);
|
|
|
|
false ->
|
|
|
|
R = fetch_value(InkerClone, JK),
|
2016-11-02 15:38:51 +00:00
|
|
|
case R of
|
|
|
|
not_present ->
|
2017-04-07 12:09:11 +00:00
|
|
|
Acc;
|
|
|
|
Value ->
|
|
|
|
FoldObjectsFun(B, K, Value, Acc)
|
|
|
|
|
|
|
|
end
|
|
|
|
end;
|
|
|
|
false ->
|
|
|
|
Acc
|
|
|
|
end
|
|
|
|
end,
|
2016-11-02 15:38:51 +00:00
|
|
|
AccFun.
|
|
|
|
|
2016-11-04 11:01:37 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
2016-10-31 17:26:28 +00:00
|
|
|
check_presence(Key, Value, InkerClone) ->
|
|
|
|
{LedgerKey, SQN} = leveled_codec:strip_to_keyseqonly({Key, Value}),
|
|
|
|
case leveled_inker:ink_keycheck(InkerClone, LedgerKey, SQN) of
|
|
|
|
probably ->
|
|
|
|
true;
|
|
|
|
missing ->
|
|
|
|
false
|
|
|
|
end.
|
|
|
|
|
2016-11-18 15:53:22 +00:00
|
|
|
accumulate_keys(FoldKeysFun) ->
|
2016-10-31 12:12:06 +00:00
|
|
|
Now = leveled_codec:integer_now(),
|
2016-11-18 15:53:22 +00:00
|
|
|
AccFun = fun(Key, Value, Acc) ->
|
2016-10-31 12:12:06 +00:00
|
|
|
case leveled_codec:is_active(Key, Value, Now) of
|
|
|
|
true ->
|
2016-11-18 15:53:22 +00:00
|
|
|
{B, K} = leveled_codec:from_ledgerkey(Key),
|
|
|
|
FoldKeysFun(B, K, Acc);
|
2016-10-31 12:12:06 +00:00
|
|
|
false ->
|
2016-11-18 15:53:22 +00:00
|
|
|
Acc
|
2016-10-31 12:12:06 +00:00
|
|
|
end
|
|
|
|
end,
|
|
|
|
AccFun.
|
2016-10-18 01:59:03 +01:00
|
|
|
|
2016-11-18 11:53:14 +00:00
|
|
|
add_keys(ObjKey, _IdxValue) ->
|
|
|
|
ObjKey.
|
2016-10-18 01:59:03 +01:00
|
|
|
|
2016-11-18 11:53:14 +00:00
|
|
|
add_terms(ObjKey, IdxValue) ->
|
|
|
|
{IdxValue, ObjKey}.
|
2016-10-18 01:59:03 +01:00
|
|
|
|
2016-11-18 11:53:14 +00:00
|
|
|
accumulate_index(TermRe, AddFun, FoldKeysFun) ->
|
2016-10-31 12:12:06 +00:00
|
|
|
Now = leveled_codec:integer_now(),
|
2016-10-18 01:59:03 +01:00
|
|
|
case TermRe of
|
|
|
|
undefined ->
|
|
|
|
fun(Key, Value, Acc) ->
|
2016-10-31 12:12:06 +00:00
|
|
|
case leveled_codec:is_active(Key, Value, Now) of
|
2016-10-18 01:59:03 +01:00
|
|
|
true ->
|
2016-11-18 11:53:14 +00:00
|
|
|
{Bucket,
|
2016-10-18 01:59:03 +01:00
|
|
|
ObjKey,
|
|
|
|
IdxValue} = leveled_codec:from_ledgerkey(Key),
|
2016-11-18 11:53:14 +00:00
|
|
|
FoldKeysFun(Bucket, AddFun(ObjKey, IdxValue), Acc);
|
2016-10-18 01:59:03 +01:00
|
|
|
false ->
|
|
|
|
Acc
|
|
|
|
end end;
|
|
|
|
TermRe ->
|
|
|
|
fun(Key, Value, Acc) ->
|
2016-10-31 12:12:06 +00:00
|
|
|
case leveled_codec:is_active(Key, Value, Now) of
|
2016-10-18 01:59:03 +01:00
|
|
|
true ->
|
2016-11-18 11:53:14 +00:00
|
|
|
{Bucket,
|
2016-10-18 01:59:03 +01:00
|
|
|
ObjKey,
|
|
|
|
IdxValue} = leveled_codec:from_ledgerkey(Key),
|
|
|
|
case re:run(IdxValue, TermRe) of
|
|
|
|
nomatch ->
|
|
|
|
Acc;
|
|
|
|
_ ->
|
2016-11-18 11:53:14 +00:00
|
|
|
FoldKeysFun(Bucket,
|
|
|
|
AddFun(ObjKey, IdxValue),
|
|
|
|
Acc)
|
2016-10-18 01:59:03 +01:00
|
|
|
end;
|
|
|
|
false ->
|
|
|
|
Acc
|
|
|
|
end end
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
2016-10-31 12:12:06 +00:00
|
|
|
preparefor_ledgercache(?INKT_KEYD,
|
|
|
|
LedgerKey, SQN, _Obj, _Size, {IndexSpecs, TTL}) ->
|
2016-10-25 23:13:14 +01:00
|
|
|
{Bucket, Key} = leveled_codec:from_ledgerkey(LedgerKey),
|
2017-01-05 21:58:33 +00:00
|
|
|
KeyChanges = leveled_codec:convert_indexspecs(IndexSpecs,
|
|
|
|
Bucket,
|
|
|
|
Key,
|
|
|
|
SQN,
|
|
|
|
TTL),
|
|
|
|
{no_lookup, SQN, KeyChanges};
|
2016-10-31 12:12:06 +00:00
|
|
|
preparefor_ledgercache(_Type, LedgerKey, SQN, Obj, Size, {IndexSpecs, TTL}) ->
|
2017-01-05 21:58:33 +00:00
|
|
|
{Bucket, Key, ObjKeyChange, H} = leveled_codec:generate_ledgerkv(LedgerKey,
|
|
|
|
SQN,
|
|
|
|
Obj,
|
|
|
|
Size,
|
|
|
|
TTL),
|
|
|
|
KeyChanges = [ObjKeyChange] ++ leveled_codec:convert_indexspecs(IndexSpecs,
|
|
|
|
Bucket,
|
|
|
|
Key,
|
|
|
|
SQN,
|
|
|
|
TTL),
|
|
|
|
{H, SQN, KeyChanges}.
|
|
|
|
|
|
|
|
|
|
|
|
addto_ledgercache({H, SQN, KeyChanges}, Cache) ->
|
|
|
|
ets:insert(Cache#ledger_cache.mem, KeyChanges),
|
|
|
|
UpdIndex = leveled_pmem:prepare_for_index(Cache#ledger_cache.index, H),
|
|
|
|
Cache#ledger_cache{index = UpdIndex,
|
|
|
|
min_sqn=min(SQN, Cache#ledger_cache.min_sqn),
|
|
|
|
max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}.
|
|
|
|
|
|
|
|
addto_ledgercache({H, SQN, KeyChanges}, Cache, loader) ->
|
2017-01-20 16:36:20 +00:00
|
|
|
UpdQ = KeyChanges ++ Cache#ledger_cache.load_queue,
|
2017-01-05 21:58:33 +00:00
|
|
|
UpdIndex = leveled_pmem:prepare_for_index(Cache#ledger_cache.index, H),
|
|
|
|
Cache#ledger_cache{index = UpdIndex,
|
2017-01-20 16:36:20 +00:00
|
|
|
load_queue = UpdQ,
|
2017-01-05 21:58:33 +00:00
|
|
|
min_sqn=min(SQN, Cache#ledger_cache.min_sqn),
|
|
|
|
max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}.
|
2016-09-15 10:53:24 +01:00
|
|
|
|
2017-01-05 17:00:12 +00:00
|
|
|
|
2016-09-15 10:53:24 +01:00
|
|
|
maybepush_ledgercache(MaxCacheSize, Cache, Penciller) ->
|
2017-01-05 17:00:12 +00:00
|
|
|
Tab = Cache#ledger_cache.mem,
|
|
|
|
CacheSize = ets:info(Tab, size),
|
2016-10-20 02:23:45 +01:00
|
|
|
TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize),
|
2016-09-15 10:53:24 +01:00
|
|
|
if
|
2016-10-20 02:23:45 +01:00
|
|
|
TimeToPush ->
|
2017-03-13 11:54:46 +00:00
|
|
|
CacheToLoad = {Tab,
|
2017-01-05 21:58:33 +00:00
|
|
|
Cache#ledger_cache.index,
|
2016-12-11 01:02:56 +00:00
|
|
|
Cache#ledger_cache.min_sqn,
|
|
|
|
Cache#ledger_cache.max_sqn},
|
|
|
|
case leveled_penciller:pcl_pushmem(Penciller, CacheToLoad) of
|
2016-09-15 10:53:24 +01:00
|
|
|
ok ->
|
2017-01-05 17:00:12 +00:00
|
|
|
Cache0 = #ledger_cache{},
|
2017-03-21 12:02:22 +00:00
|
|
|
true = ets:delete(Tab),
|
|
|
|
NewTab = ets:new(mem, [ordered_set]),
|
|
|
|
{ok, Cache0#ledger_cache{mem=NewTab}};
|
2016-10-30 18:25:30 +00:00
|
|
|
returned ->
|
2016-11-05 14:31:10 +00:00
|
|
|
{returned, Cache}
|
2016-09-15 10:53:24 +01:00
|
|
|
end;
|
|
|
|
true ->
|
2016-10-20 02:23:45 +01:00
|
|
|
{ok, Cache}
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
2016-11-03 19:39:23 +00:00
|
|
|
maybe_withjitter(CacheSize, MaxCacheSize) ->
|
2016-10-20 02:23:45 +01:00
|
|
|
if
|
|
|
|
CacheSize > MaxCacheSize ->
|
2016-11-05 14:31:10 +00:00
|
|
|
R = random:uniform(7 * MaxCacheSize),
|
2016-10-20 02:23:45 +01:00
|
|
|
if
|
2016-11-05 14:31:10 +00:00
|
|
|
(CacheSize - MaxCacheSize) > R ->
|
2016-10-20 02:23:45 +01:00
|
|
|
true;
|
|
|
|
true ->
|
|
|
|
false
|
|
|
|
end;
|
|
|
|
true ->
|
|
|
|
false
|
2016-09-15 10:53:24 +01:00
|
|
|
end.
|
|
|
|
|
2016-11-03 09:19:02 +00:00
|
|
|
|
|
|
|
|
2016-09-15 18:38:23 +01:00
|
|
|
load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) ->
|
2016-10-20 02:23:45 +01:00
|
|
|
{MinSQN, MaxSQN, OutputTree} = Acc0,
|
2016-10-25 23:13:14 +01:00
|
|
|
{SQN, Type, PK} = KeyInLedger,
|
2016-10-08 22:15:48 +01:00
|
|
|
% VBin may already be a term
|
|
|
|
{VBin, VSize} = ExtractFun(ValueInLedger),
|
2016-10-25 23:13:14 +01:00
|
|
|
{Obj, IndexSpecs} = leveled_codec:split_inkvalue(VBin),
|
2016-09-15 10:53:24 +01:00
|
|
|
case SQN of
|
|
|
|
SQN when SQN < MinSQN ->
|
|
|
|
{loop, Acc0};
|
2016-10-07 10:04:48 +01:00
|
|
|
SQN when SQN < MaxSQN ->
|
2016-10-25 23:13:14 +01:00
|
|
|
Changes = preparefor_ledgercache(Type, PK, SQN,
|
|
|
|
Obj, VSize, IndexSpecs),
|
2017-01-05 17:00:12 +00:00
|
|
|
{loop,
|
|
|
|
{MinSQN,
|
|
|
|
MaxSQN,
|
|
|
|
addto_ledgercache(Changes, OutputTree, loader)}};
|
2016-10-07 10:04:48 +01:00
|
|
|
MaxSQN ->
|
2016-11-02 18:14:46 +00:00
|
|
|
leveled_log:log("B0006", [SQN]),
|
2016-10-25 23:13:14 +01:00
|
|
|
Changes = preparefor_ledgercache(Type, PK, SQN,
|
|
|
|
Obj, VSize, IndexSpecs),
|
2017-01-05 17:00:12 +00:00
|
|
|
{stop,
|
|
|
|
{MinSQN,
|
|
|
|
MaxSQN,
|
|
|
|
addto_ledgercache(Changes, OutputTree, loader)}};
|
2016-09-15 10:53:24 +01:00
|
|
|
SQN when SQN > MaxSQN ->
|
2016-11-02 18:14:46 +00:00
|
|
|
leveled_log:log("B0007", [MaxSQN, SQN]),
|
2016-09-15 10:53:24 +01:00
|
|
|
{stop, Acc0}
|
|
|
|
end.
|
|
|
|
|
2016-09-08 14:21:30 +01:00
|
|
|
|
2016-11-02 12:58:27 +00:00
|
|
|
get_opt(Key, Opts) ->
|
|
|
|
get_opt(Key, Opts, undefined).
|
|
|
|
|
|
|
|
get_opt(Key, Opts, Default) ->
|
|
|
|
case proplists:get_value(Key, Opts) of
|
|
|
|
undefined ->
|
2016-11-18 21:35:45 +00:00
|
|
|
Default;
|
2016-11-02 12:58:27 +00:00
|
|
|
Value ->
|
|
|
|
Value
|
|
|
|
end.
|
|
|
|
|
2016-11-21 12:34:40 +00:00
|
|
|
delete_path(DirPath) ->
|
|
|
|
ok = filelib:ensure_dir(DirPath),
|
|
|
|
{ok, Files} = file:list_dir(DirPath),
|
|
|
|
[file:delete(filename:join([DirPath, File])) || File <- Files],
|
|
|
|
file:del_dir(DirPath).
|
2016-11-02 12:58:27 +00:00
|
|
|
|
2016-09-08 14:21:30 +01:00
|
|
|
%%%============================================================================
|
|
|
|
%%% Test
|
|
|
|
%%%============================================================================
|
|
|
|
|
|
|
|
-ifdef(TEST).
|
|
|
|
|
2016-09-15 15:14:49 +01:00
|
|
|
reset_filestructure() ->
|
|
|
|
RootPath = "../test",
|
|
|
|
leveled_inker:clean_testdir(RootPath ++ "/" ++ ?JOURNAL_FP),
|
|
|
|
leveled_penciller:clean_testdir(RootPath ++ "/" ++ ?LEDGER_FP),
|
|
|
|
RootPath.
|
2016-09-19 15:31:26 +01:00
|
|
|
|
2016-10-14 18:43:16 +01:00
|
|
|
|
2016-09-19 15:31:26 +01:00
|
|
|
generate_multiple_objects(Count, KeyNumber) ->
|
|
|
|
generate_multiple_objects(Count, KeyNumber, []).
|
2016-10-31 12:12:06 +00:00
|
|
|
|
2016-09-19 15:31:26 +01:00
|
|
|
generate_multiple_objects(0, _KeyNumber, ObjL) ->
|
|
|
|
ObjL;
|
|
|
|
generate_multiple_objects(Count, KeyNumber, ObjL) ->
|
2016-10-31 12:12:06 +00:00
|
|
|
Key = "Key" ++ integer_to_list(KeyNumber),
|
|
|
|
Value = crypto:rand_bytes(256),
|
|
|
|
IndexSpec = [{add, "idx1_bin", "f" ++ integer_to_list(KeyNumber rem 10)}],
|
|
|
|
generate_multiple_objects(Count - 1,
|
|
|
|
KeyNumber + 1,
|
|
|
|
ObjL ++ [{Key, Value, IndexSpec}]).
|
|
|
|
|
2016-10-05 18:28:31 +01:00
|
|
|
|
2016-10-31 12:12:06 +00:00
|
|
|
ttl_test() ->
|
|
|
|
RootPath = reset_filestructure(),
|
2016-11-02 12:58:27 +00:00
|
|
|
{ok, Bookie1} = book_start([{root_path, RootPath}]),
|
2016-10-31 12:12:06 +00:00
|
|
|
ObjL1 = generate_multiple_objects(100, 1),
|
|
|
|
% Put in all the objects with a TTL in the future
|
|
|
|
Future = leveled_codec:integer_now() + 300,
|
|
|
|
lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1,
|
|
|
|
"Bucket", K, V, S,
|
|
|
|
?STD_TAG,
|
|
|
|
Future) end,
|
|
|
|
ObjL1),
|
|
|
|
lists:foreach(fun({K, V, _S}) ->
|
|
|
|
{ok, V} = book_get(Bookie1, "Bucket", K, ?STD_TAG)
|
|
|
|
end,
|
|
|
|
ObjL1),
|
|
|
|
lists:foreach(fun({K, _V, _S}) ->
|
|
|
|
{ok, _} = book_head(Bookie1, "Bucket", K, ?STD_TAG)
|
|
|
|
end,
|
|
|
|
ObjL1),
|
|
|
|
|
|
|
|
ObjL2 = generate_multiple_objects(100, 101),
|
|
|
|
Past = leveled_codec:integer_now() - 300,
|
|
|
|
lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1,
|
|
|
|
"Bucket", K, V, S,
|
|
|
|
?STD_TAG,
|
|
|
|
Past) end,
|
|
|
|
ObjL2),
|
|
|
|
lists:foreach(fun({K, _V, _S}) ->
|
|
|
|
not_found = book_get(Bookie1, "Bucket", K, ?STD_TAG)
|
|
|
|
end,
|
|
|
|
ObjL2),
|
|
|
|
lists:foreach(fun({K, _V, _S}) ->
|
|
|
|
not_found = book_head(Bookie1, "Bucket", K, ?STD_TAG)
|
|
|
|
end,
|
|
|
|
ObjL2),
|
|
|
|
|
|
|
|
{async, BucketFolder} = book_returnfolder(Bookie1,
|
|
|
|
{bucket_stats, "Bucket"}),
|
|
|
|
{_Size, Count} = BucketFolder(),
|
|
|
|
?assertMatch(100, Count),
|
2016-11-18 11:53:14 +00:00
|
|
|
FoldKeysFun = fun(_B, Item, FKFAcc) -> FKFAcc ++ [Item] end,
|
2016-10-31 15:13:11 +00:00
|
|
|
{async,
|
|
|
|
IndexFolder} = book_returnfolder(Bookie1,
|
|
|
|
{index_query,
|
|
|
|
"Bucket",
|
2016-11-18 11:53:14 +00:00
|
|
|
{FoldKeysFun, []},
|
2016-10-31 15:13:11 +00:00
|
|
|
{"idx1_bin", "f8", "f9"},
|
|
|
|
{false, undefined}}),
|
|
|
|
KeyList = IndexFolder(),
|
|
|
|
?assertMatch(20, length(KeyList)),
|
|
|
|
|
|
|
|
{ok, Regex} = re:compile("f8"),
|
|
|
|
{async,
|
|
|
|
IndexFolderTR} = book_returnfolder(Bookie1,
|
|
|
|
{index_query,
|
|
|
|
"Bucket",
|
2016-11-18 11:53:14 +00:00
|
|
|
{FoldKeysFun, []},
|
2016-10-31 15:13:11 +00:00
|
|
|
{"idx1_bin", "f8", "f9"},
|
|
|
|
{true, Regex}}),
|
|
|
|
TermKeyList = IndexFolderTR(),
|
|
|
|
?assertMatch(10, length(TermKeyList)),
|
|
|
|
|
2016-10-31 12:12:06 +00:00
|
|
|
ok = book_close(Bookie1),
|
2016-11-02 12:58:27 +00:00
|
|
|
{ok, Bookie2} = book_start([{root_path, RootPath}]),
|
2016-10-31 15:13:11 +00:00
|
|
|
|
|
|
|
{async,
|
|
|
|
IndexFolderTR2} = book_returnfolder(Bookie2,
|
|
|
|
{index_query,
|
|
|
|
"Bucket",
|
2016-11-18 11:53:14 +00:00
|
|
|
{FoldKeysFun, []},
|
2016-10-31 15:13:11 +00:00
|
|
|
{"idx1_bin", "f7", "f9"},
|
|
|
|
{false, Regex}}),
|
|
|
|
KeyList2 = IndexFolderTR2(),
|
|
|
|
?assertMatch(10, length(KeyList2)),
|
|
|
|
|
|
|
|
lists:foreach(fun({K, _V, _S}) ->
|
|
|
|
not_found = book_get(Bookie2, "Bucket", K, ?STD_TAG)
|
|
|
|
end,
|
|
|
|
ObjL2),
|
|
|
|
lists:foreach(fun({K, _V, _S}) ->
|
|
|
|
not_found = book_head(Bookie2, "Bucket", K, ?STD_TAG)
|
|
|
|
end,
|
|
|
|
ObjL2),
|
|
|
|
|
|
|
|
ok = book_close(Bookie2),
|
2016-10-31 12:12:06 +00:00
|
|
|
reset_filestructure().
|
|
|
|
|
2016-10-31 16:02:32 +00:00
|
|
|
hashtree_query_test() ->
|
|
|
|
RootPath = reset_filestructure(),
|
2016-11-02 12:58:27 +00:00
|
|
|
{ok, Bookie1} = book_start([{root_path, RootPath},
|
|
|
|
{max_journalsize, 1000000},
|
|
|
|
{cache_size, 500}]),
|
2016-10-31 16:02:32 +00:00
|
|
|
ObjL1 = generate_multiple_objects(1200, 1),
|
|
|
|
% Put in all the objects with a TTL in the future
|
|
|
|
Future = leveled_codec:integer_now() + 300,
|
|
|
|
lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1,
|
|
|
|
"Bucket", K, V, S,
|
|
|
|
?STD_TAG,
|
|
|
|
Future) end,
|
|
|
|
ObjL1),
|
|
|
|
ObjL2 = generate_multiple_objects(20, 1201),
|
|
|
|
% Put in a few objects with a TTL in the past
|
|
|
|
Past = leveled_codec:integer_now() - 300,
|
|
|
|
lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1,
|
|
|
|
"Bucket", K, V, S,
|
|
|
|
?STD_TAG,
|
|
|
|
Past) end,
|
|
|
|
ObjL2),
|
|
|
|
% Scan the store for the Bucket, Keys and Hashes
|
|
|
|
{async, HTFolder} = book_returnfolder(Bookie1,
|
|
|
|
{hashtree_query,
|
|
|
|
?STD_TAG,
|
|
|
|
false}),
|
|
|
|
KeyHashList = HTFolder(),
|
|
|
|
lists:foreach(fun({B, _K, H}) ->
|
|
|
|
?assertMatch("Bucket", B),
|
|
|
|
?assertMatch(true, is_integer(H))
|
|
|
|
end,
|
|
|
|
KeyHashList),
|
|
|
|
?assertMatch(1200, length(KeyHashList)),
|
|
|
|
ok = book_close(Bookie1),
|
2016-11-02 12:58:27 +00:00
|
|
|
{ok, Bookie2} = book_start([{root_path, RootPath},
|
|
|
|
{max_journalsize, 200000},
|
|
|
|
{cache_size, 500}]),
|
2016-10-31 16:02:32 +00:00
|
|
|
{async, HTFolder2} = book_returnfolder(Bookie2,
|
|
|
|
{hashtree_query,
|
|
|
|
?STD_TAG,
|
|
|
|
false}),
|
2016-12-29 02:07:14 +00:00
|
|
|
L0 = length(KeyHashList),
|
|
|
|
HTR2 = HTFolder2(),
|
|
|
|
?assertMatch(L0, length(HTR2)),
|
|
|
|
?assertMatch(KeyHashList, HTR2),
|
2016-10-31 16:02:32 +00:00
|
|
|
ok = book_close(Bookie2),
|
|
|
|
reset_filestructure().
|
|
|
|
|
2016-10-31 18:51:23 +00:00
|
|
|
hashtree_query_withjournalcheck_test() ->
|
|
|
|
RootPath = reset_filestructure(),
|
2016-11-02 12:58:27 +00:00
|
|
|
{ok, Bookie1} = book_start([{root_path, RootPath},
|
|
|
|
{max_journalsize, 1000000},
|
|
|
|
{cache_size, 500}]),
|
2016-10-31 18:51:23 +00:00
|
|
|
ObjL1 = generate_multiple_objects(800, 1),
|
|
|
|
% Put in all the objects with a TTL in the future
|
|
|
|
Future = leveled_codec:integer_now() + 300,
|
|
|
|
lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1,
|
|
|
|
"Bucket", K, V, S,
|
|
|
|
?STD_TAG,
|
|
|
|
Future) end,
|
|
|
|
ObjL1),
|
|
|
|
{async, HTFolder1} = book_returnfolder(Bookie1,
|
|
|
|
{hashtree_query,
|
|
|
|
?STD_TAG,
|
|
|
|
false}),
|
|
|
|
KeyHashList = HTFolder1(),
|
|
|
|
{async, HTFolder2} = book_returnfolder(Bookie1,
|
|
|
|
{hashtree_query,
|
|
|
|
?STD_TAG,
|
|
|
|
check_presence}),
|
|
|
|
?assertMatch(KeyHashList, HTFolder2()),
|
|
|
|
ok = book_close(Bookie1),
|
|
|
|
reset_filestructure().
|
|
|
|
|
2016-11-02 15:38:51 +00:00
|
|
|
foldobjects_vs_hashtree_test() ->
|
|
|
|
RootPath = reset_filestructure(),
|
|
|
|
{ok, Bookie1} = book_start([{root_path, RootPath},
|
|
|
|
{max_journalsize, 1000000},
|
|
|
|
{cache_size, 500}]),
|
|
|
|
ObjL1 = generate_multiple_objects(800, 1),
|
|
|
|
% Put in all the objects with a TTL in the future
|
|
|
|
Future = leveled_codec:integer_now() + 300,
|
|
|
|
lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1,
|
|
|
|
"Bucket", K, V, S,
|
|
|
|
?STD_TAG,
|
|
|
|
Future) end,
|
|
|
|
ObjL1),
|
|
|
|
{async, HTFolder1} = book_returnfolder(Bookie1,
|
|
|
|
{hashtree_query,
|
|
|
|
?STD_TAG,
|
|
|
|
false}),
|
|
|
|
KeyHashList1 = lists:usort(HTFolder1()),
|
2017-04-07 14:56:28 +00:00
|
|
|
|
2016-11-02 15:38:51 +00:00
|
|
|
FoldObjectsFun = fun(B, K, V, Acc) ->
|
|
|
|
[{B, K, erlang:phash2(term_to_binary(V))}|Acc] end,
|
2017-04-07 14:19:25 +00:00
|
|
|
{async, HTFolder2} =
|
|
|
|
book_returnfolder(Bookie1,
|
|
|
|
{foldobjects_allkeys, ?STD_TAG, FoldObjectsFun}),
|
2016-11-02 15:38:51 +00:00
|
|
|
KeyHashList2 = HTFolder2(),
|
|
|
|
?assertMatch(KeyHashList1, lists:usort(KeyHashList2)),
|
|
|
|
|
2017-04-07 14:19:25 +00:00
|
|
|
FoldHeadsFun =
|
|
|
|
fun(B, K, ProxyV, Acc) ->
|
|
|
|
{proxy_object,
|
|
|
|
_MDBin,
|
|
|
|
_Size,
|
|
|
|
{FetchFun, Clone, JK}} = ProxyV,
|
|
|
|
V = FetchFun(Clone, JK),
|
|
|
|
[{B, K, erlang:phash2(term_to_binary(V))}|Acc]
|
|
|
|
end,
|
|
|
|
|
|
|
|
{async, HTFolder3} =
|
|
|
|
book_returnfolder(Bookie1,
|
|
|
|
{foldheads_allkeys, ?STD_TAG, FoldHeadsFun}),
|
|
|
|
KeyHashList3 = HTFolder3(),
|
|
|
|
?assertMatch(KeyHashList1, lists:usort(KeyHashList3)),
|
|
|
|
|
|
|
|
FoldHeadsFun2 =
|
|
|
|
fun(B, K, ProxyV, Acc) ->
|
|
|
|
{proxy_object,
|
|
|
|
MD,
|
|
|
|
_Size,
|
|
|
|
_Fetcher} = ProxyV,
|
|
|
|
{Hash, _Size} = MD,
|
|
|
|
[{B, K, Hash}|Acc]
|
|
|
|
end,
|
|
|
|
|
|
|
|
{async, HTFolder4} =
|
|
|
|
book_returnfolder(Bookie1,
|
|
|
|
{foldheads_allkeys, ?STD_TAG, FoldHeadsFun2}),
|
|
|
|
KeyHashList4 = HTFolder4(),
|
|
|
|
?assertMatch(KeyHashList1, lists:usort(KeyHashList4)),
|
|
|
|
|
2016-11-02 15:38:51 +00:00
|
|
|
ok = book_close(Bookie1),
|
|
|
|
reset_filestructure().
|
|
|
|
|
2017-04-07 14:19:25 +00:00
|
|
|
|
2017-04-07 14:56:28 +00:00
|
|
|
foldobjects_vs_foldheads_bybucket_test() ->
|
|
|
|
RootPath = reset_filestructure(),
|
|
|
|
{ok, Bookie1} = book_start([{root_path, RootPath},
|
|
|
|
{max_journalsize, 1000000},
|
|
|
|
{cache_size, 500}]),
|
|
|
|
ObjL1 = generate_multiple_objects(400, 1),
|
|
|
|
ObjL2 = generate_multiple_objects(400, 1),
|
|
|
|
% Put in all the objects with a TTL in the future
|
|
|
|
Future = leveled_codec:integer_now() + 300,
|
|
|
|
lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1,
|
|
|
|
"BucketA", K, V, S,
|
|
|
|
?STD_TAG,
|
|
|
|
Future) end,
|
|
|
|
ObjL1),
|
|
|
|
lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1,
|
|
|
|
"BucketB", K, V, S,
|
|
|
|
?STD_TAG,
|
|
|
|
Future) end,
|
|
|
|
ObjL2),
|
|
|
|
|
|
|
|
FoldObjectsFun = fun(B, K, V, Acc) ->
|
|
|
|
[{B, K, erlang:phash2(term_to_binary(V))}|Acc] end,
|
|
|
|
{async, HTFolder1A} =
|
|
|
|
book_returnfolder(Bookie1,
|
|
|
|
{foldobjects_bybucket,
|
|
|
|
?STD_TAG,
|
|
|
|
"BucketA",
|
|
|
|
FoldObjectsFun}),
|
|
|
|
KeyHashList1A = HTFolder1A(),
|
|
|
|
{async, HTFolder1B} =
|
|
|
|
book_returnfolder(Bookie1,
|
|
|
|
{foldobjects_bybucket,
|
|
|
|
?STD_TAG,
|
|
|
|
"BucketB",
|
|
|
|
FoldObjectsFun}),
|
|
|
|
KeyHashList1B = HTFolder1B(),
|
|
|
|
?assertMatch(false,
|
|
|
|
lists:usort(KeyHashList1A) == lists:usort(KeyHashList1B)),
|
|
|
|
|
|
|
|
FoldHeadsFun =
|
|
|
|
fun(B, K, ProxyV, Acc) ->
|
|
|
|
{proxy_object,
|
|
|
|
_MDBin,
|
|
|
|
_Size,
|
|
|
|
{FetchFun, Clone, JK}} = ProxyV,
|
|
|
|
V = FetchFun(Clone, JK),
|
|
|
|
[{B, K, erlang:phash2(term_to_binary(V))}|Acc]
|
|
|
|
end,
|
|
|
|
|
|
|
|
{async, HTFolder2A} =
|
|
|
|
book_returnfolder(Bookie1,
|
|
|
|
{foldheads_bybucket,
|
|
|
|
?STD_TAG,
|
|
|
|
"BucketA",
|
|
|
|
FoldHeadsFun}),
|
|
|
|
KeyHashList2A = HTFolder2A(),
|
|
|
|
{async, HTFolder2B} =
|
|
|
|
book_returnfolder(Bookie1,
|
|
|
|
{foldheads_bybucket,
|
|
|
|
?STD_TAG,
|
|
|
|
"BucketB",
|
|
|
|
FoldHeadsFun}),
|
|
|
|
KeyHashList2B = HTFolder2B(),
|
|
|
|
?assertMatch(true,
|
|
|
|
lists:usort(KeyHashList1A) == lists:usort(KeyHashList2A)),
|
|
|
|
?assertMatch(true,
|
|
|
|
lists:usort(KeyHashList1B) == lists:usort(KeyHashList2B)),
|
|
|
|
|
|
|
|
ok = book_close(Bookie1),
|
|
|
|
reset_filestructure().
|
|
|
|
|
2017-04-07 14:19:25 +00:00
|
|
|
|
2017-03-02 17:49:43 +00:00
|
|
|
scan_table_test() ->
|
|
|
|
K1 = leveled_codec:to_ledgerkey(<<"B1">>,
|
|
|
|
<<"K1">>,
|
|
|
|
?IDX_TAG,
|
|
|
|
<<"F1-bin">>,
|
|
|
|
<<"AA1">>),
|
|
|
|
K2 = leveled_codec:to_ledgerkey(<<"B1">>,
|
|
|
|
<<"K2">>,
|
|
|
|
?IDX_TAG,
|
|
|
|
<<"F1-bin">>,
|
|
|
|
<<"AA1">>),
|
|
|
|
K3 = leveled_codec:to_ledgerkey(<<"B1">>,
|
|
|
|
<<"K3">>,
|
|
|
|
?IDX_TAG,
|
|
|
|
<<"F1-bin">>,
|
|
|
|
<<"AB1">>),
|
|
|
|
K4 = leveled_codec:to_ledgerkey(<<"B1">>,
|
|
|
|
<<"K4">>,
|
|
|
|
?IDX_TAG,
|
|
|
|
<<"F1-bin">>,
|
|
|
|
<<"AA2">>),
|
|
|
|
K5 = leveled_codec:to_ledgerkey(<<"B2">>,
|
|
|
|
<<"K5">>,
|
|
|
|
?IDX_TAG,
|
|
|
|
<<"F1-bin">>,
|
|
|
|
<<"AA2">>),
|
|
|
|
Tab0 = ets:new(mem, [ordered_set]),
|
|
|
|
|
|
|
|
SK_A0 = leveled_codec:to_ledgerkey(<<"B1">>,
|
|
|
|
null,
|
|
|
|
?IDX_TAG,
|
|
|
|
<<"F1-bin">>,
|
|
|
|
<<"AA0">>),
|
|
|
|
EK_A9 = leveled_codec:to_ledgerkey(<<"B1">>,
|
|
|
|
null,
|
|
|
|
?IDX_TAG,
|
|
|
|
<<"F1-bin">>,
|
|
|
|
<<"AA9">>),
|
|
|
|
Empty = {[], infinity, 0},
|
|
|
|
?assertMatch(Empty,
|
|
|
|
scan_table(Tab0, SK_A0, EK_A9)),
|
|
|
|
ets:insert(Tab0, [{K1, {1, active, no_lookup, null}}]),
|
|
|
|
?assertMatch({[{K1, _}], 1, 1},
|
|
|
|
scan_table(Tab0, SK_A0, EK_A9)),
|
|
|
|
ets:insert(Tab0, [{K2, {2, active, no_lookup, null}}]),
|
|
|
|
?assertMatch({[{K1, _}, {K2, _}], 1, 2},
|
|
|
|
scan_table(Tab0, SK_A0, EK_A9)),
|
|
|
|
ets:insert(Tab0, [{K3, {3, active, no_lookup, null}}]),
|
|
|
|
?assertMatch({[{K1, _}, {K2, _}], 1, 2},
|
|
|
|
scan_table(Tab0, SK_A0, EK_A9)),
|
|
|
|
ets:insert(Tab0, [{K4, {4, active, no_lookup, null}}]),
|
|
|
|
?assertMatch({[{K1, _}, {K2, _}, {K4, _}], 1, 4},
|
|
|
|
scan_table(Tab0, SK_A0, EK_A9)),
|
|
|
|
ets:insert(Tab0, [{K5, {5, active, no_lookup, null}}]),
|
|
|
|
?assertMatch({[{K1, _}, {K2, _}, {K4, _}], 1, 4},
|
|
|
|
scan_table(Tab0, SK_A0, EK_A9)).
|
|
|
|
|
2017-02-26 22:41:25 +00:00
|
|
|
longrunning_test() ->
|
|
|
|
SW = os:timestamp(),
|
|
|
|
timer:sleep(100),
|
2017-02-27 20:23:36 +00:00
|
|
|
ok = maybe_longrunning(SW, put).
|
2017-02-26 22:41:25 +00:00
|
|
|
|
2016-11-14 20:43:38 +00:00
|
|
|
coverage_cheat_test() ->
|
|
|
|
{noreply, _State0} = handle_info(timeout, #state{}),
|
|
|
|
{ok, _State1} = code_change(null, #state{}, null),
|
|
|
|
{noreply, _State2} = handle_cast(null, #state{}).
|
|
|
|
|
2016-11-02 15:38:51 +00:00
|
|
|
|
2016-11-05 15:59:31 +00:00
|
|
|
-endif.
|