diff --git a/src/leveled_imanifest.erl b/src/leveled_imanifest.erl index 3583b2b..d6b68c4 100644 --- a/src/leveled_imanifest.erl +++ b/src/leveled_imanifest.erl @@ -28,11 +28,22 @@ -define(PENDING_FILEX, "pnd"). -define(SKIP_WIDTH, 16). +-type manifest() :: list({integer(), list()}). +%% The manifest is divided into blocks by sequence number, with each block +%% being a list of manifest entries for that SQN range. +-type manifest_entry() :: {integer(), string(), pid()|string(), any()}. +%% The Entry should have a pid() as the third element, but a string() may be +%% used in unit tests + %%%============================================================================ %%% API %%%============================================================================ +-spec generate_entry(pid()) -> list(). +%% @doc +%% Generate a list with a single iManifest entry for a journal file. Used +%% only by the clerk when creating new entries for compacted files. generate_entry(Journal) -> {ok, NewFN} = leveled_cdb:cdb_complete(Journal), {ok, PidR} = leveled_cdb:cdb_open_reader(NewFN), @@ -44,7 +55,12 @@ generate_entry(Journal) -> leveled_log:log("IC013", [NewFN]), [] end. - + +-spec add_entry(manifest(), manifest_entry(), boolean()) -> manifest(). +%% @doc +%% Add a new entry to the manifest, if this is the rolling of a new active +%% journal the boolean ToEnd can be used to indicate it should be simply +%% appended to the end of the manifest. add_entry(Manifest, Entry, ToEnd) -> {SQN, FN, PidR, LastKey} = Entry, StrippedName = filename:rootname(FN), @@ -57,6 +73,10 @@ add_entry(Manifest, Entry, ToEnd) -> from_list(Man1) end. +-spec append_lastkey(manifest(), pid(), any()) -> manifest(). +%% @doc +%% On discovery of the last key in the last journal entry, the manifest can +%% be updated through this function to have the last key append_lastkey(Manifest, Pid, LastKey) -> [{SQNMarker, SQNL}|ManifestTail] = Manifest, [{E_SQN, E_FN, E_P, E_LK}|SQNL_Tail] = SQNL, @@ -68,22 +88,35 @@ append_lastkey(Manifest, Pid, LastKey) -> Manifest end. +-spec remove_entry(manifest(), manifest_entry()) -> manifest(). +%% @doc +%% Remove an entry from a manifest (after compaction) remove_entry(Manifest, Entry) -> {SQN, FN, _PidR, _LastKey} = Entry, leveled_log:log("I0013", [FN]), Man0 = lists:keydelete(SQN, 1, to_list(Manifest)), from_list(Man0). +-spec find_entry(integer(), manifest()) -> pid()|string(). +%% @doc +%% Given a SQN find the relevant manifest_entry, returning just the pid() of +%% the journal file (which may be a string() in unit tests) find_entry(SQN, [{SQNMarker, SubL}|_Tail]) when SQN >= SQNMarker -> find_subentry(SQN, SubL); find_entry(SQN, [_TopEntry|Tail]) -> find_entry(SQN, Tail). +-spec head_entry(manifest()) -> manifest_entry(). +%% @doc +%% Return the head manifets entry (the most recent journal) head_entry(Manifest) -> [{_SQNMarker, SQNL}|_Tail] = Manifest, [HeadEntry|_SQNL_Tail] = SQNL, HeadEntry. - + +-spec to_list(manifest()) -> list(). +%% @doc +%% Convert the manifest to a flat list to_list(Manifest) -> FoldFun = fun({_SQNMarker, SubL}, Acc) -> @@ -91,6 +124,9 @@ to_list(Manifest) -> end, lists:foldl(FoldFun, [], Manifest). +-spec reader(integer(), string()) -> manifest(). +%% @doc +%% Given a file path and a manifest SQN return the inker manifest reader(SQN, RootPath) -> ManifestPath = leveled_inker:filepath(RootPath, manifest_dir), leveled_log:log("I0015", [ManifestPath, SQN]), @@ -99,7 +135,10 @@ reader(SQN, RootPath) -> ++ ".man")), from_list(lists:reverse(lists:sort(binary_to_term(MBin)))). - +-spec writer(manifest(), integer(), string()) -> ok. +%% @doc +%% Given a manifest and a manifest SQN and a file path, save the manifest to +%% disk writer(Manifest, ManSQN, RootPath) -> ManPath = leveled_inker:filepath(RootPath, manifest_dir), NewFN = filename:join(ManPath, @@ -114,12 +153,18 @@ writer(Manifest, ManSQN, RootPath) -> ok = file:rename(TmpFN, NewFN), ok end. - + +-spec printer(manifest()) -> ok. +%% @doc +%% Print the manifest to the log printer(Manifest) -> lists:foreach(fun({SQN, FN, _PID, _LK}) -> leveled_log:log("I0017", [SQN, FN]) end, to_list(Manifest)). +-spec complete_filex() -> string(). +%% @doc +%% Return the file extension to be used for a completed manifest file complete_filex() -> ?MANIFEST_FILEX. diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 15aa494..69d9a3f 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -106,7 +106,7 @@ ink_compactionpending/1, ink_getmanifest/1, ink_updatemanifest/3, - ink_print_manifest/1, + ink_printmanifest/1, ink_close/1, ink_doom/1, build_dummy_journal/0, @@ -139,43 +139,140 @@ source_inker :: pid()}). +-type inker_options() :: #inker_options{}. + + %%%============================================================================ %%% API %%%============================================================================ - + +-spec ink_start(inker_options()) -> {ok, pid()}. +%% @doc +%% Startup an inker process - passing in options. +%% +%% The first options are root_path and start_snapshot - if the inker is to be a +%% snapshot clone of another inker then start_snapshot should be true, +%% otherwise the root_path sill be used to find a file structure to provide a +%% starting point of state for the inker. +%% +%% The inker will need ot read and write CDB files (if it is not a snapshot), +%% and so a cdb_options record should be passed in as an inker_option to be +%% used when opening such files. +%% +%% The inker will need to know what the reload strategy is, to inform the +%% clerk about the rules to enforce during compaction. ink_start(InkerOpts) -> gen_server:start(?MODULE, [InkerOpts], []). +-spec ink_put(pid(), + {atom(), any(), any(), any()}|string(), + any(), + {list(), integer()|infinity}) -> + {ok, integer(), integer()}. +%% @doc +%% PUT an object into the journal, returning the sequence number for the PUT +%% as well as the size of the object (information required by the ledger). +%% +%% The primary key is expected to be a tuple of the form +%% {Tag, Bucket, Key, null}, but unit tests support pure string Keys and so +%% these types are also supported. +%% +%% KeyChanges is a tuple of {KeyChanges, TTL} where the TTL is an +%% expiry time (or infinity). ink_put(Pid, PrimaryKey, Object, KeyChanges) -> gen_server:call(Pid, {put, PrimaryKey, Object, KeyChanges}, infinity). +-spec ink_get(pid(), + {atom(), any(), any(), any()}|string(), + integer()) -> + {{integer(), any()}, {any(), any()}}. +%% @doc +%% Fetch the object as stored in the Journal. Will not mask errors, should be +%% used only in tests ink_get(Pid, PrimaryKey, SQN) -> gen_server:call(Pid, {get, PrimaryKey, SQN}, infinity). +-spec ink_fetch(pid(), + {atom(), any(), any(), any()}|string(), + integer()) -> + any(). +%% @doc +%% Fetch the value that was stored for a given Key at a particular SQN (i.e. +%% this must be a SQN of the write for this key). the full object is returned +%% or the atome not_present if there is no such Key stored at that SQN, or if +%% fetching the Key prompted some anticipated error (e.g. CRC check failed) ink_fetch(Pid, PrimaryKey, SQN) -> gen_server:call(Pid, {fetch, PrimaryKey, SQN}, infinity). +-spec ink_keycheck(pid(), + {atom(), any(), any(), any()}|string(), + integer()) -> + probably|missing. +%% @doc +%% Quick check to determine if key is probably present. Positive results have +%% a very small false positive rate, as can be triggered through a hash +%% collision. ink_keycheck(Pid, PrimaryKey, SQN) -> gen_server:call(Pid, {key_check, PrimaryKey, SQN}, infinity). +-spec ink_registersnapshot(pid(), pid()) -> {list(), pid()}. +%% @doc +%% Register a snapshot clone for the process, returning the Manifest and the +%% pid of the active journal. ink_registersnapshot(Pid, Requestor) -> gen_server:call(Pid, {register_snapshot, Requestor}, infinity). +-spec ink_releasesnapshot(pid(), pid()) -> ok. +%% @doc +%% Release a registered snapshot as it is no longer in use. This should be +%% called by all terminating snapshots - otherwise space may not be cleared +%% following compaction. ink_releasesnapshot(Pid, Snapshot) -> gen_server:cast(Pid, {release_snapshot, Snapshot}). +-spec ink_confirmdelete(pid(), integer()) -> boolean(). +%% @doc +%% Confirm if a Journal CDB file can be deleted, as it has been set to delete +%% and is no longer in use by any snapshots ink_confirmdelete(Pid, ManSQN) -> gen_server:call(Pid, {confirm_delete, ManSQN}). +-spec ink_close(pid()) -> ok. +%% @doc +%% Close the inker, prompting all the Journal file processes to be called. ink_close(Pid) -> gen_server:call(Pid, close, infinity). +-spec ink_doom(pid()) -> {ok, [{string(), string(), string(), string()}]}. +%% @doc +%% Test function used to close a file, and return all file paths (potentially +%% to erase all persisted existence) ink_doom(Pid) -> gen_server:call(Pid, doom, 60000). +-spec ink_loadpcl(pid(), integer(), fun(), pid()) -> ok. +%% @doc +%% Function to prompt load of the Ledger at startup. the Penciller should +%% have determined the lowest SQN not present in the Ledger, and the inker +%% should fold over the Journal from that point, using the function to load +%% penciller with the results. +%% +%% The load fun should be a five arity function like: +%% load_fun(KeyInJournal, ValueInJournal, _Position, Acc0, ExtractFun) ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> gen_server:call(Pid, {load_pcl, MinSQN, FilterFun, Penciller}, infinity). +-spec ink_compactjournal(pid(), pid(), integer()) -> ok. +%% @doc +%% Trigger a compaction event. the compaction event will use a sqn check +%% against the Ledger to see if a value can be compacted - if the penciller +%% believes it to be superseded that it can be compacted. +%% +%% The inker will get the maximum persisted sequence number from the +%% initiate_penciller_snapshot/1 function - and use that as a pre-filter so +%% that any value that was written more recently than the last flush to disk +%% of the Ledger will not be considered for compaction (as this may be +%% required to reload the Ledger on startup). ink_compactjournal(Pid, Bookie, Timeout) -> CheckerInitiateFun = fun initiate_penciller_snapshot/1, CheckerCloseFun = fun leveled_penciller:pcl_close/1, @@ -200,16 +297,35 @@ ink_compactjournal(Pid, Checker, InitiateFun, CloseFun, FilterFun, Timeout) -> FilterFun, Timeout}, infinity). - +-spec ink_compactioncomplete(pid()) -> ok. +%% @doc +%% Used by a clerk to state that a compaction process is over, only change +%% is to unlock the Inker for further compactions. ink_compactioncomplete(Pid) -> gen_server:call(Pid, compaction_complete, infinity). +-spec ink_compactionpending(pid()) -> boolean(). +%% @doc +%% Is there ongoing compaction work? No compaction work should be initiated +%5 if there is already some compaction work ongoing. ink_compactionpending(Pid) -> gen_server:call(Pid, compaction_pending, infinity). - + +-spec ink_getmanifest(pid()) -> list(). +%% @doc +%% Allows the clerk to fetch the manifest at the point it starts a compaction +%% job ink_getmanifest(Pid) -> gen_server:call(Pid, get_manifest, infinity). +-spec ink_updatemanifest(pid(), list(), list()) -> {ok, integer()}. +%% @doc +%% Add a section of new entries into the manifest, and drop a bunch of deleted +%% files out of the manifest. Used to update the manifest after a compaction +%% job. +%% +%% Returns {ok, ManSQN} with the ManSQN being the sequence number of the +%% updated manifest ink_updatemanifest(Pid, ManifestSnippet, DeletedFiles) -> gen_server:call(Pid, {update_manifest, @@ -217,7 +333,10 @@ ink_updatemanifest(Pid, ManifestSnippet, DeletedFiles) -> DeletedFiles}, infinity). -ink_print_manifest(Pid) -> +-spec ink_printmanifest(pid()) -> ok. +%% @doc +%% Used in tests to print out the manifest +ink_printmanifest(Pid) -> gen_server:call(Pid, print_manifest, infinity). %%%============================================================================ @@ -857,27 +976,31 @@ compact_journal_test() -> reload_strategy=RStrategy}), {ok, NewSQN1, _ObjSize} = ink_put(Ink1, test_ledgerkey("KeyAA"), - "TestValueAA", []), + "TestValueAA", + {[], infinity}), ?assertMatch(NewSQN1, 5), - ok = ink_print_manifest(Ink1), + ok = ink_printmanifest(Ink1), R0 = ink_get(Ink1, test_ledgerkey("KeyAA"), 5), - ?assertMatch(R0, {{5, test_ledgerkey("KeyAA")}, {"TestValueAA", []}}), + ?assertMatch(R0, + {{5, test_ledgerkey("KeyAA")}, + {"TestValueAA", {[], infinity}}}), FunnyLoop = lists:seq(1, 48), Checker = lists:map(fun(X) -> PK = "KeyZ" ++ integer_to_list(X), {ok, SQN, _} = ink_put(Ink1, test_ledgerkey(PK), crypto:rand_bytes(10000), - []), + {[], infinity}), {SQN, test_ledgerkey(PK)} end, FunnyLoop), {ok, NewSQN2, _ObjSize} = ink_put(Ink1, test_ledgerkey("KeyBB"), - "TestValueBB", []), + "TestValueBB", + {[], infinity}), ?assertMatch(NewSQN2, 54), ActualManifest = ink_getmanifest(Ink1), - ok = ink_print_manifest(Ink1), + ok = ink_printmanifest(Ink1), ?assertMatch(3, length(ActualManifest)), ok = ink_compactjournal(Ink1, Checker, @@ -938,7 +1061,7 @@ empty_manifest_test() -> {ok, Ink2} = ink_start(#inker_options{root_path=RootPath, cdb_options=CDBopts}), ?assertMatch(not_present, ink_fetch(Ink2, "Key1", 1)), - {ok, SQN, Size} = ink_put(Ink2, "Key1", "Value1", []), + {ok, SQN, Size} = ink_put(Ink2, "Key1", "Value1", {[], infinity}), ?assertMatch(2, SQN), ?assertMatch(true, Size > 0), {ok, V} = ink_fetch(Ink2, "Key1", 2), diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 22d8b11..5724f88 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -243,19 +243,59 @@ head_timing :: tuple()}). +-type penciller_options() :: #penciller_options{}. +-type bookies_memory() :: {tuple()|empty_cache, + array:array()|empty_array, + integer()|infinity, + integer()}. +-type pcl_state() :: #state{}. %%%============================================================================ %%% API %%%============================================================================ - +-spec pcl_start(penciller_options()) -> {ok, pid()}. +%% @doc +%% Start a penciller using a penciller options record. The start_snapshot +%% option should be used if this is to be a clone of an existing penciller, +%% otherwise the penciller will look in root path for a manifest and +%% associated sst files to start-up from a previous persisted state. +%% +%% When starting a clone a query can also be passed. This prevents the whole +%% Level Zero memory space from being copied to the snapshot, instead the +%% query is run against the level zero space and just the query results are +%5 copied into the clone. pcl_start(PCLopts) -> gen_server:start(?MODULE, [PCLopts], []). +-spec pcl_pushmem(pid(), bookies_memory()) -> ok|returned. +%% @doc +%% Load the contents of the Bookie's memory of recent additions to the Ledger +%% to the Ledger proper. +%% +%% The load is made up of a cache in the form of a leveled_skiplist tuple (or +%% the atom empty_cache if no cache is present), an index of entries in the +%% skiplist in the form of leveled_pmem index (or empty_index), the minimum +%% sequence number in the cache and the maximum sequence number. +%% +%% If the penciller does not have capacity for the pushed cache it will +%% respond with the atom 'returned'. This is a signal to hold the memory +%% at the Bookie, and try again soon. This normally only occurs when there +%% is a backlog of merges - so the bookie should backoff for longer each time. pcl_pushmem(Pid, LedgerCache) -> %% Bookie to dump memory onto penciller gen_server:call(Pid, {push_mem, LedgerCache}, infinity). +-spec pcl_fetchlevelzero(pid(), integer()) -> tuple(). +%% @doc +%% Allows a single slot of the penciller's levelzero cache to be fetched. The +%% levelzero cache can be up to 40K keys - sending this to the process that is +%% persisting this in a SST file in a single cast will lock the process for +%% 30-40ms. This allows that process to fetch this slot by slot, so that +%% this is split into a series of smaller events. +%% +%% The return value will be a leveled_skiplist that forms that part of the +%% cache pcl_fetchlevelzero(Pid, Slot) -> %% Timeout to cause crash of L0 file when it can't get the close signal %% as it is deadlocked making this call. @@ -263,7 +303,17 @@ pcl_fetchlevelzero(Pid, Slot) -> %% If the timeout gets hit outside of close scenario the Penciller will %% be stuck in L0 pending gen_server:call(Pid, {fetch_levelzero, Slot}, 60000). - + +-spec pcl_fetch(pid(), tuple()) -> {tuple(), tuple()}|not_present. +%% @doc +%% Fetch a key, return the first (highest SQN) occurrence of that Key along +%% with the value. +%% +%% The Key needs to be hashable (i.e. have a tag which indicates that the key +%% can be looked up) - index entries are not hashable for example. +%% +%% If the hash is already knonw, call pcl_fetch/3 as magic_hash is a +%% relatively expensive hash function pcl_fetch(Pid, Key) -> Hash = leveled_codec:magic_hash(Key), if @@ -271,19 +321,48 @@ pcl_fetch(Pid, Key) -> gen_server:call(Pid, {fetch, Key, Hash}, infinity) end. +-spec pcl_fetch(pid(), tuple(), integer()) -> {tuple(), tuple()}|not_present. +%% @doc +%% Fetch a key, return the first (highest SQN) occurrence of that Key along +%% with the value. +%% +%% Hash should be result of leveled_codec:magic_hash(Key) pcl_fetch(Pid, Key, Hash) -> gen_server:call(Pid, {fetch, Key, Hash}, infinity). +-spec pcl_fetchkeys(pid(), tuple(), tuple(), fun(), any()) -> any(). +%% @doc +%% Run a range query between StartKey and EndKey (inclusive). This will cover +%% all keys in the range - so must only be run against snapshots of the +%% penciller to avoid blocking behaviour. +%% +%% Comparison with the upper-end of the range (EndKey) is done using +%% leveled_codec:endkey_passed/2 - so use nulls within the tuple to manage +%% the top of the range. Comparison with the start of the range is based on +%% Erlang term order. pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc) -> gen_server:call(Pid, {fetch_keys, StartKey, EndKey, AccFun, InitAcc, -1}, infinity). +-spec pcl_fetchnextkey(pid(), tuple(), tuple(), fun(), any()) -> any(). +%% @doc +%% Run a range query between StartKey and EndKey (inclusive). This has the +%% same constraints as pcl_fetchkeys/5, but will only return the first key +%% found in erlang term order. pcl_fetchnextkey(Pid, StartKey, EndKey, AccFun, InitAcc) -> gen_server:call(Pid, {fetch_keys, StartKey, EndKey, AccFun, InitAcc, 1}, infinity). +-spec pcl_checksequencenumber(pid(), tuple(), integer()) -> boolean(). +%% @doc +%% Check if the sequence number of the passed key is not replaced by a change +%% after the passed sequence number. Will return true if the Key is present +%% and either is equal to, or prior to the passed SQN. +%% +%% If the key is not present, it will be assumed that a higher sequence number +%% tombstone once existed, and false will be returned. pcl_checksequencenumber(Pid, Key, SQN) -> Hash = leveled_codec:magic_hash(Key), if @@ -291,32 +370,80 @@ pcl_checksequencenumber(Pid, Key, SQN) -> gen_server:call(Pid, {check_sqn, Key, Hash, SQN}, infinity) end. +-spec pcl_workforclerk(pid()) -> ok. +%% @doc +%% A request from the clerk to check for work. If work is present the +%% Penciller will cast back to the clerk, no response is sent to this +%% request. pcl_workforclerk(Pid) -> gen_server:cast(Pid, work_for_clerk). +-spec pcl_manifestchange(pid(), tuple()) -> ok. +%% @doc +%% Provide a manifest record (i.e. the output of the leveled_pmanifest module) +%% that is required to beocme the new manifest. pcl_manifestchange(Pid, Manifest) -> gen_server:cast(Pid, {manifest_change, Manifest}). +-spec pcl_confirml0complete(pid(), string(), tuple(), tuple()) -> ok. +%% @doc +%% Allows a SST writer that has written a L0 file to confirm that the file +%% is now complete, so the filename and key ranges can be added to the +%% manifest and the file can be used in place of the in-memory levelzero +%% cache. pcl_confirml0complete(Pid, FN, StartKey, EndKey) -> gen_server:cast(Pid, {levelzero_complete, FN, StartKey, EndKey}). +-spec pcl_confirmdelete(pid(), string(), pid()) -> ok. +%% @doc +%% Poll from a delete_pending file requesting a message if the file is now +%% ready for deletion (i.e. all snapshots which depend on the file have +%% finished) pcl_confirmdelete(Pid, FileName, FilePid) -> gen_server:cast(Pid, {confirm_delete, FileName, FilePid}). +-spec pcl_getstartupsequencenumber(pid()) -> integer(). +%% @doc +%% At startup the penciller will get the largest sequence number that is +%% within the persisted files. This function allows for this sequence number +%% to be fetched - so that it can be used to determine parts of the Ledger +%% which may have been lost in the last shutdown (so that the ledger can +%% be reloaded from that point in the Journal) pcl_getstartupsequencenumber(Pid) -> gen_server:call(Pid, get_startup_sqn, infinity). +-spec pcl_registersnapshot(pid(), + pid(), + no_lookup|{tuple(), tuple()}|undefined, + bookies_memory(), + boolean()) + -> {ok, pcl_state()}. +%% @doc +%% Register a snapshot of the penciller, returning a state record from the +%% penciller for the snapshot to use as its LoopData pcl_registersnapshot(Pid, Snapshot, Query, BookiesMem, LR) -> gen_server:call(Pid, {register_snapshot, Snapshot, Query, BookiesMem, LR}, infinity). +-spec pcl_releasesnapshot(pid(), pid()) -> ok. +%% @doc +%% Inform the primary penciller that a snapshot is finished, so that the +%% penciller can allow deletes to proceed if appropriate. pcl_releasesnapshot(Pid, Snapshot) -> gen_server:cast(Pid, {release_snapshot, Snapshot}). +-spec pcl_close(pid()) -> ok. +%% @doc +%% Close the penciller neatly, trying to persist to disk anything in the memory pcl_close(Pid) -> gen_server:call(Pid, close, 60000). +-spec pcl_doom(pid()) -> {ok, list()}. +%% @doc +%% Close the penciller neatly, trying to persist to disk anything in the memory +%% Return a list of filepaths from where files exist for this penciller (should +%% the calling process which to erase the store). pcl_doom(Pid) -> gen_server:call(Pid, doom, 60000). diff --git a/src/leveled_pmanifest.erl b/src/leveled_pmanifest.erl index 8fc4dba..29259cf 100644 --- a/src/leveled_pmanifest.erl +++ b/src/leveled_pmanifest.erl @@ -73,10 +73,19 @@ % Currently the lowest level (the largest number) }). +-type manifest() :: #manifest{}. +-type manifest_entry() :: #manifest_entry{}. + %%%============================================================================ %%% API %%%============================================================================ +-spec new_manifest() -> manifest(). +%% @doc +%% The manifest in this case is a manifest of the ledger. This contains +%% information on the layout of the files, but also information of snapshots +%% that may have an influence on the manifest as they require files to remain +%% after the primary penciller is happy for them to be removed. new_manifest() -> LevelArray0 = array:new([{size, ?MAX_LEVELS + 1}, {default, []}]), SetLowerLevelFun = @@ -94,6 +103,10 @@ new_manifest() -> basement = 0 }. +-spec open_manifest(string()) -> manifest(). +%% @doc +%% Open a manifest in the appropriate sub-directory of the RootPath, and will +%% return an empty manifest if no such manifest is present. open_manifest(RootPath) -> % Open the manifest in the file path which has the highest SQN, and will % open without error @@ -113,12 +126,22 @@ open_manifest(RootPath) -> [], Filenames))), open_manifestfile(RootPath, ValidManSQNs). - + +-spec copy_manifest(manifest()) -> manifest(). +%% @doc +%% Used to pass the manifest to a snapshot, removing information not required +%% by a snapshot copy_manifest(Manifest) -> % Copy the manifest ensuring anything only the master process should care % about is switched to undefined Manifest#manifest{snapshots = undefined, pending_deletes = undefined}. +-spec load_manifest(manifest(), fun(), fun()) -> {integer(), manifest()}. +%% @doc +%% Roll over the manifest starting a process to manage each file in the +%% manifest. The PidFun should be able to return the Pid of a file process +%% (having started one). The SQNFun will return the max sequence number +%% of that file, if passed the Pid that owns it. load_manifest(Manifest, PidFun, SQNFun) -> UpdateLevelFun = fun(LevelIdx, {AccMaxSQN, AccMan}) -> @@ -130,6 +153,11 @@ load_manifest(Manifest, PidFun, SQNFun) -> lists:foldl(UpdateLevelFun, {0, Manifest}, lists:seq(0, Manifest#manifest.basement)). +-spec close_manifest(manifest(), fun()) -> ok. +%% @doc +%% Close all the files in the manifest (using CloseEntryFun to call close on +%% a file). Firts all the files in the active manifest are called, and then +%% any files which were pending deletion. close_manifest(Manifest, CloseEntryFun) -> CloseLevelFun = fun(LevelIdx) -> @@ -144,6 +172,9 @@ close_manifest(Manifest, CloseEntryFun) -> end, lists:foreach(ClosePDFun, dict:to_list(Manifest#manifest.pending_deletes)). +-spec save_manifest(manifest(), string()) -> ok. +%% @doc +%% Save the manifest to file (with a checksum) save_manifest(Manifest, RootPath) -> FP = filepath(RootPath, Manifest#manifest.manifest_sqn, current_manifest), ManBin = term_to_binary(Manifest#manifest{snapshots = [], @@ -152,7 +183,15 @@ save_manifest(Manifest, RootPath) -> CRC = erlang:crc32(ManBin), ok = file:write_file(FP, <>). - +-spec replace_manifest_entry(manifest(), integer(), integer(), + list()|manifest_entry(), + list()|manifest_entry()) -> manifest(). +%% @doc +%% Replace a list of manifest entries in the manifest with a new set of entries +%% Pass in the new manifest SQN to be used for this manifest. The list of +%% entries can just be a single entry +%% +%% This is generally called on the level being merged down into. replace_manifest_entry(Manifest, ManSQN, LevelIdx, Removals, Additions) -> Levels = Manifest#manifest.levels, Level = array:get(LevelIdx, Levels), @@ -176,6 +215,11 @@ replace_manifest_entry(Manifest, ManSQN, LevelIdx, Removals, Additions) -> pending_deletes = PendingDeletes} end. +-spec insert_manifest_entry(manifest(), integer(), integer(), + list()|manifest_entry()) -> manifest(). +%% @doc +%% Place a single new manifest entry into a level of the manifest, at a given +%% level and manifest sequence number insert_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) -> Levels = Manifest#manifest.levels, Level = array:get(LevelIdx, Levels), @@ -186,6 +230,10 @@ insert_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) -> basement = Basement, manifest_sqn = ManSQN}. +-spec remove_manifest_entry(manifest(), integer(), integer(), + list()|manifest_entry()) -> manifest(). +%% @doc +%% Remove a manifest entry (as it has been merged into the level below) remove_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) -> Levels = Manifest#manifest.levels, Level = array:get(LevelIdx, Levels), @@ -207,6 +255,11 @@ remove_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) -> pending_deletes = PendingDeletes} end. +-spec switch_manifest_entry(manifest(), integer(), integer(), + list()|manifest_entry()) -> manifest(). +%% @doc +%% Switch a manifest etry from this level to the level below (i.e when there +%% are no overlapping manifest entries in the level below) switch_manifest_entry(Manifest, ManSQN, SrcLevel, Entry) -> % Move to level below - so needs to be removed but not marked as a % pending deletion @@ -219,9 +272,16 @@ switch_manifest_entry(Manifest, ManSQN, SrcLevel, Entry) -> SrcLevel + 1, Entry). +-spec get_manifest_sqn(manifest()) -> integer(). +%% @doc +%% Return the manifest SQN for this manifest get_manifest_sqn(Manifest) -> Manifest#manifest.manifest_sqn. +-spec key_lookup(manifest(), integer(), tuple()) -> false|manifest_entry(). +%% @doc +%% For a given key find which manifest entry covers that key at that level, +%% returning false if there is no covering manifest entry at that level. key_lookup(Manifest, LevelIdx, Key) -> case LevelIdx > Manifest#manifest.basement of true -> @@ -232,6 +292,10 @@ key_lookup(Manifest, LevelIdx, Key) -> Key) end. +-spec range_lookup(manifest(), integer(), tuple(), tuple()) -> list(). +%% @doc +%% Return a list of manifest_entry pointers at this level which cover the +%% key query range. range_lookup(Manifest, LevelIdx, StartKey, EndKey) -> MakePointerFun = fun(M) -> @@ -239,6 +303,11 @@ range_lookup(Manifest, LevelIdx, StartKey, EndKey) -> end, range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun). +-spec merge_lookup(manifest(), integer(), tuple(), tuple()) -> list(). +%% @doc +%% Return a list of manifest_entry pointers at this level which cover the +%% key query range, only all keys in the files should be included in the +%% pointers, not just the queries in the range. merge_lookup(Manifest, LevelIdx, StartKey, EndKey) -> MakePointerFun = fun(M) -> @@ -247,7 +316,8 @@ merge_lookup(Manifest, LevelIdx, StartKey, EndKey) -> range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun). - +-spec mergefile_selector(manifest(), integer()) -> manifest_entry(). +%% @doc %% An algorithm for discovering which files to merge .... %% We can find the most optimal file: %% - The one with the most overlapping data below? @@ -267,14 +337,25 @@ mergefile_selector(Manifest, LevelIdx) -> {_SK, ME} = lists:nth(random:uniform(length(Level)), Level), ME. -%% When the cllerk returns an update manifest to the penciller, the penciller -%% should restore its view of the snapshots to that manifest +-spec merge_snapshot(manifest(), manifest()) -> manifest(). +%% @doc +%% When the clerk returns an updated manifest to the penciller, the penciller +%% should restore its view of the snapshots to that manifest. Snapshots can +%% be received in parallel to the manifest ebing updated, so the updated +%% manifest must not trample over any accrued state in the manifest. merge_snapshot(PencillerManifest, ClerkManifest) -> ClerkManifest#manifest{snapshots = PencillerManifest#manifest.snapshots, min_snapshot_sqn = PencillerManifest#manifest.min_snapshot_sqn}. +-spec add_snapshot(manifest(), pid()|atom(), integer()) -> manifest(). +%% @doc +%% Add a snapshot reference to the manifest, withe rusing the pid or an atom +%% known to reference a special process. The timeout should be in seconds, and +%% the snapshot will assume to have expired at timeout (and so at that stage +%% files which depended on the snapshot will potentially expire, and if the +%% clone is still active it may crash) add_snapshot(Manifest, Pid, Timeout) -> SnapEntry = {Pid, Manifest#manifest.manifest_sqn, seconds_now(), Timeout}, SnapList0 = [SnapEntry|Manifest#manifest.snapshots], @@ -289,6 +370,9 @@ add_snapshot(Manifest, Pid, Timeout) -> min_snapshot_sqn = N0} end. +-spec release_snapshot(manifest(), pid()|atom()) -> manifest(). +%% @doc +%% When a clone is complete the release should be notified to the manifest. release_snapshot(Manifest, Pid) -> FilterFun = fun({P, SQN, ST, TO}, {Acc, MinSQN, Found}) -> @@ -324,6 +408,10 @@ release_snapshot(Manifest, Pid) -> min_snapshot_sqn = MinSnapSQN} end. +-spec ready_to_delete(manifest(), string()) -> {boolean(), manifest()}. +%% @doc +%% A SST file which is in the delete_pending state can check to see if it is +%% ready to delete against the manifest. ready_to_delete(Manifest, Filename) -> {ChangeSQN, _ME} = dict:fetch(Filename, Manifest#manifest.pending_deletes), case Manifest#manifest.min_snapshot_sqn of @@ -343,6 +431,18 @@ ready_to_delete(Manifest, Filename) -> {false, release_snapshot(Manifest, ?PHANTOM_PID)} end. +-spec check_for_work(manifest(), list()) -> {list(), integer()}. +%% @doc +%% Check for compaction work in the manifest - look at levels which contain +%% more files in the threshold. +%% +%% File count determines size in leveled (unlike leveldb which works on the +%% total data volume). Files are fixed size in terms of keys, and the size of +%% metadata is assumed to be contianed and regular and so uninteresting for +%% level sizing. +%% +%% Return a list of levels which are over-sized as well as the total items +%% across the manifest which are beyond the size (the total work outstanding). check_for_work(Manifest, Thresholds) -> CheckLevelFun = fun({LevelIdx, MaxCount}, {AccL, AccC}) -> @@ -362,9 +462,17 @@ check_for_work(Manifest, Thresholds) -> end, lists:foldr(CheckLevelFun, {[], 0}, Thresholds). +-spec is_basement(manifest(), integer()) -> boolean(). +%% @doc +%% Is this level the lowest in the manifest which contains active files. When +%% merging down to the basement level special rules may apply (for example to +%% reap tombstones) is_basement(Manifest, Level) -> Level >= Manifest#manifest.basement. +-spec levelzero_present(manifest()) -> boolean(). +%% @doc +%% Is there a file in level zero (as only one file only can be in level zero). levelzero_present(Manifest) -> not is_empty(0, array:get(0, Manifest#manifest.levels)). @@ -688,7 +796,7 @@ initial_setup() -> owner="pid_z6"}, Man0 = new_manifest(), - % insert_manifest_entry(Manifest, ManSQN, Level, Entry) + Man1 = insert_manifest_entry(Man0, 1, 1, E1), Man2 = insert_manifest_entry(Man1, 1, 1, E2), Man3 = insert_manifest_entry(Man2, 1, 1, E3), @@ -747,11 +855,9 @@ random_select_test() -> L1File = mergefile_selector(LastManifest, 1), % This blows up if the function is not prepared for the different format % https://github.com/martinsumner/leveled/issues/43 - L2File = mergefile_selector(LastManifest, 2), + _L2File = mergefile_selector(LastManifest, 2), Level1 = array:get(1, LastManifest#manifest.levels), - ?assertMatch(true, lists:member(L1File, Level1)), - ?assertMatch(true, is_record(L1File, manifest_entry)), - ?assertMatch(true, is_record(L2File, manifest_entry)). + ?assertMatch(true, lists:member(L1File, Level1)). keylookup_manifest_test() -> {Man0, Man1, Man2, Man3, _Man4, _Man5, Man6} = initial_setup(), @@ -965,35 +1071,35 @@ snapshot_release_test() -> filename="Z3", owner="pid_z3"}, - Man7 = add_snapshot(Man6, "pid_a1", 3600), + Man7 = add_snapshot(Man6, pid_a1, 3600), Man8 = remove_manifest_entry(Man7, 2, 1, E1), - Man9 = add_snapshot(Man8, "pid_a2", 3600), + Man9 = add_snapshot(Man8, pid_a2, 3600), Man10 = remove_manifest_entry(Man9, 3, 1, E2), - Man11 = add_snapshot(Man10, "pid_a3", 3600), + Man11 = add_snapshot(Man10, pid_a3, 3600), Man12 = remove_manifest_entry(Man11, 4, 1, E3), - Man13 = add_snapshot(Man12, "pid_a4", 3600), + Man13 = add_snapshot(Man12, pid_a4, 3600), ?assertMatch(false, element(1, ready_to_delete(Man8, "Z1"))), ?assertMatch(false, element(1, ready_to_delete(Man10, "Z2"))), ?assertMatch(false, element(1, ready_to_delete(Man12, "Z3"))), - Man14 = release_snapshot(Man13, "pid_a1"), + Man14 = release_snapshot(Man13, pid_a1), ?assertMatch(false, element(1, ready_to_delete(Man14, "Z2"))), ?assertMatch(false, element(1, ready_to_delete(Man14, "Z3"))), {Bool14, Man15} = ready_to_delete(Man14, "Z1"), ?assertMatch(true, Bool14), %This doesn't change anything - released snaphsot not the min - Man16 = release_snapshot(Man15, "pid_a4"), + Man16 = release_snapshot(Man15, pid_a4), ?assertMatch(false, element(1, ready_to_delete(Man16, "Z2"))), ?assertMatch(false, element(1, ready_to_delete(Man16, "Z3"))), - Man17 = release_snapshot(Man16, "pid_a2"), + Man17 = release_snapshot(Man16, pid_a2), ?assertMatch(false, element(1, ready_to_delete(Man17, "Z3"))), {Bool17, Man18} = ready_to_delete(Man17, "Z2"), ?assertMatch(true, Bool17), - Man19 = release_snapshot(Man18, "pid_a3"), + Man19 = release_snapshot(Man18, pid_a3), io:format("MinSnapSQN ~w~n", [Man19#manifest.min_snapshot_sqn]), @@ -1003,11 +1109,11 @@ snapshot_release_test() -> snapshot_timeout_test() -> Man6 = element(7, initial_setup()), - Man7 = add_snapshot(Man6, "pid_a1", 3600), + Man7 = add_snapshot(Man6, pid_a1, 3600), ?assertMatch(1, length(Man7#manifest.snapshots)), - Man8 = release_snapshot(Man7, "pid_a1"), + Man8 = release_snapshot(Man7, pid_a1), ?assertMatch(0, length(Man8#manifest.snapshots)), - Man9 = add_snapshot(Man8, "pid_a1", 0), + Man9 = add_snapshot(Man8, pid_a1, 0), timer:sleep(2001), ?assertMatch(1, length(Man9#manifest.snapshots)), Man10 = release_snapshot(Man9, ?PHANTOM_PID), diff --git a/src/leveled_pmem.erl b/src/leveled_pmem.erl index 97e4d5c..0d4c7cf 100644 --- a/src/leveled_pmem.erl +++ b/src/leveled_pmem.erl @@ -43,11 +43,20 @@ -include_lib("eunit/include/eunit.hrl"). +-type index_array() :: array:array(). %%%============================================================================ %%% API %%%============================================================================ +-spec prepare_for_index(index_array(), integer()|no_lookup) -> index_array(). +%% @doc +%% Add the hash of a key to the index. This is 'prepared' in the sense that +%% this index is not use until it is loaded into the main index. +%% +%% prepare_for_index is called from the Bookie when been added to the ledger +%% cache, but the index is not used until that ledger cache is in the +%% penciller L0 memory prepare_for_index(IndexArray, no_lookup) -> IndexArray; prepare_for_index(IndexArray, Hash) -> @@ -55,7 +64,55 @@ prepare_for_index(IndexArray, Hash) -> Bin = array:get(Slot, IndexArray), array:set(Slot, <>, IndexArray). +-spec add_to_index(index_array(), index_array(), integer()) -> index_array(). +%% @doc +%% Expand the penciller's current index array with the details from a new +%% ledger cache tree sent from the Bookie. The tree will have a cache slot +%% which is the index of this ledger_cache in the list of the ledger_caches +add_to_index(LM1Array, L0Index, CacheSlot) when CacheSlot < 128 -> + IndexAddFun = + fun(Slot, Acc) -> + Bin0 = array:get(Slot, Acc), + BinLM1 = array:get(Slot, LM1Array), + array:set(Slot, + <>, + Acc) + end, + lists:foldl(IndexAddFun, L0Index, lists:seq(0, 255)). +-spec new_index() -> index_array(). +%% @doc +%% Create a new index array +new_index() -> + array:new([{size, 256}, {default, <<>>}]). + +-spec clear_index(index_array()) -> index_array(). +%% @doc +%% Create a new index array +clear_index(_L0Index) -> + new_index(). + +-spec check_index(integer(), index_array()) -> list(integer()). +%% @doc +%% return a list of positions in the list of cache arrays that may contain the +%% key associated with the hash being checked +check_index(Hash, L0Index) -> + {Slot, H0} = split_hash(Hash), + Bin = array:get(Slot, L0Index), + find_pos(Bin, H0, [], 0). + + +-spec add_to_cache(integer(), + {tuple(), integer(), integer()}, + integer(), + list()) -> + {integer(), integer(), list()}. +%% @doc +%% The penciller's cache is a list of leveled_trees, this adds a new tree to +%% that cache, providing an update to the approximate size of the cache and +%% the Ledger's SQN. add_to_cache(L0Size, {LevelMinus1, MinSQN, MaxSQN}, LedgerSQN, TreeList) -> LM1Size = leveled_tree:tsize(LevelMinus1), case LM1Size of @@ -70,30 +127,14 @@ add_to_cache(L0Size, {LevelMinus1, MinSQN, MaxSQN}, LedgerSQN, TreeList) -> end end. -add_to_index(LM1Array, L0Index, CacheSlot) when CacheSlot < 128 -> - IndexAddFun = - fun(Slot, Acc) -> - Bin0 = array:get(Slot, Acc), - BinLM1 = array:get(Slot, LM1Array), - array:set(Slot, - <>, - Acc) - end, - lists:foldl(IndexAddFun, L0Index, lists:seq(0, 255)). - -new_index() -> - array:new([{size, 256}, {default, <<>>}]). - -clear_index(_L0Index) -> - new_index(). - -check_index(Hash, L0Index) -> - {Slot, H0} = split_hash(Hash), - Bin = array:get(Slot, L0Index), - find_pos(Bin, H0, [], 0). - +-spec to_list(integer(), fun()) -> list(). +%% @doc +%% The cache is a list of leveled_trees of length Slots. This will fetch +%% each tree in turn by slot ID and then produce a merged/sorted output of +%% Keys and Values (to load into a SST file). +%% +%% Each slot is requested in turn to avoid halting the penciller whilst it +%% does a large object copy of the whole cache. to_list(Slots, FetchFun) -> SW = os:timestamp(), SlotList = lists:reverse(lists:seq(1, Slots)), @@ -107,10 +148,25 @@ to_list(Slots, FetchFun) -> leveled_log:log_timer("PM002", [length(FullList)], SW), FullList. - +-spec check_levelzero(tuple(), list(integer()), list()) + -> {boolean(), tuple|not_found}. +%% @doc +%% Check for the presence of a given Key in the Level Zero cache, with the +%% index array having been checked first for a list of potential positions +%% in the list of ledger caches - and then each potential ledger_cache being +%% checked (with the most recently received cache being checked first) until a +%% match is found. check_levelzero(Key, PosList, TreeList) -> check_levelzero(Key, leveled_codec:magic_hash(Key), PosList, TreeList). +-spec check_levelzero(tuple(), integer(), list(integer()), list()) + -> {boolean(), tuple|not_found}. +%% @doc +%% Check for the presence of a given Key in the Level Zero cache, with the +%% index array having been checked first for a list of potential positions +%% in the list of ledger caches - and then each potential ledger_cache being +%% checked (with the most recently received cache being checked first) until a +%% match is found. check_levelzero(_Key, _Hash, _PosList, []) -> {false, not_found}; check_levelzero(_Key, _Hash, [], _TreeList) -> @@ -118,7 +174,11 @@ check_levelzero(_Key, _Hash, [], _TreeList) -> check_levelzero(Key, Hash, PosList, TreeList) -> check_slotlist(Key, Hash, PosList, TreeList). - +-spec merge_trees(tuple(), tuple(), list(tuple()), tuple()) -> list(). +%% @doc +%% Return a list of keys and values across the level zero cache (and the +%% currently unmerged bookie's ledger cache) that are between StartKey +%% and EndKey (inclusive). merge_trees(StartKey, EndKey, TreeList, LevelMinus1) -> lists:foldl(fun(Tree, Acc) -> R = leveled_tree:match_range(StartKey, diff --git a/src/leveled_tinybloom.erl b/src/leveled_tinybloom.erl index 23ff343..9d0ae32 100644 --- a/src/leveled_tinybloom.erl +++ b/src/leveled_tinybloom.erl @@ -25,7 +25,9 @@ %%% API %%%============================================================================ - +-spec create_bloom(list(integer())) -> binary(). +%% @doc +%% Create a binary bloom filter from alist of hashes create_bloom(HashList) -> case length(HashList) of 0 -> @@ -41,6 +43,9 @@ create_bloom(HashList) -> add_hashlist(HashList, 1, 0, 0) end. +-spec check_hash(integer(), binary()) -> boolean(). +%% @doc +%% Check for the presence of a given hash within a bloom check_hash(_Hash, <<>>) -> false; check_hash(Hash, BloomBin) -> diff --git a/src/leveled_tree.erl b/src/leveled_tree.erl index ba07a3f..9670e24 100644 --- a/src/leveled_tree.erl +++ b/src/leveled_tree.erl @@ -30,21 +30,48 @@ -define(SKIP_WIDTH, 16). +-type tree_type() :: tree|idxt|skpl. +-type leveled_tree() :: {tree_type(), + integer(), % length + any()}. %%%============================================================================ %%% API %%%============================================================================ +-spec from_orderedset(ets:tab(), tree_type()) -> leveled_tree(). +%% @doc +%% Convert an ETS table of Keys and Values (of table type ordered_set) into a +%% leveled_tree of the given type. from_orderedset(Table, Type) -> from_orderedlist(ets:tab2list(Table), Type, ?SKIP_WIDTH). +-spec from_orderedset(ets:tab(), tree_type(), integer()|auto) + -> leveled_tree(). +%% @doc +%% Convert an ETS table of Keys and Values (of table type ordered_set) into a +%% leveled_tree of the given type. The SkipWidth is an integer representing +%% the underlying list size joined in the tree (the trees are all trees of +%% lists of this size). For the skpl type the width can be auto-sized based +%% on the length from_orderedset(Table, Type, SkipWidth) -> from_orderedlist(ets:tab2list(Table), Type, SkipWidth). - +-spec from_orderedlist(list(tuple()), tree_type()) -> leveled_tree(). +%% @doc +%% Convert a list of Keys and Values (of table type ordered_set) into a +%% leveled_tree of the given type. from_orderedlist(OrderedList, Type) -> from_orderedlist(OrderedList, Type, ?SKIP_WIDTH). +-spec from_orderedlist(list(tuple()), tree_type(), integer()|auto) + -> leveled_tree(). +%% @doc +%% Convert a list of Keys and Values (of table type ordered_set) into a +%% leveled_tree of the given type. The SkipWidth is an integer representing +%% the underlying list size joined in the tree (the trees are all trees of +%% lists of this size). For the skpl type the width can be auto-sized based +%% on the length from_orderedlist(OrderedList, tree, SkipWidth) -> L = length(OrderedList), {tree, L, tree_fromorderedlist(OrderedList, [], L, SkipWidth)}; @@ -63,7 +90,11 @@ from_orderedlist(OrderedList, skpl, _SkipWidth) -> end, {skpl, L, skpl_fromorderedlist(OrderedList, L, SkipWidth, 2)}. - +-spec match(tuple()|integer(), leveled_tree()) -> none|{value, any()}. +%% @doc +%% Return the value from a tree associated with an exact match for the given +%% key. This assumes the tree contains the actual keys and values to be +%% macthed against, not a manifest representing ranges of keys and values. match(Key, {tree, _L, Tree}) -> Iter = tree_iterator_from(Key, Tree), case tree_next(Iter) of @@ -84,6 +115,12 @@ match(Key, {skpl, _L, SkipList}) -> SL0 = skpl_getsublist(Key, SkipList), lookup_match(Key, SL0). +-spec search(tuple()|integer(), leveled_tree(), fun()) -> none|tuple(). +%% @doc +%% Search is used when the tree is a manifest of key ranges and it is necessary +%% to find a rnage which may contain the key. The StartKeyFun is used if the +%% values contain extra information that can be used to determine if the key is +%% or is not present. search(Key, {tree, _L, Tree}, StartKeyFun) -> Iter = tree_iterator_from(Key, Tree), case tree_next(Iter) of @@ -126,6 +163,18 @@ search(Key, {skpl, _L, SkipList}, StartKeyFun) -> none end. +-spec match_range(tuple()|integer()|all, + tuple()|integer()|all, + leveled_tree()) + -> list(). +%% @doc +%% Return a range of value between trees from a tree associated with an +%% exact match for the given key. This assumes the tree contains the actual +%% keys and values to be macthed against, not a manifest representing ranges +%% of keys and values. +%% +%% The keyword all can be used as a substitute for the StartKey to remove a +%% constraint from the range. match_range(StartRange, EndRange, Tree) -> EndRangeFun = fun(ER, FirstRHSKey, _FirstRHSValue) -> @@ -133,6 +182,15 @@ match_range(StartRange, EndRange, Tree) -> end, match_range(StartRange, EndRange, Tree, EndRangeFun). +-spec match_range(tuple()|integer()|all, + tuple()|integer()|all, + leveled_tree(), + fun()) + -> list(). +%% @doc +%% As match_range/3 but a function can be passed to be used when comparing the +%5 EndKey with a key in the tree (such as leveled_codec:endkey_passed), where +%% Erlang term comparison will not give the desired result. match_range(StartRange, EndRange, {tree, _L, Tree}, EndRangeFun) -> treelookup_range_start(StartRange, EndRange, Tree, EndRangeFun); match_range(StartRange, EndRange, {idxt, _L, Tree}, EndRangeFun) -> @@ -140,7 +198,18 @@ match_range(StartRange, EndRange, {idxt, _L, Tree}, EndRangeFun) -> match_range(StartRange, EndRange, {skpl, _L, SkipList}, EndRangeFun) -> skpllookup_to_range(StartRange, EndRange, SkipList, EndRangeFun). - +-spec search_range(tuple()|integer()|all, + tuple()|integer()|all, + leveled_tree(), + fun()) + -> list(). +%% @doc +%% Extract a range from a tree, with search used when the tree is a manifest +%% of key ranges and it is necessary to find a rnage which may encapsulate the +%% key range. +%% +%% The StartKeyFun is used if the values contain extra information that can be +%% used to determine if the key is or is not present. search_range(StartRange, EndRange, Tree, StartKeyFun) -> EndRangeFun = fun(ER, _FirstRHSKey, FirstRHSValue) -> @@ -156,7 +225,9 @@ search_range(StartRange, EndRange, Tree, StartKeyFun) -> skpllookup_to_range(StartRange, EndRange, SL, EndRangeFun) end. - +-spec to_list(leveled_tree()) -> list(). +%% @doc +%% Collapse the tree back to a list to_list({tree, _L, Tree}) -> FoldFun = fun({_MK, SL}, Acc) -> @@ -176,10 +247,15 @@ to_list({skpl, _L, SkipList}) -> lists:append(Lv0List). - +-spec tsize(leveled_tree()) -> integer(). +%% @doc +%% Return the count of items in a tree tsize({_Type, L, _Tree}) -> L. +-spec empty(tree_type()) -> leveled_tree(). +%% @doc +%% Return an empty tree of the given type empty(tree) -> {tree, 0, empty_tree()}; empty(idxt) ->