leveled/src/leveled_pmanifest.erl
Martin Sumner aaeac7ba36
Mas d34 i453 eqwalizer (#454)
* Add eqwalizer and clear for codec & sst

The eqwalizer errors highlighted the need in several places for type clarification.

Within tests there are some issue where a type is assumed, and so ignore has been used to handle this rather than write more complex code to be explicit about the assumption.

The handling of arrays isn't great by eqwalizer - to be specific about the content of array causes issues when initialising an array.  Perhaps a type (map maybe) where one can be more explicit about types might be a better option (even if there is a minimal performance impact).

The use of a ?TOMB_COUNT defined option complicated the code much more with eqwalizer.  So for now, there is no developer option to disable ?TOMB_COUNT.

Test fixes required where strings have been used for buckets/keys not binaries.

The leveled_sst statem needs a different state record for starting when compared to other modes.  The state record has been divided up to reflect this, to make type management easier.  The impact on performance needs to be tested.

* Update ct tests to support binary keys/buckets only

* Eqwalizer for leveled_cdb and leveled_tictac

As array is used in leveled_tictac - there is the same issue as with leveled_sst

* Remove redundant indirection of leveled_rand

A legacy of pre-20 OTP

* Morde modules eqwalized

ebloom/log/util/monitor

* Eqwalize further modules

elp eqwalize leveled_codec; elp eqwalize leveled_sst; elp eqwalize leveled_cdb; elp eqwalize leveled_tictac; elp eqwalize leveled_log; elp eqwalize leveled_monitor; elp eqwalize leveled_head; elp eqwalize leveled_ebloom; elp eqwalize leveled_iclerk

All concurrently OK

* Refactor unit tests to use binary() no string() in key

Previously string() was allowed just to avoid having to change all these tests.  Go through the pain now, as part of eqwalizing.

* Add fixes for penciller, inker

Add a new ?IS_DEF macro to replace =/= undefined.

Now more explicit about primary, object and query keys

* Further fixes

Need to clarify functions used by runner - where keys , query keys and object keys are used

* Further eqwalisation

* Eqwalize leveled_pmanifest

Also make implementation independent of choice of dict - i.e. one can save a manifest using dict for blooms/pending_deletions and then open a manifest with code that uses a different type.  Allow for slow dict to be replaced with map.

Would not be backwards compatible though, without further thought - i.e. if you upgrade then downgrade.

Redundant code created by leveled_sst refactoring removed.

* Fix backwards compatibility issues

* Manifest Entry to belong to leveled_pmanifest

There are two manifests - leveled_pmanifest and leveled_imanifest.  Both have manifest_entry() type objects, but these types are different.  To avoid confusion don't include the pmanifest manifest_entry() within the global include file - be specific that it belongs to the leveled_pmanifest module

* Ignore elp file - large binary

* Update src/leveled_pmem.erl

Remove unnecessary empty list from type definition

Co-authored-by: Thomas Arts <thomas.arts@quviq.com>

---------

Co-authored-by: Thomas Arts <thomas.arts@quviq.com>
2024-11-13 13:37:13 +00:00

1766 lines
62 KiB
Erlang

%% -------- PENCILLER MANIFEST ---------
%%
%% The manifest is an ordered set of files for each level to be used to find
%% which file is relevant for a given key or range lookup at a given level.
%%
%% This implementation is incomplete, in that it just uses a plain list at
%% each level. This is fine for short-lived volume tests, but as the deeper
%% levels are used there will be an exponential penalty.
%%
%% The original intention was to swap out this implementation for a
%% multi-version ETS table - but that became complex. So one of two changes
%% are pending:
%% - Use a single version ES cache for lower levels (and not allow snapshots to
%% access the cache)
%% - Use a skiplist like enhanced list at lower levels.
-module(leveled_pmanifest).
% Test uses extracted/edited array related to specific issue, which is not in
% an expected format
-eqwalizer({nowarn_function, potential_issue_test/0}).
-include("leveled.hrl").
-export([
new_manifest/0,
open_manifest/1,
copy_manifest/1,
load_manifest/3,
close_manifest/2,
save_manifest/2,
query_manifest/3,
get_manifest_sqn/1,
key_lookup/3,
range_lookup/4,
merge_lookup/4,
insert_manifest_entry/4,
remove_manifest_entry/4,
replace_manifest_entry/5,
switch_manifest_entry/4,
mergefile_selector/3,
add_snapshot/3,
release_snapshot/2,
merge_snapshot/2,
ready_to_delete/2,
clear_pending/3,
check_for_work/1,
is_basement/2,
levelzero_present/1,
check_bloom/3,
report_manifest_level/2,
snapshot_pids/1,
get_sstpids/1
]).
-export(
[
new_entry/5,
entry_startkey/1,
entry_endkey/1,
entry_filename/1,
entry_owner/1,
is_entry/1
]
).
-export([
filepath/2
]).
-define(MANIFEST_FILEX, "man").
-define(PENDING_FILEX, "pnd").
-define(MANIFEST_FP, "ledger_manifest").
-define(LEVEL_SCALEFACTOR,
[{0, 0},
{1, 4}, {2, 16}, {3, 64}, % Factor of 4
{4, 384}, {5, 2304}, % Factor of 6
{6, 18432}, % Factor of 8
{7, infinity}]).
% As an alternative to going up by a factor of 8 at each level,
% increase by a factor of 4 at young levels - to make early
% compaction jobs shorter.
%
% There are 32K keys per files => with 4096 files there are 100M
% keys supported,
% 600M keys is supported before hitting the infinite level.
% At o(10) trillion keys behaviour may become increasingly
% difficult to predict.
-if(?OTP_RELEASE >= 25).
-if(length(?LEVEL_SCALEFACTOR) /= ?MAX_LEVELS).
-error("length ?LEVEL_SCALEFACTOR differs from ?MAX_LEVELS").
-endif.
-endif.
-define(TREE_TYPE, idxt).
-define(TREE_WIDTH, 8).
-define(PHANTOM_PID, r2d_fail).
-define(MANIFESTS_TO_RETAIN, 5).
-define(GROOM_SAMPLE, 16).
-record(manifest,
{
levels :: array:array(dynamic()),
% an array of lists or trees representing the manifest, where the
% list is created using the to_list function on leveled_treee
manifest_sqn = 0 :: non_neg_integer(),
% The current manifest SQN
snapshots = [] :: list(snapshot()),
% A list of snaphots (i.e. clones)
min_snapshot_sqn = 0 :: integer(),
% The smallest snapshot manifest SQN in the snapshot list
pending_deletes = new_pending_deletions() :: pending_deletions(),
basement :: non_neg_integer(),
% Currently the lowest level (the largest number)
blooms = new_blooms() :: blooms()
}).
-record(manifest_entry,
{
start_key :: leveled_codec:object_key(),
end_key :: leveled_codec:object_key(),
owner :: pid(),
filename :: string(),
bloom = none :: leveled_ebloom:bloom() | none
}
).
-type snapshot() ::
{pid(), non_neg_integer(), pos_integer(), pos_integer()}.
-type manifest() :: #manifest{}.
-type manifest_entry() :: #manifest_entry{}.
-type manifest_owner() :: pid().
-type lsm_level() :: 0..7.
-type pending_deletions() :: dict:dict().
-type blooms() :: dict:dict().
-type selector_strategy() ::
random|{grooming, fun((list(manifest_entry())) -> manifest_entry())}.
-export_type([manifest/0, manifest_entry/0, manifest_owner/0, lsm_level/0]).
%%%============================================================================
%%% API
%%%============================================================================
-spec new_manifest() -> manifest().
%% @doc
%% The manifest in this case is a manifest of the ledger. This contains
%% information on the layout of the files, but also information of snapshots
%% that may have an influence on the manifest as they require files to remain
%% after the primary penciller is happy for them to be removed.
new_manifest() ->
LevelArray0 = array:new([{size, ?MAX_LEVELS + 1}, {default, []}]),
SetLowerLevelFun =
fun(IDX, Acc) ->
array:set(IDX, leveled_tree:empty(?TREE_TYPE), Acc)
end,
LevelArray1 =
lists:foldl(
SetLowerLevelFun, LevelArray0, lists:seq(2, ?MAX_LEVELS)
),
#manifest{
levels = LevelArray1,
manifest_sqn = 0,
snapshots = [],
basement = 0
}.
-spec open_manifest(string()) -> manifest().
%% @doc
%% Open a manifest in the appropriate sub-directory of the RootPath, and will
%% return an empty manifest if no such manifest is present.
open_manifest(RootPath) ->
% Open the manifest in the file path which has the highest SQN, and will
% open without error
ManifestPath = filepath(RootPath, manifest),
{ok, Filenames} = file:list_dir(ManifestPath),
CurrRegex = "nonzero_(?<MSN>[0-9]+)\\." ++ ?MANIFEST_FILEX,
ExtractSQNFun =
fun(FN, Acc) ->
case re:run(FN, CurrRegex, [{capture, ['MSN'], list}]) of
nomatch ->
Acc;
{match, [Int]} when is_list(Int) ->
Acc ++ [list_to_integer(Int)]
end
end,
ValidManSQNs =
lists:reverse(lists:sort(lists:foldl(ExtractSQNFun, [], Filenames))),
open_manifestfile(RootPath, ValidManSQNs).
-spec copy_manifest(manifest()) -> manifest().
%% @doc
%% Used to pass the manifest to a snapshot, removing information not required
%% by a snapshot
copy_manifest(Manifest) ->
% Copy the manifest ensuring anything only the master process should care
% about is switched to be empty
Manifest#manifest{
snapshots = [], pending_deletes = new_pending_deletions()
}.
-spec load_manifest(
manifest(),
fun((file:name_all(), 1..7) -> {pid(), leveled_ebloom:bloom()}),
fun((pid()) -> pos_integer()))
-> {integer(), manifest(), list()}.
%% @doc
%% Roll over the manifest starting a process to manage each file in the
%% manifest. The PidFun should be able to return the Pid of a file process
%% (having started one). The SQNFun will return the max sequence number
%% of that file, if passed the Pid that owns it.
%%
%% The manifest is started from the basement first, and then the higher levels
%% as the page cache will be loaded with each file, and it would be
%% preferable to have the higher levels in the cache if memory is insufficient
%% to load each level
load_manifest(Manifest, LoadFun, SQNFun) ->
UpdateLevelFun =
fun(LevelIdx, {AccMaxSQN, AccMan, AccFL}) ->
L0 = array:get(LevelIdx, AccMan#manifest.levels),
{L1, SQN1, FileList, LvlBloom} =
load_level(LevelIdx, L0, LoadFun, SQNFun),
UpdLevels = array:set(LevelIdx, L1, AccMan#manifest.levels),
FoldBloomFun =
fun({P, B}, BAcc) ->
dict:store(P, B, BAcc)
end,
UpdBlooms =
lists:foldl(FoldBloomFun, AccMan#manifest.blooms, LvlBloom),
{max(AccMaxSQN, SQN1),
AccMan#manifest{levels = UpdLevels, blooms = UpdBlooms},
AccFL ++ FileList}
end,
lists:foldl(UpdateLevelFun,
{0, Manifest, []},
lists:reverse(lists:seq(0, Manifest#manifest.basement))).
-spec close_manifest(
manifest(),
fun((any()) -> ok)) -> ok.
%% @doc
%% Close all the files in the manifest (using CloseEntryFun to call close on
%% a file). Firts all the files in the active manifest are called, and then
%% any files which were pending deletion.
close_manifest(Manifest, CloseEntryFun) ->
CloseLevelFun =
fun(LevelIdx) ->
Level = array:get(LevelIdx, Manifest#manifest.levels),
close_level(LevelIdx, Level, CloseEntryFun)
end,
lists:foreach(CloseLevelFun, lists:seq(0, Manifest#manifest.basement)),
ClosePDFun =
fun({_FN, {_SQN, ME}}) ->
CloseEntryFun(ME)
end,
lists:foreach(ClosePDFun, dict:to_list(Manifest#manifest.pending_deletes)).
-spec save_manifest(manifest(), string()) -> ok.
%% @doc
%% Save the manifest to file (with a checksum)
save_manifest(Manifest, RootPath) ->
TFP = filepath(RootPath, Manifest#manifest.manifest_sqn, pending_manifest),
AFP = filepath(RootPath, Manifest#manifest.manifest_sqn, current_manifest),
ManBin =
term_to_binary(
Manifest#manifest{
snapshots = [],
pending_deletes = new_pending_deletions(),
min_snapshot_sqn = 0,
blooms = new_blooms()
}
),
CRC = erlang:crc32(ManBin),
ToPersist = <<CRC:32/integer, ManBin/binary>>,
ok = leveled_util:safe_rename(TFP, AFP, ToPersist, true),
GC_SQN = Manifest#manifest.manifest_sqn - ?MANIFESTS_TO_RETAIN,
% If a manifest is corrupted the previous one will be tried, so don't
% delete the previous one straight away. Retain until enough have been
% kept to make the probability of all being independently corrupted
% through separate events negligible
ok = remove_manifest(RootPath, GC_SQN),
% Sometimes we skip a SQN, so to GC all may need to clear up previous
% as well
ok = remove_manifest(RootPath, GC_SQN - 1).
-spec remove_manifest(string(), integer()) -> ok.
remove_manifest(RootPath, GC_SQN) ->
LFP = filepath(RootPath, GC_SQN, current_manifest),
ok =
case filelib:is_file(LFP) of
true ->
file:delete(LFP);
_ ->
ok
end.
-spec report_manifest_level(
manifest(), non_neg_integer()) ->
{non_neg_integer(),
non_neg_integer(),
{string(), pid(), non_neg_integer()} |
undefined,
non_neg_integer(),
non_neg_integer(),
non_neg_integer(),
non_neg_integer()}.
%% @doc
%% Report on a level in the manifest
%% - How many files in the level
%% - The average size of the memory occupied by a files in the level
%% - The file with the largest memory footprint {Filename, Pid, Memory}
report_manifest_level(Manifest, LevelIdx) ->
Levels = Manifest#manifest.levels,
Level = array:get(LevelIdx, Levels),
{LevelSize, LevelList} =
case is_list(Level) of
true ->
{length(Level), Level};
_ ->
{leveled_tree:tsize(Level), leveled_tree:to_list(Level)}
end,
AccMemFun =
fun(MaybeME, {MemAcc, Max, HBSAcc, HSAcc, LHSAcc, BVHSAcc}) ->
ME = get_manifest_entry(MaybeME),
P = ME#manifest_entry.owner,
{memory, PM} = process_info(P, memory),
UpdMax =
case Max of
{_MaxFN, _MaxP, MaxPM} when MaxPM > PM ->
Max;
_ ->
{ME#manifest_entry.filename, P, PM}
end,
{garbage_collection_info, GCI} =
process_info(P, garbage_collection_info),
HBS = proplists:get_value(heap_block_size, GCI),
HS = proplists:get_value(heap_size, GCI),
LHS = proplists:get_value(recent_size, GCI),
BVHS = proplists:get_value(bin_vheap_size, GCI),
{MemAcc + PM, UpdMax,
HBSAcc + HBS, HSAcc + HS, LHSAcc + LHS, BVHSAcc + BVHS}
end,
case LevelSize of
0 ->
{0, 0, undefined, 0, 0, 0, 0};
_ ->
{TotalMem, BiggestMem, TotalHBS, TotalHS, TotalLHS, TotalBVBS} =
lists:foldl(AccMemFun, {0, undefined, 0, 0, 0, 0}, LevelList),
{LevelSize, TotalMem div LevelSize, BiggestMem,
TotalHBS div LevelSize,
TotalHS div LevelSize,
TotalLHS div LevelSize,
TotalBVBS div LevelSize}
end.
-spec replace_manifest_entry(
manifest(),
integer(),
integer(),
list()|manifest_entry(),
list()|manifest_entry()) -> manifest().
%% @doc
%% Replace a list of manifest entries in the manifest with a new set of entries
%% Pass in the new manifest SQN to be used for this manifest. The list of
%% entries can just be a single entry
%%
%% This is generally called on the level being merged down into.
replace_manifest_entry(Manifest, ManSQN, LevelIdx, Removals, Additions) ->
Levels = Manifest#manifest.levels,
Level = array:get(LevelIdx, Levels),
{UpdBlooms, StrippedAdditions} =
update_blooms(Removals, Additions, Manifest#manifest.blooms),
UpdLevel = replace_entry(LevelIdx, Level, Removals, StrippedAdditions),
leveled_log:log(pc019, ["insert", LevelIdx, UpdLevel]),
PendingDeletes =
update_pendingdeletes(
ManSQN, Removals, Manifest#manifest.pending_deletes),
UpdLevels = array:set(LevelIdx, UpdLevel, Levels),
case is_empty(LevelIdx, UpdLevel) of
true ->
Manifest#manifest{
levels = UpdLevels,
basement = get_basement(UpdLevels),
manifest_sqn = ManSQN,
pending_deletes = PendingDeletes,
blooms = UpdBlooms
};
false ->
Basement = max(LevelIdx, Manifest#manifest.basement),
Manifest#manifest{
levels = UpdLevels,
basement = Basement,
manifest_sqn = ManSQN,
pending_deletes = PendingDeletes,
blooms = UpdBlooms
}
end.
-spec insert_manifest_entry(
manifest(), integer(), integer(), list()|manifest_entry()) -> manifest().
%% @doc
%% Place a single new manifest entry into a level of the manifest, at a given
%% level and manifest sequence number
insert_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) ->
Levels = Manifest#manifest.levels,
Level = array:get(LevelIdx, Levels),
{UpdBlooms, UpdEntry} =
update_blooms([], Entry, Manifest#manifest.blooms),
UpdLevel = add_entry(LevelIdx, Level, UpdEntry),
leveled_log:log(pc019, ["insert", LevelIdx, UpdLevel]),
Basement = max(LevelIdx, Manifest#manifest.basement),
Manifest#manifest{
levels = array:set(LevelIdx, UpdLevel, Levels),
basement = Basement,
manifest_sqn = ManSQN,
blooms = UpdBlooms
}.
-spec remove_manifest_entry(
manifest(), integer(), integer(), list()|manifest_entry()) -> manifest().
%% @doc
%% Remove a manifest entry (as it has been merged into the level below)
remove_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) ->
Levels = Manifest#manifest.levels,
Level = array:get(LevelIdx, Levels),
{UpdBlooms, []} =
update_blooms(Entry, [], Manifest#manifest.blooms),
UpdLevel = remove_entry(LevelIdx, Level, Entry),
leveled_log:log(pc019, ["remove", LevelIdx, UpdLevel]),
PendingDeletes =
update_pendingdeletes(
ManSQN, Entry, Manifest#manifest.pending_deletes),
UpdLevels = array:set(LevelIdx, UpdLevel, Levels),
case is_empty(LevelIdx, UpdLevel) of
true ->
Manifest#manifest{
levels = UpdLevels,
basement = get_basement(UpdLevels),
manifest_sqn = ManSQN,
pending_deletes = PendingDeletes,
blooms = UpdBlooms
};
false ->
Manifest#manifest{
levels = UpdLevels,
manifest_sqn = ManSQN,
pending_deletes = PendingDeletes,
blooms = UpdBlooms
}
end.
-spec switch_manifest_entry(
manifest(), integer(), integer(), list()|manifest_entry()) -> manifest().
%% @doc
%% Switch a manifest etry from this level to the level below (i.e when there
%% are no overlapping manifest entries in the level below)
switch_manifest_entry(Manifest, ManSQN, SrcLevel, Entry) ->
% Move to level below - so needs to be removed but not marked as a
% pending deletion
Levels = Manifest#manifest.levels,
Level = array:get(SrcLevel, Levels),
UpdLevel = remove_entry(SrcLevel, Level, Entry),
UpdLevels = array:set(SrcLevel, UpdLevel, Levels),
insert_manifest_entry(
Manifest#manifest{levels = UpdLevels}, ManSQN, SrcLevel + 1, Entry).
-spec get_manifest_sqn(manifest()) -> integer().
%% @doc
%% Return the manifest SQN for this manifest
get_manifest_sqn(Manifest) ->
Manifest#manifest.manifest_sqn.
-spec key_lookup(
manifest(), integer(), leveled_codec:ledger_key()) ->
false|manifest_owner().
%% @doc
%% For a given key find which manifest entry covers that key at that level,
%% returning false if there is no covering manifest entry at that level.
key_lookup(Manifest, LevelIdx, Key) ->
case LevelIdx > Manifest#manifest.basement of
true ->
false;
false ->
key_lookup_level(
LevelIdx, array:get(LevelIdx, Manifest#manifest.levels), Key)
end.
-spec query_manifest(
manifest(),
leveled_codec:ledger_key(),
leveled_codec:ledger_key())
-> list(
{lsm_level(),
list({next, manifest_entry(), leveled_codec:ledger_key()})}).
query_manifest(Manifest, StartKey, EndKey) ->
SetupFoldFun =
fun(Level, Acc) ->
case range_lookup(Manifest, Level, StartKey, EndKey) of
[] ->
Acc;
Pointers ->
[{Level, Pointers}|Acc]
end
end,
lists:foldl(SetupFoldFun, [], lists:seq(0, ?MAX_LEVELS - 1)).
-spec range_lookup(
manifest(),
integer(),
leveled_codec:ledger_key(),
leveled_codec:ledger_key())
-> list({next, manifest_entry(), leveled_codec:ledger_key()}).
%% @doc
%% Return a list of manifest_entry pointers at this level which cover the
%% key query range.
range_lookup(Manifest, LevelIdx, StartKey, EndKey) ->
MakePointerFun =
fun(M) ->
{next, M, StartKey}
end,
range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun).
-spec merge_lookup(
manifest(),
integer(),
leveled_codec:ledger_key(),
leveled_codec:ledger_key()) -> list({next, manifest_entry(), all}).
%% @doc
%% Return a list of manifest_entry pointers at this level which cover the
%% key query range, only all keys in the files should be included in the
%% pointers, not just the queries in the range.
merge_lookup(Manifest, LevelIdx, StartKey, EndKey) ->
MakePointerFun =
fun(M) ->
{next, M, all}
end,
range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun).
-spec mergefile_selector(
manifest(), integer(), selector_strategy()) -> manifest_entry().
%% @doc
%% An algorithm for discovering which files to merge ....
%% We can find the most optimal file:
%% - The one with the most overlapping data below?
%% - The one that overlaps with the fewest files below?
%% - The smallest file?
%% We could try and be fair in some way (merge oldest first)
%% Ultimately, there is a lack of certainty that being fair or optimal is
%% genuinely better - eventually every file has to be compacted.
%%
%% Hence, the initial implementation is to select files to merge at random
mergefile_selector(Manifest, LevelIdx, _Strategy) when LevelIdx =< 1 ->
Level = array:get(LevelIdx, Manifest#manifest.levels),
lists:nth(rand:uniform(length(Level)), Level);
mergefile_selector(Manifest, LevelIdx, random) ->
Level =
leveled_tree:to_list(
array:get(LevelIdx, Manifest#manifest.levels)),
{_SK, ME} = lists:nth(rand:uniform(length(Level)), Level),
ME;
mergefile_selector(Manifest, LevelIdx, {grooming, ScoringFun}) ->
Level =
leveled_tree:to_list(
array:get(LevelIdx, Manifest#manifest.levels)),
SelectorFun =
fun(_I, Acc) ->
{_SK, ME} = lists:nth(rand:uniform(length(Level)), Level),
[ME|Acc]
end,
Sample =
lists:usort(lists:foldl(SelectorFun, [], lists:seq(1, ?GROOM_SAMPLE))),
% Note that Entries may be less than GROOM_SAMPLE, if same one picked
% multiple times. Level cannot be empty, as otherwise a merge would not
% have been chosen at this level
ScoringFun(Sample).
-spec merge_snapshot(manifest(), manifest()) -> manifest().
%% @doc
%% When the clerk returns an updated manifest to the penciller, the penciller
%% should restore its view of the snapshots to that manifest. Snapshots can
%% be received in parallel to the manifest being updated, so the updated
%% manifest must not trample over any accrued state in the manifest.
merge_snapshot(PencillerManifest, ClerkManifest) ->
ClerkManifest#manifest{
snapshots = PencillerManifest#manifest.snapshots,
min_snapshot_sqn = PencillerManifest#manifest.min_snapshot_sqn}.
-spec add_snapshot(manifest(), pid(), integer()) -> manifest().
%% @doc
%% Add a snapshot reference to the manifest, withe rusing the pid or an atom
%% known to reference a special process. The timeout should be in seconds, and
%% the snapshot will assume to have expired at timeout (and so at that stage
%% files which depended on the snapshot will potentially expire, and if the
%% clone is still active it may crash)
add_snapshot(Manifest, Pid, Timeout) ->
SnapEntry = {Pid, Manifest#manifest.manifest_sqn, seconds_now(), Timeout},
SnapList0 = [SnapEntry|Manifest#manifest.snapshots],
ManSQN = Manifest#manifest.manifest_sqn,
case Manifest#manifest.min_snapshot_sqn of
0 ->
Manifest#manifest{
snapshots = SnapList0, min_snapshot_sqn = ManSQN};
N ->
N0 = min(N, ManSQN),
Manifest#manifest{
snapshots = SnapList0, min_snapshot_sqn = N0}
end.
-spec release_snapshot(manifest(), pid()|atom()) -> manifest().
%% @doc
%% When a clone is complete the release should be notified to the manifest.
release_snapshot(Manifest, Pid) ->
FilterFun =
fun({P, SQN, ST, TO}, {Acc, MinSQN, Found}) ->
case P of
Pid ->
{Acc, MinSQN, true};
_ ->
case seconds_now() > (ST + TO) of
true ->
leveled_log:log(p0038, [P, SQN, ST, TO]),
{Acc, MinSQN, Found};
false ->
{[{P, SQN, ST, TO}|Acc], min(SQN, MinSQN), Found}
end
end
end,
{SnapList0, MinSnapSQN, Hit} =
lists:foldl(
FilterFun,
{[], infinity, false},
Manifest#manifest.snapshots
),
case Hit of
false ->
leveled_log:log(p0039, [Pid, length(SnapList0), MinSnapSQN]);
true ->
ok
end,
case SnapList0 of
[] ->
Manifest#manifest{snapshots = SnapList0, min_snapshot_sqn = 0};
_ when is_integer(MinSnapSQN) ->
leveled_log:log(p0004, [SnapList0]),
Manifest#manifest{
snapshots = SnapList0, min_snapshot_sqn = MinSnapSQN
}
end.
%% @doc
%% A SST file which is in the delete_pending state can check to see if it is
%% ready to delete against the manifest.
%% This does not update the manifest, a call is required to clear_pending to
%% remove the file from the manifest's list of pending_deletes.
-spec ready_to_delete(manifest(), string()) -> boolean().
ready_to_delete(Manifest, Filename) ->
PendingDelete = dict:find(Filename, Manifest#manifest.pending_deletes),
case {PendingDelete, Manifest#manifest.min_snapshot_sqn} of
{{ok, _}, 0} ->
% no shapshots
true;
{{ok, {ChangeSQN, _ME}}, N} when N >= ChangeSQN ->
% Every snapshot is looking at a version of history after this
% was removed
true;
_ ->
false
end.
-spec clear_pending(manifest(), list(string()), boolean()) -> manifest().
clear_pending(Manifest, [], true) ->
% If the penciller got a confirm_delete that resulted in no pending
% removals, then maybe we need to timeout a snapshot via release
release_snapshot(Manifest, ?PHANTOM_PID);
clear_pending(Manifest, [], false) ->
Manifest;
clear_pending(Manifest, [FN|RestFN], MaybeRelease) ->
PDs = dict:erase(FN, Manifest#manifest.pending_deletes),
clear_pending(
Manifest#manifest{pending_deletes = PDs},
RestFN,
MaybeRelease).
-spec check_for_work(manifest()) -> {list(), integer()}.
%% @doc
%% Check for compaction work in the manifest - look at levels which contain
%% more files in the threshold.
%%
%% File count determines size in leveled (unlike leveldb which works on the
%% total data volume). Files are fixed size in terms of keys, and the size of
%% metadata is assumed to be contianed and regular and so uninteresting for
%% level sizing.
%%
%% Return a list of levels which are over-sized as well as the total items
%% across the manifest which are beyond the size (the total work outstanding).
check_for_work(Manifest) ->
CheckLevelFun =
fun({LevelIdx, MaxCount}, {AccL, AccC}) ->
case LevelIdx > Manifest#manifest.basement of
true ->
{AccL, AccC};
false ->
Level = array:get(LevelIdx, Manifest#manifest.levels),
S = size(LevelIdx, Level),
case S > MaxCount of
true ->
{[LevelIdx|AccL], AccC + S - MaxCount};
false ->
{AccL, AccC}
end
end
end,
lists:foldr(CheckLevelFun, {[], 0}, ?LEVEL_SCALEFACTOR).
-spec is_basement(manifest(), integer()) -> boolean().
%% @doc
%% Is this level the lowest in the manifest which contains active files. When
%% merging down to the basement level special rules may apply (for example to
%% reap tombstones)
is_basement(Manifest, Level) ->
Level >= Manifest#manifest.basement.
-spec levelzero_present(manifest()) -> boolean().
%% @doc
%% Is there a file in level zero (as only one file only can be in level zero).
levelzero_present(Manifest) ->
not is_empty(0, array:get(0, Manifest#manifest.levels)).
-spec check_bloom(manifest(), pid(), {integer(), integer()}) -> boolean().
%% @doc
%% Check to see if a hash is present in a manifest entry by using the exported
%% bloom filter
check_bloom(Manifest, FP, Hash) ->
case dict:find(FP, Manifest#manifest.blooms) of
{ok, Bloom} when is_binary(Bloom) ->
leveled_ebloom:check_hash(Hash, Bloom);
_ ->
true
end.
-spec snapshot_pids(manifest()) -> list(pid()).
%% @doc
%% Return a list of snapshot_pids - to be shutdown on shutdown
snapshot_pids(Manifest) ->
lists:map(fun(S) -> element(1, S) end, Manifest#manifest.snapshots).
-spec get_sstpids(manifest()) -> list(pid()).
%% @doc
%% Return a list of all SST PIDs in the current manifest
get_sstpids(Manifest) ->
FoldFun =
fun(I, Acc) ->
Level = array:get(I, Manifest#manifest.levels),
LevelAsList =
case I of
I when I > 1 ->
leveled_tree:to_list(Level);
_ ->
Level
end,
Pids =
lists:map(
fun(MaybeME) ->
ME = get_manifest_entry(MaybeME),
ME#manifest_entry.owner
end,
LevelAsList),
Acc ++ Pids
end,
lists:foldl(FoldFun, [], lists:seq(0, Manifest#manifest.basement)).
%%%============================================================================
%%% Manifest Entry
%%%============================================================================
-spec new_entry(
leveled_codec:object_key(),
leveled_codec:object_key(),
pid(),
string(),
leveled_ebloom:bloom()|none) -> manifest_entry().
new_entry(StartKey, EndKey, Owner, FileName, Bloom) ->
#manifest_entry{
start_key = StartKey,
end_key = EndKey,
owner = Owner,
filename = FileName,
bloom = Bloom
}.
-spec is_entry(any()) -> boolean().
is_entry(ME) -> is_record(ME, manifest_entry).
-spec entry_startkey(manifest_entry()) -> leveled_codec:object_key().
entry_startkey(ME) -> ME#manifest_entry.start_key.
-spec entry_endkey(manifest_entry()) -> leveled_codec:object_key().
entry_endkey(ME) -> ME#manifest_entry.end_key.
-spec entry_owner(manifest_entry()) -> pid().
entry_owner(ME) -> ME#manifest_entry.owner.
-spec entry_filename(manifest_entry()) -> string().
entry_filename(#manifest_entry{filename = FN}) when ?IS_DEF(FN)-> FN.
%%%============================================================================
%%% Internal Functions
%%%============================================================================
-spec get_manifest_entry(
{tuple(), manifest_entry()}|manifest_entry()) -> manifest_entry().
%% @doc
%% Manifest levels can have entries of two forms, use this if only interested
%% in the latter form
get_manifest_entry({_EndKey, ManifestEntry}) ->
ManifestEntry;
get_manifest_entry(ManifestEntry) ->
ManifestEntry.
%% All these internal functions that work on a level are also passed LeveIdx
%% even if this is not presently relevant. Currnetly levels are lists, but
%% future branches may make lower levels trees or skiplists to improve fetch
%% efficiency
load_level(LevelIdx, Level, LoadFun, SQNFun) ->
HigherLevelLoadFun =
fun(ME, {L_Out, L_MaxSQN, FileList, BloomL}) ->
FN = ME#manifest_entry.filename,
{P, Bloom} = LoadFun(FN, LevelIdx),
SQN = SQNFun(P),
{[ME#manifest_entry{owner=P}|L_Out],
max(SQN, L_MaxSQN),
[FN|FileList],
[{P, Bloom}|BloomL]}
end,
LowerLevelLoadFun =
fun({EK, ME}, {L_Out, L_MaxSQN, FileList, BloomL}) ->
FN = ME#manifest_entry.filename,
{P, Bloom} = LoadFun(FN, LevelIdx),
SQN = SQNFun(P),
{[{EK, ME#manifest_entry{owner=P}}|L_Out],
max(SQN, L_MaxSQN),
[FN|FileList],
[{P, Bloom}|BloomL]}
end,
case LevelIdx =< 1 of
true ->
lists:foldr(HigherLevelLoadFun, {[], 0, [], []}, Level);
false ->
{L0, MaxSQN, Flist, UpdBloomL} =
lists:foldr(
LowerLevelLoadFun,
{[], 0, [], []},
leveled_tree:to_list(Level)
),
{leveled_tree:from_orderedlist(L0, ?TREE_TYPE, ?TREE_WIDTH),
MaxSQN,
Flist,
UpdBloomL}
end.
close_level(LevelIdx, Level, CloseEntryFun) when LevelIdx =< 1 ->
lists:foreach(CloseEntryFun, Level);
close_level(_LevelIdx, Level, CloseEntryFun) ->
lists:foreach(CloseEntryFun, leveled_tree:to_list(Level)).
is_empty(_LevelIdx, []) ->
true;
is_empty(LevelIdx, _Level) when LevelIdx =< 1 ->
false;
is_empty(_LevelIdx, Level) ->
leveled_tree:tsize(Level) == 0.
size(LevelIdx, Level) when LevelIdx =< 1 ->
length(Level);
size(_LevelIdx, Level) ->
leveled_tree:tsize(Level).
pred_fun(LevelIdx, StartKey, _EndKey) when LevelIdx =< 1 ->
fun(ME) ->
ME#manifest_entry.start_key < StartKey
end;
pred_fun(_LevelIdx, _StartKey, EndKey) ->
fun({EK, _ME}) ->
EK < EndKey
end.
add_entry(_LevelIdx, Level, []) ->
Level;
add_entry(LevelIdx, Level, Entries) when is_list(Entries) ->
FirstEntry = lists:nth(1, Entries),
PredFun =
pred_fun(
LevelIdx,
FirstEntry#manifest_entry.start_key,
FirstEntry#manifest_entry.end_key
),
case LevelIdx =< 1 of
true ->
{LHS, RHS} = lists:splitwith(PredFun, Level),
lists:append([LHS, Entries, RHS]);
false ->
{LHS, RHS} = lists:splitwith(PredFun, leveled_tree:to_list(Level)),
MapFun =
fun(ME) ->
{ME#manifest_entry.end_key, ME}
end,
Entries0 = lists:map(MapFun, Entries),
leveled_tree:from_orderedlist(
lists:append([LHS, Entries0, RHS]),
?TREE_TYPE,
?TREE_WIDTH
)
end.
remove_entry(LevelIdx, Level, Entries) ->
% We're assuming we're removing a sorted sublist
{RemLength, FirstRemoval} = measure_removals(Entries),
remove_section(LevelIdx, Level, FirstRemoval, RemLength).
measure_removals(Removals) ->
case is_list(Removals) of
true ->
{length(Removals), lists:nth(1, Removals)};
false ->
{1, Removals}
end.
remove_section(LevelIdx, Level, FirstEntry, SectionLength) ->
PredFun = pred_fun(LevelIdx,
FirstEntry#manifest_entry.start_key,
FirstEntry#manifest_entry.end_key),
case LevelIdx =< 1 of
true ->
{LHS, RHS} = lists:splitwith(PredFun, Level),
Post = lists:nthtail(SectionLength, RHS),
lists:append([LHS, Post]);
false ->
{LHS, RHS} = lists:splitwith(PredFun, leveled_tree:to_list(Level)),
Post = lists:nthtail(SectionLength, RHS),
leveled_tree:from_orderedlist(
lists:append([LHS, Post]), ?TREE_TYPE, ?TREE_WIDTH)
end.
replace_entry(LevelIdx, Level, Removals, Additions) when LevelIdx =< 1 ->
{SectionLength, FirstEntry} = measure_removals(Removals),
PredFun =
pred_fun(
LevelIdx,
FirstEntry#manifest_entry.start_key,
FirstEntry#manifest_entry.end_key
),
{LHS, RHS} = lists:splitwith(PredFun, Level),
Post = lists:nthtail(SectionLength, RHS),
lists:append([LHS, Additions, Post]);
replace_entry(LevelIdx, Level, Removals, Additions) ->
{SectionLength, FirstEntry} = measure_removals(Removals),
PredFun =
pred_fun(
LevelIdx,
FirstEntry#manifest_entry.start_key,
FirstEntry#manifest_entry.end_key
),
{LHS, RHS} = lists:splitwith(PredFun, leveled_tree:to_list(Level)),
Post =
case RHS of
[] ->
[];
_ ->
lists:nthtail(SectionLength, RHS)
end,
MapFun =
fun(ME) ->
{ME#manifest_entry.end_key, ME}
end,
UpdList = lists:append([LHS, lists:map(MapFun, Additions), Post]),
leveled_tree:from_orderedlist(UpdList, ?TREE_TYPE, ?TREE_WIDTH).
update_pendingdeletes(ManSQN, Removals, PendingDeletes) ->
DelFun =
fun(E, Acc) ->
dict:store(E#manifest_entry.filename, {ManSQN, E}, Acc)
end,
Entries =
case is_list(Removals) of
true ->
Removals;
false ->
[Removals]
end,
lists:foldl(DelFun, PendingDeletes, Entries).
-spec update_blooms(
list()|manifest_entry(),
list()|manifest_entry(),
blooms())
-> {blooms(), list()}.
%% @doc
%%
%% The manifest is a Pid-> Bloom mappping for every Pid, and this needs to
%% be updated to represent the changes. However, the bloom would bloat out
%% the stored manifest, so the bloom must be stripped from the manifest entry
%% as part of this process
update_blooms(Removals, Additions, Blooms) ->
Additions0 =
case is_list(Additions) of
true -> Additions;
false -> [Additions]
end,
Removals0 =
case is_list(Removals) of
true -> Removals;
false -> [Removals]
end,
RemFun =
fun(R, BloomD) ->
dict:erase(R#manifest_entry.owner, BloomD)
end,
AddFun =
fun(A, BloomD) ->
dict:store(A#manifest_entry.owner, A#manifest_entry.bloom, BloomD)
end,
StripFun =
fun(A) ->
A#manifest_entry{bloom = none}
end,
Blooms0 = lists:foldl(RemFun, Blooms, Removals0),
Blooms1 = lists:foldl(AddFun, Blooms0, Additions0),
{Blooms1, lists:map(StripFun, Additions0)}.
key_lookup_level(LevelIdx, [], _Key) when LevelIdx =< 1 ->
false;
key_lookup_level(LevelIdx, [Entry|Rest], Key) when LevelIdx =< 1 ->
case Entry#manifest_entry.end_key >= Key of
true ->
case Key >= Entry#manifest_entry.start_key of
true ->
Entry#manifest_entry.owner;
false ->
false
end;
false ->
key_lookup_level(LevelIdx, Rest, Key)
end;
key_lookup_level(_LevelIdx, Level, Key) ->
StartKeyFun =
fun(ME) ->
ME#manifest_entry.start_key
end,
case leveled_tree:search(Key, Level, StartKeyFun) of
none ->
false;
{_EK, ME} ->
ME#manifest_entry.owner
end.
range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun) ->
Range =
case LevelIdx > Manifest#manifest.basement of
true ->
[];
false ->
range_lookup_level(
LevelIdx,
array:get(LevelIdx, Manifest#manifest.levels),
StartKey,
EndKey
)
end,
lists:map(MakePointerFun, Range).
range_lookup_level(LevelIdx, Level, QStartKey, QEndKey) when LevelIdx =< 1 ->
BeforeFun =
fun(M) ->
QStartKey > M#manifest_entry.end_key
end,
NotAfterFun =
fun(M) ->
not
leveled_codec:endkey_passed(
QEndKey, M#manifest_entry.start_key)
end,
{_Before, MaybeIn} = lists:splitwith(BeforeFun, Level),
{In, _After} = lists:splitwith(NotAfterFun, MaybeIn),
In;
range_lookup_level(_LevelIdx, Level, QStartKey, QEndKey) ->
StartKeyFun =
fun(ME) ->
ME#manifest_entry.start_key
end,
Range = leveled_tree:search_range(QStartKey, QEndKey, Level, StartKeyFun),
MapFun =
fun({_EK, ME}) ->
ME
end,
lists:map(MapFun, Range).
get_basement(Levels) ->
GetBaseFun =
fun(L, Acc) ->
case is_empty(L, array:get(L, Levels)) of
false ->
max(L, Acc);
true ->
Acc
end
end,
lists:foldl(GetBaseFun, 0, lists:seq(0, ?MAX_LEVELS)).
filepath(RootPath, manifest) ->
MFP = RootPath ++ "/" ++ ?MANIFEST_FP ++ "/",
filelib:ensure_dir(MFP),
MFP.
filepath(RootPath, NewMSN, current_manifest) ->
filepath(RootPath, manifest) ++ "nonzero_"
++ integer_to_list(NewMSN) ++ "." ++ ?MANIFEST_FILEX;
filepath(RootPath, NewMSN, pending_manifest) ->
filepath(RootPath, manifest) ++ "nonzero_"
++ integer_to_list(NewMSN) ++ "." ++ ?PENDING_FILEX.
open_manifestfile(_RootPath, L) when L == [] orelse L == [0] ->
leveled_log:log(p0013, []),
new_manifest();
open_manifestfile(RootPath, [TopManSQN|Rest]) ->
CurrManFile = filepath(RootPath, TopManSQN, current_manifest),
{ok, FileBin} = file:read_file(CurrManFile),
<<CRC:32/integer, BinaryOfTerm/binary>> = FileBin,
case erlang:crc32(BinaryOfTerm) of
CRC ->
leveled_log:log(p0012, [TopManSQN]),
Manifest = binary_to_term(BinaryOfTerm),
Manifest#manifest{
pending_deletes = new_pending_deletions(),
blooms = new_blooms()
};
_ ->
leveled_log:log(p0033, [CurrManFile, "crc wonky"]),
open_manifestfile(RootPath, Rest)
end.
seconds_now() ->
{MegaNow, SecNow, _} = os:timestamp(),
MegaNow * 1000000 + SecNow.
new_blooms() -> dict:new().
new_pending_deletions() -> dict:new().
%%%============================================================================
%%% Test
%%%============================================================================
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
initial_setup() ->
initial_setup(single_change).
initial_setup(Changes) ->
E1 =
#manifest_entry{
start_key={i, <<"Bucket1">>, {<<"Idx1">>, <<"Fld1">>}, <<"K8">>},
end_key={i, <<"Bucket1">>, {<<"Idx1">>, <<"Fld9">>}, <<"K93">>},
filename="Z1",
owner=list_to_pid("<0.101.0>"),
bloom=none
},
E2 =
#manifest_entry{
start_key={i, <<"Bucket1">>, {<<"Idx1">>, <<"Fld9">>}, <<"K97">>},
end_key={o, <<"Bucket1">>, <<"K71">>, null},
filename="Z2",
owner=list_to_pid("<0.102.0>"),
bloom=none
},
E3 =
#manifest_entry{
start_key={o, <<"Bucket1">>, <<"K75">>, null},
end_key={o, <<"Bucket1">>, <<"K993">>, null},
filename="Z3",
owner=list_to_pid("<0.103.0>"),
bloom=none
},
E4 =
#manifest_entry{
start_key={i, <<"Bucket1">>, {<<"Idx1">>, <<"Fld1">>}, <<"K8">>},
end_key={i, <<"Bucket1">>, {<<"Idx1">>, <<"Fld7">>}, <<"K93">>},
filename="Z4",
owner=list_to_pid("<0.104.0>"),
bloom=none
},
E5 =
#manifest_entry{
start_key={i, <<"Bucket1">>, {<<"Idx1">>, <<"Fld7">>}, <<"K97">>},
end_key={o, <<"Bucket1">>, <<"K78">>, null},
filename="Z5",
owner=list_to_pid("<0.105.0>"),
bloom=none
},
E6 =
#manifest_entry{
start_key={o, <<"Bucket1">>, <<"K81">>, null},
end_key={o, <<"Bucket1">>, <<"K996">>, null},
filename="Z6",
owner=list_to_pid("<0.106.0>"),
bloom=none
},
initial_setup(Changes, E1, E2, E3, E4, E5, E6).
initial_setup(single_change, E1, E2, E3, E4, E5, E6) ->
Man0 = new_manifest(),
Man1 = insert_manifest_entry(Man0, 1, 1, E1),
Man2 = insert_manifest_entry(Man1, 1, 1, E2),
Man3 = insert_manifest_entry(Man2, 1, 1, E3),
Man4 = insert_manifest_entry(Man3, 1, 2, E4),
Man5 = insert_manifest_entry(Man4, 1, 2, E5),
Man6 = insert_manifest_entry(Man5, 1, 2, E6),
?assertMatch(Man6, insert_manifest_entry(Man6, 1, 2, [])),
{Man0, Man1, Man2, Man3, Man4, Man5, Man6};
initial_setup(multi_change, E1, E2, E3, E4, E5, E6) ->
Man0 = new_manifest(),
Man1 = insert_manifest_entry(Man0, 1, 1, E1),
Man2 = insert_manifest_entry(Man1, 2, 1, E2),
Man3 = insert_manifest_entry(Man2, 3, 1, E3),
Man4 = insert_manifest_entry(Man3, 4, 2, E4),
Man5 = insert_manifest_entry(Man4, 5, 2, E5),
Man6 = insert_manifest_entry(Man5, 6, 2, E6),
?assertMatch(Man6, insert_manifest_entry(Man6, 6, 2, [])),
{Man0, Man1, Man2, Man3, Man4, Man5, Man6}.
changeup_setup(Man6) ->
E1 =
#manifest_entry{
start_key={i, <<"Bucket1">>, {<<"Idx1">>, <<"Fld1">>}, <<"K8">>},
end_key={i, <<"Bucket1">>, {<<"Idx1">>, <<"Fld9">>}, <<"K93">>},
filename="Z1",
owner=list_to_pid("<0.101.0>"),
bloom=none
},
E2 =
#manifest_entry{
start_key={i, <<"Bucket1">>, {<<"Idx1">>, <<"Fld9">>}, <<"K97">>},
end_key={o, <<"Bucket1">>, <<"K71">>, null},
filename="Z2",
owner=list_to_pid("<0.102.0>"),
bloom=none
},
E3 =
#manifest_entry{
start_key={o, <<"Bucket1">>, <<"K75">>, null},
end_key={o, <<"Bucket1">>, <<"K993">>, null},
filename="Z3",
owner=list_to_pid("<0.103.0>"),
bloom=none
},
E1_2 =
#manifest_entry{
start_key={i, <<"Bucket1">>, {<<"Idx1">>, <<"Fld4">>}, <<"K8">>},
end_key={i, <<"Bucket1">>, {<<"Idx1">>, <<"Fld9">>}, <<"K62">>},
owner=list_to_pid("<0.201.0>"),
filename="Y1",
bloom=none
},
E2_2 =
#manifest_entry{
start_key={i, <<"Bucket1">>, {<<"Idx1">>, <<"Fld9">>}, <<"K67">>},
end_key={o, <<"Bucket1">>, <<"K45">>, null},
owner=list_to_pid("<0.202.0>"),
filename="Y2",
bloom=none
},
E3_2 =
#manifest_entry{
start_key={o, <<"Bucket1">>, <<"K47">>, null},
end_key={o, <<"Bucket1">>, <<"K812">>, null},
owner=list_to_pid("<0.203.0>"),
filename="Y3",
bloom=none
},
E4_2 =
#manifest_entry{
start_key={o, <<"Bucket1">>, <<"K815">>, null},
end_key={o, <<"Bucket1">>, <<"K998">>, null},
owner=list_to_pid("<0.204.0>"),
filename="Y4",
bloom=none
},
Man7 = remove_manifest_entry(Man6, 2, 1, E1),
Man8 = remove_manifest_entry(Man7, 2, 1, E2),
Man9 = remove_manifest_entry(Man8, 2, 1, E3),
Man10 = insert_manifest_entry(Man9, 2, 1, E1_2),
Man11 = insert_manifest_entry(Man10, 2, 1, E2_2),
Man12 = insert_manifest_entry(Man11, 2, 1, E3_2),
Man13 = insert_manifest_entry(Man12, 2, 1, E4_2),
% remove_manifest_entry(Manifest, ManSQN, Level, Entry)
{Man7, Man8, Man9, Man10, Man11, Man12, Man13}.
random_select_test() ->
ManTuple = initial_setup(),
LastManifest = element(7, ManTuple),
L1File = mergefile_selector(LastManifest, 1, random),
% This blows up if the function is not prepared for the different format
% https://github.com/martinsumner/leveled/issues/43
_L2File = mergefile_selector(LastManifest, 2, random),
Level1 = array:get(1, LastManifest#manifest.levels),
?assertMatch(true, lists:member(L1File, Level1)).
manifest_gc_test() ->
RP = "test/test_area/",
ok = filelib:ensure_dir(RP),
ok = leveled_penciller:clean_testdir(RP),
ManifestT = initial_setup(multi_change),
ManifestL = tuple_to_list(ManifestT),
lists:foreach(fun(M) -> save_manifest(M, RP) end, ManifestL),
{ok, FNs} = file:list_dir(filepath(RP, manifest)),
io:format("FNs ~w~n", [FNs]),
?assertMatch(true, length(ManifestL) > ?MANIFESTS_TO_RETAIN),
?assertMatch(?MANIFESTS_TO_RETAIN, length(FNs)).
keylookup_manifest_test() ->
{Man0, Man1, Man2, Man3, _Man4, _Man5, Man6} = initial_setup(),
LK1_1 = {o, <<"Bucket1">>, <<"K711">>, null},
LK1_2 = {o, <<"Bucket1">>, <<"K70">>, null},
LK1_3 = {o, <<"Bucket1">>, <<"K71">>, null},
LK1_4 = {o, <<"Bucket1">>, <<"K75">>, null},
LK1_5 = {o, <<"Bucket1">>, <<"K76">>, null},
?assertMatch(false, key_lookup(Man0, 1, LK1_1)),
?assertMatch(false, key_lookup(Man1, 1, LK1_1)),
?assertMatch(false, key_lookup(Man2, 1, LK1_1)),
?assertMatch(false, key_lookup(Man3, 1, LK1_1)),
?assertMatch(false, key_lookup(Man6, 1, LK1_1)),
PZ2 = list_to_pid("<0.102.0>"),
PZ3 = list_to_pid("<0.103.0>"),
PZ5 = list_to_pid("<0.105.0>"),
?assertMatch(PZ2, key_lookup(Man6, 1, LK1_2)),
?assertMatch(PZ2, key_lookup(Man6, 1, LK1_3)),
?assertMatch(PZ3, key_lookup(Man6, 1, LK1_4)),
?assertMatch(PZ3, key_lookup(Man6, 1, LK1_5)),
?assertMatch(PZ5, key_lookup(Man6, 2, LK1_2)),
?assertMatch(PZ5, key_lookup(Man6, 2, LK1_3)),
?assertMatch(PZ5, key_lookup(Man6, 2, LK1_4)),
?assertMatch(PZ5, key_lookup(Man6, 2, LK1_5)),
{_Man7, _Man8, _Man9, _Man10, _Man11, _Man12,
Man13} = changeup_setup(Man6),
?assertMatch(false, key_lookup(Man0, 1, LK1_1)),
?assertMatch(false, key_lookup(Man1, 1, LK1_1)),
?assertMatch(false, key_lookup(Man2, 1, LK1_1)),
?assertMatch(false, key_lookup(Man3, 1, LK1_1)),
?assertMatch(false, key_lookup(Man6, 1, LK1_1)),
?assertMatch(PZ2, key_lookup(Man6, 1, LK1_2)),
?assertMatch(PZ2, key_lookup(Man6, 1, LK1_3)),
?assertMatch(PZ3, key_lookup(Man6, 1, LK1_4)),
?assertMatch(PZ3, key_lookup(Man6, 1, LK1_5)),
?assertMatch(PZ5, key_lookup(Man6, 2, LK1_2)),
?assertMatch(PZ5, key_lookup(Man6, 2, LK1_3)),
?assertMatch(PZ5, key_lookup(Man6, 2, LK1_4)),
?assertMatch(PZ5, key_lookup(Man6, 2, LK1_5)),
PY3 = list_to_pid("<0.203.0>"),
?assertMatch(PY3, key_lookup(Man13, 1, LK1_4)),
?assertMatch(PZ5, key_lookup(Man13, 2, LK1_4)).
ext_keylookup_manifest_test() ->
RP = "test/test_area",
ok = leveled_penciller:clean_testdir(RP),
{_Man0, _Man1, _Man2, _Man3, _Man4, _Man5, Man6} = initial_setup(),
save_manifest(Man6, RP),
E7 =
#manifest_entry{
start_key={o, <<"Bucket1">>, <<"K997">>, null},
end_key={o, <<"Bucket1">>, <<"K999">>, null},
filename="Z7",
owner=list_to_pid("<0.107.0>")
},
Man7 = insert_manifest_entry(Man6, 2, 2, E7),
save_manifest(Man7, RP),
ManOpen1 = open_manifest(RP),
?assertMatch(2, get_manifest_sqn(ManOpen1)),
Man7FN = filepath(RP, 2, current_manifest),
Man7FNAlt = filename:rootname(Man7FN) ++ ".pnd",
{ok, BytesCopied} = file:copy(Man7FN, Man7FNAlt),
{ok, Bin} = file:read_file(Man7FN),
?assertMatch(BytesCopied, byte_size(Bin)),
RandPos = rand:uniform(bit_size(Bin) - 1),
<<Pre:RandPos/bitstring, BitToFlip:1/integer, Rest/bitstring>> = Bin,
Flipped = BitToFlip bxor 1,
ok = file:write_file(Man7FN,
<<Pre:RandPos/bitstring,
Flipped:1/integer,
Rest/bitstring>>),
?assertMatch(2, get_manifest_sqn(Man7)),
ManOpen2 = open_manifest(RP),
?assertMatch(1, get_manifest_sqn(ManOpen2)),
E1 =
#manifest_entry{
start_key={i, <<"Bucket1">>, {<<"Idx1">>, <<"Fld1">>}, <<"K8">>},
end_key={i, <<"Bucket1">>, {<<"Idx1">>, <<"Fld9">>}, <<"K93">>},
filename="Z1",
owner=list_to_pid("<0.101.0>"),
bloom=none
},
E2 =
#manifest_entry{
start_key={i, <<"Bucket1">>, {<<"Idx1">>, <<"Fld9">>}, <<"K97">>},
end_key={o, <<"Bucket1">>, <<"K71">>, null},
filename="Z2",
owner=list_to_pid("<0.102.0>"),
bloom=none
},
E3 =
#manifest_entry{
start_key={o, <<"Bucket1">>, <<"K75">>, null},
end_key={o, <<"Bucket1">>, <<"K993">>, null},
filename="Z3",
owner=list_to_pid("<0.103.0>"),
bloom=none
},
E1_2 =
#manifest_entry{
start_key={i, <<"Bucket1">>, {<<"Idx1">>, <<"Fld4">>}, <<"K8">>},
end_key={i, <<"Bucket1">>, {<<"Idx1">>, <<"Fld9">>}, <<"K62">>},
owner=list_to_pid("<0.201.0>"),
filename="Y1",
bloom=none
},
E2_2 =
#manifest_entry{
start_key={i, <<"Bucket1">>, {<<"Idx1">>, <<"Fld9">>}, <<"K67">>},
end_key={o, <<"Bucket1">>, <<"K45">>, null},
owner=list_to_pid("<0.202.0>"),
filename="Y2",
bloom=none
},
E3_2 =
#manifest_entry{
start_key={o, <<"Bucket1">>, <<"K47">>, null},
end_key={o, <<"Bucket1">>, <<"K812">>, null},
owner=list_to_pid("<0.203.0>"),
filename="Y3",
bloom=none
},
E4_2 =
#manifest_entry{
start_key={o, <<"Bucket1">>, <<"K815">>, null},
end_key={o, <<"Bucket1">>, <<"K998">>, null},
owner=list_to_pid("<0.104.0>"),
filename="Y4",
bloom=none
},
Man8 = replace_manifest_entry(ManOpen2, 2, 1, E1, E1_2),
Man9 = remove_manifest_entry(Man8, 2, 1, [E2, E3]),
Man10 = insert_manifest_entry(Man9, 2, 1, [E2_2, E3_2, E4_2]),
?assertMatch(2, get_manifest_sqn(Man10)),
LK1_4 = {o, <<"Bucket1">>, <<"K75">>, null},
PY3 = list_to_pid("<0.203.0>"),
PZ5 = list_to_pid("<0.105.0>"),
?assertMatch(PY3, key_lookup(Man10, 1, LK1_4)),
?assertMatch(PZ5, key_lookup(Man10, 2, LK1_4)),
E5 =
#manifest_entry{
start_key={i, <<"Bucket1">>, {<<"Idx1">>, <<"Fld7">>}, <<"K97">>},
end_key={o, <<"Bucket1">>, <<"K78">>, null},
filename="Z5",
owner=PZ5,
bloom=none
},
E6 =
#manifest_entry{
start_key={o, <<"Bucket1">>, <<"K81">>, null},
end_key={o, <<"Bucket1">>, <<"K996">>, null},
filename="Z6",
owner=list_to_pid("<0.106.0>"),
bloom=none
},
Man11 = remove_manifest_entry(Man10, 3, 2, [E5, E6]),
?assertMatch(3, get_manifest_sqn(Man11)),
?assertMatch(false, key_lookup(Man11, 2, LK1_4)),
Man12 = replace_manifest_entry(Man11, 4, 2, E2_2, E5),
?assertMatch(4, get_manifest_sqn(Man12)),
?assertMatch(PZ5, key_lookup(Man12, 2, LK1_4)).
rangequery_manifest_test() ->
{_Man0, _Man1, _Man2, _Man3, _Man4, _Man5, Man6} = initial_setup(),
PidMapFun =
fun(Pointer) ->
{next, ME, _SK} = Pointer,
ME#manifest_entry.owner
end,
PZ1 = list_to_pid("<0.101.0>"),
PZ3 = list_to_pid("<0.103.0>"),
PZ5 = list_to_pid("<0.105.0>"),
PZ6 = list_to_pid("<0.106.0>"),
PY1 = list_to_pid("<0.201.0>"),
PY3 = list_to_pid("<0.203.0>"),
PY4 = list_to_pid("<0.204.0>"),
SK1 = {o, <<"Bucket1">>, <<"K711">>, null},
EK1 = {o, <<"Bucket1">>, <<"K999">>, null},
RL1_1 = lists:map(PidMapFun, range_lookup(Man6, 1, SK1, EK1)),
?assertMatch([PZ3], RL1_1),
RL1_2 = lists:map(PidMapFun, range_lookup(Man6, 2, SK1, EK1)),
?assertMatch([PZ5, PZ6], RL1_2),
SK2 = {i, <<"Bucket1">>, {<<"Idx1">>, <<"Fld8">>}, null},
EK2 = {i, <<"Bucket1">>, {<<"Idx1">>, <<"Fld8">>}, null},
RL2_1 = lists:map(PidMapFun, range_lookup(Man6, 1, SK2, EK2)),
?assertMatch([PZ1], RL2_1),
RL2_2 = lists:map(PidMapFun, range_lookup(Man6, 2, SK2, EK2)),
?assertMatch([PZ5], RL2_2),
SK3 = {o, <<"Bucket1">>, <<"K994">>, null},
EK3 = {o, <<"Bucket1">>, <<"K995">>, null},
RL3_1 = lists:map(PidMapFun, range_lookup(Man6, 1, SK3, EK3)),
?assertMatch([], RL3_1),
RL3_2 = lists:map(PidMapFun, range_lookup(Man6, 2, SK3, EK3)),
?assertMatch([PZ6], RL3_2),
{_Man7, _Man8, _Man9, _Man10, _Man11, _Man12,
Man13} = changeup_setup(Man6),
RL1_1A = lists:map(PidMapFun, range_lookup(Man6, 1, SK1, EK1)),
?assertMatch([PZ3], RL1_1A),
RL2_1A = lists:map(PidMapFun, range_lookup(Man6, 1, SK2, EK2)),
?assertMatch([PZ1], RL2_1A),
RL3_1A = lists:map(PidMapFun, range_lookup(Man6, 1, SK3, EK3)),
?assertMatch([], RL3_1A),
RL1_1B = lists:map(PidMapFun, range_lookup(Man13, 1, SK1, EK1)),
?assertMatch([PY3, PY4], RL1_1B),
RL2_1B = lists:map(PidMapFun, range_lookup(Man13, 1, SK2, EK2)),
?assertMatch([PY1], RL2_1B),
RL3_1B = lists:map(PidMapFun, range_lookup(Man13, 1, SK3, EK3)),
?assertMatch([PY4], RL3_1B).
levelzero_present_test() ->
E0 =
#manifest_entry{
start_key={i, <<"Bucket1">>, {<<"Idx1">>, <<"Fld1">>}, <<"K8">>},
end_key={o, <<"Bucket1">>, <<"Key996">>, null},
filename="Z0",
owner=list_to_pid("<0.101.0>"),
bloom=none},
Man0 = new_manifest(),
?assertMatch(false, levelzero_present(Man0)),
% insert_manifest_entry(Manifest, ManSQN, Level, Entry)
Man1 = insert_manifest_entry(Man0, 1, 0, E0),
?assertMatch(true, levelzero_present(Man1)).
ready_to_delete_combined(Manifest, Filename) ->
case ready_to_delete(Manifest, Filename) of
true ->
{true, clear_pending(Manifest, [Filename], false)};
false ->
{false, clear_pending(Manifest, [], true)}
end.
snapshot_release_test() ->
PidA1 = spawn(fun() -> ok end),
PidA2 = spawn(fun() -> ok end),
PidA3 = spawn(fun() -> ok end),
PidA4 = spawn(fun() -> ok end),
Man6 = element(7, initial_setup()),
E1 =
#manifest_entry{
start_key={i, <<"Bucket1">>, {<<"Idx1">>, <<"Fld1">>}, <<"K8">>},
end_key={i, <<"Bucket1">>, {<<"Idx1">>, <<"Fld9">>}, <<"K93">>},
filename="Z1",
owner=list_to_pid("<0.101.0>"),
bloom=none
},
E2 =
#manifest_entry{
start_key={i, <<"Bucket1">>, {<<"Idx1">>, <<"Fld9">>}, <<"K97">>},
end_key={o, <<"Bucket1">>, <<"K71">>, null},
filename="Z2",
owner=list_to_pid("<0.102.0>"),
bloom=none
},
E3 =
#manifest_entry{
start_key={o, <<"Bucket1">>, <<"K75">>, null},
end_key={o, <<"Bucket1">>, <<"K993">>, null},
filename="Z3",
owner=list_to_pid("<0.103.0>"),
bloom=none
},
Man7 = add_snapshot(Man6, PidA1, 3600),
Man8 = remove_manifest_entry(Man7, 2, 1, E1),
Man9 = add_snapshot(Man8, PidA2, 3600),
Man10 = remove_manifest_entry(Man9, 3, 1, E2),
Man11 = add_snapshot(Man10, PidA3, 3600),
Man12 = remove_manifest_entry(Man11, 4, 1, E3),
Man13 = add_snapshot(Man12, PidA4, 3600),
?assertMatch(false, element(1, ready_to_delete_combined(Man8, "Z1"))),
?assertMatch(false, element(1, ready_to_delete_combined(Man10, "Z2"))),
?assertMatch(false, element(1, ready_to_delete_combined(Man12, "Z3"))),
Man14 = release_snapshot(Man13, PidA1),
?assertMatch(false, element(1, ready_to_delete_combined(Man14, "Z2"))),
?assertMatch(false, element(1, ready_to_delete_combined(Man14, "Z3"))),
{Bool14, Man15} = ready_to_delete_combined(Man14, "Z1"),
?assertMatch(true, Bool14),
%This doesn't change anything - released snaphsot not the min
Man16 = release_snapshot(Man15, PidA4),
?assertMatch(false, element(1, ready_to_delete_combined(Man16, "Z2"))),
?assertMatch(false, element(1, ready_to_delete_combined(Man16, "Z3"))),
Man17 = release_snapshot(Man16, PidA2),
?assertMatch(false, element(1, ready_to_delete_combined(Man17, "Z3"))),
{Bool17, Man18} = ready_to_delete_combined(Man17, "Z2"),
?assertMatch(true, Bool17),
Man19 = release_snapshot(Man18, PidA3),
io:format("MinSnapSQN ~w~n", [Man19#manifest.min_snapshot_sqn]),
{Bool19, _Man20} = ready_to_delete_combined(Man19, "Z3"),
?assertMatch(true, Bool19).
snapshot_timeout_test() ->
PidA1 = spawn(fun() -> ok end),
Man6 = element(7, initial_setup()),
Man7 = add_snapshot(Man6, PidA1, 3600),
?assertMatch(1, length(Man7#manifest.snapshots)),
Man8 = release_snapshot(Man7, PidA1),
?assertMatch(0, length(Man8#manifest.snapshots)),
Man9 = add_snapshot(Man8, PidA1, 1),
timer:sleep(2001),
?assertMatch(1, length(Man9#manifest.snapshots)),
Man10 = release_snapshot(Man9, ?PHANTOM_PID),
?assertMatch(0, length(Man10#manifest.snapshots)).
potential_issue_test() ->
Manifest =
{manifest,
{array,9,0,[],
{
[],
[
{manifest_entry,
{o_rkv, <<"Bucket">>, <<"Key10">>, null},
{o_rkv, <<"Bucket">>, <<"Key12949">>,null},
list_to_pid("<0.313.0>"),
"./16_1_0.sst",
none
},
{manifest_entry,
{o_rkv, <<"Bucket">>, <<"Key129490">>, null},
{o_rkv, <<"Bucket">>, <<"Key158981">>, null},
list_to_pid("<0.315.0>"),
"./16_1_1.sst",
none
},
{manifest_entry,
{o_rkv, <<"Bucket">>, <<"Key158982">>, null},
{o_rkv, <<"Bucket">>, <<"Key188472">>, null},
list_to_pid("<0.316.0>"),
"./16_1_2.sst",
none
}
],
{
idxt,
1,
{
{
[
{{o_rkv, <<"Bucket1">>, <<"Key1">>, null},
{
manifest_entry,
{o_rkv, <<"Bucket">>, <<"Key9083">>, null},
{o_rkv, <<"Bucket1">>, <<"Key1">>, null},
list_to_pid("<0.320.0>"),
"./16_1_6.sst",
none
}
}
]
},
{1, {{o_rkv, <<"Bucket1">> ,<<"Key1">> ,null},1,nil,nil}}}},
{idxt,0,{{},{0,nil}}},
{idxt,0,{{},{0,nil}}},
{idxt,0,{{},{0,nil}}},
{idxt,0,{{},{0,nil}}},
{idxt,0,{{},{0,nil}}},
{idxt,0,{{},{0,nil}}},
[]}},
19,
[],
0,
new_pending_deletions(),
2,
new_blooms()},
Range1 =
range_lookup(
Manifest,
1,
{o_rkv, <<"Bucket">>, null, null},
{o_rkv, <<"Bucket">>, null, null}
),
Range2 =
range_lookup(
Manifest,
2,
{o_rkv, <<"Bucket">>, null, null},
{o_rkv, <<"Bucket">>, null, null}
),
io:format("Range in Level 1 ~w~n", [Range1]),
io:format("Range in Level 2 ~w~n", [Range2]),
?assertMatch(3, length(Range1)),
?assertMatch(1, length(Range2)).
-endif.