commit
8792229946
12 changed files with 261 additions and 136 deletions
|
@ -30,7 +30,7 @@
|
|||
-record(manifest_entry,
|
||||
{start_key :: tuple(),
|
||||
end_key :: tuple(),
|
||||
owner :: pid(),
|
||||
owner :: pid()|list(),
|
||||
filename :: string()}).
|
||||
|
||||
-record(cdb_options,
|
||||
|
@ -55,7 +55,7 @@
|
|||
max_inmemory_tablesize :: integer(),
|
||||
start_snapshot = false :: boolean(),
|
||||
source_penciller :: pid(),
|
||||
levelzero_cointoss = false :: boolean}).
|
||||
levelzero_cointoss = false :: boolean()}).
|
||||
|
||||
-record(iclerk_options,
|
||||
{inker :: pid(),
|
||||
|
|
1
rebar.lock
Normal file
1
rebar.lock
Normal file
|
@ -0,0 +1 @@
|
|||
[].
|
|
@ -4,8 +4,8 @@
|
|||
%% - Keys, Metadata and Values are not persisted together - the Keys and
|
||||
%% Metadata are kept in a tree-based ledger, whereas the values are stored
|
||||
%% only in a sequential Journal.
|
||||
%% - Different file formats are used for Journal (based on constant
|
||||
%% database), and the ledger (sft, based on sst)
|
||||
%% - Different file formats are used for Journal (based on DJ Bernstein
|
||||
%% constant database), and the ledger (based on sst)
|
||||
%% - It is not intended to be general purpose, but be primarily suited for
|
||||
%% use as a Riak backend in specific circumstances (relatively large values,
|
||||
%% and frequent use of iterators)
|
||||
|
@ -32,77 +32,6 @@
|
|||
%% For the Penciller the manifest maps key ranges to files at each level of
|
||||
%% the Ledger.
|
||||
%%
|
||||
%% -------- PUT --------
|
||||
%%
|
||||
%% A PUT request consists of
|
||||
%% - A Primary Key and a Value
|
||||
%% - IndexSpecs - a set of secondary key changes associated with the
|
||||
%% transaction
|
||||
%%
|
||||
%% The Bookie takes the request and passes it first to the Inker to add the
|
||||
%% request to the journal.
|
||||
%%
|
||||
%% The inker will pass the PK/Value/IndexSpecs to the current (append only)
|
||||
%% CDB journal file to persist the change. The call should return either 'ok'
|
||||
%% or 'roll'. -'roll' indicates that the CDB file has insufficient capacity for
|
||||
%% this write, and a new journal file should be created (with appropriate
|
||||
%% manifest changes to be made).
|
||||
%%
|
||||
%% The inker will return the SQN which the change has been made at, as well as
|
||||
%% the object size on disk within the Journal.
|
||||
%%
|
||||
%% Once the object has been persisted to the Journal, the Ledger can be updated.
|
||||
%% The Ledger is updated by the Bookie applying a function (extract_metadata/4)
|
||||
%% to the Value to return the Object Metadata, a function to generate a hash
|
||||
%% of the Value and also taking the Primary Key, the IndexSpecs, the Sequence
|
||||
%% Number in the Journal and the Object Size (returned from the Inker).
|
||||
%%
|
||||
%% A set of Ledger Key changes are then generated and placed in the Bookie's
|
||||
%% Ledger Key cache (a gb_tree).
|
||||
%%
|
||||
%% The PUT can now be acknowledged. In the background the Bookie may then
|
||||
%% choose to push the cache to the Penciller for eventual persistence within
|
||||
%% the ledger. This push will either be acccepted or returned (if the
|
||||
%% Penciller has a backlog of key changes). The back-pressure should lead to
|
||||
%% the Bookie entering into a slow-offer status whereby the next PUT will be
|
||||
%% acknowledged by a PAUSE signal - with the expectation that the this will
|
||||
%% lead to a back-off behaviour.
|
||||
%%
|
||||
%% -------- GET, HEAD --------
|
||||
%%
|
||||
%% The Bookie supports both GET and HEAD requests, with the HEAD request
|
||||
%% returning only the metadata and not the actual object value. The HEAD
|
||||
%% requets cna be serviced by reference to the Ledger Cache and the Penciller.
|
||||
%%
|
||||
%% GET requests first follow the path of a HEAD request, and if an object is
|
||||
%% found, then fetch the value from the Journal via the Inker.
|
||||
%%
|
||||
%% -------- Snapshots/Clones --------
|
||||
%%
|
||||
%% If there is a snapshot request (e.g. to iterate over the keys) the Bookie
|
||||
%% may request a clone of the Penciller, or the Penciller and the Inker.
|
||||
%%
|
||||
%% The clone is seeded with the manifest. The clone should be registered with
|
||||
%% the real Inker/Penciller, so that the real Inker/Penciller may prevent the
|
||||
%% deletion of files still in use by a snapshot clone.
|
||||
%%
|
||||
%% Iterators should de-register themselves from the Penciller on completion.
|
||||
%% Iterators should be automatically release after a timeout period. A file
|
||||
%% can only be deleted from the Ledger if it is no longer in the manifest, and
|
||||
%% there are no registered iterators from before the point the file was
|
||||
%% removed from the manifest.
|
||||
%%
|
||||
%% -------- On Startup --------
|
||||
%%
|
||||
%% On startup the Bookie must restart both the Inker to load the Journal, and
|
||||
%% the Penciller to load the Ledger. Once the Penciller has started, the
|
||||
%% Bookie should request the highest sequence number in the Ledger, and then
|
||||
%% and try and rebuild any missing information from the Journal.
|
||||
%%
|
||||
%% To rebuild the Ledger it requests the Inker to scan over the files from
|
||||
%% the sequence number and re-generate the Ledger changes - pushing the changes
|
||||
%% directly back into the Ledger.
|
||||
|
||||
|
||||
|
||||
-module(leveled_bookie).
|
||||
|
@ -174,45 +103,220 @@
|
|||
%%% API
|
||||
%%%============================================================================
|
||||
|
||||
%% @doc Start a Leveled Key/Value store - limited options support.
|
||||
%%
|
||||
%% The most common startup parameters are extracted out from the options to
|
||||
%% provide this startup method. This will start a KV store from the previous
|
||||
%% store at root path - or an empty one if there is no store at the path.
|
||||
%%
|
||||
%% Fiddling with the LedgerCacheSize and JournalSize may improve performance,
|
||||
%% but these are primarily exposed to support special situations (e.g. very
|
||||
%% low memory installations), there should not be huge variance in outcomes
|
||||
%% from modifying these numbers.
|
||||
%%
|
||||
%% The sync_strategy determines if the store is going to flush writes to disk
|
||||
%% before returning an ack. There are three settings currrently supported:
|
||||
%% - sync - sync to disk by passing the sync flag to the file writer (only
|
||||
%% works in OTP 18)
|
||||
%% - riak_sync - sync to disk by explicitly calling data_sync after the write
|
||||
%% - none - leave it to the operating system to control flushing
|
||||
%%
|
||||
%% On startup the Bookie must restart both the Inker to load the Journal, and
|
||||
%% the Penciller to load the Ledger. Once the Penciller has started, the
|
||||
%% Bookie should request the highest sequence number in the Ledger, and then
|
||||
%% and try and rebuild any missing information from the Journal.
|
||||
%%
|
||||
%% To rebuild the Ledger it requests the Inker to scan over the files from
|
||||
%% the sequence number and re-generate the Ledger changes - pushing the changes
|
||||
%% directly back into the Ledger.
|
||||
|
||||
book_start(RootPath, LedgerCacheSize, JournalSize, SyncStrategy) ->
|
||||
book_start([{root_path, RootPath},
|
||||
{cache_size, LedgerCacheSize},
|
||||
{max_journalsize, JournalSize},
|
||||
{sync_strategy, SyncStrategy}]).
|
||||
|
||||
%% @doc Start a Leveled Key/Value store - full options support.
|
||||
%%
|
||||
%% Allows an options proplists to be passed for setting options. There are
|
||||
%% two primary additional options this allows over book_start/4:
|
||||
%% - retain_strategy
|
||||
%% - waste_retention_period
|
||||
%%
|
||||
%% Both of these relate to compaction in the Journal. The retain_strategy
|
||||
%% determines if a skinny record of the object should be retained following
|
||||
%% compaction, and how thta should be used when recovering lost state in the
|
||||
%% Ledger.
|
||||
%%
|
||||
%% Currently compacted records no longer in use are not removed but moved to
|
||||
%% a journal_waste folder, and the waste_retention_period determines how long
|
||||
%% this history should be kept for (for example to allow for it to be backed
|
||||
%% up before deletion)
|
||||
%%
|
||||
%% TODO:
|
||||
%% The reload_strategy is exposed as currently no firm decision has been made
|
||||
%% about how recovery should work. For instance if we were to trust evrything
|
||||
%% as permanent in the Ledger once it is persisted, then there would be no
|
||||
%% need to retain a skinny history of key changes in the Journal after
|
||||
%% compaction. If, as an alternative we assume the Ledger is never permanent,
|
||||
%% and retain the skinny hisory - then backups need only be made against the
|
||||
%% Journal. The skinny history of key changes is primarily related to the
|
||||
%% issue of supporting secondary indexes in Riak.
|
||||
%%
|
||||
%% These two strategies are referred to as recovr (assume we can recover any
|
||||
%% deltas from a lost ledger and a lost history through resilience outside of
|
||||
%% the store), or retain (retain a history of key changes, even when the object
|
||||
%% value has been compacted). There is a third, unimplemented strategy, which
|
||||
%% is recalc - which would require when reloading the Ledger from the Journal,
|
||||
%% to recalculate the index changes based on the current state of the Ledger
|
||||
%% and the object metadata.
|
||||
|
||||
book_start(Opts) ->
|
||||
gen_server:start(?MODULE, [Opts], []).
|
||||
|
||||
%% @doc Put an object with an expiry time
|
||||
%%
|
||||
%% Put an item in the store but with a Time To Live - the time when the object
|
||||
%% should expire, in gregorian_sconds (add the required number of seconds to
|
||||
%% leveled_codec:integer_time/1).
|
||||
%%
|
||||
%% There exists the possibility of per object expiry times, not just whole
|
||||
%% store expiry times as has traditionally been the feature in Riak. Care
|
||||
%% will need to be taken if implementing per-object times about the choice of
|
||||
%% reload_strategy. If expired objects are to be compacted entirely, then the
|
||||
%% history of KeyChanges will be lost on reload.
|
||||
|
||||
book_tempput(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) when is_integer(TTL) ->
|
||||
book_tempput(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL)
|
||||
when is_integer(TTL) ->
|
||||
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL).
|
||||
|
||||
%% @doc - Standard PUT
|
||||
%%
|
||||
%% A PUT request consists of
|
||||
%% - A Primary Key and a Value
|
||||
%% - IndexSpecs - a set of secondary key changes associated with the
|
||||
%% transaction
|
||||
%% - A tag indictaing the type of object. Behaviour for metadata extraction,
|
||||
%% and ledger compaction will vary by type. There are three currently
|
||||
%% implemented types i (Index), o (Standard), o_rkv (Riak). Keys added with
|
||||
%% Index tags are not fetchable (as they will not be hashed), but are
|
||||
%% extractable via range query.
|
||||
%%
|
||||
%% The Bookie takes the request and passes it first to the Inker to add the
|
||||
%% request to the journal.
|
||||
%%
|
||||
%% The inker will pass the PK/Value/IndexSpecs to the current (append only)
|
||||
%% CDB journal file to persist the change. The call should return either 'ok'
|
||||
%% or 'roll'. -'roll' indicates that the CDB file has insufficient capacity for
|
||||
%% this write, and a new journal file should be created (with appropriate
|
||||
%% manifest changes to be made).
|
||||
%%
|
||||
%% The inker will return the SQN which the change has been made at, as well as
|
||||
%% the object size on disk within the Journal.
|
||||
%%
|
||||
%% Once the object has been persisted to the Journal, the Ledger can be updated.
|
||||
%% The Ledger is updated by the Bookie applying a function (extract_metadata/4)
|
||||
%% to the Value to return the Object Metadata, a function to generate a hash
|
||||
%% of the Value and also taking the Primary Key, the IndexSpecs, the Sequence
|
||||
%% Number in the Journal and the Object Size (returned from the Inker).
|
||||
%%
|
||||
%% A set of Ledger Key changes are then generated and placed in the Bookie's
|
||||
%% Ledger Key cache.
|
||||
%%
|
||||
%% The PUT can now be acknowledged. In the background the Bookie may then
|
||||
%% choose to push the cache to the Penciller for eventual persistence within
|
||||
%% the ledger. This push will either be acccepted or returned (if the
|
||||
%% Penciller has a backlog of key changes). The back-pressure should lead to
|
||||
%% the Bookie entering into a slow-offer status whereby the next PUT will be
|
||||
%% acknowledged by a PAUSE signal - with the expectation that the this will
|
||||
%% lead to a back-off behaviour.
|
||||
|
||||
book_put(Pid, Bucket, Key, Object, IndexSpecs) ->
|
||||
book_put(Pid, Bucket, Key, Object, IndexSpecs, ?STD_TAG).
|
||||
|
||||
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag) ->
|
||||
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, infinity).
|
||||
|
||||
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) ->
|
||||
gen_server:call(Pid,
|
||||
{put, Bucket, Key, Object, IndexSpecs, Tag, TTL},
|
||||
infinity).
|
||||
|
||||
%% @doc - Standard PUT
|
||||
%%
|
||||
%% A thin wrap around the put of a special tombstone object. There is no
|
||||
%% immediate reclaim of space, simply the addition of a more recent tombstone.
|
||||
|
||||
book_delete(Pid, Bucket, Key, IndexSpecs) ->
|
||||
book_put(Pid, Bucket, Key, delete, IndexSpecs, ?STD_TAG).
|
||||
|
||||
%% @doc - GET and HAD requests
|
||||
%%
|
||||
%% The Bookie supports both GET and HEAD requests, with the HEAD request
|
||||
%% returning only the metadata and not the actual object value. The HEAD
|
||||
%% requets cna be serviced by reference to the Ledger Cache and the Penciller.
|
||||
%%
|
||||
%% GET requests first follow the path of a HEAD request, and if an object is
|
||||
%% found, then fetch the value from the Journal via the Inker.
|
||||
|
||||
book_get(Pid, Bucket, Key) ->
|
||||
book_get(Pid, Bucket, Key, ?STD_TAG).
|
||||
|
||||
book_head(Pid, Bucket, Key) ->
|
||||
book_head(Pid, Bucket, Key, ?STD_TAG).
|
||||
|
||||
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) ->
|
||||
gen_server:call(Pid,
|
||||
{put, Bucket, Key, Object, IndexSpecs, Tag, TTL},
|
||||
infinity).
|
||||
|
||||
book_get(Pid, Bucket, Key, Tag) ->
|
||||
gen_server:call(Pid, {get, Bucket, Key, Tag}, infinity).
|
||||
|
||||
book_head(Pid, Bucket, Key, Tag) ->
|
||||
gen_server:call(Pid, {head, Bucket, Key, Tag}, infinity).
|
||||
|
||||
%% @doc Snapshots/Clones
|
||||
%%
|
||||
%% If there is a snapshot request (e.g. to iterate over the keys) the Bookie
|
||||
%% may request a clone of the Penciller, or clones of both the Penciller and
|
||||
%% the Inker should values also need to be accessed.
|
||||
%%
|
||||
%% The clone is seeded with the manifest SQN. The clone should be registered
|
||||
%% with the real Inker/Penciller, so that the real Inker/Penciller may prevent
|
||||
%% the deletion of files still in use by a snapshot clone.
|
||||
%%
|
||||
%% Iterators should de-register themselves from the Penciller on completion.
|
||||
%% Iterators should be automatically release after a timeout period. A file
|
||||
%% can only be deleted from the Ledger if it is no longer in the manifest, and
|
||||
%% there are no registered iterators from before the point the file was
|
||||
%% removed from the manifest.
|
||||
%%
|
||||
%% Clones are simply new gen_servers with copies of the relevant
|
||||
%% StateData.
|
||||
%%
|
||||
%% There are a series of specific folders implemented that provide pre-canned
|
||||
%% snapshot functionality:
|
||||
%%
|
||||
%% {bucket_stats, Bucket} -> return a key count and total object size within
|
||||
%% a bucket
|
||||
%% {riakbucket_stats, Bucket} -> as above, but for buckets with the Riak Tag
|
||||
%% {binary_bucketlist, Tag, {FoldKeysFun, Acc}} -> if we assume buckets and
|
||||
%% keys are binaries, provides a fast bucket list function
|
||||
%% {index_query,
|
||||
%% Constraint,
|
||||
%% {FoldKeysFun, Acc},
|
||||
%% {IdxField, StartValue, EndValue},
|
||||
%% {ReturnTerms, TermRegex}} -> secondray index query
|
||||
%% {keylist, Tag, {FoldKeysFun, Acc}} -> list all keys with tag
|
||||
%% {keylist, Tag, Bucket, {FoldKeysFun, Acc}} -> list all keys within given
|
||||
%% bucket
|
||||
%% {hashtree_query, Tag, JournalCheck} -> return keys and hashes for all
|
||||
%% objects with a given tag
|
||||
%% {foldobjects_bybucket, Tag, Bucket, FoldObjectsFun} -> fold over all objects
|
||||
%% in a given bucket
|
||||
%% {foldobjects_byindex,
|
||||
%% Tag,
|
||||
%% Bucket,
|
||||
%% {Field, FromTerm, ToTerm},
|
||||
%% FoldObjectsFun} -> fold over all objects with an entry in a given
|
||||
%% range on a given index
|
||||
|
||||
book_returnfolder(Pid, FolderType) ->
|
||||
gen_server:call(Pid, {return_folder, FolderType}, infinity).
|
||||
|
||||
|
@ -222,15 +326,29 @@ book_snapshotstore(Pid, Requestor, Timeout) ->
|
|||
book_snapshotledger(Pid, Requestor, Timeout) ->
|
||||
gen_server:call(Pid, {snapshot, Requestor, ledger, Timeout}, infinity).
|
||||
|
||||
%% @doc Call for compaction of the Journal
|
||||
%%
|
||||
%% the scheduling of Journla compaction is called externally, so it is assumed
|
||||
%% in Riak it will be triggered by a vnode callback.
|
||||
|
||||
book_compactjournal(Pid, Timeout) ->
|
||||
gen_server:call(Pid, {compact_journal, Timeout}, infinity).
|
||||
|
||||
%% @doc Check on progress of the last compaction
|
||||
|
||||
book_islastcompactionpending(Pid) ->
|
||||
gen_server:call(Pid, confirm_compact, infinity).
|
||||
|
||||
%% @doc Clean shutdown
|
||||
%%
|
||||
%% A clean shutdown will persist all the information in the Penciller memory
|
||||
%% before closing, so shutdown is not instantaneous.
|
||||
|
||||
book_close(Pid) ->
|
||||
gen_server:call(Pid, close, infinity).
|
||||
|
||||
%% @doc Close and clean-out files
|
||||
|
||||
book_destroy(Pid) ->
|
||||
gen_server:call(Pid, destroy, infinity).
|
||||
|
||||
|
@ -353,7 +471,9 @@ handle_call({head, Bucket, Key, Tag}, _From, State) ->
|
|||
{_SeqN, {active, TS}, _MH, MD} ->
|
||||
case TS >= leveled_codec:integer_now() of
|
||||
true ->
|
||||
OMD = leveled_codec:build_metadata_object(LedgerKey, MD),
|
||||
OMD =
|
||||
leveled_codec:build_metadata_object(LedgerKey,
|
||||
MD),
|
||||
{reply, {ok, OMD}, State};
|
||||
false ->
|
||||
{reply, not_found, State}
|
||||
|
@ -1282,6 +1402,11 @@ foldobjects_vs_hashtree_test() ->
|
|||
ok = book_close(Bookie1),
|
||||
reset_filestructure().
|
||||
|
||||
longrunning_test() ->
|
||||
SW = os:timestamp(),
|
||||
timer:sleep(100),
|
||||
ok = maybe_longrunning(SW, put).
|
||||
|
||||
coverage_cheat_test() ->
|
||||
{noreply, _State0} = handle_info(timeout, #state{}),
|
||||
{ok, _State1} = code_change(null, #state{}, null),
|
||||
|
|
|
@ -109,8 +109,7 @@
|
|||
inker :: pid(),
|
||||
deferred_delete = false :: boolean(),
|
||||
waste_path :: string(),
|
||||
sync_strategy = none,
|
||||
put_timing :: tuple()}).
|
||||
sync_strategy = none}).
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
|
@ -272,14 +271,12 @@ writer({key_check, Key}, _From, State) ->
|
|||
writer,
|
||||
State};
|
||||
writer({put_kv, Key, Value}, _From, State) ->
|
||||
SW = os:timestamp(),
|
||||
Result = put(State#state.handle,
|
||||
Key,
|
||||
Value,
|
||||
{State#state.last_position, State#state.hashtree},
|
||||
State#state.binary_mode,
|
||||
State#state.max_size),
|
||||
T0 = timer:now_diff(os:timestamp(), SW),
|
||||
case Result of
|
||||
roll ->
|
||||
%% Key and value could not be written
|
||||
|
@ -292,15 +289,10 @@ writer({put_kv, Key, Value}, _From, State) ->
|
|||
_ ->
|
||||
ok
|
||||
end,
|
||||
T1 = timer:now_diff(os:timestamp(), SW) - T0,
|
||||
Timings = leveled_log:put_timing(journal,
|
||||
State#state.put_timing,
|
||||
T0, T1),
|
||||
{reply, ok, writer, State#state{handle=UpdHandle,
|
||||
last_position=NewPosition,
|
||||
last_key=Key,
|
||||
hashtree=HashTree,
|
||||
put_timing=Timings}}
|
||||
hashtree=HashTree}}
|
||||
end;
|
||||
writer({mput_kv, []}, _From, State) ->
|
||||
{reply, ok, writer, State};
|
||||
|
|
|
@ -369,8 +369,6 @@ riak_metadata_from_binary(V1Binary) ->
|
|||
<<VclockBin:VclockLen/binary, SibCount:32/integer, SibsBin/binary>> = Rest,
|
||||
SibMetaBinList =
|
||||
case SibCount of
|
||||
0 ->
|
||||
[];
|
||||
SC when is_integer(SC) ->
|
||||
get_metadata_from_siblings(SibsBin, SibCount, [])
|
||||
end,
|
||||
|
|
|
@ -219,5 +219,11 @@ buildrandomfashion_test() ->
|
|||
test_testmanifest(Man2),
|
||||
?assertMatch(ManL0, to_list(Man2)).
|
||||
|
||||
empty_active_journal_test() ->
|
||||
Path = "../test/journal/journal_files/",
|
||||
ok = filelib:ensure_dir(Path),
|
||||
{ok, ActJ} = leveled_cdb:cdb_open_writer(Path ++ "test_emptyactive_file.pnd"),
|
||||
?assertMatch([], generate_entry(ActJ)),
|
||||
?assertMatch(ok, file:delete(Path ++ "test_emptyactive_file.cdb")).
|
||||
|
||||
-endif.
|
|
@ -136,8 +136,7 @@
|
|||
clerk :: pid(),
|
||||
compaction_pending = false :: boolean(),
|
||||
is_snapshot = false :: boolean(),
|
||||
source_inker :: pid(),
|
||||
put_timing :: tuple()}).
|
||||
source_inker :: pid()}).
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
|
@ -410,22 +409,16 @@ start_from_file(InkOpts) ->
|
|||
put_object(LedgerKey, Object, KeyChanges, State) ->
|
||||
NewSQN = State#state.journal_sqn + 1,
|
||||
ActiveJournal = State#state.active_journaldb,
|
||||
SW= os:timestamp(),
|
||||
{JournalKey, JournalBin} = leveled_codec:to_inkerkv(LedgerKey,
|
||||
NewSQN,
|
||||
Object,
|
||||
KeyChanges),
|
||||
T0 = timer:now_diff(os:timestamp(), SW),
|
||||
case leveled_cdb:cdb_put(ActiveJournal,
|
||||
JournalKey,
|
||||
JournalBin) of
|
||||
ok ->
|
||||
T1 = timer:now_diff(os:timestamp(), SW) - T0,
|
||||
UpdPutTimes = leveled_log:put_timing(inker,
|
||||
State#state.put_timing,
|
||||
T0, T1),
|
||||
{ok,
|
||||
State#state{journal_sqn=NewSQN, put_timing=UpdPutTimes},
|
||||
State#state{journal_sqn=NewSQN},
|
||||
byte_size(JournalBin)};
|
||||
roll ->
|
||||
SWroll = os:timestamp(),
|
||||
|
@ -544,24 +537,20 @@ open_all_manifest(Man0, RootPath, CDBOpts) ->
|
|||
[{HeadSQN, HeadFN, _IgnorePid, HeadLK}|ManifestTail] = Man1,
|
||||
OpenJournalFun =
|
||||
fun(ManEntry) ->
|
||||
case ManEntry of
|
||||
{LowSQN, FN, _, LK_RO} ->
|
||||
CFN = FN ++ "." ++ ?JOURNAL_FILEX,
|
||||
PFN = FN ++ "." ++ ?PENDING_FILEX,
|
||||
case filelib:is_file(CFN) of
|
||||
true ->
|
||||
{ok, Pid} = leveled_cdb:cdb_reopen_reader(CFN,
|
||||
LK_RO),
|
||||
{LowSQN, FN, Pid, LK_RO};
|
||||
false ->
|
||||
W = leveled_cdb:cdb_open_writer(PFN, CDBOpts),
|
||||
{ok, Pid} = W,
|
||||
ok = leveled_cdb:cdb_roll(Pid),
|
||||
LK_WR = leveled_cdb:cdb_lastkey(Pid),
|
||||
{LowSQN, FN, Pid, LK_WR}
|
||||
end;
|
||||
_ ->
|
||||
ManEntry
|
||||
{LowSQN, FN, _, LK_RO} = ManEntry,
|
||||
CFN = FN ++ "." ++ ?JOURNAL_FILEX,
|
||||
PFN = FN ++ "." ++ ?PENDING_FILEX,
|
||||
case filelib:is_file(CFN) of
|
||||
true ->
|
||||
{ok, Pid} = leveled_cdb:cdb_reopen_reader(CFN,
|
||||
LK_RO),
|
||||
{LowSQN, FN, Pid, LK_RO};
|
||||
false ->
|
||||
W = leveled_cdb:cdb_open_writer(PFN, CDBOpts),
|
||||
{ok, Pid} = W,
|
||||
ok = leveled_cdb:cdb_roll(Pid),
|
||||
LK_WR = leveled_cdb:cdb_lastkey(Pid),
|
||||
{LowSQN, FN, Pid, LK_WR}
|
||||
end
|
||||
end,
|
||||
OpenedTailAsList = lists:map(OpenJournalFun, ManifestTail),
|
||||
|
|
|
@ -14,9 +14,9 @@
|
|||
get_timing/3,
|
||||
sst_timing/3]).
|
||||
|
||||
-define(PUT_LOGPOINT, 20000).
|
||||
-define(HEAD_LOGPOINT, 50000).
|
||||
-define(GET_LOGPOINT, 50000).
|
||||
-define(PUT_LOGPOINT, 10000).
|
||||
-define(HEAD_LOGPOINT, 20000).
|
||||
-define(GET_LOGPOINT, 20000).
|
||||
-define(SST_LOGPOINT, 20000).
|
||||
-define(LOG_LEVEL, [info, warn, error, critical]).
|
||||
-define(SAMPLE_RATE, 16).
|
||||
|
@ -353,15 +353,15 @@ log_timer(LogReference, Subs, StartTime) ->
|
|||
put_timing(_Actor, undefined, T0, T1) ->
|
||||
{1, {T0, T1}, {T0, T1}};
|
||||
put_timing(Actor, {?PUT_LOGPOINT, {Total0, Total1}, {Max0, Max1}}, T0, T1) ->
|
||||
RN = random:uniform(?HEAD_LOGPOINT),
|
||||
case RN > ?HEAD_LOGPOINT div 2 of
|
||||
RN = random:uniform(?PUT_LOGPOINT),
|
||||
case RN > ?PUT_LOGPOINT div 2 of
|
||||
true ->
|
||||
% log at the timing point less than half the time
|
||||
LogRef =
|
||||
case Actor of
|
||||
bookie -> "B0012";
|
||||
inker -> "I0019";
|
||||
journal -> "CDB17"
|
||||
bookie -> "B0012" %;
|
||||
% inker -> "I0019";
|
||||
% journal -> "CDB17"
|
||||
end,
|
||||
log(LogRef, [?PUT_LOGPOINT, Total0, Total1, Max0, Max1]),
|
||||
put_timing(Actor, undefined, T0, T1);
|
||||
|
|
|
@ -797,8 +797,11 @@ timed_sst_get(PID, Key, Hash) ->
|
|||
SW = os:timestamp(),
|
||||
R = leveled_sst:sst_get(PID, Key, Hash),
|
||||
T0 = timer:now_diff(os:timestamp(), SW),
|
||||
log_slowfetch(T0, R, PID, ?SLOW_FETCH).
|
||||
|
||||
log_slowfetch(T0, R, PID, FetchTolerance) ->
|
||||
case {T0, R} of
|
||||
{T, R} when T < ?SLOW_FETCH ->
|
||||
{T, R} when T < FetchTolerance ->
|
||||
R;
|
||||
{T, not_present} ->
|
||||
leveled_log:log("PC016", [PID, T, not_present]),
|
||||
|
@ -807,7 +810,6 @@ timed_sst_get(PID, Key, Hash) ->
|
|||
leveled_log:log("PC016", [PID, T, found]),
|
||||
R
|
||||
end.
|
||||
|
||||
|
||||
compare_to_sqn(Obj, SQN) ->
|
||||
case Obj of
|
||||
|
@ -1110,6 +1112,13 @@ add_missing_hash({K, {SQN, ST, MD}}) ->
|
|||
{K, {SQN, ST, leveled_codec:magic_hash(K), MD}}.
|
||||
|
||||
|
||||
clean_dir_test() ->
|
||||
% Pointless gesture to test coverage
|
||||
RootPath = "../test/ledger",
|
||||
?assertMatch(ok, file:write_file(RootPath ++ "/test.bob", "hello")),
|
||||
ok = clean_subdir(RootPath ++ "/test.bob"),
|
||||
ok = file:delete(RootPath ++ "/test.bob").
|
||||
|
||||
simple_server_test() ->
|
||||
RootPath = "../test/ledger",
|
||||
clean_testdir(RootPath),
|
||||
|
@ -1411,6 +1420,9 @@ create_file_test() ->
|
|||
{ok, Bin} = file:read_file("../test/new_file.sst.discarded"),
|
||||
?assertMatch("hello", binary_to_term(Bin)).
|
||||
|
||||
slow_fetch_test() ->
|
||||
?assertMatch(not_present, log_slowfetch(2, not_present, "fake", 1)).
|
||||
|
||||
checkready(Pid) ->
|
||||
try
|
||||
leveled_sst:sst_checkready(Pid)
|
||||
|
|
|
@ -606,10 +606,7 @@ filepath(RootPath, NewMSN, current_manifest) ->
|
|||
++ integer_to_list(NewMSN) ++ "." ++ ?MANIFEST_FILEX.
|
||||
|
||||
|
||||
open_manifestfile(_RootPath, []) ->
|
||||
leveled_log:log("P0013", []),
|
||||
new_manifest();
|
||||
open_manifestfile(_RootPath, [0]) ->
|
||||
open_manifestfile(_RootPath, L) when L == [] orelse L == [0] ->
|
||||
leveled_log:log("P0013", []),
|
||||
new_manifest();
|
||||
open_manifestfile(RootPath, [TopManSQN|Rest]) ->
|
||||
|
@ -666,6 +663,7 @@ initial_setup() ->
|
|||
Man4 = insert_manifest_entry(Man3, 1, 2, E4),
|
||||
Man5 = insert_manifest_entry(Man4, 1, 2, E5),
|
||||
Man6 = insert_manifest_entry(Man5, 1, 2, E6),
|
||||
?assertMatch(Man6, insert_manifest_entry(Man6, 1, 2, [])),
|
||||
{Man0, Man1, Man2, Man3, Man4, Man5, Man6}.
|
||||
|
||||
changeup_setup(Man6) ->
|
||||
|
|
|
@ -1619,6 +1619,8 @@ nonsense_coverage_test() ->
|
|||
?assertMatch({ok, reader, #state{}}, code_change(nonsense,
|
||||
reader,
|
||||
#state{},
|
||||
nonsense)).
|
||||
nonsense)),
|
||||
?assertMatch({reply, undefined, reader, #state{}},
|
||||
handle_sync_event("hello", self(), reader, #state{})).
|
||||
|
||||
-endif.
|
|
@ -102,6 +102,7 @@ many_put_fetch_head(_Config) ->
|
|||
{ok, Bookie3} = leveled_bookie:book_start(StartOpts2),
|
||||
testutil:check_forlist(Bookie3, ChkList2A),
|
||||
testutil:check_forobject(Bookie3, TestObject),
|
||||
testutil:check_formissingobject(Bookie3, "Bookie1", "MissingKey0123"),
|
||||
ok = leveled_bookie:book_destroy(Bookie3).
|
||||
|
||||
journal_compaction(_Config) ->
|
||||
|
@ -455,6 +456,7 @@ load_and_count_withdelete(_Config) ->
|
|||
ok = leveled_bookie:book_close(Bookie1),
|
||||
{ok, Bookie2} = leveled_bookie:book_start(StartOpts1),
|
||||
testutil:check_formissingobject(Bookie2, BucketD, KeyD),
|
||||
testutil:check_formissingobject(Bookie2, "Bookie1", "MissingKey0123"),
|
||||
{_BSize, 0} = testutil:check_bucket_stats(Bookie2, BucketD),
|
||||
ok = leveled_bookie:book_close(Bookie2),
|
||||
testutil:reset_filestructure().
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue