diff --git a/src/leveled_pmanifest.erl b/src/leveled_pmanifest.erl index 8fc4dba..29259cf 100644 --- a/src/leveled_pmanifest.erl +++ b/src/leveled_pmanifest.erl @@ -73,10 +73,19 @@ % Currently the lowest level (the largest number) }). +-type manifest() :: #manifest{}. +-type manifest_entry() :: #manifest_entry{}. + %%%============================================================================ %%% API %%%============================================================================ +-spec new_manifest() -> manifest(). +%% @doc +%% The manifest in this case is a manifest of the ledger. This contains +%% information on the layout of the files, but also information of snapshots +%% that may have an influence on the manifest as they require files to remain +%% after the primary penciller is happy for them to be removed. new_manifest() -> LevelArray0 = array:new([{size, ?MAX_LEVELS + 1}, {default, []}]), SetLowerLevelFun = @@ -94,6 +103,10 @@ new_manifest() -> basement = 0 }. +-spec open_manifest(string()) -> manifest(). +%% @doc +%% Open a manifest in the appropriate sub-directory of the RootPath, and will +%% return an empty manifest if no such manifest is present. open_manifest(RootPath) -> % Open the manifest in the file path which has the highest SQN, and will % open without error @@ -113,12 +126,22 @@ open_manifest(RootPath) -> [], Filenames))), open_manifestfile(RootPath, ValidManSQNs). - + +-spec copy_manifest(manifest()) -> manifest(). +%% @doc +%% Used to pass the manifest to a snapshot, removing information not required +%% by a snapshot copy_manifest(Manifest) -> % Copy the manifest ensuring anything only the master process should care % about is switched to undefined Manifest#manifest{snapshots = undefined, pending_deletes = undefined}. +-spec load_manifest(manifest(), fun(), fun()) -> {integer(), manifest()}. +%% @doc +%% Roll over the manifest starting a process to manage each file in the +%% manifest. The PidFun should be able to return the Pid of a file process +%% (having started one). The SQNFun will return the max sequence number +%% of that file, if passed the Pid that owns it. load_manifest(Manifest, PidFun, SQNFun) -> UpdateLevelFun = fun(LevelIdx, {AccMaxSQN, AccMan}) -> @@ -130,6 +153,11 @@ load_manifest(Manifest, PidFun, SQNFun) -> lists:foldl(UpdateLevelFun, {0, Manifest}, lists:seq(0, Manifest#manifest.basement)). +-spec close_manifest(manifest(), fun()) -> ok. +%% @doc +%% Close all the files in the manifest (using CloseEntryFun to call close on +%% a file). Firts all the files in the active manifest are called, and then +%% any files which were pending deletion. close_manifest(Manifest, CloseEntryFun) -> CloseLevelFun = fun(LevelIdx) -> @@ -144,6 +172,9 @@ close_manifest(Manifest, CloseEntryFun) -> end, lists:foreach(ClosePDFun, dict:to_list(Manifest#manifest.pending_deletes)). +-spec save_manifest(manifest(), string()) -> ok. +%% @doc +%% Save the manifest to file (with a checksum) save_manifest(Manifest, RootPath) -> FP = filepath(RootPath, Manifest#manifest.manifest_sqn, current_manifest), ManBin = term_to_binary(Manifest#manifest{snapshots = [], @@ -152,7 +183,15 @@ save_manifest(Manifest, RootPath) -> CRC = erlang:crc32(ManBin), ok = file:write_file(FP, <>). - +-spec replace_manifest_entry(manifest(), integer(), integer(), + list()|manifest_entry(), + list()|manifest_entry()) -> manifest(). +%% @doc +%% Replace a list of manifest entries in the manifest with a new set of entries +%% Pass in the new manifest SQN to be used for this manifest. The list of +%% entries can just be a single entry +%% +%% This is generally called on the level being merged down into. replace_manifest_entry(Manifest, ManSQN, LevelIdx, Removals, Additions) -> Levels = Manifest#manifest.levels, Level = array:get(LevelIdx, Levels), @@ -176,6 +215,11 @@ replace_manifest_entry(Manifest, ManSQN, LevelIdx, Removals, Additions) -> pending_deletes = PendingDeletes} end. +-spec insert_manifest_entry(manifest(), integer(), integer(), + list()|manifest_entry()) -> manifest(). +%% @doc +%% Place a single new manifest entry into a level of the manifest, at a given +%% level and manifest sequence number insert_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) -> Levels = Manifest#manifest.levels, Level = array:get(LevelIdx, Levels), @@ -186,6 +230,10 @@ insert_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) -> basement = Basement, manifest_sqn = ManSQN}. +-spec remove_manifest_entry(manifest(), integer(), integer(), + list()|manifest_entry()) -> manifest(). +%% @doc +%% Remove a manifest entry (as it has been merged into the level below) remove_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) -> Levels = Manifest#manifest.levels, Level = array:get(LevelIdx, Levels), @@ -207,6 +255,11 @@ remove_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) -> pending_deletes = PendingDeletes} end. +-spec switch_manifest_entry(manifest(), integer(), integer(), + list()|manifest_entry()) -> manifest(). +%% @doc +%% Switch a manifest etry from this level to the level below (i.e when there +%% are no overlapping manifest entries in the level below) switch_manifest_entry(Manifest, ManSQN, SrcLevel, Entry) -> % Move to level below - so needs to be removed but not marked as a % pending deletion @@ -219,9 +272,16 @@ switch_manifest_entry(Manifest, ManSQN, SrcLevel, Entry) -> SrcLevel + 1, Entry). +-spec get_manifest_sqn(manifest()) -> integer(). +%% @doc +%% Return the manifest SQN for this manifest get_manifest_sqn(Manifest) -> Manifest#manifest.manifest_sqn. +-spec key_lookup(manifest(), integer(), tuple()) -> false|manifest_entry(). +%% @doc +%% For a given key find which manifest entry covers that key at that level, +%% returning false if there is no covering manifest entry at that level. key_lookup(Manifest, LevelIdx, Key) -> case LevelIdx > Manifest#manifest.basement of true -> @@ -232,6 +292,10 @@ key_lookup(Manifest, LevelIdx, Key) -> Key) end. +-spec range_lookup(manifest(), integer(), tuple(), tuple()) -> list(). +%% @doc +%% Return a list of manifest_entry pointers at this level which cover the +%% key query range. range_lookup(Manifest, LevelIdx, StartKey, EndKey) -> MakePointerFun = fun(M) -> @@ -239,6 +303,11 @@ range_lookup(Manifest, LevelIdx, StartKey, EndKey) -> end, range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun). +-spec merge_lookup(manifest(), integer(), tuple(), tuple()) -> list(). +%% @doc +%% Return a list of manifest_entry pointers at this level which cover the +%% key query range, only all keys in the files should be included in the +%% pointers, not just the queries in the range. merge_lookup(Manifest, LevelIdx, StartKey, EndKey) -> MakePointerFun = fun(M) -> @@ -247,7 +316,8 @@ merge_lookup(Manifest, LevelIdx, StartKey, EndKey) -> range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun). - +-spec mergefile_selector(manifest(), integer()) -> manifest_entry(). +%% @doc %% An algorithm for discovering which files to merge .... %% We can find the most optimal file: %% - The one with the most overlapping data below? @@ -267,14 +337,25 @@ mergefile_selector(Manifest, LevelIdx) -> {_SK, ME} = lists:nth(random:uniform(length(Level)), Level), ME. -%% When the cllerk returns an update manifest to the penciller, the penciller -%% should restore its view of the snapshots to that manifest +-spec merge_snapshot(manifest(), manifest()) -> manifest(). +%% @doc +%% When the clerk returns an updated manifest to the penciller, the penciller +%% should restore its view of the snapshots to that manifest. Snapshots can +%% be received in parallel to the manifest ebing updated, so the updated +%% manifest must not trample over any accrued state in the manifest. merge_snapshot(PencillerManifest, ClerkManifest) -> ClerkManifest#manifest{snapshots = PencillerManifest#manifest.snapshots, min_snapshot_sqn = PencillerManifest#manifest.min_snapshot_sqn}. +-spec add_snapshot(manifest(), pid()|atom(), integer()) -> manifest(). +%% @doc +%% Add a snapshot reference to the manifest, withe rusing the pid or an atom +%% known to reference a special process. The timeout should be in seconds, and +%% the snapshot will assume to have expired at timeout (and so at that stage +%% files which depended on the snapshot will potentially expire, and if the +%% clone is still active it may crash) add_snapshot(Manifest, Pid, Timeout) -> SnapEntry = {Pid, Manifest#manifest.manifest_sqn, seconds_now(), Timeout}, SnapList0 = [SnapEntry|Manifest#manifest.snapshots], @@ -289,6 +370,9 @@ add_snapshot(Manifest, Pid, Timeout) -> min_snapshot_sqn = N0} end. +-spec release_snapshot(manifest(), pid()|atom()) -> manifest(). +%% @doc +%% When a clone is complete the release should be notified to the manifest. release_snapshot(Manifest, Pid) -> FilterFun = fun({P, SQN, ST, TO}, {Acc, MinSQN, Found}) -> @@ -324,6 +408,10 @@ release_snapshot(Manifest, Pid) -> min_snapshot_sqn = MinSnapSQN} end. +-spec ready_to_delete(manifest(), string()) -> {boolean(), manifest()}. +%% @doc +%% A SST file which is in the delete_pending state can check to see if it is +%% ready to delete against the manifest. ready_to_delete(Manifest, Filename) -> {ChangeSQN, _ME} = dict:fetch(Filename, Manifest#manifest.pending_deletes), case Manifest#manifest.min_snapshot_sqn of @@ -343,6 +431,18 @@ ready_to_delete(Manifest, Filename) -> {false, release_snapshot(Manifest, ?PHANTOM_PID)} end. +-spec check_for_work(manifest(), list()) -> {list(), integer()}. +%% @doc +%% Check for compaction work in the manifest - look at levels which contain +%% more files in the threshold. +%% +%% File count determines size in leveled (unlike leveldb which works on the +%% total data volume). Files are fixed size in terms of keys, and the size of +%% metadata is assumed to be contianed and regular and so uninteresting for +%% level sizing. +%% +%% Return a list of levels which are over-sized as well as the total items +%% across the manifest which are beyond the size (the total work outstanding). check_for_work(Manifest, Thresholds) -> CheckLevelFun = fun({LevelIdx, MaxCount}, {AccL, AccC}) -> @@ -362,9 +462,17 @@ check_for_work(Manifest, Thresholds) -> end, lists:foldr(CheckLevelFun, {[], 0}, Thresholds). +-spec is_basement(manifest(), integer()) -> boolean(). +%% @doc +%% Is this level the lowest in the manifest which contains active files. When +%% merging down to the basement level special rules may apply (for example to +%% reap tombstones) is_basement(Manifest, Level) -> Level >= Manifest#manifest.basement. +-spec levelzero_present(manifest()) -> boolean(). +%% @doc +%% Is there a file in level zero (as only one file only can be in level zero). levelzero_present(Manifest) -> not is_empty(0, array:get(0, Manifest#manifest.levels)). @@ -688,7 +796,7 @@ initial_setup() -> owner="pid_z6"}, Man0 = new_manifest(), - % insert_manifest_entry(Manifest, ManSQN, Level, Entry) + Man1 = insert_manifest_entry(Man0, 1, 1, E1), Man2 = insert_manifest_entry(Man1, 1, 1, E2), Man3 = insert_manifest_entry(Man2, 1, 1, E3), @@ -747,11 +855,9 @@ random_select_test() -> L1File = mergefile_selector(LastManifest, 1), % This blows up if the function is not prepared for the different format % https://github.com/martinsumner/leveled/issues/43 - L2File = mergefile_selector(LastManifest, 2), + _L2File = mergefile_selector(LastManifest, 2), Level1 = array:get(1, LastManifest#manifest.levels), - ?assertMatch(true, lists:member(L1File, Level1)), - ?assertMatch(true, is_record(L1File, manifest_entry)), - ?assertMatch(true, is_record(L2File, manifest_entry)). + ?assertMatch(true, lists:member(L1File, Level1)). keylookup_manifest_test() -> {Man0, Man1, Man2, Man3, _Man4, _Man5, Man6} = initial_setup(), @@ -965,35 +1071,35 @@ snapshot_release_test() -> filename="Z3", owner="pid_z3"}, - Man7 = add_snapshot(Man6, "pid_a1", 3600), + Man7 = add_snapshot(Man6, pid_a1, 3600), Man8 = remove_manifest_entry(Man7, 2, 1, E1), - Man9 = add_snapshot(Man8, "pid_a2", 3600), + Man9 = add_snapshot(Man8, pid_a2, 3600), Man10 = remove_manifest_entry(Man9, 3, 1, E2), - Man11 = add_snapshot(Man10, "pid_a3", 3600), + Man11 = add_snapshot(Man10, pid_a3, 3600), Man12 = remove_manifest_entry(Man11, 4, 1, E3), - Man13 = add_snapshot(Man12, "pid_a4", 3600), + Man13 = add_snapshot(Man12, pid_a4, 3600), ?assertMatch(false, element(1, ready_to_delete(Man8, "Z1"))), ?assertMatch(false, element(1, ready_to_delete(Man10, "Z2"))), ?assertMatch(false, element(1, ready_to_delete(Man12, "Z3"))), - Man14 = release_snapshot(Man13, "pid_a1"), + Man14 = release_snapshot(Man13, pid_a1), ?assertMatch(false, element(1, ready_to_delete(Man14, "Z2"))), ?assertMatch(false, element(1, ready_to_delete(Man14, "Z3"))), {Bool14, Man15} = ready_to_delete(Man14, "Z1"), ?assertMatch(true, Bool14), %This doesn't change anything - released snaphsot not the min - Man16 = release_snapshot(Man15, "pid_a4"), + Man16 = release_snapshot(Man15, pid_a4), ?assertMatch(false, element(1, ready_to_delete(Man16, "Z2"))), ?assertMatch(false, element(1, ready_to_delete(Man16, "Z3"))), - Man17 = release_snapshot(Man16, "pid_a2"), + Man17 = release_snapshot(Man16, pid_a2), ?assertMatch(false, element(1, ready_to_delete(Man17, "Z3"))), {Bool17, Man18} = ready_to_delete(Man17, "Z2"), ?assertMatch(true, Bool17), - Man19 = release_snapshot(Man18, "pid_a3"), + Man19 = release_snapshot(Man18, pid_a3), io:format("MinSnapSQN ~w~n", [Man19#manifest.min_snapshot_sqn]), @@ -1003,11 +1109,11 @@ snapshot_release_test() -> snapshot_timeout_test() -> Man6 = element(7, initial_setup()), - Man7 = add_snapshot(Man6, "pid_a1", 3600), + Man7 = add_snapshot(Man6, pid_a1, 3600), ?assertMatch(1, length(Man7#manifest.snapshots)), - Man8 = release_snapshot(Man7, "pid_a1"), + Man8 = release_snapshot(Man7, pid_a1), ?assertMatch(0, length(Man8#manifest.snapshots)), - Man9 = add_snapshot(Man8, "pid_a1", 0), + Man9 = add_snapshot(Man8, pid_a1, 0), timer:sleep(2001), ?assertMatch(1, length(Man9#manifest.snapshots)), Man10 = release_snapshot(Man9, ?PHANTOM_PID),