diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 970b54c..74fad3e 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -179,7 +179,7 @@ book_start(Opts) -> %% @doc Put an object with an expiry time %% %% Put an item in the store but with a Time To Live - the time when the object -%% should expire, in gregorian_sconds (add the required number of seconds to +%% should expire, in gregorian_seconds (add the required number of seconds to %% leveled_codec:integer_time/1). %% %% There exists the possibility of per object expiry times, not just whole @@ -1308,11 +1308,11 @@ maybe_withjitter(CacheSize, MaxCacheSize) -> -load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) -> +load_fun(KeyInJournal, ValueInJournal, _Position, Acc0, ExtractFun) -> {MinSQN, MaxSQN, OutputTree} = Acc0, - {SQN, Type, PK} = KeyInLedger, + {SQN, Type, PK} = KeyInJournal, % VBin may already be a term - {VBin, VSize} = ExtractFun(ValueInLedger), + {VBin, VSize} = ExtractFun(ValueInJournal), {Obj, IndexSpecs} = leveled_codec:split_inkvalue(VBin), case SQN of SQN when SQN < MinSQN -> diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 2513a4a..f261b19 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -106,7 +106,7 @@ ink_compactionpending/1, ink_getmanifest/1, ink_updatemanifest/3, - ink_print_manifest/1, + ink_printmanifest/1, ink_close/1, ink_doom/1, build_dummy_journal/0, @@ -139,43 +139,140 @@ source_inker :: pid()}). +-type inker_options() :: #inker_options{}. + + %%%============================================================================ %%% API %%%============================================================================ - + +-spec ink_start(inker_options()) -> {ok, pid()}. +%% @doc +%% Startup an inker process - passing in options. +%% +%% The first options are root_path and start_snapshot - if the inker is to be a +%% snapshot clone of another inker then start_snapshot should be true, +%% otherwise the root_path sill be used to find a file structure to provide a +%% starting point of state for the inker. +%% +%% The inker will need ot read and write CDB files (if it is not a snapshot), +%% and so a cdb_options record should be passed in as an inker_option to be +%% used when opening such files. +%% +%% The inker will need to know what the reload strategy is, to inform the +%% clerk about the rules to enforce during compaction. ink_start(InkerOpts) -> gen_server:start(?MODULE, [InkerOpts], []). +-spec ink_put(pid(), + {atom(), any(), any(), any()}|string(), + any(), + {list(), integer()|infinity}) -> + {ok, integer(), integer()}. +%% @doc +%% PUT an object into the journal, returning the sequence number for the PUT +%% as well as the size of the object (information required by the ledger). +%% +%% The primary key is expected to be a tuple of the form +%% {Tag, Bucket, Key, null}, but unit tests support pure string Keys and so +%% these types are also supported. +%% +%% KeyChanges is a tuple of {KeyChanges, TTL} where the TTL is an +%% expiry time (or infinity). ink_put(Pid, PrimaryKey, Object, KeyChanges) -> gen_server:call(Pid, {put, PrimaryKey, Object, KeyChanges}, infinity). +-spec ink_get(pid(), + {atom(), any(), any(), any()}|string(), + integer()) -> + {{integer(), any()}, {any(), any()}}. +%% @doc +%% Fetch the object as stored in the Journal. Will not mask errors, should be +%% used only in tests ink_get(Pid, PrimaryKey, SQN) -> gen_server:call(Pid, {get, PrimaryKey, SQN}, infinity). +-spec ink_fetch(pid(), + {atom(), any(), any(), any()}|string(), + integer()) -> + any(). +%% @doc +%% Fetch the value that was stored for a given Key at a particular SQN (i.e. +%% this must be a SQN of the write for this key). the full object is returned +%% or the atome not_present if there is no such Key stored at that SQN, or if +%% fetching the Key prompted some anticipated error (e.g. CRC check failed) ink_fetch(Pid, PrimaryKey, SQN) -> gen_server:call(Pid, {fetch, PrimaryKey, SQN}, infinity). +-spec ink_keycheck(pid(), + {atom(), any(), any(), any()}|string(), + integer()) -> + probably|missing. +%% @doc +%% Quick check to determine if key is probably present. Positive results have +%% a very small false positive rate, as can be triggered through a hash +%% collision. ink_keycheck(Pid, PrimaryKey, SQN) -> gen_server:call(Pid, {key_check, PrimaryKey, SQN}, infinity). +-spec ink_registersnapshot(pid(), pid()) -> {list(), pid()}. +%% @doc +%% Register a snapshot clone for the process, returning the Manifest and the +%% pid of the active journal. ink_registersnapshot(Pid, Requestor) -> gen_server:call(Pid, {register_snapshot, Requestor}, infinity). +-spec ink_releasesnapshot(pid(), pid()) -> ok. +%% @doc +%% Release a registered snapshot as it is no longer in use. This should be +%% called by all terminating snapshots - otherwise space may not be cleared +%% following compaction. ink_releasesnapshot(Pid, Snapshot) -> gen_server:cast(Pid, {release_snapshot, Snapshot}). +-spec ink_confirmdelete(pid(), integer()) -> boolean(). +%% @doc +%% Confirm if a Journal CDB file can be deleted, as it has been set to delete +%% and is no longer in use by any snapshots ink_confirmdelete(Pid, ManSQN) -> gen_server:call(Pid, {confirm_delete, ManSQN}). +-spec ink_close(pid()) -> ok. +%% @doc +%% Close the inker, prompting all the Journal file processes to be called. ink_close(Pid) -> gen_server:call(Pid, close, infinity). +-spec ink_doom(pid()) -> {ok, [{string(), string(), string(), string()}]}. +%% @doc +%% Test function used to close a file, and return all file paths (potentially +%% to erase all persisted existence) ink_doom(Pid) -> gen_server:call(Pid, doom, 60000). +-spec ink_loadpcl(pid(), integer(), fun(), pid()) -> ok. +%% @doc +%% Function to prompt load of the Ledger at startup. the Penciller should +%% have determined the lowest SQN not present in the Ledger, and the inker +%% should fold over the Journal from that point, using the function to load +%% penciller with the results. +%% +%% The load fun should be a five arity function like: +%% load_fun(KeyInJournal, ValueInJournal, _Position, Acc0, ExtractFun) ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> gen_server:call(Pid, {load_pcl, MinSQN, FilterFun, Penciller}, infinity). +-spec ink_compactjournal(pid(), pid(), integer()) -> ok. +%% @doc +%% Trigger a compaction event. the compaction event will use a sqn check +%% against the Ledger to see if a value can be compacted - if the penciller +%% believes it to be superseded that it can be compacted. +%% +%% The inker will get the maximum persisted sequence number from the +%% initiate_penciller_snapshot/1 function - and use that as a pre-filter so +%% that any value that was written more recently than the last flush to disk +%% of the Ledger will not be considered for compaction (as this may be +%% required to reload the Ledger on startup). ink_compactjournal(Pid, Bookie, Timeout) -> CheckerInitiateFun = fun initiate_penciller_snapshot/1, CheckerFilterFun = fun leveled_penciller:pcl_checksequencenumber/3, @@ -197,16 +294,35 @@ ink_compactjournal(Pid, Checker, InitiateFun, FilterFun, Timeout) -> FilterFun, Timeout}, infinity). - +-spec ink_compactioncomplete(pid()) -> ok. +%% @doc +%% Used by a clerk to state that a compaction process is over, only change +%% is to unlock the Inker for further compactions. ink_compactioncomplete(Pid) -> gen_server:call(Pid, compaction_complete, infinity). +-spec ink_compactionpending(pid()) -> boolean(). +%% @doc +%% Is there ongoing compaction work? No compaction work should be initiated +%5 if there is already some compaction work ongoing. ink_compactionpending(Pid) -> gen_server:call(Pid, compaction_pending, infinity). - + +-spec ink_getmanifest(pid()) -> list(). +%% @doc +%% Allows the clerk to fetch the manifest at the point it starts a compaction +%% job ink_getmanifest(Pid) -> gen_server:call(Pid, get_manifest, infinity). +-spec ink_updatemanifest(pid(), list(), list()) -> {ok, integer()}. +%% @doc +%% Add a section of new entries into the manifest, and drop a bunch of deleted +%% files out of the manifest. Used to update the manifest after a compaction +%% job. +%% +%% Returns {ok, ManSQN} with the ManSQN being the sequence number of the +%% updated manifest ink_updatemanifest(Pid, ManifestSnippet, DeletedFiles) -> gen_server:call(Pid, {update_manifest, @@ -214,7 +330,10 @@ ink_updatemanifest(Pid, ManifestSnippet, DeletedFiles) -> DeletedFiles}, infinity). -ink_print_manifest(Pid) -> +-spec ink_printmanifest(pid()) -> ok. +%% @doc +%% Used in tests to print out the manifest +ink_printmanifest(Pid) -> gen_server:call(Pid, print_manifest, infinity). %%%============================================================================ @@ -853,27 +972,31 @@ compact_journal_test() -> reload_strategy=RStrategy}), {ok, NewSQN1, _ObjSize} = ink_put(Ink1, test_ledgerkey("KeyAA"), - "TestValueAA", []), + "TestValueAA", + {[], infinity}), ?assertMatch(NewSQN1, 5), - ok = ink_print_manifest(Ink1), + ok = ink_printmanifest(Ink1), R0 = ink_get(Ink1, test_ledgerkey("KeyAA"), 5), - ?assertMatch(R0, {{5, test_ledgerkey("KeyAA")}, {"TestValueAA", []}}), + ?assertMatch(R0, + {{5, test_ledgerkey("KeyAA")}, + {"TestValueAA", {[], infinity}}}), FunnyLoop = lists:seq(1, 48), Checker = lists:map(fun(X) -> PK = "KeyZ" ++ integer_to_list(X), {ok, SQN, _} = ink_put(Ink1, test_ledgerkey(PK), crypto:rand_bytes(10000), - []), + {[], infinity}), {SQN, test_ledgerkey(PK)} end, FunnyLoop), {ok, NewSQN2, _ObjSize} = ink_put(Ink1, test_ledgerkey("KeyBB"), - "TestValueBB", []), + "TestValueBB", + {[], infinity}), ?assertMatch(NewSQN2, 54), ActualManifest = ink_getmanifest(Ink1), - ok = ink_print_manifest(Ink1), + ok = ink_printmanifest(Ink1), ?assertMatch(3, length(ActualManifest)), ok = ink_compactjournal(Ink1, Checker, @@ -931,7 +1054,7 @@ empty_manifest_test() -> {ok, Ink2} = ink_start(#inker_options{root_path=RootPath, cdb_options=CDBopts}), ?assertMatch(not_present, ink_fetch(Ink2, "Key1", 1)), - {ok, SQN, Size} = ink_put(Ink2, "Key1", "Value1", []), + {ok, SQN, Size} = ink_put(Ink2, "Key1", "Value1", {[], infinity}), ?assertMatch(2, SQN), ?assertMatch(true, Size > 0), {ok, V} = ink_fetch(Ink2, "Key1", 2),