2016-08-02 17:51:43 +01:00
|
|
|
%% -------- PENCILLER ---------
|
2016-07-29 17:19:30 +01:00
|
|
|
%%
|
2016-08-16 12:45:48 +01:00
|
|
|
%% The penciller is responsible for writing and re-writing the ledger - a
|
2016-08-02 13:44:48 +01:00
|
|
|
%% persisted, ordered view of non-recent Keys and Metadata which have been
|
|
|
|
%% added to the store.
|
|
|
|
%% - The penciller maintains a manifest of all the files within the current
|
|
|
|
%% Ledger.
|
2016-08-16 12:45:48 +01:00
|
|
|
%% - The Penciller provides re-write (compaction) work up to be managed by
|
|
|
|
%% the Penciller's Clerk
|
2016-10-03 23:34:28 +01:00
|
|
|
%% - The Penciller maintains a register of iterators who have requested
|
2016-08-02 13:44:48 +01:00
|
|
|
%% snapshots of the Ledger
|
2016-08-16 12:45:48 +01:00
|
|
|
%% - The accepts new dumps (in the form of lists of keys) from the Bookie, and
|
|
|
|
%% calls the Bookie once the process of pencilling this data in the Ledger is
|
|
|
|
%% complete - and the Bookie is free to forget about the data
|
2016-10-03 23:34:28 +01:00
|
|
|
%% - The Penciller's persistence of the ledger may not be reliable, in that it
|
|
|
|
%% may lose data but only in sequence from a particular sequence number. On
|
|
|
|
%% startup the Penciller will inform the Bookie of the highest sequence number
|
|
|
|
%% it has, and the Bookie should load any missing data from that point out of
|
|
|
|
%5 the journal.
|
2016-07-29 17:19:30 +01:00
|
|
|
%%
|
2016-08-02 17:51:43 +01:00
|
|
|
%% -------- LEDGER ---------
|
2016-07-29 17:19:30 +01:00
|
|
|
%%
|
2016-08-02 13:44:48 +01:00
|
|
|
%% The Ledger is divided into many levels
|
2016-08-16 12:45:48 +01:00
|
|
|
%% - L0: New keys are received from the Bookie and merged into a single ETS
|
2016-08-02 17:51:43 +01:00
|
|
|
%% table, until that table is the size of a SFT file, and it is then persisted
|
2016-08-16 12:45:48 +01:00
|
|
|
%% as a SFT file at this level. L0 SFT files can be larger than the normal
|
|
|
|
%% maximum size - so we don't have to consider problems of either having more
|
|
|
|
%% than one L0 file (and handling what happens on a crash between writing the
|
|
|
|
%% files when the second may have overlapping sequence numbers), or having a
|
|
|
|
%% remainder with overlapping in sequence numbers in memory after the file is
|
|
|
|
%% written. Once the persistence is completed, the ETS table can be erased.
|
|
|
|
%% There can be only one SFT file at Level 0, so the work to merge that file
|
|
|
|
%% to the lower level must be the highest priority, as otherwise writes to the
|
|
|
|
%% ledger will stall, when there is next a need to persist.
|
|
|
|
%% - L1 TO L7: May contain multiple processes managing non-overlapping sft
|
|
|
|
%% files. Compaction work should be sheduled if the number of files exceeds
|
|
|
|
%% the target size of the level, where the target size is 8 ^ n.
|
2016-07-27 18:03:44 +01:00
|
|
|
%%
|
|
|
|
%% The most recent revision of a Key can be found by checking each level until
|
2016-08-02 13:44:48 +01:00
|
|
|
%% the key is found. To check a level the correct file must be sought from the
|
|
|
|
%% manifest for that level, and then a call is made to that file. If the Key
|
|
|
|
%% is not present then every level should be checked.
|
2016-07-27 18:03:44 +01:00
|
|
|
%%
|
|
|
|
%% If a compaction change takes the size of a level beyond the target size,
|
|
|
|
%% then compaction work for that level + 1 should be added to the compaction
|
|
|
|
%% work queue.
|
2016-08-16 12:45:48 +01:00
|
|
|
%% Compaction work is fetched by the Penciller's Clerk because:
|
2016-07-27 18:03:44 +01:00
|
|
|
%% - it has timed out due to a period of inactivity
|
|
|
|
%% - it has been triggered by the a cast to indicate the arrival of high
|
|
|
|
%% priority compaction work
|
2016-08-02 17:51:43 +01:00
|
|
|
%% The Penciller's Clerk (which performs compaction worker) will always call
|
2016-08-16 12:45:48 +01:00
|
|
|
%% the Penciller to find out the highest priority work currently required
|
2016-08-02 17:51:43 +01:00
|
|
|
%% whenever it has either completed work, or a timeout has occurred since it
|
|
|
|
%% was informed there was no work to do.
|
2016-07-27 18:03:44 +01:00
|
|
|
%%
|
2016-08-16 12:45:48 +01:00
|
|
|
%% When the clerk picks work it will take the current manifest, and the
|
|
|
|
%% Penciller assumes the manifest sequence number is to be incremented.
|
|
|
|
%% When the clerk has completed the work it cna request that the manifest
|
|
|
|
%% change be committed by the Penciller. The commit is made through changing
|
|
|
|
%% the filename of the new manifest - so the Penciller is not held up by the
|
|
|
|
%% process of wiritng a file, just altering file system metadata.
|
2016-08-02 13:44:48 +01:00
|
|
|
%%
|
2016-08-16 12:45:48 +01:00
|
|
|
%% The manifest is locked by a clerk taking work, or by there being a need to
|
|
|
|
%% write a file to Level 0. If the manifest is locked, then new keys can still
|
|
|
|
%% be added in memory - however, the response to that push will be to "pause",
|
|
|
|
%% that is to say the Penciller will ask the Bookie to slowdown.
|
2016-07-27 18:03:44 +01:00
|
|
|
%%
|
2016-08-02 17:51:43 +01:00
|
|
|
%% ---------- PUSH ----------
|
|
|
|
%%
|
2016-08-16 12:45:48 +01:00
|
|
|
%% The Penciller must support the PUSH of a dump of keys from the Bookie. The
|
2016-08-02 17:51:43 +01:00
|
|
|
%% call to PUSH should be immediately acknowledged, and then work should be
|
|
|
|
%% completed to merge the ETS table into the L0 ETS table.
|
|
|
|
%%
|
|
|
|
%% The Penciller MUST NOT accept a new PUSH if the Clerk has commenced the
|
|
|
|
%% conversion of the current ETS table into a SFT file, but not completed this
|
|
|
|
%% change. This should prompt a stall.
|
|
|
|
%%
|
|
|
|
%% ---------- FETCH ----------
|
|
|
|
%%
|
|
|
|
%% On request to fetch a key the Penciller should look first in the L0 ETS
|
|
|
|
%% table, and then look in the SFT files Level by Level, consulting the
|
|
|
|
%% Manifest to determine which file should be checked at each level.
|
|
|
|
%%
|
|
|
|
%% ---------- SNAPSHOT ----------
|
|
|
|
%%
|
2016-10-03 23:34:28 +01:00
|
|
|
%% Iterators may request a snapshot of the database. A snapshot is a cloned
|
|
|
|
%% Penciller seeded not from disk, but by the in-memory ETS table and the
|
|
|
|
%% in-memory manifest.
|
|
|
|
|
|
|
|
%% To provide a snapshot the Penciller must snapshot the ETS table. The
|
|
|
|
%% snapshot of the ETS table is managed by the Penciller storing a list of the
|
|
|
|
%% batches of Keys which have been pushed to the Penciller, and it is expected
|
|
|
|
%% that this will be converted by the clone into a gb_tree. The clone may
|
|
|
|
%% then update the master Penciller with the gb_tree to be cached and used by
|
|
|
|
%% other cloned processes.
|
|
|
|
%%
|
|
|
|
%% Clones formed to support snapshots are registered by the Penciller, so that
|
|
|
|
%% SFT files valid at the point of the snapshot until either the iterator is
|
2016-08-02 17:51:43 +01:00
|
|
|
%% completed or has timed out.
|
|
|
|
%%
|
|
|
|
%% ---------- ON STARTUP ----------
|
|
|
|
%%
|
|
|
|
%% On Startup the Bookie with ask the Penciller to initiate the Ledger first.
|
|
|
|
%% To initiate the Ledger the must consult the manifest, and then start a SFT
|
|
|
|
%% management process for each file in the manifest.
|
|
|
|
%%
|
2016-08-15 16:43:39 +01:00
|
|
|
%% The penciller should then try and read any Level 0 file which has the
|
|
|
|
%% manifest sequence number one higher than the last store in the manifest.
|
2016-08-02 17:51:43 +01:00
|
|
|
%%
|
|
|
|
%% The Bookie will ask the Inker for any Keys seen beyond that sequence number
|
|
|
|
%% before the startup of the overall store can be completed.
|
|
|
|
%%
|
|
|
|
%% ---------- ON SHUTDOWN ----------
|
|
|
|
%%
|
|
|
|
%% On a controlled shutdown the Penciller should attempt to write any in-memory
|
2016-10-03 23:34:28 +01:00
|
|
|
%% ETS table to a L0 SFT file, assuming one is nto already pending. If one is
|
|
|
|
%% already pending then the Penciller will not persist this part of the Ledger.
|
2016-08-02 17:51:43 +01:00
|
|
|
%%
|
|
|
|
%% ---------- FOLDER STRUCTURE ----------
|
|
|
|
%%
|
|
|
|
%% The following folders are used by the Penciller
|
2016-10-03 23:34:28 +01:00
|
|
|
%% $ROOT/ledger/ledger_manifest/ - used for keeping manifest files
|
|
|
|
%% $ROOT/ledger/ledger_files/ - containing individual SFT files
|
2016-08-02 17:51:43 +01:00
|
|
|
%%
|
|
|
|
%% In larger stores there could be a large number of files in the ledger_file
|
|
|
|
%% folder - perhaps o(1000). It is assumed that modern file systems should
|
|
|
|
%% handle this efficiently.
|
|
|
|
%%
|
|
|
|
%% ---------- COMPACTION & MANIFEST UPDATES ----------
|
|
|
|
%%
|
|
|
|
%% The Penciller can have one and only one Clerk for performing compaction
|
|
|
|
%% work. When the Clerk has requested and taken work, it should perform the
|
|
|
|
%5 compaction work starting the new SFT process to manage the new Ledger state
|
|
|
|
%% and then write a new manifest file that represents that state with using
|
2016-08-09 16:09:29 +01:00
|
|
|
%% the next Manifest sequence number as the filename:
|
|
|
|
%% - nonzero_<ManifestSQN#>.pnd
|
2016-08-02 17:51:43 +01:00
|
|
|
%%
|
2016-08-09 16:09:29 +01:00
|
|
|
%% The Penciller on accepting the change should rename the manifest file to -
|
|
|
|
%% - nonzero_<ManifestSQN#>.crr
|
2016-08-02 17:51:43 +01:00
|
|
|
%%
|
2016-08-09 16:09:29 +01:00
|
|
|
%% On startup, the Penciller should look for the nonzero_*.crr file with the
|
|
|
|
%% highest such manifest sequence number.
|
2016-08-02 17:51:43 +01:00
|
|
|
%%
|
|
|
|
%% The pace at which the store can accept updates will be dependent on the
|
|
|
|
%% speed at which the Penciller's Clerk can merge files at lower levels plus
|
|
|
|
%% the time it takes to merge from Level 0. As if a clerk has commenced
|
|
|
|
%% compaction work at a lower level and then immediately a L0 SFT file is
|
|
|
|
%% written the Penciller will need to wait for this compaction work to
|
|
|
|
%% complete and the L0 file to be compacted before the ETS table can be
|
|
|
|
%% allowed to again reach capacity
|
2016-08-09 16:09:29 +01:00
|
|
|
%%
|
|
|
|
%% The writing of L0 files do not require the involvement of the clerk.
|
|
|
|
%% The L0 files are prompted directly by the penciller when the in-memory ets
|
|
|
|
%% table has reached capacity. When there is a next push into memory the
|
|
|
|
%% penciller calls to check that the file is now active (which may pause if the
|
|
|
|
%% write is ongoing the acceptence of the push), and if so it can clear the ets
|
|
|
|
%% table and build a new table starting with the remainder, and the keys from
|
|
|
|
%% the latest push.
|
|
|
|
%%
|
2016-09-21 18:31:42 +01:00
|
|
|
%% ---------- NOTES ON THE USE OF ETS ----------
|
|
|
|
%%
|
|
|
|
%% Insertion into ETS is very fast, and so using ETS does not slow the PUT
|
|
|
|
%% path. However, an ETS table is mutable, so it does complicate the
|
|
|
|
%% snapshotting of the Ledger.
|
|
|
|
%%
|
|
|
|
%% Some alternatives have been considered:
|
|
|
|
%%
|
|
|
|
%% A1 - Use gb_trees not ETS table
|
|
|
|
%% * Speed of inserts are too slow especially as the Bookie is blocked until
|
|
|
|
%% the insert is complete. Inserting 32K very simple keys takes 250ms. Only
|
|
|
|
%% the naive commands can be used, as Keys may be present - so not easy to
|
|
|
|
%% optimise. There is a lack of bulk operations
|
|
|
|
%%
|
|
|
|
%% A2 - Use some other structure other than gb_trees or ETS tables
|
|
|
|
%% * There is nothing else that will support iterators, so snapshots would
|
|
|
|
%% either need to do a conversion when they request the snapshot if
|
|
|
|
%% they need to iterate, or iterate through map functions scanning all the
|
|
|
|
%% keys. The conversion may not be expensive, as we know loading into an ETS
|
|
|
|
%% table is fast - but there may be some hidden overheads with creating and
|
|
|
|
%5 destroying many ETS tables.
|
|
|
|
%%
|
|
|
|
%% A3 - keep a parallel list of lists of things that have gone in the ETS
|
|
|
|
%% table in the format they arrived in
|
|
|
|
%% * There is doubling up of memory, and the snapshot must do some work to
|
|
|
|
%% make use of these lists. This combines the continued use of fast ETS
|
|
|
|
%% with the solution of A2 at a memory cost.
|
|
|
|
%%
|
|
|
|
%% A4 - Try and cache the conversion to be shared between snapshots registered
|
|
|
|
%% at the same Ledger SQN
|
|
|
|
%% * This is a rif on A2/A3, but if generally there is o(10) or o(100) seconds
|
|
|
|
%% between memory pushes, but much more frequent snapshots this may be more
|
|
|
|
%% efficient
|
|
|
|
%%
|
|
|
|
%% A5 - Produce a specific snapshot of the ETS table via an iterator on demand
|
|
|
|
%% for each snapshot
|
|
|
|
%% * So if a snapshot was required for na iterator, the Penciller would block
|
|
|
|
%% whilst it iterated over the ETS table first to produce a snapshot-specific
|
|
|
|
%% immutbale view. If the snapshot was required for a long-lived complete view
|
|
|
|
%% of the database the Penciller would block for a tab2list.
|
|
|
|
%%
|
|
|
|
%% A6 - Have snapshots incrementally create and share immutable trees, from a
|
|
|
|
%% parallel cache of changes
|
|
|
|
%% * This is a variance on A3. As changes are pushed to the Penciller in the
|
|
|
|
%% form of lists the Penciller updates a cache of the lists that are contained
|
|
|
|
%% in the current ETS table. These lists are returned to the snapshot when
|
|
|
|
%% the snapshot is registered. All snapshots it is assumed will convert these
|
|
|
|
%% lists into a gb_tree to use, but following that conversion they can cast
|
|
|
|
%% to the Penciller to refine the cache, so that the cache will become a
|
|
|
|
%% gb_tree up the ledger SQN at which the snapshot is registered, and now only
|
|
|
|
%% store new lists for subsequent updates. Future snapshot requests (before
|
|
|
|
%% the ets table is flushed) will now receive the array (if no changes have)
|
|
|
|
%% been made, or the array and only the lists needed to incrementally change
|
|
|
|
%% the array. If changes are infrequent, each snapshot request will pay the
|
|
|
|
%% full 20ms to 250ms cost of producing the array (although perhaps the clerk
|
|
|
|
%% could also update periodiclaly to avoid this). If changes are frequent,
|
|
|
|
%% the snapshot will generally not require to do a conversion, or will only
|
|
|
|
%% be required to do a small conversion
|
|
|
|
%%
|
|
|
|
%% A6 is the preferred option
|
2016-07-29 17:19:30 +01:00
|
|
|
|
2016-07-28 17:22:50 +01:00
|
|
|
|
2016-08-02 13:44:48 +01:00
|
|
|
-module(leveled_penciller).
|
2016-07-27 18:03:44 +01:00
|
|
|
|
2016-08-09 16:09:29 +01:00
|
|
|
-behaviour(gen_server).
|
|
|
|
|
|
|
|
-include("../include/leveled.hrl").
|
2016-07-27 18:03:44 +01:00
|
|
|
|
2016-08-09 16:09:29 +01:00
|
|
|
-export([init/1,
|
|
|
|
handle_call/3,
|
|
|
|
handle_cast/2,
|
|
|
|
handle_info/2,
|
|
|
|
terminate/2,
|
|
|
|
code_change/3,
|
|
|
|
pcl_start/1,
|
|
|
|
pcl_pushmem/2,
|
|
|
|
pcl_fetch/2,
|
2016-09-26 10:55:08 +01:00
|
|
|
pcl_checksequencenumber/3,
|
2016-08-09 16:09:29 +01:00
|
|
|
pcl_workforclerk/1,
|
|
|
|
pcl_requestmanifestchange/2,
|
2016-08-12 01:05:59 +01:00
|
|
|
pcl_confirmdelete/2,
|
2016-08-15 16:43:39 +01:00
|
|
|
pcl_prompt/1,
|
|
|
|
pcl_close/1,
|
2016-09-21 18:31:42 +01:00
|
|
|
pcl_registersnapshot/2,
|
|
|
|
pcl_updatesnapshotcache/3,
|
2016-09-23 18:50:29 +01:00
|
|
|
pcl_loadsnapshot/2,
|
2016-09-15 10:53:24 +01:00
|
|
|
pcl_getstartupsequencenumber/1,
|
2016-09-21 18:31:42 +01:00
|
|
|
roll_new_tree/3,
|
2016-09-15 10:53:24 +01:00
|
|
|
clean_testdir/1]).
|
2016-07-27 18:03:44 +01:00
|
|
|
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
|
2016-07-28 17:22:50 +01:00
|
|
|
-define(LEVEL_SCALEFACTOR, [{0, 0}, {1, 8}, {2, 64}, {3, 512},
|
|
|
|
{4, 4096}, {5, 32768}, {6, 262144}, {7, infinity}]).
|
2016-07-27 18:03:44 +01:00
|
|
|
-define(MAX_LEVELS, 8).
|
|
|
|
-define(MAX_WORK_WAIT, 300).
|
2016-08-02 17:51:43 +01:00
|
|
|
-define(MANIFEST_FP, "ledger_manifest").
|
|
|
|
-define(FILES_FP, "ledger_files").
|
|
|
|
-define(CURRENT_FILEX, "crr").
|
|
|
|
-define(PENDING_FILEX, "pnd").
|
2016-08-09 16:09:29 +01:00
|
|
|
-define(MEMTABLE, mem).
|
|
|
|
-define(MAX_TABLESIZE, 32000).
|
2016-08-16 12:45:48 +01:00
|
|
|
-define(PROMPT_WAIT_ONL0, 5).
|
|
|
|
-define(L0PEND_RESET, {false, null, null}).
|
2016-08-09 16:09:29 +01:00
|
|
|
|
2016-09-23 18:50:29 +01:00
|
|
|
-record(l0snapshot, {increments = [] :: list(),
|
2016-09-21 18:31:42 +01:00
|
|
|
tree = gb_trees:empty() :: gb_trees:tree(),
|
|
|
|
ledger_sqn = 0 :: integer()}).
|
|
|
|
|
2016-08-09 16:09:29 +01:00
|
|
|
-record(state, {manifest = [] :: list(),
|
|
|
|
ongoing_work = [] :: list(),
|
|
|
|
manifest_sqn = 0 :: integer(),
|
2016-08-15 16:43:39 +01:00
|
|
|
ledger_sqn = 0 :: integer(),
|
2016-09-21 18:31:42 +01:00
|
|
|
registered_snapshots = [] :: list(),
|
2016-08-09 16:09:29 +01:00
|
|
|
unreferenced_files = [] :: list(),
|
2016-08-12 01:05:59 +01:00
|
|
|
root_path = "../test" :: string(),
|
2016-08-09 16:09:29 +01:00
|
|
|
table_size = 0 :: integer(),
|
|
|
|
clerk :: pid(),
|
2016-08-16 12:45:48 +01:00
|
|
|
levelzero_pending = ?L0PEND_RESET :: tuple(),
|
2016-09-21 18:31:42 +01:00
|
|
|
memtable_copy = #l0snapshot{} :: #l0snapshot{},
|
2016-09-23 18:50:29 +01:00
|
|
|
levelzero_snapshot = gb_trees:empty() :: gb_trees:tree(),
|
2016-08-12 01:05:59 +01:00
|
|
|
memtable,
|
2016-09-08 14:21:30 +01:00
|
|
|
backlog = false :: boolean(),
|
2016-09-23 18:50:29 +01:00
|
|
|
memtable_maxsize :: integer(),
|
|
|
|
is_snapshot = false :: boolean(),
|
|
|
|
snapshot_fully_loaded = false :: boolean(),
|
|
|
|
source_penciller :: pid()}).
|
2016-08-09 16:09:29 +01:00
|
|
|
|
2016-09-21 18:31:42 +01:00
|
|
|
|
2016-08-09 16:09:29 +01:00
|
|
|
|
|
|
|
%%%============================================================================
|
|
|
|
%%% API
|
|
|
|
%%%============================================================================
|
2016-09-15 18:38:23 +01:00
|
|
|
|
2016-08-12 01:05:59 +01:00
|
|
|
|
2016-09-08 14:21:30 +01:00
|
|
|
pcl_start(PCLopts) ->
|
|
|
|
gen_server:start(?MODULE, [PCLopts], []).
|
2016-08-09 16:09:29 +01:00
|
|
|
|
|
|
|
pcl_pushmem(Pid, DumpList) ->
|
|
|
|
%% Bookie to dump memory onto penciller
|
|
|
|
gen_server:call(Pid, {push_mem, DumpList}, infinity).
|
|
|
|
|
|
|
|
pcl_fetch(Pid, Key) ->
|
|
|
|
gen_server:call(Pid, {fetch, Key}, infinity).
|
|
|
|
|
2016-09-26 10:55:08 +01:00
|
|
|
pcl_checksequencenumber(Pid, Key, SQN) ->
|
|
|
|
gen_server:call(Pid, {check_sqn, Key, SQN}, infinity).
|
|
|
|
|
2016-08-09 16:09:29 +01:00
|
|
|
pcl_workforclerk(Pid) ->
|
|
|
|
gen_server:call(Pid, work_for_clerk, infinity).
|
|
|
|
|
|
|
|
pcl_requestmanifestchange(Pid, WorkItem) ->
|
2016-08-15 16:43:39 +01:00
|
|
|
gen_server:call(Pid, {manifest_change, WorkItem}, infinity).
|
2016-08-10 13:02:08 +01:00
|
|
|
|
|
|
|
pcl_confirmdelete(Pid, FileName) ->
|
2016-09-21 18:31:42 +01:00
|
|
|
gen_server:call(Pid, {confirm_delete, FileName}, infinity).
|
2016-08-09 16:09:29 +01:00
|
|
|
|
2016-08-12 01:05:59 +01:00
|
|
|
pcl_prompt(Pid) ->
|
2016-09-21 18:31:42 +01:00
|
|
|
gen_server:call(Pid, prompt_compaction, infinity).
|
2016-08-12 01:05:59 +01:00
|
|
|
|
2016-08-15 16:43:39 +01:00
|
|
|
pcl_getstartupsequencenumber(Pid) ->
|
2016-09-21 18:31:42 +01:00
|
|
|
gen_server:call(Pid, get_startup_sqn, infinity).
|
|
|
|
|
|
|
|
pcl_registersnapshot(Pid, Snapshot) ->
|
|
|
|
gen_server:call(Pid, {register_snapshot, Snapshot}, infinity).
|
|
|
|
|
|
|
|
pcl_updatesnapshotcache(Pid, Tree, SQN) ->
|
|
|
|
gen_server:cast(Pid, {update_snapshotcache, Tree, SQN}).
|
2016-08-15 16:43:39 +01:00
|
|
|
|
2016-09-23 18:50:29 +01:00
|
|
|
pcl_loadsnapshot(Pid, Increment) ->
|
|
|
|
gen_server:call(Pid, {load_snapshot, Increment}, infinity).
|
|
|
|
|
2016-08-15 16:43:39 +01:00
|
|
|
pcl_close(Pid) ->
|
|
|
|
gen_server:call(Pid, close).
|
|
|
|
|
2016-09-21 18:31:42 +01:00
|
|
|
|
2016-08-09 16:09:29 +01:00
|
|
|
%%%============================================================================
|
|
|
|
%%% gen_server callbacks
|
|
|
|
%%%============================================================================
|
|
|
|
|
2016-09-08 14:21:30 +01:00
|
|
|
init([PCLopts]) ->
|
2016-09-23 18:50:29 +01:00
|
|
|
case {PCLopts#penciller_options.root_path,
|
|
|
|
PCLopts#penciller_options.start_snapshot} of
|
|
|
|
{undefined, true} ->
|
|
|
|
SrcPenciller = PCLopts#penciller_options.source_penciller,
|
2016-09-26 10:55:08 +01:00
|
|
|
{ok,
|
|
|
|
LedgerSQN,
|
|
|
|
Manifest,
|
|
|
|
MemTableCopy} = pcl_registersnapshot(SrcPenciller, self()),
|
2016-09-23 18:50:29 +01:00
|
|
|
|
|
|
|
{ok, #state{memtable_copy=MemTableCopy,
|
|
|
|
is_snapshot=true,
|
|
|
|
source_penciller=SrcPenciller,
|
|
|
|
manifest=Manifest,
|
|
|
|
ledger_sqn=LedgerSQN}};
|
|
|
|
%% Need to do something about timeout
|
|
|
|
{_RootPath, false} ->
|
2016-09-21 18:31:42 +01:00
|
|
|
start_from_file(PCLopts)
|
|
|
|
end.
|
2016-08-15 16:43:39 +01:00
|
|
|
|
2016-08-09 16:09:29 +01:00
|
|
|
|
2016-10-06 13:23:20 +01:00
|
|
|
handle_call({push_mem, DumpList}, From, State=#state{is_snapshot=Snap})
|
|
|
|
when Snap == false ->
|
|
|
|
% The process for pushing to memory is as follows
|
|
|
|
% - Check that the inbound list does not contain any Keys with a lower
|
|
|
|
% sequence number than any existing keys (assess_sqn/1)
|
|
|
|
% - Check that any file that had been sent to be written to L0 previously
|
|
|
|
% is now completed. If it is wipe out the in-memory view as this is now
|
|
|
|
% safely persisted. This will block waiting for this to complete if it
|
|
|
|
% hasn't (checkready_pushmem/1).
|
|
|
|
% - Quick check to see if there is a need to write a L0 file
|
|
|
|
% (quickcheck_pushmem/3). If there clearly isn't, then we can reply, and
|
|
|
|
% then add to memory in the background before updating the loop state
|
|
|
|
% - Push the update into memory (do_pushtomem/3)
|
|
|
|
% - If we haven't got through quickcheck now need to check if there is a
|
|
|
|
% definite need to write a new L0 file (roll_memory/2). If all clear this
|
|
|
|
% will write the file in the background and allow a response to the user.
|
|
|
|
% If not the change has still been made but the the L0 file will not have
|
|
|
|
% been prompted - so the reply does not indicate failure but returns the
|
|
|
|
% atom 'pause' to signal a loose desire for back-pressure to be applied.
|
|
|
|
% The only reason in this case why there should be a pause is if the
|
|
|
|
% manifest is locked pending completion of a manifest change - so reacting
|
|
|
|
% to the pause signal may not be sensible
|
|
|
|
StartWatch = os:timestamp(),
|
|
|
|
case assess_sqn(DumpList) of
|
|
|
|
{MinSQN, MaxSQN} when MaxSQN >= MinSQN,
|
|
|
|
MinSQN >= State#state.ledger_sqn ->
|
|
|
|
MaxTableSize = State#state.memtable_maxsize,
|
|
|
|
{TableSize0, State1} = checkready_pushtomem(State),
|
|
|
|
case quickcheck_pushtomem(DumpList,
|
|
|
|
TableSize0,
|
|
|
|
MaxTableSize) of
|
|
|
|
{twist, TableSize1} ->
|
|
|
|
gen_server:reply(From, ok),
|
|
|
|
io:format("Reply made on push in ~w microseconds~n",
|
|
|
|
[timer:now_diff(os:timestamp(), StartWatch)]),
|
|
|
|
L0Snap = do_pushtomem(DumpList,
|
|
|
|
State1#state.memtable,
|
|
|
|
State1#state.memtable_copy,
|
|
|
|
MaxSQN),
|
|
|
|
io:format("Push completed in ~w microseconds~n",
|
|
|
|
[timer:now_diff(os:timestamp(), StartWatch)]),
|
|
|
|
{noreply,
|
|
|
|
State1#state{memtable_copy=L0Snap,
|
|
|
|
table_size=TableSize1,
|
|
|
|
ledger_sqn=MaxSQN}};
|
|
|
|
{maybe_roll, TableSize1} ->
|
|
|
|
L0Snap = do_pushtomem(DumpList,
|
|
|
|
State1#state.memtable,
|
|
|
|
State1#state.memtable_copy,
|
|
|
|
MaxSQN),
|
|
|
|
|
|
|
|
case roll_memory(State1, MaxTableSize) of
|
|
|
|
{ok, L0Pend, ManSN, TableSize2} ->
|
|
|
|
io:format("Push completed in ~w microseconds~n",
|
|
|
|
[timer:now_diff(os:timestamp(), StartWatch)]),
|
|
|
|
{reply,
|
|
|
|
ok,
|
|
|
|
State1#state{levelzero_pending=L0Pend,
|
|
|
|
table_size=TableSize2,
|
|
|
|
manifest_sqn=ManSN,
|
|
|
|
memtable_copy=L0Snap,
|
|
|
|
ledger_sqn=MaxSQN,
|
|
|
|
backlog=false}};
|
|
|
|
{pause, Reason, Details} ->
|
|
|
|
io:format("Excess work due to - " ++ Reason,
|
|
|
|
Details),
|
|
|
|
{reply,
|
|
|
|
pause,
|
|
|
|
State1#state{backlog=true,
|
|
|
|
memtable_copy=L0Snap,
|
|
|
|
table_size=TableSize1,
|
|
|
|
ledger_sqn=MaxSQN}}
|
|
|
|
end
|
|
|
|
end;
|
|
|
|
{MinSQN, MaxSQN} ->
|
|
|
|
io:format("Mismatch of sequence number expectations with push "
|
|
|
|
++ "having sequence numbers between ~w and ~w "
|
|
|
|
++ "but current sequence number is ~w~n",
|
|
|
|
[MinSQN, MaxSQN, State#state.ledger_sqn]),
|
|
|
|
{reply, refused, State};
|
|
|
|
empty ->
|
|
|
|
io:format("Empty request pushed to Penciller~n"),
|
|
|
|
{reply, ok, State}
|
2016-09-26 10:55:08 +01:00
|
|
|
end;
|
2016-10-06 13:23:20 +01:00
|
|
|
handle_call({confirm_delete, FileName}, _From, State=#state{is_snapshot=Snap})
|
|
|
|
when Snap == false ->
|
|
|
|
Reply = confirm_delete(FileName,
|
|
|
|
State#state.unreferenced_files,
|
|
|
|
State#state.registered_snapshots),
|
|
|
|
case Reply of
|
2016-09-26 10:55:08 +01:00
|
|
|
true ->
|
2016-10-06 13:23:20 +01:00
|
|
|
UF1 = lists:keydelete(FileName, 1, State#state.unreferenced_files),
|
|
|
|
{reply, true, State#state{unreferenced_files=UF1}};
|
|
|
|
_ ->
|
|
|
|
{reply, Reply, State}
|
2016-09-26 10:55:08 +01:00
|
|
|
end;
|
2016-10-06 13:23:20 +01:00
|
|
|
handle_call(prompt_compaction, _From, State=#state{is_snapshot=Snap})
|
|
|
|
when Snap == false ->
|
|
|
|
%% If there is a prompt immediately after a L0 async write event then
|
|
|
|
%% there exists the potential for the prompt to stall the database.
|
|
|
|
%% Should only accept prompts if there has been a safe wait from the
|
|
|
|
%% last L0 write event.
|
|
|
|
Proceed = case State#state.levelzero_pending of
|
|
|
|
{true, _Pid, TS} ->
|
|
|
|
TD = timer:now_diff(os:timestamp(),TS),
|
|
|
|
if
|
|
|
|
TD < ?PROMPT_WAIT_ONL0 * 1000000 -> false;
|
|
|
|
true -> true
|
|
|
|
end;
|
|
|
|
?L0PEND_RESET ->
|
|
|
|
true
|
|
|
|
end,
|
2016-09-26 10:55:08 +01:00
|
|
|
if
|
2016-10-06 13:23:20 +01:00
|
|
|
Proceed ->
|
|
|
|
{_TableSize, State1} = checkready_pushtomem(State),
|
|
|
|
case roll_memory(State1, State1#state.memtable_maxsize) of
|
|
|
|
{ok, L0Pend, MSN, TableSize} ->
|
|
|
|
io:format("Prompted push completed~n"),
|
|
|
|
{reply, ok, State1#state{levelzero_pending=L0Pend,
|
|
|
|
table_size=TableSize,
|
|
|
|
manifest_sqn=MSN,
|
|
|
|
backlog=false}};
|
|
|
|
{pause, Reason, Details} ->
|
|
|
|
io:format("Excess work due to - " ++ Reason, Details),
|
|
|
|
{reply, pause, State1#state{backlog=true}}
|
|
|
|
end;
|
2016-09-26 10:55:08 +01:00
|
|
|
true ->
|
2016-10-06 13:23:20 +01:00
|
|
|
{reply, ok, State#state{backlog=false}}
|
2016-09-26 10:55:08 +01:00
|
|
|
end;
|
2016-10-06 13:23:20 +01:00
|
|
|
handle_call({manifest_change, WI}, _From, State=#state{is_snapshot=Snap})
|
|
|
|
when Snap == false ->
|
|
|
|
{ok, UpdState} = commit_manifest_change(WI, State),
|
|
|
|
{reply, ok, UpdState};
|
2016-10-07 10:04:48 +01:00
|
|
|
handle_call({fetch, Key}, _From, State=#state{is_snapshot=Snap})
|
|
|
|
when Snap == false ->
|
|
|
|
{reply,
|
|
|
|
fetch(Key,
|
|
|
|
State#state.manifest,
|
|
|
|
State#state.memtable),
|
|
|
|
State};
|
|
|
|
handle_call({check_sqn, Key, SQN}, _From, State=#state{is_snapshot=Snap})
|
|
|
|
when Snap == false ->
|
|
|
|
{reply,
|
|
|
|
compare_to_sqn(fetch(Key,
|
2016-09-26 10:55:08 +01:00
|
|
|
State#state.manifest,
|
2016-10-07 10:04:48 +01:00
|
|
|
State#state.memtable),
|
|
|
|
SQN),
|
|
|
|
State};
|
|
|
|
handle_call({fetch, Key},
|
|
|
|
_From,
|
|
|
|
State=#state{snapshot_fully_loaded=Ready})
|
|
|
|
when Ready == true ->
|
|
|
|
{reply,
|
|
|
|
fetch_snap(Key,
|
|
|
|
State#state.manifest,
|
|
|
|
State#state.levelzero_snapshot),
|
|
|
|
State};
|
|
|
|
handle_call({check_sqn, Key, SQN},
|
|
|
|
_From,
|
|
|
|
State=#state{snapshot_fully_loaded=Ready})
|
|
|
|
when Ready == true ->
|
|
|
|
{reply,
|
|
|
|
compare_to_sqn(fetch_snap(Key,
|
|
|
|
State#state.manifest,
|
|
|
|
State#state.levelzero_snapshot),
|
|
|
|
SQN),
|
|
|
|
State};
|
2016-09-26 10:55:08 +01:00
|
|
|
handle_call(work_for_clerk, From, State) ->
|
|
|
|
{UpdState, Work} = return_work(State, From),
|
|
|
|
{reply, {Work, UpdState#state.backlog}, UpdState};
|
|
|
|
handle_call(get_startup_sqn, _From, State) ->
|
|
|
|
{reply, State#state.ledger_sqn, State};
|
|
|
|
handle_call({register_snapshot, Snapshot}, _From, State) ->
|
|
|
|
Rs = [{Snapshot, State#state.ledger_sqn}|State#state.registered_snapshots],
|
|
|
|
{reply,
|
|
|
|
{ok,
|
|
|
|
State#state.ledger_sqn,
|
|
|
|
State#state.manifest,
|
|
|
|
State#state.memtable_copy},
|
|
|
|
State#state{registered_snapshots = Rs}};
|
|
|
|
handle_call({load_snapshot, Increment}, _From, State) ->
|
|
|
|
MemTableCopy = State#state.memtable_copy,
|
|
|
|
{Tree0, TreeSQN0} = roll_new_tree(MemTableCopy#l0snapshot.tree,
|
|
|
|
MemTableCopy#l0snapshot.increments,
|
|
|
|
MemTableCopy#l0snapshot.ledger_sqn),
|
|
|
|
if
|
|
|
|
TreeSQN0 > MemTableCopy#l0snapshot.ledger_sqn ->
|
|
|
|
pcl_updatesnapshotcache(State#state.source_penciller,
|
|
|
|
Tree0,
|
|
|
|
TreeSQN0)
|
|
|
|
end,
|
|
|
|
{Tree1, TreeSQN1} = roll_new_tree(Tree0, [Increment], TreeSQN0),
|
|
|
|
io:format("Snapshot loaded to start at SQN~w~n", [TreeSQN1]),
|
|
|
|
{reply, ok, State#state{levelzero_snapshot=Tree1,
|
|
|
|
ledger_sqn=TreeSQN1,
|
|
|
|
snapshot_fully_loaded=true}};
|
|
|
|
handle_call(close, _From, State) ->
|
|
|
|
{stop, normal, ok, State}.
|
|
|
|
|
|
|
|
handle_cast({update_snapshotcache, Tree, SQN}, State) ->
|
|
|
|
MemTableC = cache_tree_in_memcopy(State#state.memtable_copy, Tree, SQN),
|
|
|
|
{noreply, State#state{memtable_copy=MemTableC}};
|
|
|
|
handle_cast(_Msg, State) ->
|
|
|
|
{noreply, State}.
|
|
|
|
|
|
|
|
handle_info(_Info, State) ->
|
|
|
|
{noreply, State}.
|
|
|
|
|
2016-10-07 10:04:48 +01:00
|
|
|
terminate(_Reason, _State=#state{is_snapshot=Snap}) when Snap == true ->
|
|
|
|
ok;
|
2016-09-26 10:55:08 +01:00
|
|
|
terminate(_Reason, State) ->
|
|
|
|
%% When a Penciller shuts down it isn't safe to try an manage the safe
|
|
|
|
%% finishing of any outstanding work. The last commmitted manifest will
|
|
|
|
%% be used.
|
|
|
|
%%
|
|
|
|
%% Level 0 files lie outside of the manifest, and so if there is no L0
|
|
|
|
%% file present it is safe to write the current contents of memory. If
|
|
|
|
%% there is a L0 file present - then the memory can be dropped (it is
|
|
|
|
%% recoverable from the ledger, and there should not be a lot to recover
|
|
|
|
%% as presumably the ETS file has been recently flushed, hence the presence
|
|
|
|
%% of a L0 file).
|
|
|
|
%%
|
|
|
|
%% The penciller should close each file in the unreferenced files, and
|
|
|
|
%% then each file in the manifest, and cast a close on the clerk.
|
|
|
|
%% The cast may not succeed as the clerk could be synchronously calling
|
|
|
|
%% the penciller looking for a manifest commit
|
|
|
|
%%
|
2016-10-07 10:04:48 +01:00
|
|
|
leveled_pclerk:clerk_stop(State#state.clerk),
|
|
|
|
Dump = ets:tab2list(State#state.memtable),
|
|
|
|
case {State#state.levelzero_pending,
|
|
|
|
get_item(0, State#state.manifest, []), length(Dump)} of
|
|
|
|
{?L0PEND_RESET, [], L} when L > 0 ->
|
|
|
|
MSN = State#state.manifest_sqn + 1,
|
|
|
|
FileName = State#state.root_path
|
|
|
|
++ "/" ++ ?FILES_FP ++ "/"
|
|
|
|
++ integer_to_list(MSN) ++ "_0_0",
|
|
|
|
NewSFT = leveled_sft:sft_new(FileName ++ ".pnd",
|
|
|
|
Dump,
|
|
|
|
[],
|
|
|
|
0),
|
|
|
|
{ok, L0Pid, {{[], []}, _SK, _HK}} = NewSFT,
|
|
|
|
io:format("Dump of memory on close to filename ~s~n",
|
|
|
|
[FileName]),
|
|
|
|
leveled_sft:sft_close(L0Pid),
|
|
|
|
file:rename(FileName ++ ".pnd", FileName ++ ".sft");
|
|
|
|
{?L0PEND_RESET, [], L} when L == 0 ->
|
|
|
|
io:format("No keys to dump from memory when closing~n");
|
|
|
|
{{true, L0Pid, _TS}, _, _} ->
|
|
|
|
leveled_sft:sft_close(L0Pid),
|
|
|
|
io:format("No opportunity to persist memory before closing"
|
|
|
|
++ " with ~w keys discarded~n",
|
|
|
|
[length(Dump)]);
|
|
|
|
_ ->
|
|
|
|
io:format("No opportunity to persist memory before closing"
|
|
|
|
++ " with ~w keys discarded~n",
|
|
|
|
[length(Dump)])
|
|
|
|
end,
|
|
|
|
ok = close_files(0, State#state.manifest),
|
|
|
|
lists:foreach(fun({_FN, Pid, _SN}) ->
|
|
|
|
leveled_sft:sft_close(Pid) end,
|
|
|
|
State#state.unreferenced_files),
|
|
|
|
ok.
|
2016-09-26 10:55:08 +01:00
|
|
|
|
|
|
|
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
|
|
{ok, State}.
|
|
|
|
|
|
|
|
|
2016-08-09 16:09:29 +01:00
|
|
|
%%%============================================================================
|
|
|
|
%%% Internal functions
|
|
|
|
%%%============================================================================
|
|
|
|
|
2016-09-21 18:31:42 +01:00
|
|
|
start_from_file(PCLopts) ->
|
|
|
|
RootPath = PCLopts#penciller_options.root_path,
|
|
|
|
MaxTableSize = case PCLopts#penciller_options.max_inmemory_tablesize of
|
|
|
|
undefined ->
|
|
|
|
?MAX_TABLESIZE;
|
|
|
|
M ->
|
|
|
|
M
|
|
|
|
end,
|
|
|
|
TID = ets:new(?MEMTABLE, [ordered_set]),
|
|
|
|
{ok, Clerk} = leveled_pclerk:clerk_new(self()),
|
|
|
|
InitState = #state{memtable=TID,
|
|
|
|
clerk=Clerk,
|
|
|
|
root_path=RootPath,
|
|
|
|
memtable_maxsize=MaxTableSize},
|
|
|
|
|
|
|
|
%% Open manifest
|
|
|
|
ManifestPath = InitState#state.root_path ++ "/" ++ ?MANIFEST_FP ++ "/",
|
|
|
|
{ok, Filenames} = case filelib:is_dir(ManifestPath) of
|
|
|
|
true ->
|
|
|
|
file:list_dir(ManifestPath);
|
|
|
|
false ->
|
|
|
|
{ok, []}
|
|
|
|
end,
|
|
|
|
CurrRegex = "nonzero_(?<MSN>[0-9]+)\\." ++ ?CURRENT_FILEX,
|
|
|
|
ValidManSQNs = lists:foldl(fun(FN, Acc) ->
|
|
|
|
case re:run(FN,
|
|
|
|
CurrRegex,
|
|
|
|
[{capture, ['MSN'], list}]) of
|
|
|
|
nomatch ->
|
|
|
|
Acc;
|
|
|
|
{match, [Int]} when is_list(Int) ->
|
|
|
|
Acc ++ [list_to_integer(Int)];
|
|
|
|
_ ->
|
|
|
|
Acc
|
|
|
|
end end,
|
|
|
|
[],
|
|
|
|
Filenames),
|
|
|
|
TopManSQN = lists:foldl(fun(X, MaxSQN) -> max(X, MaxSQN) end,
|
|
|
|
0,
|
|
|
|
ValidManSQNs),
|
|
|
|
io:format("Store to be started based on " ++
|
|
|
|
"manifest sequence number of ~w~n", [TopManSQN]),
|
|
|
|
case TopManSQN of
|
|
|
|
0 ->
|
|
|
|
io:format("Seqence number of 0 indicates no valid manifest~n"),
|
|
|
|
{ok, InitState};
|
|
|
|
_ ->
|
|
|
|
{ok, Bin} = file:read_file(filepath(InitState#state.root_path,
|
|
|
|
TopManSQN,
|
|
|
|
current_manifest)),
|
|
|
|
Manifest = binary_to_term(Bin),
|
|
|
|
{UpdManifest, MaxSQN} = open_all_filesinmanifest(Manifest),
|
|
|
|
io:format("Maximum sequence number of ~w "
|
|
|
|
++ "found in nonzero levels~n",
|
|
|
|
[MaxSQN]),
|
|
|
|
|
|
|
|
%% Find any L0 files
|
|
|
|
L0FN = filepath(RootPath,
|
|
|
|
TopManSQN + 1,
|
|
|
|
new_merge_files) ++ "_0_0.sft",
|
|
|
|
case filelib:is_file(L0FN) of
|
|
|
|
true ->
|
|
|
|
io:format("L0 file found ~s~n", [L0FN]),
|
|
|
|
{ok,
|
|
|
|
L0Pid,
|
|
|
|
{L0StartKey, L0EndKey}} = leveled_sft:sft_open(L0FN),
|
|
|
|
L0SQN = leveled_sft:sft_getmaxsequencenumber(L0Pid),
|
|
|
|
ManifestEntry = #manifest_entry{start_key=L0StartKey,
|
|
|
|
end_key=L0EndKey,
|
|
|
|
owner=L0Pid,
|
|
|
|
filename=L0FN},
|
|
|
|
UpdManifest2 = lists:keystore(0,
|
|
|
|
1,
|
|
|
|
UpdManifest,
|
|
|
|
{0, [ManifestEntry]}),
|
|
|
|
io:format("L0 file had maximum sequence number of ~w~n",
|
|
|
|
[L0SQN]),
|
|
|
|
{ok,
|
|
|
|
InitState#state{manifest=UpdManifest2,
|
|
|
|
manifest_sqn=TopManSQN,
|
|
|
|
ledger_sqn=max(MaxSQN, L0SQN)}};
|
|
|
|
false ->
|
|
|
|
io:format("No L0 file found~n"),
|
|
|
|
{ok,
|
|
|
|
InitState#state{manifest=UpdManifest,
|
|
|
|
manifest_sqn=TopManSQN,
|
|
|
|
ledger_sqn=MaxSQN}}
|
|
|
|
end
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
|
|
checkready_pushtomem(State) ->
|
2016-08-12 01:05:59 +01:00
|
|
|
{TableSize, UpdState} = case State#state.levelzero_pending of
|
2016-08-16 12:45:48 +01:00
|
|
|
{true, Pid, _TS} ->
|
2016-08-15 16:43:39 +01:00
|
|
|
%% Need to handle error scenarios?
|
2016-08-12 01:05:59 +01:00
|
|
|
%% N.B. Sync call - so will be ready
|
2016-08-16 12:45:48 +01:00
|
|
|
{ok, SrcFN, StartKey, EndKey} = leveled_sft:sft_checkready(Pid),
|
2016-08-12 01:05:59 +01:00
|
|
|
true = ets:delete_all_objects(State#state.memtable),
|
|
|
|
ManifestEntry = #manifest_entry{start_key=StartKey,
|
|
|
|
end_key=EndKey,
|
|
|
|
owner=Pid,
|
|
|
|
filename=SrcFN},
|
2016-08-16 12:45:48 +01:00
|
|
|
{0,
|
2016-08-12 01:05:59 +01:00
|
|
|
State#state{manifest=lists:keystore(0,
|
|
|
|
1,
|
|
|
|
State#state.manifest,
|
|
|
|
{0, [ManifestEntry]}),
|
2016-09-05 15:01:23 +01:00
|
|
|
levelzero_pending=?L0PEND_RESET,
|
2016-09-21 18:31:42 +01:00
|
|
|
memtable_copy=#l0snapshot{}}};
|
2016-08-16 12:45:48 +01:00
|
|
|
?L0PEND_RESET ->
|
2016-08-12 01:05:59 +01:00
|
|
|
{State#state.table_size, State}
|
|
|
|
end,
|
|
|
|
|
|
|
|
%% Prompt clerk to ask about work - do this for every push_mem
|
2016-09-20 16:13:36 +01:00
|
|
|
ok = leveled_pclerk:clerk_prompt(UpdState#state.clerk, penciller),
|
2016-09-21 18:31:42 +01:00
|
|
|
{TableSize, UpdState}.
|
|
|
|
|
|
|
|
quickcheck_pushtomem(DumpList, TableSize, MaxSize) ->
|
|
|
|
case TableSize + length(DumpList) of
|
|
|
|
ApproxTableSize when ApproxTableSize > MaxSize ->
|
|
|
|
{maybe_roll, ApproxTableSize};
|
|
|
|
ApproxTableSize ->
|
|
|
|
io:format("Table size is approximately ~w~n", [ApproxTableSize]),
|
|
|
|
{twist, ApproxTableSize}
|
|
|
|
end.
|
|
|
|
|
|
|
|
do_pushtomem(DumpList, MemTable, Snapshot, MaxSQN) ->
|
|
|
|
SW = os:timestamp(),
|
|
|
|
UpdSnapshot = add_increment_to_memcopy(Snapshot, MaxSQN, DumpList),
|
|
|
|
ets:insert(MemTable, DumpList),
|
|
|
|
io:format("Push into memory timed at ~w microseconds~n",
|
|
|
|
[timer:now_diff(os:timestamp(), SW)]),
|
|
|
|
UpdSnapshot.
|
|
|
|
|
|
|
|
roll_memory(State, MaxSize) ->
|
|
|
|
case ets:info(State#state.memtable, size) of
|
|
|
|
Size when Size > MaxSize ->
|
|
|
|
L0 = get_item(0, State#state.manifest, []),
|
|
|
|
case {L0, manifest_locked(State)} of
|
2016-08-12 01:05:59 +01:00
|
|
|
{[], false} ->
|
2016-09-21 18:31:42 +01:00
|
|
|
MSN = State#state.manifest_sqn + 1,
|
|
|
|
FileName = State#state.root_path
|
2016-08-12 01:05:59 +01:00
|
|
|
++ "/" ++ ?FILES_FP ++ "/"
|
|
|
|
++ integer_to_list(MSN) ++ "_0_0",
|
2016-08-16 12:45:48 +01:00
|
|
|
Opts = #sft_options{wait=false},
|
|
|
|
{ok, L0Pid} = leveled_sft:sft_new(FileName,
|
2016-09-21 18:31:42 +01:00
|
|
|
State#state.memtable,
|
2016-08-16 12:45:48 +01:00
|
|
|
[],
|
|
|
|
0,
|
|
|
|
Opts),
|
2016-09-21 18:31:42 +01:00
|
|
|
{ok, {true, L0Pid, os:timestamp()}, MSN, Size};
|
2016-08-12 01:05:59 +01:00
|
|
|
{[], true} ->
|
2016-09-21 18:31:42 +01:00
|
|
|
{pause,
|
|
|
|
"L0 file write blocked by change at sqn=~w~n",
|
|
|
|
[State#state.manifest_sqn]};
|
2016-08-12 01:05:59 +01:00
|
|
|
_ ->
|
2016-09-21 18:31:42 +01:00
|
|
|
{pause,
|
2016-08-12 01:05:59 +01:00
|
|
|
"L0 file write blocked by L0 file in manifest~n",
|
2016-09-21 18:31:42 +01:00
|
|
|
[]}
|
|
|
|
end;
|
|
|
|
Size ->
|
|
|
|
{ok, ?L0PEND_RESET, State#state.manifest_sqn, Size}
|
2016-08-12 01:05:59 +01:00
|
|
|
end.
|
2016-08-09 16:09:29 +01:00
|
|
|
|
2016-09-21 18:31:42 +01:00
|
|
|
|
2016-09-26 10:55:08 +01:00
|
|
|
fetch_snap(Key, Manifest, Tree) ->
|
|
|
|
case gb_trees:lookup(Key, Tree) of
|
|
|
|
{value, Value} ->
|
|
|
|
{Key, Value};
|
|
|
|
none ->
|
|
|
|
fetch(Key, Manifest, 0, fun leveled_sft:sft_get/2)
|
|
|
|
end.
|
|
|
|
|
2016-08-09 16:09:29 +01:00
|
|
|
fetch(Key, Manifest, TID) ->
|
|
|
|
case ets:lookup(TID, Key) of
|
|
|
|
[Object] ->
|
|
|
|
Object;
|
|
|
|
[] ->
|
|
|
|
fetch(Key, Manifest, 0, fun leveled_sft:sft_get/2)
|
|
|
|
end.
|
|
|
|
|
|
|
|
fetch(_Key, _Manifest, ?MAX_LEVELS + 1, _FetchFun) ->
|
|
|
|
not_present;
|
|
|
|
fetch(Key, Manifest, Level, FetchFun) ->
|
|
|
|
LevelManifest = get_item(Level, Manifest, []),
|
|
|
|
case lists:foldl(fun(File, Acc) ->
|
|
|
|
case Acc of
|
|
|
|
not_present when
|
|
|
|
Key >= File#manifest_entry.start_key,
|
|
|
|
File#manifest_entry.end_key >= Key ->
|
|
|
|
File#manifest_entry.owner;
|
|
|
|
PidFound ->
|
|
|
|
PidFound
|
|
|
|
end end,
|
|
|
|
not_present,
|
|
|
|
LevelManifest) of
|
|
|
|
not_present ->
|
|
|
|
fetch(Key, Manifest, Level + 1, FetchFun);
|
|
|
|
FileToCheck ->
|
|
|
|
case FetchFun(FileToCheck, Key) of
|
|
|
|
not_present ->
|
|
|
|
fetch(Key, Manifest, Level + 1, FetchFun);
|
|
|
|
ObjectFound ->
|
|
|
|
ObjectFound
|
|
|
|
end
|
|
|
|
end.
|
2016-09-26 10:55:08 +01:00
|
|
|
|
2016-08-09 16:09:29 +01:00
|
|
|
|
2016-10-07 10:04:48 +01:00
|
|
|
compare_to_sqn(Obj, SQN) ->
|
|
|
|
case Obj of
|
|
|
|
not_present ->
|
|
|
|
false;
|
|
|
|
Obj ->
|
|
|
|
SQNToCompare = leveled_bookie:strip_to_seqonly(Obj),
|
|
|
|
if
|
|
|
|
SQNToCompare > SQN ->
|
|
|
|
false;
|
|
|
|
true ->
|
|
|
|
true
|
|
|
|
end
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
2016-08-12 01:05:59 +01:00
|
|
|
%% Manifest lock - don't have two changes to the manifest happening
|
|
|
|
%% concurrently
|
|
|
|
|
|
|
|
manifest_locked(State) ->
|
|
|
|
if
|
|
|
|
length(State#state.ongoing_work) > 0 ->
|
|
|
|
true;
|
|
|
|
true ->
|
|
|
|
case State#state.levelzero_pending of
|
2016-08-16 12:45:48 +01:00
|
|
|
{true, _Pid, _TS} ->
|
2016-08-12 01:05:59 +01:00
|
|
|
true;
|
|
|
|
_ ->
|
|
|
|
false
|
|
|
|
end
|
|
|
|
end.
|
|
|
|
|
2016-07-27 18:03:44 +01:00
|
|
|
%% Work out what the current work queue should be
|
|
|
|
%%
|
|
|
|
%% The work queue should have a lower level work at the front, and no work
|
|
|
|
%% should be added to the queue if a compaction worker has already been asked
|
|
|
|
%% to look at work at that level
|
2016-08-16 12:45:48 +01:00
|
|
|
%%
|
|
|
|
%% The full queue is calculated for logging purposes only
|
2016-07-27 18:03:44 +01:00
|
|
|
|
|
|
|
return_work(State, From) ->
|
2016-08-09 16:09:29 +01:00
|
|
|
WorkQueue = assess_workqueue([],
|
|
|
|
0,
|
|
|
|
State#state.manifest),
|
|
|
|
case length(WorkQueue) of
|
|
|
|
L when L > 0 ->
|
|
|
|
[{SrcLevel, Manifest}|OtherWork] = WorkQueue,
|
2016-08-12 01:05:59 +01:00
|
|
|
io:format("Work at Level ~w to be scheduled for ~w with ~w " ++
|
|
|
|
"queue items outstanding~n",
|
2016-08-09 16:09:29 +01:00
|
|
|
[SrcLevel, From, length(OtherWork)]),
|
2016-08-12 01:05:59 +01:00
|
|
|
case {manifest_locked(State), State#state.ongoing_work} of
|
|
|
|
{false, _} ->
|
2016-08-09 16:09:29 +01:00
|
|
|
%% No work currently outstanding
|
|
|
|
%% Can allocate work
|
|
|
|
NextSQN = State#state.manifest_sqn + 1,
|
|
|
|
FP = filepath(State#state.root_path,
|
|
|
|
NextSQN,
|
|
|
|
new_merge_files),
|
|
|
|
ManFile = filepath(State#state.root_path,
|
|
|
|
NextSQN,
|
|
|
|
pending_manifest),
|
|
|
|
WI = #penciller_work{next_sqn=NextSQN,
|
|
|
|
clerk=From,
|
|
|
|
src_level=SrcLevel,
|
|
|
|
manifest=Manifest,
|
|
|
|
start_time = os:timestamp(),
|
|
|
|
ledger_filepath = FP,
|
|
|
|
manifest_file = ManFile},
|
|
|
|
{State#state{ongoing_work=[WI]}, WI};
|
2016-08-12 01:05:59 +01:00
|
|
|
{true, [OutstandingWork]} ->
|
2016-08-09 16:09:29 +01:00
|
|
|
%% Still awaiting a response
|
2016-08-12 01:05:59 +01:00
|
|
|
io:format("Ongoing work requested by ~w " ++
|
|
|
|
"but work outstanding from Level ~w " ++
|
|
|
|
"and Clerk ~w at sequence number ~w~n",
|
2016-08-09 16:09:29 +01:00
|
|
|
[From,
|
|
|
|
OutstandingWork#penciller_work.src_level,
|
|
|
|
OutstandingWork#penciller_work.clerk,
|
|
|
|
OutstandingWork#penciller_work.next_sqn]),
|
2016-08-12 01:05:59 +01:00
|
|
|
{State, none};
|
|
|
|
{true, _} ->
|
|
|
|
%% Manifest locked
|
|
|
|
io:format("Manifest locked but no work outstanding " ++
|
|
|
|
"with clerk~n"),
|
2016-08-02 17:51:43 +01:00
|
|
|
{State, none}
|
|
|
|
end;
|
2016-08-09 16:09:29 +01:00
|
|
|
_ ->
|
2016-07-27 18:03:44 +01:00
|
|
|
{State, none}
|
|
|
|
end.
|
|
|
|
|
2016-09-23 18:50:29 +01:00
|
|
|
|
|
|
|
%% This takes the three parts of a memtable copy - the increments, the tree
|
|
|
|
%% and the SQN at which the tree was formed, and outputs a new tree
|
|
|
|
|
|
|
|
roll_new_tree(Tree, [], HighSQN) ->
|
|
|
|
{Tree, HighSQN};
|
|
|
|
roll_new_tree(Tree, [{SQN, KVList}|TailIncs], HighSQN) when SQN >= HighSQN ->
|
|
|
|
UpdTree = lists:foldl(fun({K, V}, TreeAcc) ->
|
|
|
|
gb_trees:enter(K, V, TreeAcc) end,
|
|
|
|
Tree,
|
|
|
|
KVList),
|
2016-09-26 10:55:08 +01:00
|
|
|
roll_new_tree(UpdTree, TailIncs, SQN);
|
|
|
|
roll_new_tree(Tree, [_H|TailIncs], HighSQN) ->
|
|
|
|
roll_new_tree(Tree, TailIncs, HighSQN).
|
2016-09-23 18:50:29 +01:00
|
|
|
|
2016-09-21 18:31:42 +01:00
|
|
|
%% Update the memtable copy if the tree created advances the SQN
|
|
|
|
cache_tree_in_memcopy(MemCopy, Tree, SQN) ->
|
|
|
|
case MemCopy#l0snapshot.ledger_sqn of
|
|
|
|
CurrentSQN when SQN > CurrentSQN ->
|
|
|
|
% Discard any merged increments
|
|
|
|
io:format("Updating cache with new tree at SQN=~w~n", [SQN]),
|
|
|
|
Incs = lists:foldl(fun({PushSQN, PushL}, Acc) ->
|
|
|
|
if
|
|
|
|
SQN >= PushSQN ->
|
|
|
|
Acc;
|
|
|
|
true ->
|
|
|
|
Acc ++ {PushSQN, PushL}
|
|
|
|
end end,
|
|
|
|
[],
|
|
|
|
MemCopy#l0snapshot.increments),
|
|
|
|
#l0snapshot{ledger_sqn = SQN,
|
|
|
|
increments = Incs,
|
|
|
|
tree = Tree};
|
|
|
|
_ ->
|
|
|
|
MemCopy
|
|
|
|
end.
|
|
|
|
|
|
|
|
add_increment_to_memcopy(MemCopy, SQN, KVList) ->
|
|
|
|
Incs = MemCopy#l0snapshot.increments ++ [{SQN, KVList}],
|
|
|
|
MemCopy#l0snapshot{increments=Incs}.
|
|
|
|
|
2016-08-15 16:43:39 +01:00
|
|
|
close_files(?MAX_LEVELS - 1, _Manifest) ->
|
|
|
|
ok;
|
|
|
|
close_files(Level, Manifest) ->
|
|
|
|
LevelList = get_item(Level, Manifest, []),
|
|
|
|
lists:foreach(fun(F) -> leveled_sft:sft_close(F#manifest_entry.owner) end,
|
|
|
|
LevelList),
|
|
|
|
close_files(Level + 1, Manifest).
|
|
|
|
|
|
|
|
|
|
|
|
open_all_filesinmanifest(Manifest) ->
|
|
|
|
open_all_filesinmanifest({Manifest, 0}, 0).
|
|
|
|
|
|
|
|
open_all_filesinmanifest(Result, ?MAX_LEVELS - 1) ->
|
|
|
|
Result;
|
|
|
|
open_all_filesinmanifest({Manifest, TopSQN}, Level) ->
|
|
|
|
LevelList = get_item(Level, Manifest, []),
|
|
|
|
%% The Pids in the saved manifest related to now closed references
|
|
|
|
%% Need to roll over the manifest at this level starting new processes to
|
|
|
|
%5 replace them
|
|
|
|
LvlR = lists:foldl(fun(F, {FL, FL_SQN}) ->
|
|
|
|
FN = F#manifest_entry.filename,
|
|
|
|
{ok, P, _Keys} = leveled_sft:sft_open(FN),
|
|
|
|
F_SQN = leveled_sft:sft_getmaxsequencenumber(P),
|
|
|
|
{lists:append(FL,
|
|
|
|
[F#manifest_entry{owner = P}]),
|
|
|
|
max(FL_SQN, F_SQN)}
|
|
|
|
end,
|
|
|
|
{[], 0},
|
|
|
|
LevelList),
|
|
|
|
%% Result is tuple of revised file list for this level in manifest, and
|
|
|
|
%% the maximum sequence number seen at this level
|
|
|
|
{LvlFL, LvlSQN} = LvlR,
|
|
|
|
UpdManifest = lists:keystore(Level, 1, Manifest, {Level, LvlFL}),
|
|
|
|
open_all_filesinmanifest({UpdManifest, max(TopSQN, LvlSQN)}, Level + 1).
|
2016-08-09 16:09:29 +01:00
|
|
|
|
2016-10-05 09:54:53 +01:00
|
|
|
print_manifest(Manifest) ->
|
|
|
|
lists:foreach(fun(L) ->
|
|
|
|
io:format("Manifest at Level ~w~n", [L]),
|
|
|
|
Level = get_item(L, Manifest, []),
|
|
|
|
lists:foreach(fun(M) ->
|
|
|
|
{_, SB, SK} = M#manifest_entry.start_key,
|
|
|
|
{_, EB, EK} = M#manifest_entry.end_key,
|
|
|
|
io:format("Manifest entry of " ++
|
|
|
|
"startkey ~s ~s " ++
|
|
|
|
"endkey ~s ~s " ++
|
|
|
|
"filename=~s~n",
|
|
|
|
[SB, SK, EB, EK,
|
|
|
|
M#manifest_entry.filename])
|
|
|
|
end,
|
|
|
|
Level)
|
|
|
|
end,
|
|
|
|
lists:seq(1, ?MAX_LEVELS - 1)).
|
|
|
|
|
|
|
|
|
2016-08-09 16:09:29 +01:00
|
|
|
assess_workqueue(WorkQ, ?MAX_LEVELS - 1, _Manifest) ->
|
2016-07-27 18:03:44 +01:00
|
|
|
WorkQ;
|
2016-08-09 16:09:29 +01:00
|
|
|
assess_workqueue(WorkQ, LevelToAssess, Manifest)->
|
2016-07-28 17:22:50 +01:00
|
|
|
MaxFiles = get_item(LevelToAssess, ?LEVEL_SCALEFACTOR, 0),
|
2016-08-02 17:51:43 +01:00
|
|
|
FileCount = length(get_item(LevelToAssess, Manifest, [])),
|
|
|
|
NewWQ = maybe_append_work(WorkQ, LevelToAssess, Manifest, MaxFiles,
|
2016-08-09 16:09:29 +01:00
|
|
|
FileCount),
|
|
|
|
assess_workqueue(NewWQ, LevelToAssess + 1, Manifest).
|
2016-07-27 18:03:44 +01:00
|
|
|
|
|
|
|
|
2016-08-02 17:51:43 +01:00
|
|
|
maybe_append_work(WorkQ, Level, Manifest,
|
2016-08-09 16:09:29 +01:00
|
|
|
MaxFiles, FileCount)
|
2016-07-27 18:03:44 +01:00
|
|
|
when FileCount > MaxFiles ->
|
|
|
|
io:format("Outstanding compaction work items of ~w at level ~w~n",
|
|
|
|
[FileCount - MaxFiles, Level]),
|
2016-08-09 16:09:29 +01:00
|
|
|
lists:append(WorkQ, [{Level, Manifest}]);
|
2016-08-12 01:05:59 +01:00
|
|
|
maybe_append_work(WorkQ, _Level, _Manifest,
|
|
|
|
_MaxFiles, _FileCount) ->
|
2016-07-27 18:03:44 +01:00
|
|
|
WorkQ.
|
|
|
|
|
|
|
|
|
2016-07-28 17:22:50 +01:00
|
|
|
get_item(Index, List, Default) ->
|
|
|
|
case lists:keysearch(Index, 1, List) of
|
|
|
|
{value, {Index, Value}} ->
|
|
|
|
Value;
|
|
|
|
false ->
|
|
|
|
Default
|
|
|
|
end.
|
2016-07-27 18:03:44 +01:00
|
|
|
|
|
|
|
|
|
|
|
%% Request a manifest change
|
2016-08-09 16:09:29 +01:00
|
|
|
%% The clerk should have completed the work, and created a new manifest
|
|
|
|
%% and persisted the new view of the manifest
|
|
|
|
%%
|
|
|
|
%% To complete the change of manifest:
|
|
|
|
%% - the state of the manifest file needs to be changed from pending to current
|
|
|
|
%% - the list of unreferenced files needs to be updated on State
|
|
|
|
%% - the current manifest needs to be update don State
|
|
|
|
%% - the list of ongoing work needs to be cleared of this item
|
|
|
|
|
|
|
|
|
2016-08-10 13:02:08 +01:00
|
|
|
commit_manifest_change(ReturnedWorkItem, State) ->
|
2016-07-27 18:03:44 +01:00
|
|
|
NewMSN = State#state.manifest_sqn + 1,
|
2016-08-09 16:09:29 +01:00
|
|
|
[SentWorkItem] = State#state.ongoing_work,
|
2016-07-28 17:22:50 +01:00
|
|
|
RootPath = State#state.root_path,
|
2016-08-02 17:51:43 +01:00
|
|
|
UnreferencedFiles = State#state.unreferenced_files,
|
2016-08-09 16:09:29 +01:00
|
|
|
|
|
|
|
case {SentWorkItem#penciller_work.next_sqn,
|
|
|
|
SentWorkItem#penciller_work.clerk} of
|
2016-08-10 13:02:08 +01:00
|
|
|
{NewMSN, _From} ->
|
|
|
|
MTime = timer:now_diff(os:timestamp(),
|
2016-08-09 16:09:29 +01:00
|
|
|
SentWorkItem#penciller_work.start_time),
|
2016-08-15 16:43:39 +01:00
|
|
|
io:format("Merge to sqn ~w completed in ~w microseconds " ++
|
|
|
|
"at Level ~w~n",
|
2016-08-09 16:09:29 +01:00
|
|
|
[SentWorkItem#penciller_work.next_sqn,
|
|
|
|
MTime,
|
|
|
|
SentWorkItem#penciller_work.src_level]),
|
|
|
|
ok = rename_manifest_files(RootPath, NewMSN),
|
|
|
|
FilesToDelete = ReturnedWorkItem#penciller_work.unreferenced_files,
|
|
|
|
UnreferencedFilesUpd = update_deletions(FilesToDelete,
|
2016-08-02 17:51:43 +01:00
|
|
|
NewMSN,
|
|
|
|
UnreferencedFiles),
|
2016-08-09 16:09:29 +01:00
|
|
|
io:format("Merge has been commmitted at sequence number ~w~n",
|
|
|
|
[NewMSN]),
|
|
|
|
NewManifest = ReturnedWorkItem#penciller_work.new_manifest,
|
2016-10-05 09:54:53 +01:00
|
|
|
print_manifest(NewManifest),
|
2016-08-10 13:02:08 +01:00
|
|
|
{ok, State#state{ongoing_work=[],
|
2016-07-28 17:22:50 +01:00
|
|
|
manifest_sqn=NewMSN,
|
2016-08-02 17:51:43 +01:00
|
|
|
manifest=NewManifest,
|
|
|
|
unreferenced_files=UnreferencedFilesUpd}};
|
2016-08-10 13:02:08 +01:00
|
|
|
{MaybeWrongMSN, From} ->
|
2016-08-15 16:43:39 +01:00
|
|
|
io:format("Merge commit at sqn ~w not matched to expected" ++
|
|
|
|
" sqn ~w from Clerk ~w~n",
|
2016-08-10 13:02:08 +01:00
|
|
|
[NewMSN, MaybeWrongMSN, From]),
|
2016-07-28 17:22:50 +01:00
|
|
|
{error, State}
|
2016-08-09 16:09:29 +01:00
|
|
|
end.
|
2016-07-27 18:03:44 +01:00
|
|
|
|
|
|
|
|
2016-08-09 16:09:29 +01:00
|
|
|
rename_manifest_files(RootPath, NewMSN) ->
|
|
|
|
file:rename(filepath(RootPath, NewMSN, pending_manifest),
|
|
|
|
filepath(RootPath, NewMSN, current_manifest)).
|
2016-07-27 18:03:44 +01:00
|
|
|
|
2016-09-08 14:21:30 +01:00
|
|
|
filepath(RootPath, manifest) ->
|
|
|
|
RootPath ++ "/" ++ ?MANIFEST_FP;
|
|
|
|
filepath(RootPath, files) ->
|
|
|
|
RootPath ++ "/" ++ ?FILES_FP.
|
|
|
|
|
2016-08-09 16:09:29 +01:00
|
|
|
filepath(RootPath, NewMSN, pending_manifest) ->
|
2016-09-08 14:21:30 +01:00
|
|
|
filepath(RootPath, manifest) ++ "/" ++ "nonzero_"
|
2016-08-09 16:09:29 +01:00
|
|
|
++ integer_to_list(NewMSN) ++ "." ++ ?PENDING_FILEX;
|
|
|
|
filepath(RootPath, NewMSN, current_manifest) ->
|
2016-09-08 14:21:30 +01:00
|
|
|
filepath(RootPath, manifest) ++ "/" ++ "nonzero_"
|
2016-08-09 16:09:29 +01:00
|
|
|
++ integer_to_list(NewMSN) ++ "." ++ ?CURRENT_FILEX;
|
|
|
|
filepath(RootPath, NewMSN, new_merge_files) ->
|
2016-09-08 14:21:30 +01:00
|
|
|
filepath(RootPath, files) ++ "/" ++ integer_to_list(NewMSN).
|
2016-08-09 16:09:29 +01:00
|
|
|
|
2016-07-28 17:22:50 +01:00
|
|
|
update_deletions([], _NewMSN, UnreferencedFiles) ->
|
|
|
|
UnreferencedFiles;
|
|
|
|
update_deletions([ClearedFile|Tail], MSN, UnreferencedFiles) ->
|
2016-08-10 13:02:08 +01:00
|
|
|
io:format("Adding cleared file ~s to deletion list ~n",
|
|
|
|
[ClearedFile#manifest_entry.filename]),
|
2016-07-28 17:22:50 +01:00
|
|
|
update_deletions(Tail,
|
|
|
|
MSN,
|
2016-08-10 13:02:08 +01:00
|
|
|
lists:append(UnreferencedFiles,
|
2016-08-15 16:43:39 +01:00
|
|
|
[{ClearedFile#manifest_entry.filename,
|
|
|
|
ClearedFile#manifest_entry.owner,
|
|
|
|
MSN}])).
|
2016-08-10 13:02:08 +01:00
|
|
|
|
2016-09-21 18:31:42 +01:00
|
|
|
confirm_delete(Filename, UnreferencedFiles, RegisteredSnapshots) ->
|
2016-08-10 13:02:08 +01:00
|
|
|
case lists:keyfind(Filename, 1, UnreferencedFiles) of
|
|
|
|
false ->
|
|
|
|
false;
|
2016-08-15 16:43:39 +01:00
|
|
|
{Filename, _Pid, MSN} ->
|
2016-08-10 13:02:08 +01:00
|
|
|
LowSQN = lists:foldl(fun({_, SQN}, MinSQN) -> min(SQN, MinSQN) end,
|
|
|
|
infinity,
|
2016-09-21 18:31:42 +01:00
|
|
|
RegisteredSnapshots),
|
2016-08-10 13:02:08 +01:00
|
|
|
if
|
2016-08-15 16:43:39 +01:00
|
|
|
MSN >= LowSQN ->
|
|
|
|
false;
|
|
|
|
true ->
|
|
|
|
true
|
2016-08-10 13:02:08 +01:00
|
|
|
end
|
|
|
|
end.
|
2016-07-27 18:03:44 +01:00
|
|
|
|
2016-08-12 01:05:59 +01:00
|
|
|
|
|
|
|
|
2016-09-15 18:38:23 +01:00
|
|
|
assess_sqn([]) ->
|
|
|
|
empty;
|
2016-08-15 16:43:39 +01:00
|
|
|
assess_sqn(DumpList) ->
|
|
|
|
assess_sqn(DumpList, infinity, 0).
|
2016-08-12 01:05:59 +01:00
|
|
|
|
2016-08-15 16:43:39 +01:00
|
|
|
assess_sqn([], MinSQN, MaxSQN) ->
|
|
|
|
{MinSQN, MaxSQN};
|
|
|
|
assess_sqn([HeadKey|Tail], MinSQN, MaxSQN) ->
|
2016-09-15 10:53:24 +01:00
|
|
|
{_K, SQN} = leveled_bookie:strip_to_keyseqonly(HeadKey),
|
2016-08-15 16:43:39 +01:00
|
|
|
assess_sqn(Tail, min(MinSQN, SQN), max(MaxSQN, SQN)).
|
2016-08-12 01:05:59 +01:00
|
|
|
|
|
|
|
|
2016-07-27 18:03:44 +01:00
|
|
|
%%%============================================================================
|
|
|
|
%%% Test
|
|
|
|
%%%============================================================================
|
|
|
|
|
2016-09-08 14:21:30 +01:00
|
|
|
-ifdef(TEST).
|
|
|
|
|
|
|
|
clean_testdir(RootPath) ->
|
|
|
|
clean_subdir(filepath(RootPath, manifest)),
|
|
|
|
clean_subdir(filepath(RootPath, files)).
|
|
|
|
|
|
|
|
clean_subdir(DirPath) ->
|
|
|
|
case filelib:is_dir(DirPath) of
|
|
|
|
true ->
|
|
|
|
{ok, Files} = file:list_dir(DirPath),
|
|
|
|
lists:foreach(fun(FN) -> file:delete(filename:join(DirPath, FN)),
|
|
|
|
io:format("Delete file ~s/~s~n",
|
|
|
|
[DirPath, FN])
|
|
|
|
end,
|
|
|
|
Files);
|
|
|
|
false ->
|
|
|
|
ok
|
|
|
|
end.
|
2016-07-27 18:03:44 +01:00
|
|
|
|
|
|
|
compaction_work_assessment_test() ->
|
|
|
|
L0 = [{{o, "B1", "K1"}, {o, "B3", "K3"}, dummy_pid}],
|
|
|
|
L1 = [{{o, "B1", "K1"}, {o, "B2", "K2"}, dummy_pid},
|
|
|
|
{{o, "B2", "K3"}, {o, "B4", "K4"}, dummy_pid}],
|
2016-08-02 17:51:43 +01:00
|
|
|
Manifest = [{0, L0}, {1, L1}],
|
2016-08-09 16:09:29 +01:00
|
|
|
WorkQ1 = assess_workqueue([], 0, Manifest),
|
2016-08-02 17:51:43 +01:00
|
|
|
?assertMatch(WorkQ1, [{0, Manifest}]),
|
2016-07-27 18:03:44 +01:00
|
|
|
L1Alt = lists:append(L1,
|
|
|
|
[{{o, "B5", "K0001"}, {o, "B5", "K9999"}, dummy_pid},
|
|
|
|
{{o, "B6", "K0001"}, {o, "B6", "K9999"}, dummy_pid},
|
|
|
|
{{o, "B7", "K0001"}, {o, "B7", "K9999"}, dummy_pid},
|
|
|
|
{{o, "B8", "K0001"}, {o, "B8", "K9999"}, dummy_pid},
|
|
|
|
{{o, "B9", "K0001"}, {o, "B9", "K9999"}, dummy_pid},
|
|
|
|
{{o, "BA", "K0001"}, {o, "BA", "K9999"}, dummy_pid},
|
|
|
|
{{o, "BB", "K0001"}, {o, "BB", "K9999"}, dummy_pid}]),
|
2016-08-02 17:51:43 +01:00
|
|
|
Manifest3 = [{0, []}, {1, L1Alt}],
|
2016-08-09 16:09:29 +01:00
|
|
|
WorkQ3 = assess_workqueue([], 0, Manifest3),
|
|
|
|
?assertMatch(WorkQ3, [{1, Manifest3}]).
|
2016-08-10 13:02:08 +01:00
|
|
|
|
|
|
|
confirm_delete_test() ->
|
|
|
|
Filename = 'test.sft',
|
2016-08-15 16:43:39 +01:00
|
|
|
UnreferencedFiles = [{'other.sft', dummy_owner, 15},
|
|
|
|
{Filename, dummy_owner, 10}],
|
2016-08-10 13:02:08 +01:00
|
|
|
RegisteredIterators1 = [{dummy_pid, 16}, {dummy_pid, 12}],
|
|
|
|
R1 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators1),
|
|
|
|
?assertMatch(R1, true),
|
|
|
|
RegisteredIterators2 = [{dummy_pid, 10}, {dummy_pid, 12}],
|
|
|
|
R2 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators2),
|
|
|
|
?assertMatch(R2, false),
|
|
|
|
RegisteredIterators3 = [{dummy_pid, 9}, {dummy_pid, 12}],
|
|
|
|
R3 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators3),
|
2016-09-08 14:21:30 +01:00
|
|
|
?assertMatch(R3, false).
|
|
|
|
|
|
|
|
|
|
|
|
simple_server_test() ->
|
|
|
|
RootPath = "../test/ledger",
|
|
|
|
clean_testdir(RootPath),
|
|
|
|
{ok, PCL} = pcl_start(#penciller_options{root_path=RootPath,
|
|
|
|
max_inmemory_tablesize=1000}),
|
2016-09-15 10:53:24 +01:00
|
|
|
Key1 = {{o,"Bucket0001", "Key0001"}, {1, {active, infinity}, null}},
|
2016-09-08 14:21:30 +01:00
|
|
|
KL1 = lists:sort(leveled_sft:generate_randomkeys({1000, 2})),
|
2016-09-15 10:53:24 +01:00
|
|
|
Key2 = {{o,"Bucket0002", "Key0002"}, {1002, {active, infinity}, null}},
|
2016-09-08 14:21:30 +01:00
|
|
|
KL2 = lists:sort(leveled_sft:generate_randomkeys({1000, 1002})),
|
2016-09-15 10:53:24 +01:00
|
|
|
Key3 = {{o,"Bucket0003", "Key0003"}, {2002, {active, infinity}, null}},
|
2016-09-09 15:58:19 +01:00
|
|
|
KL3 = lists:sort(leveled_sft:generate_randomkeys({1000, 2002})),
|
2016-09-15 10:53:24 +01:00
|
|
|
Key4 = {{o,"Bucket0004", "Key0004"}, {3002, {active, infinity}, null}},
|
2016-09-09 15:58:19 +01:00
|
|
|
KL4 = lists:sort(leveled_sft:generate_randomkeys({1000, 3002})),
|
2016-09-08 14:21:30 +01:00
|
|
|
ok = pcl_pushmem(PCL, [Key1]),
|
|
|
|
R1 = pcl_fetch(PCL, {o,"Bucket0001", "Key0001"}),
|
|
|
|
?assertMatch(R1, Key1),
|
|
|
|
ok = pcl_pushmem(PCL, KL1),
|
|
|
|
R2 = pcl_fetch(PCL, {o,"Bucket0001", "Key0001"}),
|
|
|
|
?assertMatch(R2, Key1),
|
|
|
|
S1 = pcl_pushmem(PCL, [Key2]),
|
|
|
|
if S1 == pause -> timer:sleep(2); true -> ok end,
|
|
|
|
R3 = pcl_fetch(PCL, {o,"Bucket0001", "Key0001"}),
|
|
|
|
R4 = pcl_fetch(PCL, {o,"Bucket0002", "Key0002"}),
|
|
|
|
?assertMatch(R3, Key1),
|
|
|
|
?assertMatch(R4, Key2),
|
|
|
|
S2 = pcl_pushmem(PCL, KL2),
|
2016-09-26 10:55:08 +01:00
|
|
|
if S2 == pause -> timer:sleep(1000); true -> ok end,
|
2016-09-08 14:21:30 +01:00
|
|
|
S3 = pcl_pushmem(PCL, [Key3]),
|
2016-09-26 10:55:08 +01:00
|
|
|
if S3 == pause -> timer:sleep(1000); true -> ok end,
|
2016-09-08 14:21:30 +01:00
|
|
|
R5 = pcl_fetch(PCL, {o,"Bucket0001", "Key0001"}),
|
|
|
|
R6 = pcl_fetch(PCL, {o,"Bucket0002", "Key0002"}),
|
|
|
|
R7 = pcl_fetch(PCL, {o,"Bucket0003", "Key0003"}),
|
|
|
|
?assertMatch(R5, Key1),
|
|
|
|
?assertMatch(R6, Key2),
|
|
|
|
?assertMatch(R7, Key3),
|
|
|
|
ok = pcl_close(PCL),
|
|
|
|
{ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath,
|
|
|
|
max_inmemory_tablesize=1000}),
|
2016-09-09 15:58:19 +01:00
|
|
|
TopSQN = pcl_getstartupsequencenumber(PCLr),
|
2016-09-20 10:17:24 +01:00
|
|
|
Check = case TopSQN of
|
|
|
|
2001 ->
|
|
|
|
%% Last push not persisted
|
|
|
|
S3a = pcl_pushmem(PCL, [Key3]),
|
2016-09-26 10:55:08 +01:00
|
|
|
if S3a == pause -> timer:sleep(1000); true -> ok end,
|
2016-09-20 10:17:24 +01:00
|
|
|
ok;
|
|
|
|
2002 ->
|
|
|
|
%% everything got persisted
|
|
|
|
ok;
|
|
|
|
_ ->
|
|
|
|
io:format("Unexpected sequence number on restart ~w~n", [TopSQN]),
|
|
|
|
error
|
|
|
|
end,
|
|
|
|
?assertMatch(Check, ok),
|
2016-09-08 14:21:30 +01:00
|
|
|
R8 = pcl_fetch(PCLr, {o,"Bucket0001", "Key0001"}),
|
|
|
|
R9 = pcl_fetch(PCLr, {o,"Bucket0002", "Key0002"}),
|
|
|
|
R10 = pcl_fetch(PCLr, {o,"Bucket0003", "Key0003"}),
|
|
|
|
?assertMatch(R8, Key1),
|
|
|
|
?assertMatch(R9, Key2),
|
|
|
|
?assertMatch(R10, Key3),
|
2016-09-09 15:58:19 +01:00
|
|
|
S4 = pcl_pushmem(PCLr, KL3),
|
2016-09-26 10:55:08 +01:00
|
|
|
if S4 == pause -> timer:sleep(1000); true -> ok end,
|
2016-09-09 15:58:19 +01:00
|
|
|
S5 = pcl_pushmem(PCLr, [Key4]),
|
2016-09-26 10:55:08 +01:00
|
|
|
if S5 == pause -> timer:sleep(1000); true -> ok end,
|
2016-09-09 15:58:19 +01:00
|
|
|
S6 = pcl_pushmem(PCLr, KL4),
|
2016-09-26 10:55:08 +01:00
|
|
|
if S6 == pause -> timer:sleep(1000); true -> ok end,
|
2016-09-09 15:58:19 +01:00
|
|
|
R11 = pcl_fetch(PCLr, {o,"Bucket0001", "Key0001"}),
|
|
|
|
R12 = pcl_fetch(PCLr, {o,"Bucket0002", "Key0002"}),
|
|
|
|
R13 = pcl_fetch(PCLr, {o,"Bucket0003", "Key0003"}),
|
|
|
|
R14 = pcl_fetch(PCLr, {o,"Bucket0004", "Key0004"}),
|
|
|
|
?assertMatch(R11, Key1),
|
|
|
|
?assertMatch(R12, Key2),
|
|
|
|
?assertMatch(R13, Key3),
|
|
|
|
?assertMatch(R14, Key4),
|
2016-09-26 10:55:08 +01:00
|
|
|
SnapOpts = #penciller_options{start_snapshot = true,
|
2016-10-07 10:04:48 +01:00
|
|
|
source_penciller = PCLr},
|
2016-09-26 10:55:08 +01:00
|
|
|
{ok, PclSnap} = pcl_start(SnapOpts),
|
|
|
|
ok = pcl_loadsnapshot(PclSnap, []),
|
|
|
|
?assertMatch(Key1, pcl_fetch(PclSnap, {o,"Bucket0001", "Key0001"})),
|
|
|
|
?assertMatch(Key2, pcl_fetch(PclSnap, {o,"Bucket0002", "Key0002"})),
|
|
|
|
?assertMatch(Key3, pcl_fetch(PclSnap, {o,"Bucket0003", "Key0003"})),
|
|
|
|
?assertMatch(Key4, pcl_fetch(PclSnap, {o,"Bucket0004", "Key0004"})),
|
|
|
|
?assertMatch(true, pcl_checksequencenumber(PclSnap,
|
|
|
|
{o,"Bucket0001", "Key0001"},
|
|
|
|
1)),
|
|
|
|
?assertMatch(true, pcl_checksequencenumber(PclSnap,
|
|
|
|
{o,"Bucket0002", "Key0002"},
|
|
|
|
1002)),
|
|
|
|
?assertMatch(true, pcl_checksequencenumber(PclSnap,
|
|
|
|
{o,"Bucket0003", "Key0003"},
|
|
|
|
2002)),
|
|
|
|
?assertMatch(true, pcl_checksequencenumber(PclSnap,
|
|
|
|
{o,"Bucket0004", "Key0004"},
|
|
|
|
3002)),
|
|
|
|
% Add some more keys and confirm that chekc sequence number still
|
|
|
|
% sees the old version in the previous snapshot, but will see the new version
|
|
|
|
% in a new snapshot
|
|
|
|
Key1A = {{o,"Bucket0001", "Key0001"}, {4002, {active, infinity}, null}},
|
|
|
|
KL1A = lists:sort(leveled_sft:generate_randomkeys({4002, 2})),
|
|
|
|
S7 = pcl_pushmem(PCLr, [Key1A]),
|
|
|
|
if S7 == pause -> timer:sleep(1000); true -> ok end,
|
|
|
|
S8 = pcl_pushmem(PCLr, KL1A),
|
|
|
|
if S8 == pause -> timer:sleep(1000); true -> ok end,
|
|
|
|
?assertMatch(true, pcl_checksequencenumber(PclSnap,
|
|
|
|
{o,"Bucket0001", "Key0001"},
|
|
|
|
1)),
|
|
|
|
ok = pcl_close(PclSnap),
|
|
|
|
{ok, PclSnap2} = pcl_start(SnapOpts),
|
|
|
|
ok = pcl_loadsnapshot(PclSnap2, []),
|
|
|
|
?assertMatch(false, pcl_checksequencenumber(PclSnap2,
|
|
|
|
{o,"Bucket0001", "Key0001"},
|
|
|
|
1)),
|
|
|
|
?assertMatch(true, pcl_checksequencenumber(PclSnap2,
|
|
|
|
{o,"Bucket0001", "Key0001"},
|
|
|
|
4002)),
|
|
|
|
?assertMatch(true, pcl_checksequencenumber(PclSnap2,
|
|
|
|
{o,"Bucket0002", "Key0002"},
|
|
|
|
1002)),
|
|
|
|
ok = pcl_close(PclSnap2),
|
2016-09-08 14:21:30 +01:00
|
|
|
ok = pcl_close(PCLr),
|
|
|
|
clean_testdir(RootPath).
|
|
|
|
|
2016-09-21 18:31:42 +01:00
|
|
|
memcopy_test() ->
|
|
|
|
KVL1 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X),
|
|
|
|
"Value" ++ integer_to_list(X) ++ "A"} end,
|
|
|
|
lists:seq(1, 1000)),
|
|
|
|
KVL2 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X),
|
|
|
|
"Value" ++ integer_to_list(X) ++ "B"} end,
|
|
|
|
lists:seq(1001, 2000)),
|
|
|
|
KVL3 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X),
|
|
|
|
"Value" ++ integer_to_list(X) ++ "C"} end,
|
|
|
|
lists:seq(1, 1000)),
|
|
|
|
MemCopy0 = #l0snapshot{},
|
|
|
|
MemCopy1 = add_increment_to_memcopy(MemCopy0, 1000, KVL1),
|
|
|
|
MemCopy2 = add_increment_to_memcopy(MemCopy1, 2000, KVL2),
|
|
|
|
MemCopy3 = add_increment_to_memcopy(MemCopy2, 3000, KVL3),
|
|
|
|
{Tree1, HighSQN1} = roll_new_tree(gb_trees:empty(), MemCopy3#l0snapshot.increments, 0),
|
|
|
|
Size1 = gb_trees:size(Tree1),
|
|
|
|
?assertMatch(2000, Size1),
|
|
|
|
?assertMatch(3000, HighSQN1).
|
2016-09-23 18:50:29 +01:00
|
|
|
|
|
|
|
memcopy_updatecache_test() ->
|
|
|
|
KVL1 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X),
|
|
|
|
"Value" ++ integer_to_list(X) ++ "A"} end,
|
|
|
|
lists:seq(1, 1000)),
|
|
|
|
KVL2 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X),
|
|
|
|
"Value" ++ integer_to_list(X) ++ "B"} end,
|
|
|
|
lists:seq(1001, 2000)),
|
|
|
|
KVL3 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X),
|
|
|
|
"Value" ++ integer_to_list(X) ++ "C"} end,
|
|
|
|
lists:seq(1, 1000)),
|
|
|
|
MemCopy0 = #l0snapshot{},
|
|
|
|
MemCopy1 = add_increment_to_memcopy(MemCopy0, 1000, KVL1),
|
|
|
|
MemCopy2 = add_increment_to_memcopy(MemCopy1, 2000, KVL2),
|
|
|
|
MemCopy3 = add_increment_to_memcopy(MemCopy2, 3000, KVL3),
|
|
|
|
?assertMatch(0, MemCopy3#l0snapshot.ledger_sqn),
|
|
|
|
{Tree1, HighSQN1} = roll_new_tree(gb_trees:empty(), MemCopy3#l0snapshot.increments, 0),
|
|
|
|
MemCopy4 = cache_tree_in_memcopy(MemCopy3, Tree1, HighSQN1),
|
|
|
|
?assertMatch(0, length(MemCopy4#l0snapshot.increments)),
|
|
|
|
Size2 = gb_trees:size(MemCopy4#l0snapshot.tree),
|
|
|
|
?assertMatch(2000, Size2),
|
|
|
|
?assertMatch(3000, MemCopy4#l0snapshot.ledger_sqn).
|
2016-09-21 18:31:42 +01:00
|
|
|
|
2016-09-08 14:21:30 +01:00
|
|
|
-endif.
|