spec and doc - leveled_pmanifest

Give the dialyzer some help
This commit is contained in:
martinsumner 2017-05-26 15:11:33 +01:00
parent f23072fb54
commit 0459591fd7

View file

@ -73,10 +73,19 @@
% Currently the lowest level (the largest number) % Currently the lowest level (the largest number)
}). }).
-type manifest() :: #manifest{}.
-type manifest_entry() :: #manifest_entry{}.
%%%============================================================================ %%%============================================================================
%%% API %%% API
%%%============================================================================ %%%============================================================================
-spec new_manifest() -> manifest().
%% @doc
%% The manifest in this case is a manifest of the ledger. This contains
%% information on the layout of the files, but also information of snapshots
%% that may have an influence on the manifest as they require files to remain
%% after the primary penciller is happy for them to be removed.
new_manifest() -> new_manifest() ->
LevelArray0 = array:new([{size, ?MAX_LEVELS + 1}, {default, []}]), LevelArray0 = array:new([{size, ?MAX_LEVELS + 1}, {default, []}]),
SetLowerLevelFun = SetLowerLevelFun =
@ -94,6 +103,10 @@ new_manifest() ->
basement = 0 basement = 0
}. }.
-spec open_manifest(string()) -> manifest().
%% @doc
%% Open a manifest in the appropriate sub-directory of the RootPath, and will
%% return an empty manifest if no such manifest is present.
open_manifest(RootPath) -> open_manifest(RootPath) ->
% Open the manifest in the file path which has the highest SQN, and will % Open the manifest in the file path which has the highest SQN, and will
% open without error % open without error
@ -113,12 +126,22 @@ open_manifest(RootPath) ->
[], [],
Filenames))), Filenames))),
open_manifestfile(RootPath, ValidManSQNs). open_manifestfile(RootPath, ValidManSQNs).
-spec copy_manifest(manifest()) -> manifest().
%% @doc
%% Used to pass the manifest to a snapshot, removing information not required
%% by a snapshot
copy_manifest(Manifest) -> copy_manifest(Manifest) ->
% Copy the manifest ensuring anything only the master process should care % Copy the manifest ensuring anything only the master process should care
% about is switched to undefined % about is switched to undefined
Manifest#manifest{snapshots = undefined, pending_deletes = undefined}. Manifest#manifest{snapshots = undefined, pending_deletes = undefined}.
-spec load_manifest(manifest(), fun(), fun()) -> {integer(), manifest()}.
%% @doc
%% Roll over the manifest starting a process to manage each file in the
%% manifest. The PidFun should be able to return the Pid of a file process
%% (having started one). The SQNFun will return the max sequence number
%% of that file, if passed the Pid that owns it.
load_manifest(Manifest, PidFun, SQNFun) -> load_manifest(Manifest, PidFun, SQNFun) ->
UpdateLevelFun = UpdateLevelFun =
fun(LevelIdx, {AccMaxSQN, AccMan}) -> fun(LevelIdx, {AccMaxSQN, AccMan}) ->
@ -130,6 +153,11 @@ load_manifest(Manifest, PidFun, SQNFun) ->
lists:foldl(UpdateLevelFun, {0, Manifest}, lists:foldl(UpdateLevelFun, {0, Manifest},
lists:seq(0, Manifest#manifest.basement)). lists:seq(0, Manifest#manifest.basement)).
-spec close_manifest(manifest(), fun()) -> ok.
%% @doc
%% Close all the files in the manifest (using CloseEntryFun to call close on
%% a file). Firts all the files in the active manifest are called, and then
%% any files which were pending deletion.
close_manifest(Manifest, CloseEntryFun) -> close_manifest(Manifest, CloseEntryFun) ->
CloseLevelFun = CloseLevelFun =
fun(LevelIdx) -> fun(LevelIdx) ->
@ -144,6 +172,9 @@ close_manifest(Manifest, CloseEntryFun) ->
end, end,
lists:foreach(ClosePDFun, dict:to_list(Manifest#manifest.pending_deletes)). lists:foreach(ClosePDFun, dict:to_list(Manifest#manifest.pending_deletes)).
-spec save_manifest(manifest(), string()) -> ok.
%% @doc
%% Save the manifest to file (with a checksum)
save_manifest(Manifest, RootPath) -> save_manifest(Manifest, RootPath) ->
FP = filepath(RootPath, Manifest#manifest.manifest_sqn, current_manifest), FP = filepath(RootPath, Manifest#manifest.manifest_sqn, current_manifest),
ManBin = term_to_binary(Manifest#manifest{snapshots = [], ManBin = term_to_binary(Manifest#manifest{snapshots = [],
@ -152,7 +183,15 @@ save_manifest(Manifest, RootPath) ->
CRC = erlang:crc32(ManBin), CRC = erlang:crc32(ManBin),
ok = file:write_file(FP, <<CRC:32/integer, ManBin/binary>>). ok = file:write_file(FP, <<CRC:32/integer, ManBin/binary>>).
-spec replace_manifest_entry(manifest(), integer(), integer(),
list()|manifest_entry(),
list()|manifest_entry()) -> manifest().
%% @doc
%% Replace a list of manifest entries in the manifest with a new set of entries
%% Pass in the new manifest SQN to be used for this manifest. The list of
%% entries can just be a single entry
%%
%% This is generally called on the level being merged down into.
replace_manifest_entry(Manifest, ManSQN, LevelIdx, Removals, Additions) -> replace_manifest_entry(Manifest, ManSQN, LevelIdx, Removals, Additions) ->
Levels = Manifest#manifest.levels, Levels = Manifest#manifest.levels,
Level = array:get(LevelIdx, Levels), Level = array:get(LevelIdx, Levels),
@ -176,6 +215,11 @@ replace_manifest_entry(Manifest, ManSQN, LevelIdx, Removals, Additions) ->
pending_deletes = PendingDeletes} pending_deletes = PendingDeletes}
end. end.
-spec insert_manifest_entry(manifest(), integer(), integer(),
list()|manifest_entry()) -> manifest().
%% @doc
%% Place a single new manifest entry into a level of the manifest, at a given
%% level and manifest sequence number
insert_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) -> insert_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) ->
Levels = Manifest#manifest.levels, Levels = Manifest#manifest.levels,
Level = array:get(LevelIdx, Levels), Level = array:get(LevelIdx, Levels),
@ -186,6 +230,10 @@ insert_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) ->
basement = Basement, basement = Basement,
manifest_sqn = ManSQN}. manifest_sqn = ManSQN}.
-spec remove_manifest_entry(manifest(), integer(), integer(),
list()|manifest_entry()) -> manifest().
%% @doc
%% Remove a manifest entry (as it has been merged into the level below)
remove_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) -> remove_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) ->
Levels = Manifest#manifest.levels, Levels = Manifest#manifest.levels,
Level = array:get(LevelIdx, Levels), Level = array:get(LevelIdx, Levels),
@ -207,6 +255,11 @@ remove_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) ->
pending_deletes = PendingDeletes} pending_deletes = PendingDeletes}
end. end.
-spec switch_manifest_entry(manifest(), integer(), integer(),
list()|manifest_entry()) -> manifest().
%% @doc
%% Switch a manifest etry from this level to the level below (i.e when there
%% are no overlapping manifest entries in the level below)
switch_manifest_entry(Manifest, ManSQN, SrcLevel, Entry) -> switch_manifest_entry(Manifest, ManSQN, SrcLevel, Entry) ->
% Move to level below - so needs to be removed but not marked as a % Move to level below - so needs to be removed but not marked as a
% pending deletion % pending deletion
@ -219,9 +272,16 @@ switch_manifest_entry(Manifest, ManSQN, SrcLevel, Entry) ->
SrcLevel + 1, SrcLevel + 1,
Entry). Entry).
-spec get_manifest_sqn(manifest()) -> integer().
%% @doc
%% Return the manifest SQN for this manifest
get_manifest_sqn(Manifest) -> get_manifest_sqn(Manifest) ->
Manifest#manifest.manifest_sqn. Manifest#manifest.manifest_sqn.
-spec key_lookup(manifest(), integer(), tuple()) -> false|manifest_entry().
%% @doc
%% For a given key find which manifest entry covers that key at that level,
%% returning false if there is no covering manifest entry at that level.
key_lookup(Manifest, LevelIdx, Key) -> key_lookup(Manifest, LevelIdx, Key) ->
case LevelIdx > Manifest#manifest.basement of case LevelIdx > Manifest#manifest.basement of
true -> true ->
@ -232,6 +292,10 @@ key_lookup(Manifest, LevelIdx, Key) ->
Key) Key)
end. end.
-spec range_lookup(manifest(), integer(), tuple(), tuple()) -> list().
%% @doc
%% Return a list of manifest_entry pointers at this level which cover the
%% key query range.
range_lookup(Manifest, LevelIdx, StartKey, EndKey) -> range_lookup(Manifest, LevelIdx, StartKey, EndKey) ->
MakePointerFun = MakePointerFun =
fun(M) -> fun(M) ->
@ -239,6 +303,11 @@ range_lookup(Manifest, LevelIdx, StartKey, EndKey) ->
end, end,
range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun). range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun).
-spec merge_lookup(manifest(), integer(), tuple(), tuple()) -> list().
%% @doc
%% Return a list of manifest_entry pointers at this level which cover the
%% key query range, only all keys in the files should be included in the
%% pointers, not just the queries in the range.
merge_lookup(Manifest, LevelIdx, StartKey, EndKey) -> merge_lookup(Manifest, LevelIdx, StartKey, EndKey) ->
MakePointerFun = MakePointerFun =
fun(M) -> fun(M) ->
@ -247,7 +316,8 @@ merge_lookup(Manifest, LevelIdx, StartKey, EndKey) ->
range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun). range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun).
-spec mergefile_selector(manifest(), integer()) -> manifest_entry().
%% @doc
%% An algorithm for discovering which files to merge .... %% An algorithm for discovering which files to merge ....
%% We can find the most optimal file: %% We can find the most optimal file:
%% - The one with the most overlapping data below? %% - The one with the most overlapping data below?
@ -267,14 +337,25 @@ mergefile_selector(Manifest, LevelIdx) ->
{_SK, ME} = lists:nth(random:uniform(length(Level)), Level), {_SK, ME} = lists:nth(random:uniform(length(Level)), Level),
ME. ME.
%% When the cllerk returns an update manifest to the penciller, the penciller -spec merge_snapshot(manifest(), manifest()) -> manifest().
%% should restore its view of the snapshots to that manifest %% @doc
%% When the clerk returns an updated manifest to the penciller, the penciller
%% should restore its view of the snapshots to that manifest. Snapshots can
%% be received in parallel to the manifest ebing updated, so the updated
%% manifest must not trample over any accrued state in the manifest.
merge_snapshot(PencillerManifest, ClerkManifest) -> merge_snapshot(PencillerManifest, ClerkManifest) ->
ClerkManifest#manifest{snapshots = ClerkManifest#manifest{snapshots =
PencillerManifest#manifest.snapshots, PencillerManifest#manifest.snapshots,
min_snapshot_sqn = min_snapshot_sqn =
PencillerManifest#manifest.min_snapshot_sqn}. PencillerManifest#manifest.min_snapshot_sqn}.
-spec add_snapshot(manifest(), pid()|atom(), integer()) -> manifest().
%% @doc
%% Add a snapshot reference to the manifest, withe rusing the pid or an atom
%% known to reference a special process. The timeout should be in seconds, and
%% the snapshot will assume to have expired at timeout (and so at that stage
%% files which depended on the snapshot will potentially expire, and if the
%% clone is still active it may crash)
add_snapshot(Manifest, Pid, Timeout) -> add_snapshot(Manifest, Pid, Timeout) ->
SnapEntry = {Pid, Manifest#manifest.manifest_sqn, seconds_now(), Timeout}, SnapEntry = {Pid, Manifest#manifest.manifest_sqn, seconds_now(), Timeout},
SnapList0 = [SnapEntry|Manifest#manifest.snapshots], SnapList0 = [SnapEntry|Manifest#manifest.snapshots],
@ -289,6 +370,9 @@ add_snapshot(Manifest, Pid, Timeout) ->
min_snapshot_sqn = N0} min_snapshot_sqn = N0}
end. end.
-spec release_snapshot(manifest(), pid()|atom()) -> manifest().
%% @doc
%% When a clone is complete the release should be notified to the manifest.
release_snapshot(Manifest, Pid) -> release_snapshot(Manifest, Pid) ->
FilterFun = FilterFun =
fun({P, SQN, ST, TO}, {Acc, MinSQN, Found}) -> fun({P, SQN, ST, TO}, {Acc, MinSQN, Found}) ->
@ -324,6 +408,10 @@ release_snapshot(Manifest, Pid) ->
min_snapshot_sqn = MinSnapSQN} min_snapshot_sqn = MinSnapSQN}
end. end.
-spec ready_to_delete(manifest(), string()) -> {boolean(), manifest()}.
%% @doc
%% A SST file which is in the delete_pending state can check to see if it is
%% ready to delete against the manifest.
ready_to_delete(Manifest, Filename) -> ready_to_delete(Manifest, Filename) ->
{ChangeSQN, _ME} = dict:fetch(Filename, Manifest#manifest.pending_deletes), {ChangeSQN, _ME} = dict:fetch(Filename, Manifest#manifest.pending_deletes),
case Manifest#manifest.min_snapshot_sqn of case Manifest#manifest.min_snapshot_sqn of
@ -343,6 +431,18 @@ ready_to_delete(Manifest, Filename) ->
{false, release_snapshot(Manifest, ?PHANTOM_PID)} {false, release_snapshot(Manifest, ?PHANTOM_PID)}
end. end.
-spec check_for_work(manifest(), list()) -> {list(), integer()}.
%% @doc
%% Check for compaction work in the manifest - look at levels which contain
%% more files in the threshold.
%%
%% File count determines size in leveled (unlike leveldb which works on the
%% total data volume). Files are fixed size in terms of keys, and the size of
%% metadata is assumed to be contianed and regular and so uninteresting for
%% level sizing.
%%
%% Return a list of levels which are over-sized as well as the total items
%% across the manifest which are beyond the size (the total work outstanding).
check_for_work(Manifest, Thresholds) -> check_for_work(Manifest, Thresholds) ->
CheckLevelFun = CheckLevelFun =
fun({LevelIdx, MaxCount}, {AccL, AccC}) -> fun({LevelIdx, MaxCount}, {AccL, AccC}) ->
@ -362,9 +462,17 @@ check_for_work(Manifest, Thresholds) ->
end, end,
lists:foldr(CheckLevelFun, {[], 0}, Thresholds). lists:foldr(CheckLevelFun, {[], 0}, Thresholds).
-spec is_basement(manifest(), integer()) -> boolean().
%% @doc
%% Is this level the lowest in the manifest which contains active files. When
%% merging down to the basement level special rules may apply (for example to
%% reap tombstones)
is_basement(Manifest, Level) -> is_basement(Manifest, Level) ->
Level >= Manifest#manifest.basement. Level >= Manifest#manifest.basement.
-spec levelzero_present(manifest()) -> boolean().
%% @doc
%% Is there a file in level zero (as only one file only can be in level zero).
levelzero_present(Manifest) -> levelzero_present(Manifest) ->
not is_empty(0, array:get(0, Manifest#manifest.levels)). not is_empty(0, array:get(0, Manifest#manifest.levels)).
@ -688,7 +796,7 @@ initial_setup() ->
owner="pid_z6"}, owner="pid_z6"},
Man0 = new_manifest(), Man0 = new_manifest(),
% insert_manifest_entry(Manifest, ManSQN, Level, Entry)
Man1 = insert_manifest_entry(Man0, 1, 1, E1), Man1 = insert_manifest_entry(Man0, 1, 1, E1),
Man2 = insert_manifest_entry(Man1, 1, 1, E2), Man2 = insert_manifest_entry(Man1, 1, 1, E2),
Man3 = insert_manifest_entry(Man2, 1, 1, E3), Man3 = insert_manifest_entry(Man2, 1, 1, E3),
@ -747,11 +855,9 @@ random_select_test() ->
L1File = mergefile_selector(LastManifest, 1), L1File = mergefile_selector(LastManifest, 1),
% This blows up if the function is not prepared for the different format % This blows up if the function is not prepared for the different format
% https://github.com/martinsumner/leveled/issues/43 % https://github.com/martinsumner/leveled/issues/43
L2File = mergefile_selector(LastManifest, 2), _L2File = mergefile_selector(LastManifest, 2),
Level1 = array:get(1, LastManifest#manifest.levels), Level1 = array:get(1, LastManifest#manifest.levels),
?assertMatch(true, lists:member(L1File, Level1)), ?assertMatch(true, lists:member(L1File, Level1)).
?assertMatch(true, is_record(L1File, manifest_entry)),
?assertMatch(true, is_record(L2File, manifest_entry)).
keylookup_manifest_test() -> keylookup_manifest_test() ->
{Man0, Man1, Man2, Man3, _Man4, _Man5, Man6} = initial_setup(), {Man0, Man1, Man2, Man3, _Man4, _Man5, Man6} = initial_setup(),
@ -965,35 +1071,35 @@ snapshot_release_test() ->
filename="Z3", filename="Z3",
owner="pid_z3"}, owner="pid_z3"},
Man7 = add_snapshot(Man6, "pid_a1", 3600), Man7 = add_snapshot(Man6, pid_a1, 3600),
Man8 = remove_manifest_entry(Man7, 2, 1, E1), Man8 = remove_manifest_entry(Man7, 2, 1, E1),
Man9 = add_snapshot(Man8, "pid_a2", 3600), Man9 = add_snapshot(Man8, pid_a2, 3600),
Man10 = remove_manifest_entry(Man9, 3, 1, E2), Man10 = remove_manifest_entry(Man9, 3, 1, E2),
Man11 = add_snapshot(Man10, "pid_a3", 3600), Man11 = add_snapshot(Man10, pid_a3, 3600),
Man12 = remove_manifest_entry(Man11, 4, 1, E3), Man12 = remove_manifest_entry(Man11, 4, 1, E3),
Man13 = add_snapshot(Man12, "pid_a4", 3600), Man13 = add_snapshot(Man12, pid_a4, 3600),
?assertMatch(false, element(1, ready_to_delete(Man8, "Z1"))), ?assertMatch(false, element(1, ready_to_delete(Man8, "Z1"))),
?assertMatch(false, element(1, ready_to_delete(Man10, "Z2"))), ?assertMatch(false, element(1, ready_to_delete(Man10, "Z2"))),
?assertMatch(false, element(1, ready_to_delete(Man12, "Z3"))), ?assertMatch(false, element(1, ready_to_delete(Man12, "Z3"))),
Man14 = release_snapshot(Man13, "pid_a1"), Man14 = release_snapshot(Man13, pid_a1),
?assertMatch(false, element(1, ready_to_delete(Man14, "Z2"))), ?assertMatch(false, element(1, ready_to_delete(Man14, "Z2"))),
?assertMatch(false, element(1, ready_to_delete(Man14, "Z3"))), ?assertMatch(false, element(1, ready_to_delete(Man14, "Z3"))),
{Bool14, Man15} = ready_to_delete(Man14, "Z1"), {Bool14, Man15} = ready_to_delete(Man14, "Z1"),
?assertMatch(true, Bool14), ?assertMatch(true, Bool14),
%This doesn't change anything - released snaphsot not the min %This doesn't change anything - released snaphsot not the min
Man16 = release_snapshot(Man15, "pid_a4"), Man16 = release_snapshot(Man15, pid_a4),
?assertMatch(false, element(1, ready_to_delete(Man16, "Z2"))), ?assertMatch(false, element(1, ready_to_delete(Man16, "Z2"))),
?assertMatch(false, element(1, ready_to_delete(Man16, "Z3"))), ?assertMatch(false, element(1, ready_to_delete(Man16, "Z3"))),
Man17 = release_snapshot(Man16, "pid_a2"), Man17 = release_snapshot(Man16, pid_a2),
?assertMatch(false, element(1, ready_to_delete(Man17, "Z3"))), ?assertMatch(false, element(1, ready_to_delete(Man17, "Z3"))),
{Bool17, Man18} = ready_to_delete(Man17, "Z2"), {Bool17, Man18} = ready_to_delete(Man17, "Z2"),
?assertMatch(true, Bool17), ?assertMatch(true, Bool17),
Man19 = release_snapshot(Man18, "pid_a3"), Man19 = release_snapshot(Man18, pid_a3),
io:format("MinSnapSQN ~w~n", [Man19#manifest.min_snapshot_sqn]), io:format("MinSnapSQN ~w~n", [Man19#manifest.min_snapshot_sqn]),
@ -1003,11 +1109,11 @@ snapshot_release_test() ->
snapshot_timeout_test() -> snapshot_timeout_test() ->
Man6 = element(7, initial_setup()), Man6 = element(7, initial_setup()),
Man7 = add_snapshot(Man6, "pid_a1", 3600), Man7 = add_snapshot(Man6, pid_a1, 3600),
?assertMatch(1, length(Man7#manifest.snapshots)), ?assertMatch(1, length(Man7#manifest.snapshots)),
Man8 = release_snapshot(Man7, "pid_a1"), Man8 = release_snapshot(Man7, pid_a1),
?assertMatch(0, length(Man8#manifest.snapshots)), ?assertMatch(0, length(Man8#manifest.snapshots)),
Man9 = add_snapshot(Man8, "pid_a1", 0), Man9 = add_snapshot(Man8, pid_a1, 0),
timer:sleep(2001), timer:sleep(2001),
?assertMatch(1, length(Man9#manifest.snapshots)), ?assertMatch(1, length(Man9#manifest.snapshots)),
Man10 = release_snapshot(Man9, ?PHANTOM_PID), Man10 = release_snapshot(Man9, ?PHANTOM_PID),