Develop 3.1 d30update (#386)

* Mas i370 patch d (#383)

* Refactor penciller memory

In high-volume tests on large key-count clusters, so significant variation in the P0031 time has been seen:

TimeBucket	PatchA
a.0ms_to_1ms	18554
b.1ms_to_2ms	51778
c.2ms_to_3ms	696
d.3ms_to_5ms	220
e.5ms_to_8ms	59
f.8ms_to_13ms	40
g.13ms_to_21ms	364
h.21ms_to_34ms	277
i.34ms_to_55ms	34
j.55ms_to_89ms	17
k.89ms_to_144ms	21
l.144ms_to_233ms	31
m.233ms_to_377ms	45
n.377ms_to_610ms	52
o.610ms_to_987ms	59
p.987ms_to_1597ms	55
q.1597ms_to_2684ms	54
r.2684ms_to_4281ms	29
s.4281ms_to_6965ms	7
t.6295ms_to_11246ms	1

It is unclear why this varies so much.  The time to add to the cache appears to be minimal (but perhaps there is an issue with timing points in the code), whereas the time to add to the index is much more significant and variable.  There is also variable time when the memory is rolled (although the actual activity here appears to be minimal.

The refactoring here is two-fold:

- tidy and simplify by keeping LoopState managed within handle_call, and add more helpful dialyzer specs;

- change the update to the index to be a simple extension of a list, rather than any conversion.

This alternative version of the pmem index in unit test is orders of magnitude faster to add - and is the same order of magnitude to check.  Anticipation is that it may be more efficient in terms of memory changes.

* Compress SST index

Reduces the size of the leveled_sst index with two changes:

1 - Where there is a common prefix of tuple elements (e.g. Bucket) across the whole leveled_sst file - only the non-common part is indexed, and a function is used to compare.

2 - There is less "indexing" of the index i.e. only 1 in 16 keys are passed into the gb_trees part instead of 1 in 4

* Immediate hibernate

Reasons for delay in hibernate were not clear.

Straight after creation the process will not be in receipt of messages (must wait for the manifest to be updated), so better to hibernate now.  This also means the log PC023 provides more accurate information.

* Refactor BIC

This patch avoids the following:

- repeated replacement of the same element in the BIC (via get_kvrange), by checking presence via GET before sing SET

- Stops re-reading of all elements to discover high modified date

Also there appears to have been a bug where a missing HMD for the file is required to add to the cache.  However, now the cache may be erased without erasing the HMD.  This means that the cache can never be rebuilt

* Use correct size in test results

erts_debug:flat_size/1 returns size in words (i.e. 8 bytes on 64-bit CPU) not bytes

* Don't change summary record

As it is persisted as part of the file write, any change to the summary record cannot be rolled back

* Clerk to prompt L0 write

Simplifies the logic if the clerk request work for the penciller prompts L0 writes as well as Manifest changes.

The advantage now is that if the penciller memory is full, and PUT load stops, the clerk should still be able to prompt persistence.  the penciller can therefore make use of dead time this way

* Add push on journal compact

If there has been a backlog, followed by a quiet period - there may be a large ledger cache left unpushed.  Journal compaction events are about once per hour, so the performance overhead of a false push should be minimal, with the advantage of clearing any backlog before load starts again.

This is only relevant to riak users with very off/full batch type workloads.

* Extend tests

To more consistently trigger all overload scenarios

* Fix range keys smaller than prefix

Can't make end key an empty binary  in this case, as it may be bigger than any keys within the range, but will appear to be smaller.

Unit tests and ct tests added to expose the potential issue

* Tidy-up

- Remove penciller logs which are no longer called
- Get pclerk to only wait MIN_TIMEOUT after doing work, in case there is a backlog
- Remove update_levelzero_cache function as it is unique to handle_call of push_mem, and simple enough to be inline
- Alight testutil slow offer with standard slow offer used

* Tidy-up

Remove pre-otp20 references.

Reinstate the check that the starting pid is still active, this was added to tidy up shutdown.

Resolve failure to run on otp20 due to `-if` sttaement

* Tidy up

Using null rather then {null, Key} is potentially clearer as it is not a concern what they Key is in this case, and removes a comparison step from the leveled_codec:endkey_passed/2 function.

There were issues with coverage in eunit tests as the leveled_pclerk shut down.  This prompted a general tidy of leveled_pclerk (remove passing of LoopState into internal functions, and add dialyzer specs.

* Remove R16 relic

* Further testing another issue

The StartKey must always be less than or equal to the prefix when the first N characters are stripped,  but this is not true of the EndKey (for the query) which does not have to be between the FirstKey and the LastKey.

If the EndKey query does not match it must be greater than the Prefix (as otherwise it would not have been greater than the FirstKey - so set to null.

* Fix unit test

Unit test had a typo - and result interpretation had a misunderstanding.

* Code and spec tidy

Also look to the cover the situation when the FirstKey is the same as the Prefix with tests.

This is, in theory, not an issue as it is the EndKey for each sublist which is indexed in leveled_tree.  However, guard against it mapping to null here, just in case there are dangers lurking (note that tests will still pass without `M > N` guard in place.

* Hibernate on BIC complete

There are three situations when the BIC becomes complete:

- In a file created as part of a merge the BIS is learned in the merge
- After startup, files below L1 learn the block cache through reads that happen to read the block, eventually the while cache will be read, unless...
- Either before/after the cache is complete, it can get whiped by a timeout after a get_sqn request (e.g. as prompted by a journal compaction) ... it will then be re-filled of the back of get/get-range requests.

In all these situations we want to hibernate after the BIC is fill - to reflect the fact that the LoopState should now be relatively stable, so it is a good point to GC and rationalise location of data.

Previously on the the first base was covered.  Now all three are covered through the bic_complete message.

* Test all index keys have same term

This works functionally, but is not optimised (the term is replicated in the index)

* Summaries with same index term

If the summary index all have the same index term - only the object keys need to be indexes

* Simplify case statements

We either match the pattern of <<Prefix:N, Suffix>> or the answer should be null

* OK for M == N

If M = N for the first key, it will have a suffix of <<>>.  This will match (as expected) a query Start Key of the sam size, and be smaller than any query Start Key that has the same prefix.

If the query Start Key does not match the prefix - it will be null - as it must be smaller than the Prefix (as other wise the query Start Key would be bigger than the Last Key).

The constraint of M > N was introduced before the *_prefix_filter functions were checking the prefix, to avoid issues.  Now the prefix is being checked, then M == N is ok.

* Simplify

Correct the test to use a binary field in the range.

To avoid further issue, only apply filter when everything is a binary() type.

* Add test for head_only mode

When leveled is used as a tictacaae key store (in parallel mode), the keys will be head_only entries.  Double check they are handled as expected like object keys

* Revert previous change - must support typed buckets

Add assertion to confirm worthwhile optimisation

* Add support for configurable cache multiple (#375)

* Mas i370 patch e (#385)

Improvement to monitoring for efficiency and improved readability of logs and stats.

As part of this, where possible, tried to avoid updating loop state on READ messages in leveled processes (as was the case when tracking stats within each process).

No performance benefits found with change, but improved stats has helped discover other potential gains.
This commit is contained in:
Martin Sumner 2022-12-18 20:18:03 +00:00 committed by GitHub
parent d09f5c778b
commit a033e280e6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
27 changed files with 3500 additions and 2177 deletions

View file

@ -233,12 +233,8 @@
% related to specific query, and the StartKey/EndKey
% used to extract this part
persisted_sqn = 0 :: integer(), % The highest SQN persisted
persisted_sqn = 0 :: integer(), % The highest SQN persisted
ledger_sqn = 0 :: integer(), % The highest SQN added to L0
root_path = "test" :: string(),
clerk :: pid() | undefined,
levelzero_pending = false :: boolean(),
levelzero_constructor :: pid() | undefined,
@ -246,14 +242,18 @@
levelzero_size = 0 :: integer(),
levelzero_maxcachesize :: integer() | undefined,
levelzero_cointoss = false :: boolean(),
levelzero_index :: array:array() | undefined | redacted,
levelzero_index ::
leveled_pmem:index_array() | undefined | redacted,
levelzero_astree :: list() | undefined | redacted,
root_path = "test" :: string(),
clerk :: pid() | undefined,
is_snapshot = false :: boolean(),
snapshot_fully_loaded = false :: boolean(),
snapshot_time :: pos_integer() | undefined,
source_penciller :: pid() | undefined,
bookie_monref :: reference() | undefined,
levelzero_astree :: list() | undefined | redacted,
work_ongoing = false :: boolean(), % i.e. compaction work
work_backlog = false :: boolean(), % i.e. compaction work
@ -261,39 +261,20 @@
pending_removals = [] :: list(string()),
maybe_release = false :: boolean(),
timings = no_timing :: pcl_timings(),
timings_countdown = 0 :: integer(),
snaptimeout_short :: pos_integer()|undefined,
snaptimeout_long :: pos_integer()|undefined,
sst_options = #sst_options{} :: #sst_options{}}).
monitor = {no_monitor, 0} :: leveled_monitor:monitor(),
sst_options = #sst_options{} :: sst_options()}).
-record(pcl_timings,
{sample_count = 0 :: integer(),
foundmem_time = 0 :: integer(),
found0_time = 0 :: integer(),
found1_time = 0 :: integer(),
found2_time = 0 :: integer(),
found3_time = 0 :: integer(),
foundlower_time = 0 :: integer(),
missed_time = 0 :: integer(),
foundmem_count = 0 :: integer(),
found0_count = 0 :: integer(),
found1_count = 0 :: integer(),
found2_count = 0 :: integer(),
found3_count = 0 :: integer(),
foundlower_count = 0 :: integer(),
missed_count = 0 :: integer()}).
-type penciller_options() :: #penciller_options{}.
-type bookies_memory() :: {tuple()|empty_cache,
% array:array()|empty_array,
any()|empty_array, % Issue of type compatability with OTP16
array:array()|empty_array,
integer()|infinity,
integer()}.
-type pcl_state() :: #state{}.
-type pcl_timings() :: no_timing|#pcl_timings{}.
-type levelzero_cacheentry() :: {pos_integer(), leveled_tree:leveled_tree()}.
-type levelzero_cache() :: list(levelzero_cacheentry()).
-type iterator_entry()
@ -313,7 +294,7 @@
fun((leveled_codec:ledger_key(),
leveled_codec:ledger_value(),
any()) -> any()).
-type sst_options() :: #sst_options{}.
-export_type([levelzero_cacheentry/0, levelzero_returnfun/0, sqn_check/0]).
@ -655,73 +636,76 @@ init([LogOpts, PCLopts]) ->
Query,
BookiesMem,
LongRunning),
leveled_log:log("P0001", [self()]),
{ok, State#state{is_snapshot=true,
leveled_log:log(p0001, [self()]),
{ok, State#state{is_snapshot = true,
bookie_monref = BookieMonitor,
source_penciller=SrcPenciller}};
source_penciller = SrcPenciller}};
{_RootPath, _Snapshot=false, _Q, _BM} ->
start_from_file(PCLopts)
end.
handle_call({push_mem, {LedgerTable, PushedIdx, MinSQN, MaxSQN}},
From,
_From,
State=#state{is_snapshot=Snap}) when Snap == false ->
% The push_mem process is as follows:
%
% 1 - Receive a cache. The cache has four parts: a tree of keys and
% values, an array of 256 binaries listing the hashes present in the
% tree, a min SQN and a max SQN
%
% 2 - Check to see if there is a levelzero file pending. If so, the
% update must be returned. If not the update can be accepted
%
% 3 - The Penciller can now reply to the Bookie to show if the push has
% been accepted
%
% 4 - Update the cache:
% a) Append the cache to the list
% b) Add each of the 256 hash-listing binaries to the master L0 index array
%
% Check the approximate size of the cache. If it is over the maximum size,
% trigger a background L0 file write and update state of levelzero_pending.
CacheUpdateBlockedByPendingWork
= State#state.levelzero_pending or State#state.work_backlog,
CacheFull = leveled_pmem:cache_full(State#state.levelzero_cache),
case {CacheUpdateBlockedByPendingWork, CacheFull} of
{true, _} ->
leveled_log:log("P0018", [returned,
State#state.levelzero_pending,
State#state.work_backlog]),
% 1. If either the penciller is still waiting on the last L0 file to be
% written, or there is a work backlog - the cache is returned with the
% expectation that PUTs should be slowed. Also if the cache has reached
% the maximum number of lines (by default after 31 pushes from the bookie)
%
% 2. If (1) does not apply, the bookie's cache will be added to the
% penciller's cache.
SW = os:timestamp(),
L0Pending = State#state.levelzero_pending,
WorkBacklog = State#state.work_backlog,
CacheAlreadyFull = leveled_pmem:cache_full(State#state.levelzero_cache),
L0Size = State#state.levelzero_size,
% The clerk is prompted into action as there may be a L0 write required
ok = leveled_pclerk:clerk_prompt(State#state.clerk),
case L0Pending or WorkBacklog or CacheAlreadyFull of
true ->
% Cannot update the cache, or roll the memory so reply `returned`
% The Bookie must now retain the lesger cache and try to push the
% updated cache at a later time
leveled_log:log(
p0018,
[L0Size, L0Pending, WorkBacklog, CacheAlreadyFull]),
{reply, returned, State};
{false, true} ->
leveled_log:log("P0042", [State#state.levelzero_size]),
% The cache is full (the maximum line items have been reached), so
% can't accept any more. However, we need to try and roll memory
% otherwise cache may be permanently full.
gen_server:reply(From, returned),
{UpdState, none} = maybe_roll_memory(State, true, false),
{noreply, UpdState};
{false, false} ->
% leveled_log:log("P0018", [ok, false, false]),
false ->
% Return ok as cache has been updated on State and the Bookie
% should clear its ledger cache which is now with the Penciller
PushedTree =
case is_tuple(LedgerTable) of
true ->
LedgerTable;
false ->
leveled_tree:from_orderedset(LedgerTable,
?CACHE_TYPE)
leveled_tree:from_orderedset(LedgerTable, ?CACHE_TYPE)
end,
% Reply must happen after the table has been converted
gen_server:reply(From, ok),
% Update LevelZero will add to the cache and maybe roll the
% cache from memory to L0 disk if the cache is too big
{noreply,
update_levelzero(State#state.levelzero_size,
{PushedTree, PushedIdx, MinSQN, MaxSQN},
State#state.ledger_sqn,
State#state.levelzero_cache,
State)}
{UpdMaxSQN, NewL0Size, UpdL0Cache} =
leveled_pmem:add_to_cache(
L0Size,
{PushedTree, MinSQN, MaxSQN},
State#state.ledger_sqn,
State#state.levelzero_cache),
UpdL0Index =
leveled_pmem:add_to_index(
PushedIdx,
State#state.levelzero_index,
length(State#state.levelzero_cache) + 1),
leveled_log:log_randomtimer(
p0031, [NewL0Size, true, true, MinSQN, MaxSQN], SW, 0.1),
{reply,
ok,
State#state{
levelzero_cache = UpdL0Cache,
levelzero_size = NewL0Size,
levelzero_index = UpdL0Index,
ledger_sqn = UpdMaxSQN}}
end;
handle_call({fetch, Key, Hash, UseL0Index}, _From, State) ->
L0Idx =
@ -731,15 +715,12 @@ handle_call({fetch, Key, Hash, UseL0Index}, _From, State) ->
false ->
none
end,
{R, UpdTimings} = timed_fetch_mem(Key,
Hash,
State#state.manifest,
State#state.levelzero_cache,
L0Idx,
State#state.timings),
{UpdTimings0, CountDown} =
update_statetimings(UpdTimings, State#state.timings_countdown),
{reply, R, State#state{timings=UpdTimings0, timings_countdown=CountDown}};
R =
timed_fetch_mem(
Key, Hash, State#state.manifest,
State#state.levelzero_cache, L0Idx,
State#state.monitor),
{reply, R, State};
handle_call({check_sqn, Key, Hash, SQN}, _From, State) ->
{reply,
compare_to_sqn(
@ -792,10 +773,8 @@ handle_call({fetch_keys,
lists:filter(FilterFun, L0AsList)
end,
leveled_log:log_randomtimer("P0037",
[State#state.levelzero_size],
SW,
0.01),
leveled_log:log_randomtimer(
p0037, [State#state.levelzero_size], SW, 0.01),
%% Rename any reference to loop state that may be used by the function
%% to be returned - https://github.com/martinsumner/leveled/issues/326
@ -874,10 +853,8 @@ handle_call({register_snapshot, Snapshot, Query, BookiesMem, LongRunning},
EndKey,
State#state.levelzero_cache,
LM1Cache),
leveled_log:log_randomtimer("P0037",
[State#state.levelzero_size],
SW,
0.01),
leveled_log:log_randomtimer(
p0037, [State#state.levelzero_size], SW, 0.01),
{#state{levelzero_astree = L0AsTree,
ledger_sqn = MaxSQN,
persisted_sqn = State#state.persisted_sqn},
@ -892,15 +869,16 @@ handle_call({register_snapshot, Snapshot, Query, BookiesMem, LongRunning},
{LM1Cache, MinSQN, MaxSQN},
State#state.ledger_sqn,
State#state.levelzero_cache),
L0Index =
LM1Idx =
case BookieIdx of
empty_index ->
State#state.levelzero_index;
leveled_pmem:new_index();
_ ->
leveled_pmem:add_to_index(BookieIdx,
State#state.levelzero_index,
length(L0Cache))
BookieIdx
end,
L0Index =
leveled_pmem:add_to_index(
LM1Idx, State#state.levelzero_index, length(L0Cache)),
{#state{levelzero_cache = L0Cache,
levelzero_index = L0Index,
levelzero_size = UpdSize,
@ -930,26 +908,28 @@ handle_call(close, _From, State) ->
% The penciller should close each file in the manifest, and call a close
% on the clerk.
ok = leveled_pclerk:clerk_close(State#state.clerk),
leveled_log:log("P0008", [close]),
L0Empty = State#state.levelzero_size == 0,
case (not State#state.levelzero_pending and not L0Empty) of
leveled_log:log(p0008, [close]),
L0Left = State#state.levelzero_size > 0,
case (not State#state.levelzero_pending and L0Left) of
true ->
L0_Left = State#state.levelzero_size > 0,
{UpdState, _L0Bloom} = maybe_roll_memory(State, L0_Left, true),
L0Pid = UpdState#state.levelzero_constructor,
case is_pid(L0Pid) of
true ->
ok = leveled_sst:sst_close(L0Pid);
false ->
leveled_log:log("P0010", [State#state.levelzero_size])
end;
Man0 = State#state.manifest,
{Constructor, _} =
roll_memory(
leveled_pmanifest:get_manifest_sqn(Man0) + 1,
State#state.ledger_sqn,
State#state.root_path,
State#state.levelzero_cache,
length(State#state.levelzero_cache),
State#state.sst_options,
true),
ok = leveled_sst:sst_close(Constructor);
false ->
leveled_log:log("P0010", [State#state.levelzero_size])
leveled_log:log(p0010, [State#state.levelzero_size])
end,
shutdown_manifest(State#state.manifest, State#state.levelzero_constructor),
{stop, normal, ok, State};
handle_call(doom, _From, State) ->
leveled_log:log("P0030", []),
leveled_log:log(p0030, []),
ok = leveled_pclerk:clerk_close(State#state.clerk),
shutdown_manifest(State#state.manifest, State#state.levelzero_constructor),
@ -983,7 +963,7 @@ handle_call(persisted_sqn, _From, State) ->
handle_cast({manifest_change, Manifest}, State) ->
NewManSQN = leveled_pmanifest:get_manifest_sqn(Manifest),
OldManSQN = leveled_pmanifest:get_manifest_sqn(State#state.manifest),
leveled_log:log("P0041", [OldManSQN, NewManSQN]),
leveled_log:log(p0041, [OldManSQN, NewManSQN]),
% Only safe to update the manifest if the SQN increments
if NewManSQN > OldManSQN ->
ok =
@ -1008,7 +988,7 @@ handle_cast({manifest_change, Manifest}, State) ->
handle_cast({release_snapshot, Snapshot}, State) ->
Manifest0 = leveled_pmanifest:release_snapshot(State#state.manifest,
Snapshot),
leveled_log:log("P0003", [Snapshot]),
leveled_log:log(p0003, [Snapshot]),
{noreply, State#state{manifest=Manifest0}};
handle_cast({confirm_delete, PDFN, FilePid}, State=#state{is_snapshot=Snap})
when Snap == false ->
@ -1028,7 +1008,7 @@ handle_cast({confirm_delete, PDFN, FilePid}, State=#state{is_snapshot=Snap})
% will be cleared from pending using the maybe_release boolean
case leveled_pmanifest:ready_to_delete(State#state.manifest, PDFN) of
true ->
leveled_log:log("P0005", [PDFN]),
leveled_log:log(p0005, [PDFN]),
ok = leveled_sst:sst_deleteconfirmed(FilePid),
case State#state.work_ongoing of
true ->
@ -1060,7 +1040,7 @@ handle_cast({confirm_delete, PDFN, FilePid}, State=#state{is_snapshot=Snap})
end
end;
handle_cast({levelzero_complete, FN, StartKey, EndKey, Bloom}, State) ->
leveled_log:log("P0029", []),
leveled_log:log(p0029, []),
ManEntry = #manifest_entry{start_key=StartKey,
end_key=EndKey,
owner=State#state.levelzero_constructor,
@ -1072,53 +1052,70 @@ handle_cast({levelzero_complete, FN, StartKey, EndKey, Bloom}, State) ->
0,
ManEntry),
% Prompt clerk to ask about work - do this for every L0 roll
UpdIndex = leveled_pmem:clear_index(State#state.levelzero_index),
ok = leveled_pclerk:clerk_prompt(State#state.clerk),
{noreply, State#state{levelzero_cache=[],
levelzero_index=UpdIndex,
levelzero_index=[],
levelzero_pending=false,
levelzero_constructor=undefined,
levelzero_size=0,
manifest=UpdMan,
persisted_sqn=State#state.ledger_sqn}};
handle_cast(work_for_clerk, State) ->
case {State#state.levelzero_pending, State#state.work_ongoing} of
{false, false} ->
% TODO - as part of supervision tree and retry work:
% Need to check for work_ongoing as well as levelzero_pending as
% there may be a race that could lead to the clerk doing the same
% thing twice.
%
% This has implications though if we auto-restart the pclerk in the
% future, without altering this state - it may never be able to
% request work due to ongoing work that crashed the previous clerk
%
% Perhaps the pclerk should not be restarted because of this, and
% the failure should ripple up
{WL, WC} = leveled_pmanifest:check_for_work(State#state.manifest),
case WC of
0 ->
{noreply, State#state{work_backlog=false}};
N ->
Backlog = N > ?WORKQUEUE_BACKLOG_TOLERANCE,
leveled_log:log("P0024", [N, Backlog]),
[TL|_Tail] = WL,
ok =
leveled_pclerk:clerk_push(
State#state.clerk, {TL, State#state.manifest}),
case TL of
0 ->
% Just written a L0 so as LoopState now rewritten,
% garbage collect to free as much as possible as
% soon as possible
garbage_collect();
_ ->
ok
end,
case {State#state.levelzero_pending,
State#state.work_ongoing,
leveled_pmanifest:levelzero_present(State#state.manifest)} of
{false, false, false} ->
% If the penciller memory needs rolling, prompt this now
CacheOverSize =
maybe_cache_too_big(
State#state.levelzero_size,
State#state.levelzero_maxcachesize,
State#state.levelzero_cointoss),
CacheAlreadyFull =
leveled_pmem:cache_full(State#state.levelzero_cache),
case (CacheAlreadyFull or CacheOverSize) of
true ->
% Rolling the memory to create a new Level Zero file
NextSQN =
leveled_pmanifest:get_manifest_sqn(
State#state.manifest) + 1,
{Constructor, none} =
roll_memory(
NextSQN,
State#state.ledger_sqn,
State#state.root_path,
none,
length(State#state.levelzero_cache),
State#state.sst_options,
false),
{noreply,
State#state{work_backlog=Backlog, work_ongoing=true}}
State#state{
levelzero_pending=true,
levelzero_constructor=Constructor}};
false ->
{WL, WC} =
leveled_pmanifest:check_for_work(State#state.manifest),
case WC of
0 ->
% Should do some tidy-up work here?
{noreply, State#state{work_backlog=false}};
N ->
Backlog = N > ?WORKQUEUE_BACKLOG_TOLERANCE,
leveled_log:log(p0024, [N, Backlog]),
[TL|_Tail] = WL,
ok =
leveled_pclerk:clerk_push(
State#state.clerk,
{TL, State#state.manifest}),
{noreply,
State#state{
work_backlog=Backlog, work_ongoing=true}}
end
end;
{false, false, true} ->
ok = leveled_pclerk:clerk_push(
State#state.clerk, {0, State#state.manifest}),
{noreply, State#state{work_ongoing=true}};
_ ->
{noreply, State}
end;
@ -1157,9 +1154,9 @@ handle_info(_Info, State) ->
{noreply, State}.
terminate(Reason, _State=#state{is_snapshot=Snap}) when Snap == true ->
leveled_log:log("P0007", [Reason]);
leveled_log:log(p0007, [Reason]);
terminate(Reason, _State) ->
leveled_log:log("P0011", [Reason]).
leveled_log:log(p0011, [Reason]).
format_status(normal, [_PDict, State]) ->
State;
@ -1200,6 +1197,7 @@ start_from_file(PCLopts) ->
RootPath = PCLopts#penciller_options.root_path,
MaxTableSize = PCLopts#penciller_options.max_inmemory_tablesize,
OptsSST = PCLopts#penciller_options.sst_options,
Monitor = PCLopts#penciller_options.monitor,
SnapTimeoutShort = PCLopts#penciller_options.snaptimeout_short,
SnapTimeoutLong = PCLopts#penciller_options.snaptimeout_long,
@ -1215,10 +1213,11 @@ start_from_file(PCLopts) ->
root_path = RootPath,
levelzero_maxcachesize = MaxTableSize,
levelzero_cointoss = CoinToss,
levelzero_index = leveled_pmem:new_index(),
levelzero_index = [],
snaptimeout_short = SnapTimeoutShort,
snaptimeout_long = SnapTimeoutLong,
sst_options = OptsSST},
sst_options = OptsSST,
monitor = Monitor},
%% Open manifest
Manifest0 = leveled_pmanifest:open_manifest(RootPath),
@ -1232,15 +1231,15 @@ start_from_file(PCLopts) ->
SQNFun = fun leveled_sst:sst_getmaxsequencenumber/1,
{MaxSQN, Manifest1, FileList} =
leveled_pmanifest:load_manifest(Manifest0, OpenFun, SQNFun),
leveled_log:log("P0014", [MaxSQN]),
leveled_log:log(p0014, [MaxSQN]),
ManSQN = leveled_pmanifest:get_manifest_sqn(Manifest1),
leveled_log:log("P0035", [ManSQN]),
leveled_log:log(p0035, [ManSQN]),
%% Find any L0 files
L0FN = sst_filename(ManSQN + 1, 0, 0),
{State0, FileList0} =
case filelib:is_file(filename:join(sst_rootpath(RootPath), L0FN)) of
true ->
leveled_log:log("P0015", [L0FN]),
leveled_log:log(p0015, [L0FN]),
L0Open = leveled_sst:sst_open(sst_rootpath(RootPath),
L0FN,
OptsSST,
@ -1257,14 +1256,14 @@ start_from_file(PCLopts) ->
ManSQN + 1,
0,
L0Entry),
leveled_log:log("P0016", [L0SQN]),
leveled_log:log(p0016, [L0SQN]),
LedgerSQN = max(MaxSQN, L0SQN),
{InitState#state{manifest = Manifest2,
ledger_sqn = LedgerSQN,
persisted_sqn = LedgerSQN},
[L0FN|FileList]};
false ->
leveled_log:log("P0017", []),
leveled_log:log(p0017, []),
{InitState#state{manifest = Manifest1,
ledger_sqn = MaxSQN,
persisted_sqn = MaxSQN},
@ -1330,7 +1329,7 @@ archive_files(RootPath, UsedFileList) ->
true ->
UnusedFiles;
false ->
leveled_log:log("P0040", [FN0]),
leveled_log:log(p0040, [FN0]),
[FN0|UnusedFiles]
end;
_ ->
@ -1350,143 +1349,76 @@ archive_files(RootPath, UsedFileList) ->
ok.
-spec update_levelzero(integer(), tuple(), integer(), list(), pcl_state())
-> pcl_state().
-spec maybe_cache_too_big(
pos_integer(), pos_integer(), boolean()) -> boolean().
%% @doc
%% Update the in-memory cache of recent changes for the penciller. This is
%% the level zero at the top of the tree.
%% Once the update is made, there needs to be a decision to potentially roll
%% the level-zero memory to an on-disk level zero sst file. This can only
%% happen when the cache has exeeded the size threshold (with some jitter
%% to prevent coordination across multiple leveled instances), and when there
%% is no level zero file already present, and when there is no manifest change
%% pending.
update_levelzero(L0Size, {PushedTree, PushedIdx, MinSQN, MaxSQN},
LedgerSQN, L0Cache, State) ->
SW = os:timestamp(), % Time this for logging purposes
Update = leveled_pmem:add_to_cache(L0Size,
{PushedTree, MinSQN, MaxSQN},
LedgerSQN,
L0Cache),
UpdL0Index = leveled_pmem:add_to_index(PushedIdx,
State#state.levelzero_index,
length(L0Cache) + 1),
{UpdMaxSQN, NewL0Size, UpdL0Cache} = Update,
if
UpdMaxSQN >= LedgerSQN ->
UpdState = State#state{levelzero_cache=UpdL0Cache,
levelzero_size=NewL0Size,
levelzero_index=UpdL0Index,
ledger_sqn=UpdMaxSQN},
CacheTooBig =
NewL0Size > State#state.levelzero_maxcachesize,
CacheMuchTooBig =
NewL0Size > min(?SUPER_MAX_TABLE_SIZE,
2 * State#state.levelzero_maxcachesize),
RandomFactor =
case State#state.levelzero_cointoss of
true ->
case leveled_rand:uniform(?COIN_SIDECOUNT) of
1 ->
true;
_ ->
false
end;
false ->
true
end,
JitterCheck = RandomFactor or CacheMuchTooBig,
Due = CacheTooBig and JitterCheck,
{UpdState0, _L0Bloom} = maybe_roll_memory(UpdState, Due, false),
LogSubs = [NewL0Size, Due, State#state.work_ongoing],
case Due of
true ->
leveled_log:log_timer("P0031", LogSubs, SW);
_ ->
ok
end,
UpdState0
end.
%% Is the cache too big - should it be flushed to on-disk Level 0
%% There exists some jitter to prevent all caches from flushing concurrently
%% where there are multiple leveled instances on one machine.
maybe_cache_too_big(NewL0Size, L0MaxSize, CoinToss) ->
CacheTooBig = NewL0Size > L0MaxSize,
CacheMuchTooBig =
NewL0Size > min(?SUPER_MAX_TABLE_SIZE, 2 * L0MaxSize),
RandomFactor =
case CoinToss of
true ->
case leveled_rand:uniform(?COIN_SIDECOUNT) of
1 ->
true;
_ ->
false
end;
false ->
true
end,
CacheTooBig and (RandomFactor or CacheMuchTooBig).
-spec maybe_roll_memory(pcl_state(), boolean(), boolean())
-> {pcl_state(), leveled_ebloom:bloom()|none}.
%% @doc
%% Check that no L0 file is present before rolling memory. Returns a boolean
%% to indicate if memory has been rolled, the Pid of the L0 constructor and
%% The bloom of the L0 file (or none)
maybe_roll_memory(State, false, _SyncRoll) ->
{State, none};
maybe_roll_memory(State, true, SyncRoll) ->
BlockedByL0 = leveled_pmanifest:levelzero_present(State#state.manifest),
PendingManifestChange = State#state.work_ongoing,
% It is critical that memory is not rolled if the manifest is due to be
% updated by a change by the clerk. When that manifest change is made it
% will override the addition of L0 and data will be lost.
case (BlockedByL0 or PendingManifestChange) of
true ->
{State, none};
false ->
{L0Constructor, Bloom} = roll_memory(State, SyncRoll),
{State#state{levelzero_pending=true,
levelzero_constructor=L0Constructor},
Bloom}
end.
-spec roll_memory(pcl_state(), boolean())
-> {pid(), leveled_ebloom:bloom()|none}.
-spec roll_memory(
pos_integer(), non_neg_integer(), string(),
levelzero_cache()|none, pos_integer(),
sst_options(), boolean())
-> {pid(), leveled_ebloom:bloom()|none}.
%% @doc
%% Roll the in-memory cache into a L0 file. If this is done synchronously,
%% will return a bloom representing the contents of the file.
%%
%% Casting a large object (the levelzero cache) to the gen_server did not lead
%% to an immediate return as expected. With 32K keys in the TreeList it could
%% take around 35-40ms.
%% Casting a large object (the levelzero cache) to the SST file does not lead
%% to an immediate return. With 32K keys in the TreeList it could take around
%% 35-40ms due to the overheads of copying.
%%
%% To avoid blocking this gen_server, the SST file can request each item of the
%% To avoid blocking the penciller, the SST file can request each item of the
%% cache one at a time.
%%
%% The Wait is set to false to use a cast when calling this in normal operation
%% where as the Wait of true is used at shutdown
roll_memory(State, false) ->
ManSQN = leveled_pmanifest:get_manifest_sqn(State#state.manifest) + 1,
RootPath = sst_rootpath(State#state.root_path),
FileName = sst_filename(ManSQN, 0, 0),
leveled_log:log("P0019", [FileName, State#state.ledger_sqn]),
roll_memory(NextManSQN, LedgerSQN, RootPath, none, CL, SSTOpts, false) ->
L0Path = sst_rootpath(RootPath),
L0FN = sst_filename(NextManSQN, 0, 0),
leveled_log:log(p0019, [L0FN, LedgerSQN]),
PCL = self(),
FetchFun =
fun(Slot, ReturnFun) -> pcl_fetchlevelzero(PCL, Slot, ReturnFun) end,
R = leveled_sst:sst_newlevelzero(RootPath,
FileName,
length(State#state.levelzero_cache),
FetchFun,
PCL,
State#state.ledger_sqn,
State#state.sst_options),
{ok, Constructor, _} = R,
{ok, Constructor, _} =
leveled_sst:sst_newlevelzero(
L0Path, L0FN, CL, FetchFun, PCL, LedgerSQN, SSTOpts),
{Constructor, none};
roll_memory(State, true) ->
ManSQN = leveled_pmanifest:get_manifest_sqn(State#state.manifest) + 1,
RootPath = sst_rootpath(State#state.root_path),
FileName = sst_filename(ManSQN, 0, 0),
LZC = State#state.levelzero_cache,
FetchFun = fun(Slot) -> lists:nth(Slot, LZC) end,
KVList = leveled_pmem:to_list(length(LZC), FetchFun),
R = leveled_sst:sst_new(RootPath,
FileName,
0,
KVList,
State#state.ledger_sqn,
State#state.sst_options),
{ok, Constructor, _, Bloom} = R,
roll_memory(NextManSQN, LedgerSQN, RootPath, L0Cache, CL, SSTOpts, true) ->
L0Path = sst_rootpath(RootPath),
L0FN = sst_filename(NextManSQN, 0, 0),
FetchFun = fun(Slot) -> lists:nth(Slot, L0Cache) end,
KVList = leveled_pmem:to_list(CL, FetchFun),
{ok, Constructor, _, Bloom} =
leveled_sst:sst_new(
L0Path, L0FN, 0, KVList, LedgerSQN, SSTOpts),
{Constructor, Bloom}.
-spec timed_fetch_mem(tuple(), {integer(), integer()},
leveled_pmanifest:manifest(), list(),
leveled_pmem:index_array(), pcl_timings())
-> {tuple(), pcl_timings()}.
-spec timed_fetch_mem(
tuple(),
{integer(), integer()},
leveled_pmanifest:manifest(), list(),
leveled_pmem:index_array(),
leveled_monitor:monitor()) -> leveled_codec:ledger_kv()|not_found.
%% @doc
%% Fetch the result from the penciller, starting by looking in the memory,
%% and if it is not found looking down level by level through the LSM tree.
@ -1496,12 +1428,13 @@ roll_memory(State, true) ->
%% the cost of requests dropping levels can be monitored.
%%
%% the result tuple includes the level at which the result was found.
timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, Timings) ->
SW = os:timestamp(),
timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, Monitor) ->
SW0 = leveled_monitor:maybe_time(Monitor),
{R, Level} =
fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, fun timed_sst_get/4),
UpdTimings = update_timings(SW, Timings, R, Level),
{R, UpdTimings}.
{TS0, _SW1} = leveled_monitor:step_time(SW0),
maybelog_fetch_timing(Monitor, Level, TS0, R == not_present),
R.
-spec fetch_sqn(
@ -1576,10 +1509,10 @@ log_slowfetch(T0, R, PID, Level, FetchTolerance) ->
{T, R} when T < FetchTolerance ->
R;
{T, not_present} ->
leveled_log:log("PC016", [PID, T, Level, not_present]),
leveled_log:log(pc016, [PID, T, Level, not_present]),
not_present;
{T, R} ->
leveled_log:log("PC016", [PID, T, Level, found]),
leveled_log:log(pc016, [PID, T, Level, found]),
R
end.
@ -1929,95 +1862,17 @@ find_nextkey(QueryArray, LCnt,
end.
%%%============================================================================
%%% Timing Functions
%%%============================================================================
-spec update_statetimings(pcl_timings(), integer())
-> {pcl_timings(), integer()}.
%% @doc
%%
%% The timings state is either in countdown to the next set of samples of
%% we are actively collecting a sample. Active collection take place
%% when the countdown is 0. Once the sample has reached the expected count
%% then there is a log of that sample, and the countdown is restarted.
%%
%% Outside of sample windows the timings object should be set to the atom
%% no_timing. no_timing is a valid state for the pcl_timings type.
update_statetimings(no_timing, 0) ->
{#pcl_timings{}, 0};
update_statetimings(Timings, 0) ->
case Timings#pcl_timings.sample_count of
SC when SC >= ?TIMING_SAMPLESIZE ->
log_timings(Timings),
{no_timing, leveled_rand:uniform(2 * ?TIMING_SAMPLECOUNTDOWN)};
_SC ->
{Timings, 0}
end;
update_statetimings(no_timing, N) ->
{no_timing, N - 1}.
log_timings(Timings) ->
leveled_log:log("P0032", [Timings#pcl_timings.sample_count,
Timings#pcl_timings.foundmem_time,
Timings#pcl_timings.found0_time,
Timings#pcl_timings.found1_time,
Timings#pcl_timings.found2_time,
Timings#pcl_timings.found3_time,
Timings#pcl_timings.foundlower_time,
Timings#pcl_timings.missed_time,
Timings#pcl_timings.foundmem_count,
Timings#pcl_timings.found0_count,
Timings#pcl_timings.found1_count,
Timings#pcl_timings.found2_count,
Timings#pcl_timings.found3_count,
Timings#pcl_timings.foundlower_count,
Timings#pcl_timings.missed_count]).
-spec update_timings(erlang:timestamp(), pcl_timings(),
not_found|tuple(), integer()|basement)
-> pcl_timings().
%% @doc
%%
%% update the timings record unless the current record object is the atom
%% no_timing.
update_timings(_SW, no_timing, _Result, _Stage) ->
no_timing;
update_timings(SW, Timings, Result, Stage) ->
Timer = timer:now_diff(os:timestamp(), SW),
SC = Timings#pcl_timings.sample_count + 1,
Timings0 = Timings#pcl_timings{sample_count = SC},
case {Result, Stage} of
{not_present, _} ->
NFT = Timings#pcl_timings.missed_time + Timer,
NFC = Timings#pcl_timings.missed_count + 1,
Timings0#pcl_timings{missed_time = NFT, missed_count = NFC};
{_, memory} ->
PMT = Timings#pcl_timings.foundmem_time + Timer,
PMC = Timings#pcl_timings.foundmem_count + 1,
Timings0#pcl_timings{foundmem_time = PMT, foundmem_count = PMC};
{_, 0} ->
L0T = Timings#pcl_timings.found0_time + Timer,
L0C = Timings#pcl_timings.found0_count + 1,
Timings0#pcl_timings{found0_time = L0T, found0_count = L0C};
{_, 1} ->
L1T = Timings#pcl_timings.found1_time + Timer,
L1C = Timings#pcl_timings.found1_count + 1,
Timings0#pcl_timings{found1_time = L1T, found1_count = L1C};
{_, 2} ->
L2T = Timings#pcl_timings.found2_time + Timer,
L2C = Timings#pcl_timings.found2_count + 1,
Timings0#pcl_timings{found2_time = L2T, found2_count = L2C};
{_, 3} ->
L3T = Timings#pcl_timings.found3_time + Timer,
L3C = Timings#pcl_timings.found3_count + 1,
Timings0#pcl_timings{found3_time = L3T, found3_count = L3C};
_ ->
LLT = Timings#pcl_timings.foundlower_time + Timer,
LLC = Timings#pcl_timings.foundlower_count + 1,
Timings0#pcl_timings{foundlower_time = LLT, foundlower_count = LLC}
end.
-spec maybelog_fetch_timing(
leveled_monitor:monitor(),
memory|leveled_pmanifest:lsm_level(),
leveled_monitor:timing(),
boolean()) -> ok.
maybelog_fetch_timing(_Monitor, _Level, no_timing, _NF) ->
ok;
maybelog_fetch_timing({Pid, _StatsFreq}, _Level, FetchTime, true) ->
leveled_monitor:add_stat(Pid, {pcl_fetch_update, not_found, FetchTime});
maybelog_fetch_timing({Pid, _StatsFreq}, Level, FetchTime, _NF) ->
leveled_monitor:add_stat(Pid, {pcl_fetch_update, Level, FetchTime}).
%%%============================================================================
@ -2209,13 +2064,15 @@ simple_server_test() ->
?assertMatch(Key3, pcl_fetch(PCLr, {o,"Bucket0003", "Key0003", null})),
?assertMatch(Key4, pcl_fetch(PCLr, {o,"Bucket0004", "Key0004", null})),
{ok, PclSnap, null, _} =
leveled_bookie:snapshot_store(leveled_bookie:empty_ledgercache(),
PCLr,
null,
ledger,
undefined,
false),
{ok, PclSnap, null} =
leveled_bookie:snapshot_store(
leveled_bookie:empty_ledgercache(),
PCLr,
null,
{no_monitor, 0},
ledger,
undefined,
false),
?assertMatch(Key1, pcl_fetch(PclSnap, {o,"Bucket0001", "Key0001", null})),
?assertMatch(Key2, pcl_fetch(PclSnap, {o,"Bucket0002", "Key0002", null})),
@ -2264,13 +2121,15 @@ simple_server_test() ->
1)),
ok = pcl_close(PclSnap),
{ok, PclSnap2, null, _} =
leveled_bookie:snapshot_store(leveled_bookie:empty_ledgercache(),
PCLr,
null,
ledger,
undefined,
false),
{ok, PclSnap2, null} =
leveled_bookie:snapshot_store(
leveled_bookie:empty_ledgercache(),
PCLr,
null,
{no_monitor, 0},
ledger,
undefined,
false),
?assertMatch(replaced, pcl_checksequencenumber(PclSnap2,
{o,
@ -2476,19 +2335,6 @@ slow_fetch_test() ->
?assertMatch(not_present, log_slowfetch(2, not_present, "fake", 0, 1)),
?assertMatch("value", log_slowfetch(2, "value", "fake", 0, 1)).
timings_test() ->
SW = os:timestamp(),
timer:sleep(1),
T0 = update_timings(SW, #pcl_timings{}, {"K", "V"}, 2),
timer:sleep(1),
T1 = update_timings(SW, T0, {"K", "V"}, 3),
T2 = update_timings(SW, T1, {"K", "V"}, basement),
?assertMatch(3, T2#pcl_timings.sample_count),
?assertMatch(true, T2#pcl_timings.foundlower_time > T2#pcl_timings.found2_time),
?assertMatch(1, T2#pcl_timings.found2_count),
?assertMatch(1, T2#pcl_timings.found3_count),
?assertMatch(1, T2#pcl_timings.foundlower_count).
coverage_cheat_test() ->
{noreply, _State0} = handle_info(timeout, #state{}),
@ -2554,10 +2400,15 @@ handle_down_test() ->
loop() ->
receive
{snap, PCLr, TestPid} ->
{ok, Snap, null, _Timings} =
{ok, Snap, null} =
leveled_bookie:snapshot_store(
leveled_bookie:empty_ledgercache(),
PCLr, null, ledger, undefined, false),
PCLr,
null,
{no_monitor, 0},
ledger,
undefined,
false),
TestPid ! {self(), {ok, Snap, null}},
loop();
stop ->