2016-08-02 13:44:48 +01:00
|
|
|
%% -------- Overview ---------
|
|
|
|
%%
|
|
|
|
%% The eleveleddb is based on the LSM-tree similar to leveldb, except that:
|
|
|
|
%% - Keys, Metadata and Values are not persisted together - the Keys and
|
|
|
|
%% Metadata are kept in a tree-based ledger, whereas the values are stored
|
|
|
|
%% only in a sequential Journal.
|
|
|
|
%% - Different file formats are used for Journal (based on constant
|
|
|
|
%% database), and the ledger (sft, based on sst)
|
2016-11-07 11:02:38 +00:00
|
|
|
%% - It is not intended to be general purpose, but be primarily suited for
|
2016-08-02 13:44:48 +01:00
|
|
|
%% use as a Riak backend in specific circumstances (relatively large values,
|
|
|
|
%% and frequent use of iterators)
|
|
|
|
%% - The Journal is an extended nursery log in leveldb terms. It is keyed
|
|
|
|
%% on the sequence number of the write
|
2016-10-03 23:34:28 +01:00
|
|
|
%% - The ledger is a merge tree, where the key is the actaul object key, and
|
2016-08-02 13:44:48 +01:00
|
|
|
%% the value is the metadata of the object including the sequence number
|
|
|
|
%%
|
|
|
|
%%
|
2016-11-07 11:07:12 +00:00
|
|
|
%% -------- Actors ---------
|
2016-08-02 13:44:48 +01:00
|
|
|
%%
|
|
|
|
%% The store is fronted by a Bookie, who takes support from different actors:
|
2016-10-09 22:33:45 +01:00
|
|
|
%% - An Inker who persists new data into the journal, and returns items from
|
2016-08-02 13:44:48 +01:00
|
|
|
%% the journal based on sequence number
|
2016-10-09 22:33:45 +01:00
|
|
|
%% - A Penciller who periodically redraws the ledger, that associates keys with
|
|
|
|
%% sequence numbers and other metadata, as well as secondary keys (for index
|
|
|
|
%% queries)
|
2016-08-02 13:44:48 +01:00
|
|
|
%% - One or more Clerks, who may be used by either the inker or the penciller
|
|
|
|
%% to fulfill background tasks
|
|
|
|
%%
|
|
|
|
%% Both the Inker and the Penciller maintain a manifest of the files which
|
|
|
|
%% represent the current state of the Journal and the Ledger repsectively.
|
|
|
|
%% For the Inker the manifest maps ranges of sequence numbers to cdb files.
|
|
|
|
%% For the Penciller the manifest maps key ranges to files at each level of
|
|
|
|
%% the Ledger.
|
|
|
|
%%
|
|
|
|
%% -------- PUT --------
|
|
|
|
%%
|
|
|
|
%% A PUT request consists of
|
2016-09-15 10:53:24 +01:00
|
|
|
%% - A Primary Key and a Value
|
|
|
|
%% - IndexSpecs - a set of secondary key changes associated with the
|
|
|
|
%% transaction
|
2016-08-02 13:44:48 +01:00
|
|
|
%%
|
2016-11-07 11:02:38 +00:00
|
|
|
%% The Bookie takes the request and passes it first to the Inker to add the
|
|
|
|
%% request to the journal.
|
2016-08-02 13:44:48 +01:00
|
|
|
%%
|
2016-09-15 10:53:24 +01:00
|
|
|
%% The inker will pass the PK/Value/IndexSpecs to the current (append only)
|
|
|
|
%% CDB journal file to persist the change. The call should return either 'ok'
|
|
|
|
%% or 'roll'. -'roll' indicates that the CDB file has insufficient capacity for
|
2016-11-07 11:02:38 +00:00
|
|
|
%% this write, and a new journal file should be created (with appropriate
|
|
|
|
%% manifest changes to be made).
|
2016-09-15 10:53:24 +01:00
|
|
|
%%
|
2016-11-07 11:02:38 +00:00
|
|
|
%% The inker will return the SQN which the change has been made at, as well as
|
|
|
|
%% the object size on disk within the Journal.
|
2016-09-15 10:53:24 +01:00
|
|
|
%%
|
|
|
|
%% Once the object has been persisted to the Journal, the Ledger can be updated.
|
2016-10-09 22:33:45 +01:00
|
|
|
%% The Ledger is updated by the Bookie applying a function (extract_metadata/4)
|
|
|
|
%% to the Value to return the Object Metadata, a function to generate a hash
|
|
|
|
%% of the Value and also taking the Primary Key, the IndexSpecs, the Sequence
|
|
|
|
%% Number in the Journal and the Object Size (returned from the Inker).
|
2016-09-15 10:53:24 +01:00
|
|
|
%%
|
2016-11-07 11:02:38 +00:00
|
|
|
%% A set of Ledger Key changes are then generated and placed in the Bookie's
|
|
|
|
%% Ledger Key cache (a gb_tree).
|
|
|
|
%%
|
|
|
|
%% The PUT can now be acknowledged. In the background the Bookie may then
|
|
|
|
%% choose to push the cache to the Penciller for eventual persistence within
|
|
|
|
%% the ledger. This push will either be acccepted or returned (if the
|
|
|
|
%% Penciller has a backlog of key changes). The back-pressure should lead to
|
|
|
|
%% the Bookie entering into a slow-offer status whereby the next PUT will be
|
|
|
|
%% acknowledged by a PAUSE signal - with the expectation that the this will
|
|
|
|
%% lead to a back-off behaviour.
|
2016-08-02 13:44:48 +01:00
|
|
|
%%
|
2016-11-07 11:02:38 +00:00
|
|
|
%% -------- GET, HEAD --------
|
2016-08-02 13:44:48 +01:00
|
|
|
%%
|
2016-11-07 11:02:38 +00:00
|
|
|
%% The Bookie supports both GET and HEAD requests, with the HEAD request
|
|
|
|
%% returning only the metadata and not the actual object value. The HEAD
|
|
|
|
%% requets cna be serviced by reference to the Ledger Cache and the Penciller.
|
|
|
|
%%
|
|
|
|
%% GET requests first follow the path of a HEAD request, and if an object is
|
|
|
|
%% found, then fetch the value from the Journal via the Inker.
|
2016-08-02 13:44:48 +01:00
|
|
|
%%
|
2016-11-07 11:07:12 +00:00
|
|
|
%% -------- Snapshots/Clones --------
|
2016-08-02 13:44:48 +01:00
|
|
|
%%
|
|
|
|
%% If there is a snapshot request (e.g. to iterate over the keys) the Bookie
|
2016-10-09 22:33:45 +01:00
|
|
|
%% may request a clone of the Penciller, or the Penciller and the Inker.
|
|
|
|
%%
|
2016-11-07 11:02:38 +00:00
|
|
|
%% The clone is seeded with the manifest. The clone should be registered with
|
2016-10-09 22:33:45 +01:00
|
|
|
%% the real Inker/Penciller, so that the real Inker/Penciller may prevent the
|
|
|
|
%% deletion of files still in use by a snapshot clone.
|
2016-08-02 13:44:48 +01:00
|
|
|
%%
|
|
|
|
%% Iterators should de-register themselves from the Penciller on completion.
|
|
|
|
%% Iterators should be automatically release after a timeout period. A file
|
|
|
|
%% can only be deleted from the Ledger if it is no longer in the manifest, and
|
|
|
|
%% there are no registered iterators from before the point the file was
|
|
|
|
%% removed from the manifest.
|
|
|
|
%%
|
2016-09-15 10:53:24 +01:00
|
|
|
%% -------- On Startup --------
|
|
|
|
%%
|
|
|
|
%% On startup the Bookie must restart both the Inker to load the Journal, and
|
|
|
|
%% the Penciller to load the Ledger. Once the Penciller has started, the
|
|
|
|
%% Bookie should request the highest sequence number in the Ledger, and then
|
2016-10-09 22:33:45 +01:00
|
|
|
%% and try and rebuild any missing information from the Journal.
|
2016-09-15 10:53:24 +01:00
|
|
|
%%
|
|
|
|
%% To rebuild the Ledger it requests the Inker to scan over the files from
|
2016-10-09 22:33:45 +01:00
|
|
|
%% the sequence number and re-generate the Ledger changes - pushing the changes
|
|
|
|
%% directly back into the Ledger.
|
2016-09-15 10:53:24 +01:00
|
|
|
|
2016-08-02 13:44:48 +01:00
|
|
|
|
|
|
|
|
|
|
|
-module(leveled_bookie).
|
|
|
|
|
2016-09-08 14:21:30 +01:00
|
|
|
-behaviour(gen_server).
|
|
|
|
|
2016-10-18 01:59:03 +01:00
|
|
|
-include("include/leveled.hrl").
|
2016-09-08 14:21:30 +01:00
|
|
|
|
|
|
|
-export([init/1,
|
|
|
|
handle_call/3,
|
|
|
|
handle_cast/2,
|
|
|
|
handle_info/2,
|
|
|
|
terminate/2,
|
2016-09-15 10:53:24 +01:00
|
|
|
code_change/3,
|
|
|
|
book_start/1,
|
2016-10-19 17:34:58 +01:00
|
|
|
book_start/3,
|
2016-10-14 18:43:16 +01:00
|
|
|
book_put/5,
|
2016-10-31 12:12:06 +00:00
|
|
|
book_put/6,
|
|
|
|
book_tempput/7,
|
2016-10-16 15:41:09 +01:00
|
|
|
book_delete/4,
|
2016-10-14 18:43:16 +01:00
|
|
|
book_get/3,
|
2016-11-07 10:42:49 +00:00
|
|
|
book_get/4,
|
2016-10-14 18:43:16 +01:00
|
|
|
book_head/3,
|
2016-11-07 10:42:49 +00:00
|
|
|
book_head/4,
|
2016-10-12 17:12:49 +01:00
|
|
|
book_returnfolder/2,
|
2016-09-23 18:50:29 +01:00
|
|
|
book_snapshotstore/3,
|
|
|
|
book_snapshotledger/3,
|
2016-10-03 23:34:28 +01:00
|
|
|
book_compactjournal/2,
|
2016-10-26 20:39:16 +01:00
|
|
|
book_islastcompactionpending/1,
|
2016-11-21 12:34:40 +00:00
|
|
|
book_close/1,
|
|
|
|
book_destroy/1]).
|
2016-09-08 14:21:30 +01:00
|
|
|
|
2016-11-02 12:58:27 +00:00
|
|
|
-export([get_opt/2,
|
|
|
|
get_opt/3]).
|
|
|
|
|
2016-09-08 14:21:30 +01:00
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
|
2016-10-31 15:18:21 +00:00
|
|
|
-define(CACHE_SIZE, 2000).
|
2016-09-15 15:14:49 +01:00
|
|
|
-define(JOURNAL_FP, "journal").
|
|
|
|
-define(LEDGER_FP, "ledger").
|
2016-10-05 09:54:53 +01:00
|
|
|
-define(SNAPSHOT_TIMEOUT, 300000).
|
2016-10-31 18:51:23 +00:00
|
|
|
-define(CHECKJOURNAL_PROB, 0.2).
|
2016-09-08 14:21:30 +01:00
|
|
|
|
|
|
|
-record(state, {inker :: pid(),
|
2016-09-15 10:53:24 +01:00
|
|
|
penciller :: pid(),
|
|
|
|
cache_size :: integer(),
|
2016-11-25 14:50:13 +00:00
|
|
|
ledger_cache :: list(), % a skiplist
|
2016-11-05 14:31:10 +00:00
|
|
|
is_snapshot :: boolean(),
|
|
|
|
slow_offer = false :: boolean()}).
|
2016-09-09 15:58:19 +01:00
|
|
|
|
|
|
|
|
2016-09-08 14:21:30 +01:00
|
|
|
|
|
|
|
%%%============================================================================
|
|
|
|
%%% API
|
|
|
|
%%%============================================================================
|
|
|
|
|
2016-10-19 17:34:58 +01:00
|
|
|
book_start(RootPath, LedgerCacheSize, JournalSize) ->
|
2016-11-02 12:58:27 +00:00
|
|
|
book_start([{root_path, RootPath},
|
|
|
|
{cache_size, LedgerCacheSize},
|
|
|
|
{max_journalsize, JournalSize}]).
|
2016-10-19 17:34:58 +01:00
|
|
|
|
2016-09-15 10:53:24 +01:00
|
|
|
book_start(Opts) ->
|
|
|
|
gen_server:start(?MODULE, [Opts], []).
|
|
|
|
|
2016-10-14 18:43:16 +01:00
|
|
|
|
2016-10-31 12:12:06 +00:00
|
|
|
book_tempput(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) when is_integer(TTL) ->
|
|
|
|
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL).
|
|
|
|
|
2016-10-14 18:43:16 +01:00
|
|
|
book_put(Pid, Bucket, Key, Object, IndexSpecs) ->
|
2016-10-16 15:41:09 +01:00
|
|
|
book_put(Pid, Bucket, Key, Object, IndexSpecs, ?STD_TAG).
|
|
|
|
|
2016-10-31 12:12:06 +00:00
|
|
|
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag) ->
|
|
|
|
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, infinity).
|
|
|
|
|
2016-10-16 15:41:09 +01:00
|
|
|
book_delete(Pid, Bucket, Key, IndexSpecs) ->
|
|
|
|
book_put(Pid, Bucket, Key, delete, IndexSpecs, ?STD_TAG).
|
2016-09-08 14:21:30 +01:00
|
|
|
|
2016-10-14 18:43:16 +01:00
|
|
|
book_get(Pid, Bucket, Key) ->
|
2016-10-16 15:41:09 +01:00
|
|
|
book_get(Pid, Bucket, Key, ?STD_TAG).
|
2016-09-15 10:53:24 +01:00
|
|
|
|
2016-10-14 18:43:16 +01:00
|
|
|
book_head(Pid, Bucket, Key) ->
|
2016-10-16 15:41:09 +01:00
|
|
|
book_head(Pid, Bucket, Key, ?STD_TAG).
|
2016-10-14 18:43:16 +01:00
|
|
|
|
2016-10-31 12:12:06 +00:00
|
|
|
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) ->
|
|
|
|
gen_server:call(Pid,
|
|
|
|
{put, Bucket, Key, Object, IndexSpecs, Tag, TTL},
|
|
|
|
infinity).
|
2016-10-14 18:43:16 +01:00
|
|
|
|
|
|
|
book_get(Pid, Bucket, Key, Tag) ->
|
|
|
|
gen_server:call(Pid, {get, Bucket, Key, Tag}, infinity).
|
|
|
|
|
|
|
|
book_head(Pid, Bucket, Key, Tag) ->
|
|
|
|
gen_server:call(Pid, {head, Bucket, Key, Tag}, infinity).
|
2016-09-08 14:21:30 +01:00
|
|
|
|
2016-10-12 17:12:49 +01:00
|
|
|
book_returnfolder(Pid, FolderType) ->
|
|
|
|
gen_server:call(Pid, {return_folder, FolderType}, infinity).
|
|
|
|
|
2016-09-23 18:50:29 +01:00
|
|
|
book_snapshotstore(Pid, Requestor, Timeout) ->
|
|
|
|
gen_server:call(Pid, {snapshot, Requestor, store, Timeout}, infinity).
|
|
|
|
|
|
|
|
book_snapshotledger(Pid, Requestor, Timeout) ->
|
|
|
|
gen_server:call(Pid, {snapshot, Requestor, ledger, Timeout}, infinity).
|
|
|
|
|
2016-10-03 23:34:28 +01:00
|
|
|
book_compactjournal(Pid, Timeout) ->
|
|
|
|
gen_server:call(Pid, {compact_journal, Timeout}, infinity).
|
|
|
|
|
2016-10-26 20:39:16 +01:00
|
|
|
book_islastcompactionpending(Pid) ->
|
|
|
|
gen_server:call(Pid, confirm_compact, infinity).
|
|
|
|
|
2016-09-15 15:14:49 +01:00
|
|
|
book_close(Pid) ->
|
|
|
|
gen_server:call(Pid, close, infinity).
|
|
|
|
|
2016-11-21 12:34:40 +00:00
|
|
|
book_destroy(Pid) ->
|
|
|
|
gen_server:call(Pid, destroy, infinity).
|
|
|
|
|
2016-09-08 14:21:30 +01:00
|
|
|
%%%============================================================================
|
|
|
|
%%% gen_server callbacks
|
|
|
|
%%%============================================================================
|
|
|
|
|
|
|
|
init([Opts]) ->
|
2016-11-02 12:58:27 +00:00
|
|
|
case get_opt(snapshot_bookie, Opts) of
|
2016-10-05 09:54:53 +01:00
|
|
|
undefined ->
|
|
|
|
% Start from file not snapshot
|
|
|
|
{InkerOpts, PencillerOpts} = set_options(Opts),
|
|
|
|
{Inker, Penciller} = startup(InkerOpts, PencillerOpts),
|
2016-11-02 12:58:27 +00:00
|
|
|
CacheSize = get_opt(cache_size, Opts, ?CACHE_SIZE),
|
2016-11-02 18:14:46 +00:00
|
|
|
leveled_log:log("B0001", [Inker, Penciller]),
|
2016-10-05 09:54:53 +01:00
|
|
|
{ok, #state{inker=Inker,
|
|
|
|
penciller=Penciller,
|
|
|
|
cache_size=CacheSize,
|
2016-11-25 14:50:13 +00:00
|
|
|
ledger_cache=leveled_skiplist:empty(),
|
2016-10-05 09:54:53 +01:00
|
|
|
is_snapshot=false}};
|
|
|
|
Bookie ->
|
|
|
|
{ok,
|
|
|
|
{Penciller, LedgerCache},
|
|
|
|
Inker} = book_snapshotstore(Bookie, self(), ?SNAPSHOT_TIMEOUT),
|
2016-10-27 20:56:18 +01:00
|
|
|
ok = leveled_penciller:pcl_loadsnapshot(Penciller,
|
2016-11-25 14:50:13 +00:00
|
|
|
leveled_skiplist:empty()),
|
2016-11-02 18:14:46 +00:00
|
|
|
leveled_log:log("B0002", [Inker, Penciller]),
|
2016-10-05 09:54:53 +01:00
|
|
|
{ok, #state{penciller=Penciller,
|
|
|
|
inker=Inker,
|
|
|
|
ledger_cache=LedgerCache,
|
|
|
|
is_snapshot=true}}
|
|
|
|
end.
|
2016-09-15 10:53:24 +01:00
|
|
|
|
|
|
|
|
2016-10-31 12:12:06 +00:00
|
|
|
handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) ->
|
2016-10-14 18:43:16 +01:00
|
|
|
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
|
2016-09-15 15:14:49 +01:00
|
|
|
{ok, SQN, ObjSize} = leveled_inker:ink_put(State#state.inker,
|
2016-10-14 18:43:16 +01:00
|
|
|
LedgerKey,
|
2016-09-15 15:14:49 +01:00
|
|
|
Object,
|
2016-10-31 12:12:06 +00:00
|
|
|
{IndexSpecs, TTL}),
|
2016-10-25 23:13:14 +01:00
|
|
|
Changes = preparefor_ledgercache(no_type_assigned,
|
|
|
|
LedgerKey,
|
2016-09-15 10:53:24 +01:00
|
|
|
SQN,
|
|
|
|
Object,
|
|
|
|
ObjSize,
|
2016-10-31 12:12:06 +00:00
|
|
|
{IndexSpecs, TTL}),
|
2016-09-15 10:53:24 +01:00
|
|
|
Cache0 = addto_ledgercache(Changes, State#state.ledger_cache),
|
2016-11-05 14:31:10 +00:00
|
|
|
% If the previous push to memory was returned then punish this PUT with a
|
|
|
|
% delay. If the back-pressure in the Penciller continues, these delays
|
|
|
|
% will beocme more frequent
|
|
|
|
case State#state.slow_offer of
|
|
|
|
true ->
|
2016-11-07 10:27:38 +00:00
|
|
|
gen_server:reply(From, pause);
|
2016-11-05 14:31:10 +00:00
|
|
|
false ->
|
2016-11-07 10:27:38 +00:00
|
|
|
gen_server:reply(From, ok)
|
2016-11-05 14:31:10 +00:00
|
|
|
end,
|
|
|
|
case maybepush_ledgercache(State#state.cache_size,
|
2016-09-15 10:53:24 +01:00
|
|
|
Cache0,
|
2016-11-05 14:31:10 +00:00
|
|
|
State#state.penciller) of
|
|
|
|
{ok, NewCache} ->
|
|
|
|
{noreply, State#state{ledger_cache=NewCache, slow_offer=false}};
|
|
|
|
{returned, NewCache} ->
|
|
|
|
{noreply, State#state{ledger_cache=NewCache, slow_offer=true}}
|
|
|
|
end;
|
2016-10-14 18:43:16 +01:00
|
|
|
handle_call({get, Bucket, Key, Tag}, _From, State) ->
|
|
|
|
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
|
|
|
|
case fetch_head(LedgerKey,
|
|
|
|
State#state.penciller,
|
|
|
|
State#state.ledger_cache) of
|
2016-09-15 10:53:24 +01:00
|
|
|
not_present ->
|
|
|
|
{reply, not_found, State};
|
|
|
|
Head ->
|
2016-10-13 21:02:15 +01:00
|
|
|
{Seqn, Status, _MD} = leveled_codec:striphead_to_details(Head),
|
2016-09-15 10:53:24 +01:00
|
|
|
case Status of
|
2016-10-16 15:41:09 +01:00
|
|
|
tomb ->
|
2016-09-15 10:53:24 +01:00
|
|
|
{reply, not_found, State};
|
2016-10-31 12:12:06 +00:00
|
|
|
{active, TS} ->
|
|
|
|
Active = TS >= leveled_codec:integer_now(),
|
|
|
|
case {Active,
|
|
|
|
fetch_value(LedgerKey, Seqn, State#state.inker)} of
|
2016-10-31 20:58:19 +00:00
|
|
|
{_, not_present} ->
|
|
|
|
{reply, not_found, State};
|
2016-10-31 12:12:06 +00:00
|
|
|
{true, Object} ->
|
|
|
|
{reply, {ok, Object}, State};
|
|
|
|
_ ->
|
|
|
|
{reply, not_found, State}
|
2016-09-15 10:53:24 +01:00
|
|
|
end
|
|
|
|
end
|
|
|
|
end;
|
2016-10-14 18:43:16 +01:00
|
|
|
handle_call({head, Bucket, Key, Tag}, _From, State) ->
|
|
|
|
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
|
|
|
|
case fetch_head(LedgerKey,
|
|
|
|
State#state.penciller,
|
|
|
|
State#state.ledger_cache) of
|
2016-09-15 10:53:24 +01:00
|
|
|
not_present ->
|
|
|
|
{reply, not_found, State};
|
|
|
|
Head ->
|
2016-10-13 21:02:15 +01:00
|
|
|
{_Seqn, Status, MD} = leveled_codec:striphead_to_details(Head),
|
2016-09-15 10:53:24 +01:00
|
|
|
case Status of
|
2016-10-16 15:41:09 +01:00
|
|
|
tomb ->
|
2016-09-15 10:53:24 +01:00
|
|
|
{reply, not_found, State};
|
2016-10-31 12:12:06 +00:00
|
|
|
{active, TS} ->
|
|
|
|
case TS >= leveled_codec:integer_now() of
|
|
|
|
true ->
|
|
|
|
OMD = leveled_codec:build_metadata_object(LedgerKey, MD),
|
|
|
|
{reply, {ok, OMD}, State};
|
|
|
|
false ->
|
|
|
|
{reply, not_found, State}
|
|
|
|
end
|
2016-09-15 10:53:24 +01:00
|
|
|
end
|
2016-09-15 15:14:49 +01:00
|
|
|
end;
|
2016-10-07 10:04:48 +01:00
|
|
|
handle_call({snapshot, _Requestor, SnapType, _Timeout}, _From, State) ->
|
2016-10-31 17:26:28 +00:00
|
|
|
Reply = snapshot_store(State, SnapType),
|
|
|
|
{reply, Reply, State};
|
2016-10-12 17:12:49 +01:00
|
|
|
handle_call({return_folder, FolderType}, _From, State) ->
|
|
|
|
case FolderType of
|
2016-10-31 12:12:06 +00:00
|
|
|
{bucket_stats, Bucket} ->
|
|
|
|
{reply,
|
2016-10-31 17:26:28 +00:00
|
|
|
bucket_stats(State, Bucket, ?STD_TAG),
|
2016-10-31 12:12:06 +00:00
|
|
|
State};
|
2016-10-14 18:43:16 +01:00
|
|
|
{riakbucket_stats, Bucket} ->
|
2016-10-12 17:12:49 +01:00
|
|
|
{reply,
|
2016-10-31 17:26:28 +00:00
|
|
|
bucket_stats(State, Bucket, ?RIAK_TAG),
|
2016-10-18 01:59:03 +01:00
|
|
|
State};
|
2016-11-20 21:21:31 +00:00
|
|
|
{binary_bucketlist, Tag, {FoldKeysFun, Acc}} ->
|
|
|
|
{reply,
|
|
|
|
binary_bucketlist(State, Tag, {FoldKeysFun, Acc}),
|
|
|
|
State};
|
2016-10-18 01:59:03 +01:00
|
|
|
{index_query,
|
2016-11-18 15:53:22 +00:00
|
|
|
Constraint,
|
2016-11-18 11:53:14 +00:00
|
|
|
{FoldKeysFun, Acc},
|
2016-10-18 01:59:03 +01:00
|
|
|
{IdxField, StartValue, EndValue},
|
|
|
|
{ReturnTerms, TermRegex}} ->
|
|
|
|
{reply,
|
2016-10-31 17:26:28 +00:00
|
|
|
index_query(State,
|
2016-11-18 15:53:22 +00:00
|
|
|
Constraint,
|
2016-11-18 11:53:14 +00:00
|
|
|
{FoldKeysFun, Acc},
|
2016-10-18 01:59:03 +01:00
|
|
|
{IdxField, StartValue, EndValue},
|
|
|
|
{ReturnTerms, TermRegex}),
|
2016-10-23 22:45:43 +01:00
|
|
|
State};
|
2016-11-18 15:53:22 +00:00
|
|
|
{keylist, Tag, {FoldKeysFun, Acc}} ->
|
2016-10-31 17:26:28 +00:00
|
|
|
{reply,
|
2016-11-18 15:53:22 +00:00
|
|
|
allkey_query(State, Tag, {FoldKeysFun, Acc}),
|
2016-10-31 17:26:28 +00:00
|
|
|
State};
|
2016-11-18 15:53:22 +00:00
|
|
|
{keylist, Tag, Bucket, {FoldKeysFun, Acc}} ->
|
2016-11-17 15:55:29 +00:00
|
|
|
{reply,
|
2016-11-18 15:53:22 +00:00
|
|
|
bucketkey_query(State, Tag, Bucket, {FoldKeysFun, Acc}),
|
2016-11-17 15:55:29 +00:00
|
|
|
State};
|
2016-10-31 16:02:32 +00:00
|
|
|
{hashtree_query, Tag, JournalCheck} ->
|
2016-10-31 17:26:28 +00:00
|
|
|
{reply,
|
|
|
|
hashtree_query(State, Tag, JournalCheck),
|
2016-11-02 15:38:51 +00:00
|
|
|
State};
|
|
|
|
{foldobjects_allkeys, Tag, FoldObjectsFun} ->
|
|
|
|
{reply,
|
|
|
|
foldobjects_allkeys(State, Tag, FoldObjectsFun),
|
2016-11-04 11:01:37 +00:00
|
|
|
State};
|
|
|
|
{foldobjects_bybucket, Tag, Bucket, FoldObjectsFun} ->
|
|
|
|
{reply,
|
|
|
|
foldobjects_bybucket(State, Tag, Bucket, FoldObjectsFun),
|
|
|
|
State};
|
|
|
|
{foldobjects_byindex,
|
|
|
|
Tag,
|
|
|
|
Bucket,
|
|
|
|
{Field, FromTerm, ToTerm},
|
|
|
|
FoldObjectsFun} ->
|
|
|
|
{reply,
|
|
|
|
foldobjects_byindex(State,
|
|
|
|
Tag, Bucket,
|
|
|
|
Field, FromTerm, ToTerm,
|
|
|
|
FoldObjectsFun),
|
2016-10-31 17:26:28 +00:00
|
|
|
State}
|
2016-11-04 11:01:37 +00:00
|
|
|
|
2016-10-12 17:12:49 +01:00
|
|
|
end;
|
2016-10-03 23:34:28 +01:00
|
|
|
handle_call({compact_journal, Timeout}, _From, State) ->
|
|
|
|
ok = leveled_inker:ink_compactjournal(State#state.inker,
|
2016-10-05 18:28:31 +01:00
|
|
|
self(),
|
2016-10-03 23:34:28 +01:00
|
|
|
Timeout),
|
|
|
|
{reply, ok, State};
|
2016-10-26 20:39:16 +01:00
|
|
|
handle_call(confirm_compact, _From, State) ->
|
|
|
|
{reply, leveled_inker:ink_compactionpending(State#state.inker), State};
|
2016-09-15 15:14:49 +01:00
|
|
|
handle_call(close, _From, State) ->
|
2016-11-21 12:34:40 +00:00
|
|
|
{stop, normal, ok, State};
|
2016-11-21 12:35:20 +00:00
|
|
|
handle_call(destroy, _From, State=#state{is_snapshot=Snp}) when Snp == false ->
|
2016-11-21 12:34:40 +00:00
|
|
|
{stop, destroy, ok, State}.
|
2016-09-08 14:21:30 +01:00
|
|
|
|
|
|
|
handle_cast(_Msg, State) ->
|
|
|
|
{noreply, State}.
|
|
|
|
|
|
|
|
handle_info(_Info, State) ->
|
|
|
|
{noreply, State}.
|
|
|
|
|
2016-11-21 12:34:40 +00:00
|
|
|
terminate(destroy, State) ->
|
|
|
|
leveled_log:log("B0011", []),
|
|
|
|
{ok, InkPathList} = leveled_inker:ink_doom(State#state.inker),
|
|
|
|
{ok, PCLPathList} = leveled_penciller:pcl_doom(State#state.penciller),
|
|
|
|
lists:foreach(fun(DirPath) -> delete_path(DirPath) end, InkPathList),
|
|
|
|
lists:foreach(fun(DirPath) -> delete_path(DirPath) end, PCLPathList),
|
|
|
|
ok;
|
2016-09-15 15:14:49 +01:00
|
|
|
terminate(Reason, State) ->
|
2016-11-02 18:14:46 +00:00
|
|
|
leveled_log:log("B0003", [Reason]),
|
2016-11-03 16:05:43 +00:00
|
|
|
ok = leveled_inker:ink_close(State#state.inker),
|
2016-09-15 15:14:49 +01:00
|
|
|
ok = leveled_penciller:pcl_close(State#state.penciller).
|
2016-09-08 14:21:30 +01:00
|
|
|
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
|
|
{ok, State}.
|
|
|
|
|
|
|
|
|
|
|
|
%%%============================================================================
|
|
|
|
%%% Internal functions
|
|
|
|
%%%============================================================================
|
|
|
|
|
2016-10-31 17:26:28 +00:00
|
|
|
bucket_stats(State, Bucket, Tag) ->
|
|
|
|
{ok,
|
|
|
|
{LedgerSnapshot, LedgerCache},
|
|
|
|
_JournalSnapshot} = snapshot_store(State, ledger),
|
2016-10-12 17:12:49 +01:00
|
|
|
Folder = fun() ->
|
2016-11-25 14:50:13 +00:00
|
|
|
leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]),
|
2016-10-12 17:12:49 +01:00
|
|
|
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
|
2016-10-27 20:56:18 +01:00
|
|
|
LedgerCache),
|
2016-10-14 18:43:16 +01:00
|
|
|
StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
|
|
|
EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
2016-10-31 12:12:06 +00:00
|
|
|
AccFun = accumulate_size(),
|
2016-10-12 17:12:49 +01:00
|
|
|
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
|
|
|
StartKey,
|
|
|
|
EndKey,
|
2016-10-31 12:12:06 +00:00
|
|
|
AccFun,
|
2016-10-12 17:12:49 +01:00
|
|
|
{0, 0}),
|
|
|
|
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
|
|
|
Acc
|
|
|
|
end,
|
|
|
|
{async, Folder}.
|
|
|
|
|
2016-11-20 21:21:31 +00:00
|
|
|
|
2016-11-21 14:12:17 +00:00
|
|
|
binary_bucketlist(State, Tag, {FoldBucketsFun, InitAcc}) ->
|
2016-11-20 21:21:31 +00:00
|
|
|
% List buckets for tag, assuming bucket names are all binary type
|
|
|
|
{ok,
|
|
|
|
{LedgerSnapshot, LedgerCache},
|
|
|
|
_JournalSnapshot} = snapshot_store(State, ledger),
|
|
|
|
Folder = fun() ->
|
2016-11-25 14:50:13 +00:00
|
|
|
leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]),
|
2016-11-20 21:21:31 +00:00
|
|
|
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
|
|
|
|
LedgerCache),
|
|
|
|
BucketAcc = get_nextbucket(null,
|
|
|
|
Tag,
|
|
|
|
LedgerSnapshot,
|
|
|
|
[]),
|
|
|
|
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
2016-11-21 14:12:17 +00:00
|
|
|
lists:foldl(fun({B, _K}, Acc) -> FoldBucketsFun(B, Acc) end,
|
2016-11-20 21:21:31 +00:00
|
|
|
InitAcc,
|
|
|
|
BucketAcc)
|
|
|
|
end,
|
|
|
|
{async, Folder}.
|
|
|
|
|
|
|
|
get_nextbucket(NextBucket, Tag, LedgerSnapshot, BKList) ->
|
|
|
|
StartKey = leveled_codec:to_ledgerkey(NextBucket, null, Tag),
|
|
|
|
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
|
|
|
ExtractFun = fun(LK, _V, _Acc) -> leveled_codec:from_ledgerkey(LK) end,
|
|
|
|
BK = leveled_penciller:pcl_fetchnextkey(LedgerSnapshot,
|
|
|
|
StartKey,
|
|
|
|
EndKey,
|
|
|
|
ExtractFun,
|
|
|
|
null),
|
|
|
|
case BK of
|
|
|
|
null ->
|
|
|
|
leveled_log:log("B0008",[]),
|
|
|
|
BKList;
|
|
|
|
{B, K} when is_binary(B) ->
|
|
|
|
leveled_log:log("B0009",[B]),
|
|
|
|
get_nextbucket(<<B/binary, 0>>,
|
|
|
|
Tag,
|
|
|
|
LedgerSnapshot,
|
|
|
|
[{B, K}|BKList]);
|
|
|
|
NB ->
|
|
|
|
leveled_log:log("B0010",[NB]),
|
|
|
|
[]
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
2016-10-31 17:26:28 +00:00
|
|
|
index_query(State,
|
2016-11-18 15:53:22 +00:00
|
|
|
Constraint,
|
2016-11-18 11:53:14 +00:00
|
|
|
{FoldKeysFun, InitAcc},
|
2016-10-18 01:59:03 +01:00
|
|
|
{IdxField, StartValue, EndValue},
|
|
|
|
{ReturnTerms, TermRegex}) ->
|
2016-10-31 17:26:28 +00:00
|
|
|
{ok,
|
|
|
|
{LedgerSnapshot, LedgerCache},
|
|
|
|
_JournalSnapshot} = snapshot_store(State, ledger),
|
2016-11-18 15:53:22 +00:00
|
|
|
{Bucket, StartObjKey} =
|
|
|
|
case Constraint of
|
|
|
|
{B, SK} ->
|
|
|
|
{B, SK};
|
|
|
|
B ->
|
|
|
|
{B, null}
|
|
|
|
end,
|
2016-10-18 01:59:03 +01:00
|
|
|
Folder = fun() ->
|
2016-11-25 14:50:13 +00:00
|
|
|
leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]),
|
2016-10-18 01:59:03 +01:00
|
|
|
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
|
2016-10-27 20:56:18 +01:00
|
|
|
LedgerCache),
|
2016-11-18 15:53:22 +00:00
|
|
|
StartKey = leveled_codec:to_ledgerkey(Bucket,
|
|
|
|
StartObjKey,
|
|
|
|
?IDX_TAG,
|
|
|
|
IdxField,
|
|
|
|
StartValue),
|
|
|
|
EndKey = leveled_codec:to_ledgerkey(Bucket,
|
|
|
|
null,
|
|
|
|
?IDX_TAG,
|
|
|
|
IdxField,
|
|
|
|
EndValue),
|
2016-10-18 01:59:03 +01:00
|
|
|
AddFun = case ReturnTerms of
|
|
|
|
true ->
|
2016-11-18 11:53:14 +00:00
|
|
|
fun add_terms/2;
|
2016-10-18 01:59:03 +01:00
|
|
|
_ ->
|
2016-11-18 11:53:14 +00:00
|
|
|
fun add_keys/2
|
2016-10-18 01:59:03 +01:00
|
|
|
end,
|
2016-11-18 11:53:14 +00:00
|
|
|
AccFun = accumulate_index(TermRegex, AddFun, FoldKeysFun),
|
2016-10-18 01:59:03 +01:00
|
|
|
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
|
|
|
StartKey,
|
|
|
|
EndKey,
|
|
|
|
AccFun,
|
2016-11-18 11:53:14 +00:00
|
|
|
InitAcc),
|
2016-10-18 01:59:03 +01:00
|
|
|
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
|
|
|
Acc
|
|
|
|
end,
|
|
|
|
{async, Folder}.
|
|
|
|
|
2016-10-31 16:02:32 +00:00
|
|
|
|
2016-10-31 17:26:28 +00:00
|
|
|
hashtree_query(State, Tag, JournalCheck) ->
|
|
|
|
SnapType = case JournalCheck of
|
2016-10-31 16:02:32 +00:00
|
|
|
false ->
|
2016-10-31 17:26:28 +00:00
|
|
|
ledger;
|
|
|
|
check_presence ->
|
|
|
|
store
|
2016-10-31 16:02:32 +00:00
|
|
|
end,
|
2016-10-31 17:26:28 +00:00
|
|
|
{ok,
|
|
|
|
{LedgerSnapshot, LedgerCache},
|
|
|
|
JournalSnapshot} = snapshot_store(State, SnapType),
|
2016-10-31 16:02:32 +00:00
|
|
|
Folder = fun() ->
|
2016-11-25 14:50:13 +00:00
|
|
|
leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]),
|
2016-10-31 16:02:32 +00:00
|
|
|
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
|
|
|
|
LedgerCache),
|
|
|
|
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
|
|
|
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
2016-10-31 17:26:28 +00:00
|
|
|
AccFun = accumulate_hashes(JournalCheck, JournalSnapshot),
|
2016-10-31 16:02:32 +00:00
|
|
|
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
|
|
|
StartKey,
|
|
|
|
EndKey,
|
|
|
|
AccFun,
|
|
|
|
[]),
|
|
|
|
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
2016-10-31 18:51:23 +00:00
|
|
|
case JournalCheck of
|
|
|
|
false ->
|
|
|
|
ok;
|
|
|
|
check_presence ->
|
|
|
|
leveled_inker:ink_close(JournalSnapshot)
|
|
|
|
end,
|
2016-10-31 16:02:32 +00:00
|
|
|
Acc
|
|
|
|
end,
|
|
|
|
{async, Folder}.
|
|
|
|
|
2016-11-02 15:38:51 +00:00
|
|
|
|
|
|
|
foldobjects_allkeys(State, Tag, FoldObjectsFun) ->
|
2016-11-04 11:01:37 +00:00
|
|
|
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
|
|
|
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
|
|
|
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun).
|
|
|
|
|
|
|
|
foldobjects_bybucket(State, Tag, Bucket, FoldObjectsFun) ->
|
|
|
|
StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
|
|
|
EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
|
|
|
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun).
|
|
|
|
|
|
|
|
foldobjects_byindex(State, Tag, Bucket, Field, FromTerm, ToTerm, FoldObjectsFun) ->
|
|
|
|
StartKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field,
|
|
|
|
FromTerm),
|
|
|
|
EndKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field,
|
|
|
|
ToTerm),
|
|
|
|
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun).
|
|
|
|
|
|
|
|
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun) ->
|
2016-11-02 15:38:51 +00:00
|
|
|
{ok,
|
|
|
|
{LedgerSnapshot, LedgerCache},
|
|
|
|
JournalSnapshot} = snapshot_store(State, store),
|
2016-11-04 14:23:37 +00:00
|
|
|
{FoldFun, InitAcc} = case is_tuple(FoldObjectsFun) of
|
|
|
|
true ->
|
|
|
|
FoldObjectsFun;
|
|
|
|
false ->
|
|
|
|
{FoldObjectsFun, []}
|
|
|
|
end,
|
2016-11-02 15:38:51 +00:00
|
|
|
Folder = fun() ->
|
2016-11-25 14:50:13 +00:00
|
|
|
leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]),
|
2016-11-02 15:38:51 +00:00
|
|
|
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
|
|
|
|
LedgerCache),
|
2016-11-04 14:23:37 +00:00
|
|
|
AccFun = accumulate_objects(FoldFun, JournalSnapshot, Tag),
|
2016-11-02 15:38:51 +00:00
|
|
|
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
|
|
|
StartKey,
|
|
|
|
EndKey,
|
|
|
|
AccFun,
|
2016-11-04 14:23:37 +00:00
|
|
|
InitAcc),
|
2016-11-02 15:38:51 +00:00
|
|
|
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
|
|
|
ok = leveled_inker:ink_close(JournalSnapshot),
|
|
|
|
Acc
|
|
|
|
end,
|
|
|
|
{async, Folder}.
|
|
|
|
|
2016-11-04 11:01:37 +00:00
|
|
|
|
2016-11-18 15:53:22 +00:00
|
|
|
bucketkey_query(State, Tag, Bucket, {FoldKeysFun, InitAcc}) ->
|
2016-10-31 17:26:28 +00:00
|
|
|
{ok,
|
|
|
|
{LedgerSnapshot, LedgerCache},
|
|
|
|
_JournalSnapshot} = snapshot_store(State, ledger),
|
2016-10-23 22:45:43 +01:00
|
|
|
Folder = fun() ->
|
2016-11-25 14:50:13 +00:00
|
|
|
leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]),
|
2016-10-23 22:45:43 +01:00
|
|
|
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
|
2016-10-27 20:56:18 +01:00
|
|
|
LedgerCache),
|
2016-11-17 15:55:29 +00:00
|
|
|
SK = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
|
|
|
EK = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
2016-11-18 15:53:22 +00:00
|
|
|
AccFun = accumulate_keys(FoldKeysFun),
|
2016-10-23 22:45:43 +01:00
|
|
|
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
|
|
|
SK,
|
|
|
|
EK,
|
2016-10-31 12:12:06 +00:00
|
|
|
AccFun,
|
2016-11-18 15:53:22 +00:00
|
|
|
InitAcc),
|
2016-10-23 22:45:43 +01:00
|
|
|
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
|
|
|
lists:reverse(Acc)
|
|
|
|
end,
|
|
|
|
{async, Folder}.
|
|
|
|
|
2016-11-18 15:53:22 +00:00
|
|
|
allkey_query(State, Tag, {FoldKeysFun, InitAcc}) ->
|
|
|
|
bucketkey_query(State, Tag, null, {FoldKeysFun, InitAcc}).
|
2016-11-17 15:55:29 +00:00
|
|
|
|
2016-10-18 01:59:03 +01:00
|
|
|
|
2016-10-31 17:26:28 +00:00
|
|
|
snapshot_store(State, SnapType) ->
|
|
|
|
PCLopts = #penciller_options{start_snapshot=true,
|
|
|
|
source_penciller=State#state.penciller},
|
|
|
|
{ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts),
|
|
|
|
case SnapType of
|
|
|
|
store ->
|
|
|
|
InkerOpts = #inker_options{start_snapshot=true,
|
|
|
|
source_inker=State#state.inker},
|
|
|
|
{ok, JournalSnapshot} = leveled_inker:ink_start(InkerOpts),
|
|
|
|
{ok, {LedgerSnapshot, State#state.ledger_cache},
|
|
|
|
JournalSnapshot};
|
|
|
|
ledger ->
|
|
|
|
{ok, {LedgerSnapshot, State#state.ledger_cache},
|
|
|
|
null}
|
2016-11-03 16:05:43 +00:00
|
|
|
end.
|
2016-10-03 23:34:28 +01:00
|
|
|
|
2016-09-15 10:53:24 +01:00
|
|
|
set_options(Opts) ->
|
2016-11-02 12:58:27 +00:00
|
|
|
MaxJournalSize = get_opt(max_journalsize, Opts, 10000000000),
|
2016-10-25 23:13:14 +01:00
|
|
|
|
2016-11-14 11:17:14 +00:00
|
|
|
WRP = get_opt(waste_retention_period, Opts),
|
|
|
|
|
2016-11-02 12:58:27 +00:00
|
|
|
AltStrategy = get_opt(reload_strategy, Opts, []),
|
2016-10-25 23:13:14 +01:00
|
|
|
ReloadStrategy = leveled_codec:inker_reload_strategy(AltStrategy),
|
2016-11-02 12:58:27 +00:00
|
|
|
|
|
|
|
PCLL0CacheSize = get_opt(max_pencillercachesize, Opts),
|
|
|
|
RootPath = get_opt(root_path, Opts),
|
2016-11-14 11:17:14 +00:00
|
|
|
|
2016-11-02 12:58:27 +00:00
|
|
|
JournalFP = RootPath ++ "/" ++ ?JOURNAL_FP,
|
|
|
|
LedgerFP = RootPath ++ "/" ++ ?LEDGER_FP,
|
|
|
|
ok =filelib:ensure_dir(JournalFP),
|
|
|
|
ok =filelib:ensure_dir(LedgerFP),
|
|
|
|
|
2016-10-25 23:13:14 +01:00
|
|
|
{#inker_options{root_path = JournalFP,
|
|
|
|
reload_strategy = ReloadStrategy,
|
2016-11-02 12:58:27 +00:00
|
|
|
max_run_length = get_opt(max_run_length, Opts),
|
2016-11-14 11:17:14 +00:00
|
|
|
waste_retention_period = WRP,
|
2016-10-08 22:15:48 +01:00
|
|
|
cdb_options = #cdb_options{max_size=MaxJournalSize,
|
|
|
|
binary_mode=true}},
|
2016-10-31 01:33:33 +00:00
|
|
|
#penciller_options{root_path = LedgerFP,
|
|
|
|
max_inmemory_tablesize = PCLL0CacheSize}}.
|
2016-09-08 14:21:30 +01:00
|
|
|
|
|
|
|
startup(InkerOpts, PencillerOpts) ->
|
|
|
|
{ok, Inker} = leveled_inker:ink_start(InkerOpts),
|
|
|
|
{ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts),
|
|
|
|
LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller),
|
2016-11-02 18:14:46 +00:00
|
|
|
leveled_log:log("B0005", [LedgerSQN]),
|
2016-09-15 18:38:23 +01:00
|
|
|
ok = leveled_inker:ink_loadpcl(Inker,
|
|
|
|
LedgerSQN + 1,
|
|
|
|
fun load_fun/5,
|
|
|
|
Penciller),
|
2016-09-08 14:21:30 +01:00
|
|
|
{Inker, Penciller}.
|
|
|
|
|
|
|
|
|
2016-10-19 00:10:48 +01:00
|
|
|
fetch_head(Key, Penciller, LedgerCache) ->
|
2016-11-25 14:50:13 +00:00
|
|
|
case leveled_skiplist:lookup(Key, LedgerCache) of
|
2016-09-15 10:53:24 +01:00
|
|
|
{value, Head} ->
|
|
|
|
Head;
|
|
|
|
none ->
|
|
|
|
case leveled_penciller:pcl_fetch(Penciller, Key) of
|
|
|
|
{Key, Head} ->
|
|
|
|
Head;
|
|
|
|
not_present ->
|
|
|
|
not_present
|
|
|
|
end
|
|
|
|
end.
|
|
|
|
|
|
|
|
fetch_value(Key, SQN, Inker) ->
|
|
|
|
case leveled_inker:ink_fetch(Inker, Key, SQN) of
|
|
|
|
{ok, Value} ->
|
|
|
|
Value;
|
|
|
|
not_present ->
|
|
|
|
not_present
|
|
|
|
end.
|
|
|
|
|
2016-10-23 22:45:43 +01:00
|
|
|
|
2016-10-31 12:12:06 +00:00
|
|
|
accumulate_size() ->
|
|
|
|
Now = leveled_codec:integer_now(),
|
|
|
|
AccFun = fun(Key, Value, {Size, Count}) ->
|
|
|
|
case leveled_codec:is_active(Key, Value, Now) of
|
|
|
|
true ->
|
|
|
|
{Size + leveled_codec:get_size(Key, Value),
|
|
|
|
Count + 1};
|
|
|
|
false ->
|
|
|
|
{Size, Count}
|
|
|
|
end
|
|
|
|
end,
|
|
|
|
AccFun.
|
|
|
|
|
2016-10-31 17:26:28 +00:00
|
|
|
accumulate_hashes(JournalCheck, InkerClone) ->
|
2016-10-31 16:02:32 +00:00
|
|
|
Now = leveled_codec:integer_now(),
|
2016-10-31 17:26:28 +00:00
|
|
|
AccFun = fun(LK, V, KHList) ->
|
|
|
|
case leveled_codec:is_active(LK, V, Now) of
|
2016-10-31 16:02:32 +00:00
|
|
|
true ->
|
2016-10-31 17:26:28 +00:00
|
|
|
{B, K, H} = leveled_codec:get_keyandhash(LK, V),
|
2016-10-31 18:51:23 +00:00
|
|
|
Check = random:uniform() < ?CHECKJOURNAL_PROB,
|
|
|
|
case {JournalCheck, Check} of
|
|
|
|
{check_presence, true} ->
|
2016-10-31 17:26:28 +00:00
|
|
|
case check_presence(LK, V, InkerClone) of
|
|
|
|
true ->
|
|
|
|
[{B, K, H}|KHList];
|
|
|
|
false ->
|
|
|
|
KHList
|
2016-10-31 18:51:23 +00:00
|
|
|
end;
|
|
|
|
_ ->
|
|
|
|
[{B, K, H}|KHList]
|
2016-10-31 17:26:28 +00:00
|
|
|
end;
|
2016-10-31 16:02:32 +00:00
|
|
|
false ->
|
|
|
|
KHList
|
|
|
|
end
|
|
|
|
end,
|
|
|
|
AccFun.
|
|
|
|
|
2016-11-04 11:01:37 +00:00
|
|
|
accumulate_objects(FoldObjectsFun, InkerClone, Tag) ->
|
2016-11-02 15:38:51 +00:00
|
|
|
Now = leveled_codec:integer_now(),
|
|
|
|
AccFun = fun(LK, V, Acc) ->
|
|
|
|
case leveled_codec:is_active(LK, V, Now) of
|
|
|
|
true ->
|
|
|
|
SQN = leveled_codec:strip_to_seqonly({LK, V}),
|
2016-11-04 11:01:37 +00:00
|
|
|
{B, K} = case leveled_codec:from_ledgerkey(LK) of
|
|
|
|
{B0, K0} -> {B0, K0};
|
|
|
|
{B0, K0, _T0} -> {B0, K0}
|
|
|
|
end,
|
|
|
|
QK = leveled_codec:to_ledgerkey(B, K, Tag),
|
|
|
|
R = leveled_inker:ink_fetch(InkerClone, QK, SQN),
|
2016-11-02 15:38:51 +00:00
|
|
|
case R of
|
|
|
|
{ok, Value} ->
|
|
|
|
FoldObjectsFun(B, K, Value, Acc);
|
|
|
|
not_present ->
|
|
|
|
Acc
|
|
|
|
end;
|
|
|
|
false ->
|
|
|
|
Acc
|
|
|
|
end
|
|
|
|
end,
|
|
|
|
AccFun.
|
|
|
|
|
2016-11-04 11:01:37 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
2016-10-31 17:26:28 +00:00
|
|
|
check_presence(Key, Value, InkerClone) ->
|
|
|
|
{LedgerKey, SQN} = leveled_codec:strip_to_keyseqonly({Key, Value}),
|
|
|
|
case leveled_inker:ink_keycheck(InkerClone, LedgerKey, SQN) of
|
|
|
|
probably ->
|
|
|
|
true;
|
|
|
|
missing ->
|
|
|
|
false
|
|
|
|
end.
|
|
|
|
|
2016-11-18 15:53:22 +00:00
|
|
|
accumulate_keys(FoldKeysFun) ->
|
2016-10-31 12:12:06 +00:00
|
|
|
Now = leveled_codec:integer_now(),
|
2016-11-18 15:53:22 +00:00
|
|
|
AccFun = fun(Key, Value, Acc) ->
|
2016-10-31 12:12:06 +00:00
|
|
|
case leveled_codec:is_active(Key, Value, Now) of
|
|
|
|
true ->
|
2016-11-18 15:53:22 +00:00
|
|
|
{B, K} = leveled_codec:from_ledgerkey(Key),
|
|
|
|
FoldKeysFun(B, K, Acc);
|
2016-10-31 12:12:06 +00:00
|
|
|
false ->
|
2016-11-18 15:53:22 +00:00
|
|
|
Acc
|
2016-10-31 12:12:06 +00:00
|
|
|
end
|
|
|
|
end,
|
|
|
|
AccFun.
|
2016-10-18 01:59:03 +01:00
|
|
|
|
2016-11-18 11:53:14 +00:00
|
|
|
add_keys(ObjKey, _IdxValue) ->
|
|
|
|
ObjKey.
|
2016-10-18 01:59:03 +01:00
|
|
|
|
2016-11-18 11:53:14 +00:00
|
|
|
add_terms(ObjKey, IdxValue) ->
|
|
|
|
{IdxValue, ObjKey}.
|
2016-10-18 01:59:03 +01:00
|
|
|
|
2016-11-18 11:53:14 +00:00
|
|
|
accumulate_index(TermRe, AddFun, FoldKeysFun) ->
|
2016-10-31 12:12:06 +00:00
|
|
|
Now = leveled_codec:integer_now(),
|
2016-10-18 01:59:03 +01:00
|
|
|
case TermRe of
|
|
|
|
undefined ->
|
|
|
|
fun(Key, Value, Acc) ->
|
2016-10-31 12:12:06 +00:00
|
|
|
case leveled_codec:is_active(Key, Value, Now) of
|
2016-10-18 01:59:03 +01:00
|
|
|
true ->
|
2016-11-18 11:53:14 +00:00
|
|
|
{Bucket,
|
2016-10-18 01:59:03 +01:00
|
|
|
ObjKey,
|
|
|
|
IdxValue} = leveled_codec:from_ledgerkey(Key),
|
2016-11-18 11:53:14 +00:00
|
|
|
FoldKeysFun(Bucket, AddFun(ObjKey, IdxValue), Acc);
|
2016-10-18 01:59:03 +01:00
|
|
|
false ->
|
|
|
|
Acc
|
|
|
|
end end;
|
|
|
|
TermRe ->
|
|
|
|
fun(Key, Value, Acc) ->
|
2016-10-31 12:12:06 +00:00
|
|
|
case leveled_codec:is_active(Key, Value, Now) of
|
2016-10-18 01:59:03 +01:00
|
|
|
true ->
|
2016-11-18 11:53:14 +00:00
|
|
|
{Bucket,
|
2016-10-18 01:59:03 +01:00
|
|
|
ObjKey,
|
|
|
|
IdxValue} = leveled_codec:from_ledgerkey(Key),
|
|
|
|
case re:run(IdxValue, TermRe) of
|
|
|
|
nomatch ->
|
|
|
|
Acc;
|
|
|
|
_ ->
|
2016-11-18 11:53:14 +00:00
|
|
|
FoldKeysFun(Bucket,
|
|
|
|
AddFun(ObjKey, IdxValue),
|
|
|
|
Acc)
|
2016-10-18 01:59:03 +01:00
|
|
|
end;
|
|
|
|
false ->
|
|
|
|
Acc
|
|
|
|
end end
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
2016-10-31 12:12:06 +00:00
|
|
|
preparefor_ledgercache(?INKT_KEYD,
|
|
|
|
LedgerKey, SQN, _Obj, _Size, {IndexSpecs, TTL}) ->
|
2016-10-25 23:13:14 +01:00
|
|
|
{Bucket, Key} = leveled_codec:from_ledgerkey(LedgerKey),
|
2016-10-31 12:12:06 +00:00
|
|
|
leveled_codec:convert_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL);
|
|
|
|
preparefor_ledgercache(_Type, LedgerKey, SQN, Obj, Size, {IndexSpecs, TTL}) ->
|
2016-10-25 23:13:14 +01:00
|
|
|
{Bucket, Key, PrimaryChange} = leveled_codec:generate_ledgerkv(LedgerKey,
|
2016-10-14 18:43:16 +01:00
|
|
|
SQN,
|
|
|
|
Obj,
|
2016-10-31 12:12:06 +00:00
|
|
|
Size,
|
|
|
|
TTL),
|
|
|
|
[PrimaryChange] ++ leveled_codec:convert_indexspecs(IndexSpecs,
|
|
|
|
Bucket,
|
|
|
|
Key,
|
|
|
|
SQN,
|
|
|
|
TTL).
|
2016-09-15 10:53:24 +01:00
|
|
|
|
2016-10-25 23:13:14 +01:00
|
|
|
|
2016-09-15 10:53:24 +01:00
|
|
|
addto_ledgercache(Changes, Cache) ->
|
2016-11-25 14:50:13 +00:00
|
|
|
lists:foldl(fun({K, V}, Acc) -> leveled_skiplist:enter(K, V, Acc) end,
|
2016-10-19 00:10:48 +01:00
|
|
|
Cache,
|
|
|
|
Changes).
|
2016-09-15 10:53:24 +01:00
|
|
|
|
|
|
|
maybepush_ledgercache(MaxCacheSize, Cache, Penciller) ->
|
2016-11-25 14:50:13 +00:00
|
|
|
CacheSize = leveled_skiplist:size(Cache),
|
2016-10-20 02:23:45 +01:00
|
|
|
TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize),
|
2016-09-15 10:53:24 +01:00
|
|
|
if
|
2016-10-20 02:23:45 +01:00
|
|
|
TimeToPush ->
|
2016-10-27 20:56:18 +01:00
|
|
|
case leveled_penciller:pcl_pushmem(Penciller, Cache) of
|
2016-09-15 10:53:24 +01:00
|
|
|
ok ->
|
2016-11-25 14:50:13 +00:00
|
|
|
{ok, leveled_skiplist:empty()};
|
2016-10-30 18:25:30 +00:00
|
|
|
returned ->
|
2016-11-05 14:31:10 +00:00
|
|
|
{returned, Cache}
|
2016-09-15 10:53:24 +01:00
|
|
|
end;
|
|
|
|
true ->
|
2016-10-20 02:23:45 +01:00
|
|
|
{ok, Cache}
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
2016-11-03 19:39:23 +00:00
|
|
|
maybe_withjitter(CacheSize, MaxCacheSize) ->
|
2016-10-20 02:23:45 +01:00
|
|
|
if
|
|
|
|
CacheSize > MaxCacheSize ->
|
2016-11-05 14:31:10 +00:00
|
|
|
R = random:uniform(7 * MaxCacheSize),
|
2016-10-20 02:23:45 +01:00
|
|
|
if
|
2016-11-05 14:31:10 +00:00
|
|
|
(CacheSize - MaxCacheSize) > R ->
|
2016-10-20 02:23:45 +01:00
|
|
|
true;
|
|
|
|
true ->
|
|
|
|
false
|
|
|
|
end;
|
|
|
|
true ->
|
|
|
|
false
|
2016-09-15 10:53:24 +01:00
|
|
|
end.
|
|
|
|
|
2016-11-03 09:19:02 +00:00
|
|
|
|
|
|
|
|
2016-09-15 18:38:23 +01:00
|
|
|
load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) ->
|
2016-10-20 02:23:45 +01:00
|
|
|
{MinSQN, MaxSQN, OutputTree} = Acc0,
|
2016-10-25 23:13:14 +01:00
|
|
|
{SQN, Type, PK} = KeyInLedger,
|
2016-10-08 22:15:48 +01:00
|
|
|
% VBin may already be a term
|
|
|
|
{VBin, VSize} = ExtractFun(ValueInLedger),
|
2016-10-25 23:13:14 +01:00
|
|
|
{Obj, IndexSpecs} = leveled_codec:split_inkvalue(VBin),
|
2016-09-15 10:53:24 +01:00
|
|
|
case SQN of
|
|
|
|
SQN when SQN < MinSQN ->
|
|
|
|
{loop, Acc0};
|
2016-10-07 10:04:48 +01:00
|
|
|
SQN when SQN < MaxSQN ->
|
2016-10-25 23:13:14 +01:00
|
|
|
Changes = preparefor_ledgercache(Type, PK, SQN,
|
|
|
|
Obj, VSize, IndexSpecs),
|
2016-10-20 02:23:45 +01:00
|
|
|
{loop, {MinSQN, MaxSQN, addto_ledgercache(Changes, OutputTree)}};
|
2016-10-07 10:04:48 +01:00
|
|
|
MaxSQN ->
|
2016-11-02 18:14:46 +00:00
|
|
|
leveled_log:log("B0006", [SQN]),
|
2016-10-25 23:13:14 +01:00
|
|
|
Changes = preparefor_ledgercache(Type, PK, SQN,
|
|
|
|
Obj, VSize, IndexSpecs),
|
2016-10-20 02:23:45 +01:00
|
|
|
{stop, {MinSQN, MaxSQN, addto_ledgercache(Changes, OutputTree)}};
|
2016-09-15 10:53:24 +01:00
|
|
|
SQN when SQN > MaxSQN ->
|
2016-11-02 18:14:46 +00:00
|
|
|
leveled_log:log("B0007", [MaxSQN, SQN]),
|
2016-09-15 10:53:24 +01:00
|
|
|
{stop, Acc0}
|
|
|
|
end.
|
|
|
|
|
2016-09-08 14:21:30 +01:00
|
|
|
|
2016-11-02 12:58:27 +00:00
|
|
|
get_opt(Key, Opts) ->
|
|
|
|
get_opt(Key, Opts, undefined).
|
|
|
|
|
|
|
|
get_opt(Key, Opts, Default) ->
|
|
|
|
case proplists:get_value(Key, Opts) of
|
|
|
|
undefined ->
|
2016-11-18 21:35:45 +00:00
|
|
|
Default;
|
2016-11-02 12:58:27 +00:00
|
|
|
Value ->
|
|
|
|
Value
|
|
|
|
end.
|
|
|
|
|
2016-11-21 12:34:40 +00:00
|
|
|
delete_path(DirPath) ->
|
|
|
|
ok = filelib:ensure_dir(DirPath),
|
|
|
|
{ok, Files} = file:list_dir(DirPath),
|
|
|
|
[file:delete(filename:join([DirPath, File])) || File <- Files],
|
|
|
|
file:del_dir(DirPath).
|
2016-11-02 12:58:27 +00:00
|
|
|
|
2016-09-08 14:21:30 +01:00
|
|
|
%%%============================================================================
|
|
|
|
%%% Test
|
|
|
|
%%%============================================================================
|
|
|
|
|
|
|
|
-ifdef(TEST).
|
|
|
|
|
2016-09-15 15:14:49 +01:00
|
|
|
reset_filestructure() ->
|
|
|
|
RootPath = "../test",
|
|
|
|
leveled_inker:clean_testdir(RootPath ++ "/" ++ ?JOURNAL_FP),
|
|
|
|
leveled_penciller:clean_testdir(RootPath ++ "/" ++ ?LEDGER_FP),
|
|
|
|
RootPath.
|
2016-09-19 15:31:26 +01:00
|
|
|
|
2016-10-14 18:43:16 +01:00
|
|
|
|
2016-09-19 15:31:26 +01:00
|
|
|
generate_multiple_objects(Count, KeyNumber) ->
|
|
|
|
generate_multiple_objects(Count, KeyNumber, []).
|
2016-10-31 12:12:06 +00:00
|
|
|
|
2016-09-19 15:31:26 +01:00
|
|
|
generate_multiple_objects(0, _KeyNumber, ObjL) ->
|
|
|
|
ObjL;
|
|
|
|
generate_multiple_objects(Count, KeyNumber, ObjL) ->
|
2016-10-31 12:12:06 +00:00
|
|
|
Key = "Key" ++ integer_to_list(KeyNumber),
|
|
|
|
Value = crypto:rand_bytes(256),
|
|
|
|
IndexSpec = [{add, "idx1_bin", "f" ++ integer_to_list(KeyNumber rem 10)}],
|
|
|
|
generate_multiple_objects(Count - 1,
|
|
|
|
KeyNumber + 1,
|
|
|
|
ObjL ++ [{Key, Value, IndexSpec}]).
|
|
|
|
|
|
|
|
|
|
|
|
generate_multiple_robjects(Count, KeyNumber) ->
|
|
|
|
generate_multiple_robjects(Count, KeyNumber, []).
|
|
|
|
|
|
|
|
generate_multiple_robjects(0, _KeyNumber, ObjL) ->
|
|
|
|
ObjL;
|
|
|
|
generate_multiple_robjects(Count, KeyNumber, ObjL) ->
|
2016-09-19 15:31:26 +01:00
|
|
|
Obj = {"Bucket",
|
|
|
|
"Key" ++ integer_to_list(KeyNumber),
|
2016-09-19 15:56:35 +01:00
|
|
|
crypto:rand_bytes(1024),
|
2016-09-19 15:31:26 +01:00
|
|
|
[],
|
|
|
|
[{"MDK", "MDV" ++ integer_to_list(KeyNumber)},
|
|
|
|
{"MDK2", "MDV" ++ integer_to_list(KeyNumber)}]},
|
|
|
|
{B1, K1, V1, Spec1, MD} = Obj,
|
|
|
|
Content = #r_content{metadata=MD, value=V1},
|
|
|
|
Obj1 = #r_object{bucket=B1, key=K1, contents=[Content], vclock=[{'a',1}]},
|
2016-10-31 12:12:06 +00:00
|
|
|
generate_multiple_robjects(Count - 1, KeyNumber + 1, ObjL ++ [{Obj1, Spec1}]).
|
2016-09-19 15:31:26 +01:00
|
|
|
|
|
|
|
|
2016-09-15 15:14:49 +01:00
|
|
|
single_key_test() ->
|
|
|
|
RootPath = reset_filestructure(),
|
2016-11-02 12:58:27 +00:00
|
|
|
{ok, Bookie1} = book_start([{root_path, RootPath}]),
|
2016-09-15 15:14:49 +01:00
|
|
|
{B1, K1, V1, Spec1, MD} = {"Bucket1",
|
|
|
|
"Key1",
|
|
|
|
"Value1",
|
|
|
|
[],
|
|
|
|
{"MDK1", "MDV1"}},
|
|
|
|
Content = #r_content{metadata=MD, value=V1},
|
|
|
|
Object = #r_object{bucket=B1, key=K1, contents=[Content], vclock=[{'a',1}]},
|
2016-11-07 10:11:57 +00:00
|
|
|
ok = book_put(Bookie1, B1, K1, Object, Spec1, ?RIAK_TAG),
|
|
|
|
{ok, F1} = book_get(Bookie1, B1, K1, ?RIAK_TAG),
|
2016-09-15 15:14:49 +01:00
|
|
|
?assertMatch(F1, Object),
|
2016-09-15 18:38:23 +01:00
|
|
|
ok = book_close(Bookie1),
|
2016-11-02 12:58:27 +00:00
|
|
|
{ok, Bookie2} = book_start([{root_path, RootPath}]),
|
2016-11-07 10:11:57 +00:00
|
|
|
{ok, F2} = book_get(Bookie2, B1, K1, ?RIAK_TAG),
|
2016-09-15 18:38:23 +01:00
|
|
|
?assertMatch(F2, Object),
|
|
|
|
ok = book_close(Bookie2),
|
2016-09-15 15:14:49 +01:00
|
|
|
reset_filestructure().
|
|
|
|
|
2016-09-19 15:31:26 +01:00
|
|
|
multi_key_test() ->
|
|
|
|
RootPath = reset_filestructure(),
|
2016-11-02 12:58:27 +00:00
|
|
|
{ok, Bookie1} = book_start([{root_path, RootPath}]),
|
2016-09-19 15:31:26 +01:00
|
|
|
{B1, K1, V1, Spec1, MD1} = {"Bucket",
|
|
|
|
"Key1",
|
|
|
|
"Value1",
|
|
|
|
[],
|
|
|
|
{"MDK1", "MDV1"}},
|
|
|
|
C1 = #r_content{metadata=MD1, value=V1},
|
|
|
|
Obj1 = #r_object{bucket=B1, key=K1, contents=[C1], vclock=[{'a',1}]},
|
|
|
|
{B2, K2, V2, Spec2, MD2} = {"Bucket",
|
|
|
|
"Key2",
|
|
|
|
"Value2",
|
|
|
|
[],
|
|
|
|
{"MDK2", "MDV2"}},
|
|
|
|
C2 = #r_content{metadata=MD2, value=V2},
|
|
|
|
Obj2 = #r_object{bucket=B2, key=K2, contents=[C2], vclock=[{'a',1}]},
|
2016-11-07 10:11:57 +00:00
|
|
|
ok = book_put(Bookie1, B1, K1, Obj1, Spec1, ?RIAK_TAG),
|
2016-11-14 12:43:45 +00:00
|
|
|
ObjL1 = generate_multiple_robjects(20, 3),
|
2016-09-19 15:31:26 +01:00
|
|
|
SW1 = os:timestamp(),
|
2016-11-07 10:11:57 +00:00
|
|
|
lists:foreach(fun({O, S}) ->
|
|
|
|
{B, K} = leveled_codec:riakto_keydetails(O),
|
|
|
|
ok = book_put(Bookie1, B, K, O, S, ?RIAK_TAG)
|
|
|
|
end,
|
|
|
|
ObjL1),
|
2016-11-14 12:43:45 +00:00
|
|
|
io:format("PUT of 20 objects completed in ~w microseconds~n",
|
2016-09-19 15:31:26 +01:00
|
|
|
[timer:now_diff(os:timestamp(),SW1)]),
|
2016-11-07 10:11:57 +00:00
|
|
|
ok = book_put(Bookie1, B2, K2, Obj2, Spec2, ?RIAK_TAG),
|
|
|
|
{ok, F1A} = book_get(Bookie1, B1, K1, ?RIAK_TAG),
|
2016-09-19 15:31:26 +01:00
|
|
|
?assertMatch(F1A, Obj1),
|
2016-11-07 10:11:57 +00:00
|
|
|
{ok, F2A} = book_get(Bookie1, B2, K2, ?RIAK_TAG),
|
2016-09-19 15:31:26 +01:00
|
|
|
?assertMatch(F2A, Obj2),
|
2016-11-14 12:43:45 +00:00
|
|
|
ObjL2 = generate_multiple_robjects(20, 23),
|
2016-09-19 15:31:26 +01:00
|
|
|
SW2 = os:timestamp(),
|
2016-11-07 10:11:57 +00:00
|
|
|
lists:foreach(fun({O, S}) ->
|
|
|
|
{B, K} = leveled_codec:riakto_keydetails(O),
|
|
|
|
ok = book_put(Bookie1, B, K, O, S, ?RIAK_TAG)
|
|
|
|
end,
|
|
|
|
ObjL2),
|
2016-11-14 12:43:45 +00:00
|
|
|
io:format("PUT of 20 objects completed in ~w microseconds~n",
|
2016-09-19 15:31:26 +01:00
|
|
|
[timer:now_diff(os:timestamp(),SW2)]),
|
2016-11-07 10:11:57 +00:00
|
|
|
{ok, F1B} = book_get(Bookie1, B1, K1, ?RIAK_TAG),
|
2016-09-19 15:31:26 +01:00
|
|
|
?assertMatch(F1B, Obj1),
|
2016-11-07 10:11:57 +00:00
|
|
|
{ok, F2B} = book_get(Bookie1, B2, K2, ?RIAK_TAG),
|
2016-09-19 15:31:26 +01:00
|
|
|
?assertMatch(F2B, Obj2),
|
|
|
|
ok = book_close(Bookie1),
|
2016-10-27 20:56:18 +01:00
|
|
|
% Now reopen the file, and confirm that a fetch is still possible
|
2016-11-02 12:58:27 +00:00
|
|
|
{ok, Bookie2} = book_start([{root_path, RootPath}]),
|
2016-11-07 10:11:57 +00:00
|
|
|
{ok, F1C} = book_get(Bookie2, B1, K1, ?RIAK_TAG),
|
2016-09-19 15:31:26 +01:00
|
|
|
?assertMatch(F1C, Obj1),
|
2016-11-07 10:11:57 +00:00
|
|
|
{ok, F2C} = book_get(Bookie2, B2, K2, ?RIAK_TAG),
|
2016-09-19 15:31:26 +01:00
|
|
|
?assertMatch(F2C, Obj2),
|
2016-11-14 12:43:45 +00:00
|
|
|
ObjL3 = generate_multiple_robjects(20, 43),
|
2016-09-19 15:31:26 +01:00
|
|
|
SW3 = os:timestamp(),
|
2016-11-07 10:11:57 +00:00
|
|
|
lists:foreach(fun({O, S}) ->
|
|
|
|
{B, K} = leveled_codec:riakto_keydetails(O),
|
|
|
|
ok = book_put(Bookie2, B, K, O, S, ?RIAK_TAG)
|
|
|
|
end,
|
|
|
|
ObjL3),
|
2016-11-14 12:43:45 +00:00
|
|
|
io:format("PUT of 20 objects completed in ~w microseconds~n",
|
2016-09-19 15:31:26 +01:00
|
|
|
[timer:now_diff(os:timestamp(),SW3)]),
|
2016-11-07 10:11:57 +00:00
|
|
|
{ok, F1D} = book_get(Bookie2, B1, K1, ?RIAK_TAG),
|
2016-09-19 15:31:26 +01:00
|
|
|
?assertMatch(F1D, Obj1),
|
2016-11-07 10:11:57 +00:00
|
|
|
{ok, F2D} = book_get(Bookie2, B2, K2, ?RIAK_TAG),
|
2016-09-19 15:31:26 +01:00
|
|
|
?assertMatch(F2D, Obj2),
|
|
|
|
ok = book_close(Bookie2),
|
2016-09-19 15:56:35 +01:00
|
|
|
reset_filestructure().
|
2016-10-05 18:28:31 +01:00
|
|
|
|
2016-10-31 12:12:06 +00:00
|
|
|
ttl_test() ->
|
|
|
|
RootPath = reset_filestructure(),
|
2016-11-02 12:58:27 +00:00
|
|
|
{ok, Bookie1} = book_start([{root_path, RootPath}]),
|
2016-10-31 12:12:06 +00:00
|
|
|
ObjL1 = generate_multiple_objects(100, 1),
|
|
|
|
% Put in all the objects with a TTL in the future
|
|
|
|
Future = leveled_codec:integer_now() + 300,
|
|
|
|
lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1,
|
|
|
|
"Bucket", K, V, S,
|
|
|
|
?STD_TAG,
|
|
|
|
Future) end,
|
|
|
|
ObjL1),
|
|
|
|
lists:foreach(fun({K, V, _S}) ->
|
|
|
|
{ok, V} = book_get(Bookie1, "Bucket", K, ?STD_TAG)
|
|
|
|
end,
|
|
|
|
ObjL1),
|
|
|
|
lists:foreach(fun({K, _V, _S}) ->
|
|
|
|
{ok, _} = book_head(Bookie1, "Bucket", K, ?STD_TAG)
|
|
|
|
end,
|
|
|
|
ObjL1),
|
|
|
|
|
|
|
|
ObjL2 = generate_multiple_objects(100, 101),
|
|
|
|
Past = leveled_codec:integer_now() - 300,
|
|
|
|
lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1,
|
|
|
|
"Bucket", K, V, S,
|
|
|
|
?STD_TAG,
|
|
|
|
Past) end,
|
|
|
|
ObjL2),
|
|
|
|
lists:foreach(fun({K, _V, _S}) ->
|
|
|
|
not_found = book_get(Bookie1, "Bucket", K, ?STD_TAG)
|
|
|
|
end,
|
|
|
|
ObjL2),
|
|
|
|
lists:foreach(fun({K, _V, _S}) ->
|
|
|
|
not_found = book_head(Bookie1, "Bucket", K, ?STD_TAG)
|
|
|
|
end,
|
|
|
|
ObjL2),
|
|
|
|
|
|
|
|
{async, BucketFolder} = book_returnfolder(Bookie1,
|
|
|
|
{bucket_stats, "Bucket"}),
|
|
|
|
{_Size, Count} = BucketFolder(),
|
|
|
|
?assertMatch(100, Count),
|
2016-11-18 11:53:14 +00:00
|
|
|
FoldKeysFun = fun(_B, Item, FKFAcc) -> FKFAcc ++ [Item] end,
|
2016-10-31 15:13:11 +00:00
|
|
|
{async,
|
|
|
|
IndexFolder} = book_returnfolder(Bookie1,
|
|
|
|
{index_query,
|
|
|
|
"Bucket",
|
2016-11-18 11:53:14 +00:00
|
|
|
{FoldKeysFun, []},
|
2016-10-31 15:13:11 +00:00
|
|
|
{"idx1_bin", "f8", "f9"},
|
|
|
|
{false, undefined}}),
|
|
|
|
KeyList = IndexFolder(),
|
|
|
|
?assertMatch(20, length(KeyList)),
|
|
|
|
|
|
|
|
{ok, Regex} = re:compile("f8"),
|
|
|
|
{async,
|
|
|
|
IndexFolderTR} = book_returnfolder(Bookie1,
|
|
|
|
{index_query,
|
|
|
|
"Bucket",
|
2016-11-18 11:53:14 +00:00
|
|
|
{FoldKeysFun, []},
|
2016-10-31 15:13:11 +00:00
|
|
|
{"idx1_bin", "f8", "f9"},
|
|
|
|
{true, Regex}}),
|
|
|
|
TermKeyList = IndexFolderTR(),
|
|
|
|
?assertMatch(10, length(TermKeyList)),
|
|
|
|
|
2016-10-31 12:12:06 +00:00
|
|
|
ok = book_close(Bookie1),
|
2016-11-02 12:58:27 +00:00
|
|
|
{ok, Bookie2} = book_start([{root_path, RootPath}]),
|
2016-10-31 15:13:11 +00:00
|
|
|
|
|
|
|
{async,
|
|
|
|
IndexFolderTR2} = book_returnfolder(Bookie2,
|
|
|
|
{index_query,
|
|
|
|
"Bucket",
|
2016-11-18 11:53:14 +00:00
|
|
|
{FoldKeysFun, []},
|
2016-10-31 15:13:11 +00:00
|
|
|
{"idx1_bin", "f7", "f9"},
|
|
|
|
{false, Regex}}),
|
|
|
|
KeyList2 = IndexFolderTR2(),
|
|
|
|
?assertMatch(10, length(KeyList2)),
|
|
|
|
|
|
|
|
lists:foreach(fun({K, _V, _S}) ->
|
|
|
|
not_found = book_get(Bookie2, "Bucket", K, ?STD_TAG)
|
|
|
|
end,
|
|
|
|
ObjL2),
|
|
|
|
lists:foreach(fun({K, _V, _S}) ->
|
|
|
|
not_found = book_head(Bookie2, "Bucket", K, ?STD_TAG)
|
|
|
|
end,
|
|
|
|
ObjL2),
|
|
|
|
|
|
|
|
ok = book_close(Bookie2),
|
2016-10-31 12:12:06 +00:00
|
|
|
reset_filestructure().
|
|
|
|
|
2016-10-31 16:02:32 +00:00
|
|
|
hashtree_query_test() ->
|
|
|
|
RootPath = reset_filestructure(),
|
2016-11-02 12:58:27 +00:00
|
|
|
{ok, Bookie1} = book_start([{root_path, RootPath},
|
|
|
|
{max_journalsize, 1000000},
|
|
|
|
{cache_size, 500}]),
|
2016-10-31 16:02:32 +00:00
|
|
|
ObjL1 = generate_multiple_objects(1200, 1),
|
|
|
|
% Put in all the objects with a TTL in the future
|
|
|
|
Future = leveled_codec:integer_now() + 300,
|
|
|
|
lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1,
|
|
|
|
"Bucket", K, V, S,
|
|
|
|
?STD_TAG,
|
|
|
|
Future) end,
|
|
|
|
ObjL1),
|
|
|
|
ObjL2 = generate_multiple_objects(20, 1201),
|
|
|
|
% Put in a few objects with a TTL in the past
|
|
|
|
Past = leveled_codec:integer_now() - 300,
|
|
|
|
lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1,
|
|
|
|
"Bucket", K, V, S,
|
|
|
|
?STD_TAG,
|
|
|
|
Past) end,
|
|
|
|
ObjL2),
|
|
|
|
% Scan the store for the Bucket, Keys and Hashes
|
|
|
|
{async, HTFolder} = book_returnfolder(Bookie1,
|
|
|
|
{hashtree_query,
|
|
|
|
?STD_TAG,
|
|
|
|
false}),
|
|
|
|
KeyHashList = HTFolder(),
|
|
|
|
lists:foreach(fun({B, _K, H}) ->
|
|
|
|
?assertMatch("Bucket", B),
|
|
|
|
?assertMatch(true, is_integer(H))
|
|
|
|
end,
|
|
|
|
KeyHashList),
|
|
|
|
?assertMatch(1200, length(KeyHashList)),
|
|
|
|
ok = book_close(Bookie1),
|
2016-11-02 12:58:27 +00:00
|
|
|
{ok, Bookie2} = book_start([{root_path, RootPath},
|
|
|
|
{max_journalsize, 200000},
|
|
|
|
{cache_size, 500}]),
|
2016-10-31 16:02:32 +00:00
|
|
|
{async, HTFolder2} = book_returnfolder(Bookie2,
|
|
|
|
{hashtree_query,
|
|
|
|
?STD_TAG,
|
|
|
|
false}),
|
|
|
|
?assertMatch(KeyHashList, HTFolder2()),
|
|
|
|
ok = book_close(Bookie2),
|
|
|
|
reset_filestructure().
|
|
|
|
|
2016-10-31 18:51:23 +00:00
|
|
|
hashtree_query_withjournalcheck_test() ->
|
|
|
|
RootPath = reset_filestructure(),
|
2016-11-02 12:58:27 +00:00
|
|
|
{ok, Bookie1} = book_start([{root_path, RootPath},
|
|
|
|
{max_journalsize, 1000000},
|
|
|
|
{cache_size, 500}]),
|
2016-10-31 18:51:23 +00:00
|
|
|
ObjL1 = generate_multiple_objects(800, 1),
|
|
|
|
% Put in all the objects with a TTL in the future
|
|
|
|
Future = leveled_codec:integer_now() + 300,
|
|
|
|
lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1,
|
|
|
|
"Bucket", K, V, S,
|
|
|
|
?STD_TAG,
|
|
|
|
Future) end,
|
|
|
|
ObjL1),
|
|
|
|
{async, HTFolder1} = book_returnfolder(Bookie1,
|
|
|
|
{hashtree_query,
|
|
|
|
?STD_TAG,
|
|
|
|
false}),
|
|
|
|
KeyHashList = HTFolder1(),
|
|
|
|
{async, HTFolder2} = book_returnfolder(Bookie1,
|
|
|
|
{hashtree_query,
|
|
|
|
?STD_TAG,
|
|
|
|
check_presence}),
|
|
|
|
?assertMatch(KeyHashList, HTFolder2()),
|
|
|
|
ok = book_close(Bookie1),
|
|
|
|
reset_filestructure().
|
|
|
|
|
2016-11-02 15:38:51 +00:00
|
|
|
foldobjects_vs_hashtree_test() ->
|
|
|
|
RootPath = reset_filestructure(),
|
|
|
|
{ok, Bookie1} = book_start([{root_path, RootPath},
|
|
|
|
{max_journalsize, 1000000},
|
|
|
|
{cache_size, 500}]),
|
|
|
|
ObjL1 = generate_multiple_objects(800, 1),
|
|
|
|
% Put in all the objects with a TTL in the future
|
|
|
|
Future = leveled_codec:integer_now() + 300,
|
|
|
|
lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1,
|
|
|
|
"Bucket", K, V, S,
|
|
|
|
?STD_TAG,
|
|
|
|
Future) end,
|
|
|
|
ObjL1),
|
|
|
|
{async, HTFolder1} = book_returnfolder(Bookie1,
|
|
|
|
{hashtree_query,
|
|
|
|
?STD_TAG,
|
|
|
|
false}),
|
|
|
|
KeyHashList1 = lists:usort(HTFolder1()),
|
|
|
|
io:format("First item ~w~n", [lists:nth(1, KeyHashList1)]),
|
|
|
|
FoldObjectsFun = fun(B, K, V, Acc) ->
|
|
|
|
[{B, K, erlang:phash2(term_to_binary(V))}|Acc] end,
|
|
|
|
{async, HTFolder2} = book_returnfolder(Bookie1,
|
|
|
|
{foldobjects_allkeys,
|
|
|
|
?STD_TAG,
|
|
|
|
FoldObjectsFun}),
|
|
|
|
KeyHashList2 = HTFolder2(),
|
|
|
|
?assertMatch(KeyHashList1, lists:usort(KeyHashList2)),
|
|
|
|
|
|
|
|
ok = book_close(Bookie1),
|
|
|
|
reset_filestructure().
|
|
|
|
|
2016-11-14 20:43:38 +00:00
|
|
|
coverage_cheat_test() ->
|
|
|
|
{noreply, _State0} = handle_info(timeout, #state{}),
|
|
|
|
{ok, _State1} = code_change(null, #state{}, null),
|
|
|
|
{noreply, _State2} = handle_cast(null, #state{}).
|
|
|
|
|
2016-11-02 15:38:51 +00:00
|
|
|
|
2016-11-05 15:59:31 +00:00
|
|
|
-endif.
|