Add dpecs to inker
Give the dialyzer a hand
This commit is contained in:
parent
7ca672d7bc
commit
366253831a
2 changed files with 139 additions and 16 deletions
|
@ -179,7 +179,7 @@ book_start(Opts) ->
|
|||
%% @doc Put an object with an expiry time
|
||||
%%
|
||||
%% Put an item in the store but with a Time To Live - the time when the object
|
||||
%% should expire, in gregorian_sconds (add the required number of seconds to
|
||||
%% should expire, in gregorian_seconds (add the required number of seconds to
|
||||
%% leveled_codec:integer_time/1).
|
||||
%%
|
||||
%% There exists the possibility of per object expiry times, not just whole
|
||||
|
@ -1308,11 +1308,11 @@ maybe_withjitter(CacheSize, MaxCacheSize) ->
|
|||
|
||||
|
||||
|
||||
load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) ->
|
||||
load_fun(KeyInJournal, ValueInJournal, _Position, Acc0, ExtractFun) ->
|
||||
{MinSQN, MaxSQN, OutputTree} = Acc0,
|
||||
{SQN, Type, PK} = KeyInLedger,
|
||||
{SQN, Type, PK} = KeyInJournal,
|
||||
% VBin may already be a term
|
||||
{VBin, VSize} = ExtractFun(ValueInLedger),
|
||||
{VBin, VSize} = ExtractFun(ValueInJournal),
|
||||
{Obj, IndexSpecs} = leveled_codec:split_inkvalue(VBin),
|
||||
case SQN of
|
||||
SQN when SQN < MinSQN ->
|
||||
|
|
|
@ -106,7 +106,7 @@
|
|||
ink_compactionpending/1,
|
||||
ink_getmanifest/1,
|
||||
ink_updatemanifest/3,
|
||||
ink_print_manifest/1,
|
||||
ink_printmanifest/1,
|
||||
ink_close/1,
|
||||
ink_doom/1,
|
||||
build_dummy_journal/0,
|
||||
|
@ -139,43 +139,140 @@
|
|||
source_inker :: pid()}).
|
||||
|
||||
|
||||
-type inker_options() :: #inker_options{}.
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
%%% API
|
||||
%%%============================================================================
|
||||
|
||||
|
||||
-spec ink_start(inker_options()) -> {ok, pid()}.
|
||||
%% @doc
|
||||
%% Startup an inker process - passing in options.
|
||||
%%
|
||||
%% The first options are root_path and start_snapshot - if the inker is to be a
|
||||
%% snapshot clone of another inker then start_snapshot should be true,
|
||||
%% otherwise the root_path sill be used to find a file structure to provide a
|
||||
%% starting point of state for the inker.
|
||||
%%
|
||||
%% The inker will need ot read and write CDB files (if it is not a snapshot),
|
||||
%% and so a cdb_options record should be passed in as an inker_option to be
|
||||
%% used when opening such files.
|
||||
%%
|
||||
%% The inker will need to know what the reload strategy is, to inform the
|
||||
%% clerk about the rules to enforce during compaction.
|
||||
ink_start(InkerOpts) ->
|
||||
gen_server:start(?MODULE, [InkerOpts], []).
|
||||
|
||||
-spec ink_put(pid(),
|
||||
{atom(), any(), any(), any()}|string(),
|
||||
any(),
|
||||
{list(), integer()|infinity}) ->
|
||||
{ok, integer(), integer()}.
|
||||
%% @doc
|
||||
%% PUT an object into the journal, returning the sequence number for the PUT
|
||||
%% as well as the size of the object (information required by the ledger).
|
||||
%%
|
||||
%% The primary key is expected to be a tuple of the form
|
||||
%% {Tag, Bucket, Key, null}, but unit tests support pure string Keys and so
|
||||
%% these types are also supported.
|
||||
%%
|
||||
%% KeyChanges is a tuple of {KeyChanges, TTL} where the TTL is an
|
||||
%% expiry time (or infinity).
|
||||
ink_put(Pid, PrimaryKey, Object, KeyChanges) ->
|
||||
gen_server:call(Pid, {put, PrimaryKey, Object, KeyChanges}, infinity).
|
||||
|
||||
-spec ink_get(pid(),
|
||||
{atom(), any(), any(), any()}|string(),
|
||||
integer()) ->
|
||||
{{integer(), any()}, {any(), any()}}.
|
||||
%% @doc
|
||||
%% Fetch the object as stored in the Journal. Will not mask errors, should be
|
||||
%% used only in tests
|
||||
ink_get(Pid, PrimaryKey, SQN) ->
|
||||
gen_server:call(Pid, {get, PrimaryKey, SQN}, infinity).
|
||||
|
||||
-spec ink_fetch(pid(),
|
||||
{atom(), any(), any(), any()}|string(),
|
||||
integer()) ->
|
||||
any().
|
||||
%% @doc
|
||||
%% Fetch the value that was stored for a given Key at a particular SQN (i.e.
|
||||
%% this must be a SQN of the write for this key). the full object is returned
|
||||
%% or the atome not_present if there is no such Key stored at that SQN, or if
|
||||
%% fetching the Key prompted some anticipated error (e.g. CRC check failed)
|
||||
ink_fetch(Pid, PrimaryKey, SQN) ->
|
||||
gen_server:call(Pid, {fetch, PrimaryKey, SQN}, infinity).
|
||||
|
||||
-spec ink_keycheck(pid(),
|
||||
{atom(), any(), any(), any()}|string(),
|
||||
integer()) ->
|
||||
probably|missing.
|
||||
%% @doc
|
||||
%% Quick check to determine if key is probably present. Positive results have
|
||||
%% a very small false positive rate, as can be triggered through a hash
|
||||
%% collision.
|
||||
ink_keycheck(Pid, PrimaryKey, SQN) ->
|
||||
gen_server:call(Pid, {key_check, PrimaryKey, SQN}, infinity).
|
||||
|
||||
-spec ink_registersnapshot(pid(), pid()) -> {list(), pid()}.
|
||||
%% @doc
|
||||
%% Register a snapshot clone for the process, returning the Manifest and the
|
||||
%% pid of the active journal.
|
||||
ink_registersnapshot(Pid, Requestor) ->
|
||||
gen_server:call(Pid, {register_snapshot, Requestor}, infinity).
|
||||
|
||||
-spec ink_releasesnapshot(pid(), pid()) -> ok.
|
||||
%% @doc
|
||||
%% Release a registered snapshot as it is no longer in use. This should be
|
||||
%% called by all terminating snapshots - otherwise space may not be cleared
|
||||
%% following compaction.
|
||||
ink_releasesnapshot(Pid, Snapshot) ->
|
||||
gen_server:cast(Pid, {release_snapshot, Snapshot}).
|
||||
|
||||
-spec ink_confirmdelete(pid(), integer()) -> boolean().
|
||||
%% @doc
|
||||
%% Confirm if a Journal CDB file can be deleted, as it has been set to delete
|
||||
%% and is no longer in use by any snapshots
|
||||
ink_confirmdelete(Pid, ManSQN) ->
|
||||
gen_server:call(Pid, {confirm_delete, ManSQN}).
|
||||
|
||||
-spec ink_close(pid()) -> ok.
|
||||
%% @doc
|
||||
%% Close the inker, prompting all the Journal file processes to be called.
|
||||
ink_close(Pid) ->
|
||||
gen_server:call(Pid, close, infinity).
|
||||
|
||||
-spec ink_doom(pid()) -> {ok, [{string(), string(), string(), string()}]}.
|
||||
%% @doc
|
||||
%% Test function used to close a file, and return all file paths (potentially
|
||||
%% to erase all persisted existence)
|
||||
ink_doom(Pid) ->
|
||||
gen_server:call(Pid, doom, 60000).
|
||||
|
||||
-spec ink_loadpcl(pid(), integer(), fun(), pid()) -> ok.
|
||||
%% @doc
|
||||
%% Function to prompt load of the Ledger at startup. the Penciller should
|
||||
%% have determined the lowest SQN not present in the Ledger, and the inker
|
||||
%% should fold over the Journal from that point, using the function to load
|
||||
%% penciller with the results.
|
||||
%%
|
||||
%% The load fun should be a five arity function like:
|
||||
%% load_fun(KeyInJournal, ValueInJournal, _Position, Acc0, ExtractFun)
|
||||
ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) ->
|
||||
gen_server:call(Pid, {load_pcl, MinSQN, FilterFun, Penciller}, infinity).
|
||||
|
||||
-spec ink_compactjournal(pid(), pid(), integer()) -> ok.
|
||||
%% @doc
|
||||
%% Trigger a compaction event. the compaction event will use a sqn check
|
||||
%% against the Ledger to see if a value can be compacted - if the penciller
|
||||
%% believes it to be superseded that it can be compacted.
|
||||
%%
|
||||
%% The inker will get the maximum persisted sequence number from the
|
||||
%% initiate_penciller_snapshot/1 function - and use that as a pre-filter so
|
||||
%% that any value that was written more recently than the last flush to disk
|
||||
%% of the Ledger will not be considered for compaction (as this may be
|
||||
%% required to reload the Ledger on startup).
|
||||
ink_compactjournal(Pid, Bookie, Timeout) ->
|
||||
CheckerInitiateFun = fun initiate_penciller_snapshot/1,
|
||||
CheckerFilterFun = fun leveled_penciller:pcl_checksequencenumber/3,
|
||||
|
@ -197,16 +294,35 @@ ink_compactjournal(Pid, Checker, InitiateFun, FilterFun, Timeout) ->
|
|||
FilterFun,
|
||||
Timeout},
|
||||
infinity).
|
||||
|
||||
-spec ink_compactioncomplete(pid()) -> ok.
|
||||
%% @doc
|
||||
%% Used by a clerk to state that a compaction process is over, only change
|
||||
%% is to unlock the Inker for further compactions.
|
||||
ink_compactioncomplete(Pid) ->
|
||||
gen_server:call(Pid, compaction_complete, infinity).
|
||||
|
||||
-spec ink_compactionpending(pid()) -> boolean().
|
||||
%% @doc
|
||||
%% Is there ongoing compaction work? No compaction work should be initiated
|
||||
%5 if there is already some compaction work ongoing.
|
||||
ink_compactionpending(Pid) ->
|
||||
gen_server:call(Pid, compaction_pending, infinity).
|
||||
|
||||
|
||||
-spec ink_getmanifest(pid()) -> list().
|
||||
%% @doc
|
||||
%% Allows the clerk to fetch the manifest at the point it starts a compaction
|
||||
%% job
|
||||
ink_getmanifest(Pid) ->
|
||||
gen_server:call(Pid, get_manifest, infinity).
|
||||
|
||||
-spec ink_updatemanifest(pid(), list(), list()) -> {ok, integer()}.
|
||||
%% @doc
|
||||
%% Add a section of new entries into the manifest, and drop a bunch of deleted
|
||||
%% files out of the manifest. Used to update the manifest after a compaction
|
||||
%% job.
|
||||
%%
|
||||
%% Returns {ok, ManSQN} with the ManSQN being the sequence number of the
|
||||
%% updated manifest
|
||||
ink_updatemanifest(Pid, ManifestSnippet, DeletedFiles) ->
|
||||
gen_server:call(Pid,
|
||||
{update_manifest,
|
||||
|
@ -214,7 +330,10 @@ ink_updatemanifest(Pid, ManifestSnippet, DeletedFiles) ->
|
|||
DeletedFiles},
|
||||
infinity).
|
||||
|
||||
ink_print_manifest(Pid) ->
|
||||
-spec ink_printmanifest(pid()) -> ok.
|
||||
%% @doc
|
||||
%% Used in tests to print out the manifest
|
||||
ink_printmanifest(Pid) ->
|
||||
gen_server:call(Pid, print_manifest, infinity).
|
||||
|
||||
%%%============================================================================
|
||||
|
@ -853,27 +972,31 @@ compact_journal_test() ->
|
|||
reload_strategy=RStrategy}),
|
||||
{ok, NewSQN1, _ObjSize} = ink_put(Ink1,
|
||||
test_ledgerkey("KeyAA"),
|
||||
"TestValueAA", []),
|
||||
"TestValueAA",
|
||||
{[], infinity}),
|
||||
?assertMatch(NewSQN1, 5),
|
||||
ok = ink_print_manifest(Ink1),
|
||||
ok = ink_printmanifest(Ink1),
|
||||
R0 = ink_get(Ink1, test_ledgerkey("KeyAA"), 5),
|
||||
?assertMatch(R0, {{5, test_ledgerkey("KeyAA")}, {"TestValueAA", []}}),
|
||||
?assertMatch(R0,
|
||||
{{5, test_ledgerkey("KeyAA")},
|
||||
{"TestValueAA", {[], infinity}}}),
|
||||
FunnyLoop = lists:seq(1, 48),
|
||||
Checker = lists:map(fun(X) ->
|
||||
PK = "KeyZ" ++ integer_to_list(X),
|
||||
{ok, SQN, _} = ink_put(Ink1,
|
||||
test_ledgerkey(PK),
|
||||
crypto:rand_bytes(10000),
|
||||
[]),
|
||||
{[], infinity}),
|
||||
{SQN, test_ledgerkey(PK)}
|
||||
end,
|
||||
FunnyLoop),
|
||||
{ok, NewSQN2, _ObjSize} = ink_put(Ink1,
|
||||
test_ledgerkey("KeyBB"),
|
||||
"TestValueBB", []),
|
||||
"TestValueBB",
|
||||
{[], infinity}),
|
||||
?assertMatch(NewSQN2, 54),
|
||||
ActualManifest = ink_getmanifest(Ink1),
|
||||
ok = ink_print_manifest(Ink1),
|
||||
ok = ink_printmanifest(Ink1),
|
||||
?assertMatch(3, length(ActualManifest)),
|
||||
ok = ink_compactjournal(Ink1,
|
||||
Checker,
|
||||
|
@ -931,7 +1054,7 @@ empty_manifest_test() ->
|
|||
{ok, Ink2} = ink_start(#inker_options{root_path=RootPath,
|
||||
cdb_options=CDBopts}),
|
||||
?assertMatch(not_present, ink_fetch(Ink2, "Key1", 1)),
|
||||
{ok, SQN, Size} = ink_put(Ink2, "Key1", "Value1", []),
|
||||
{ok, SQN, Size} = ink_put(Ink2, "Key1", "Value1", {[], infinity}),
|
||||
?assertMatch(2, SQN),
|
||||
?assertMatch(true, Size > 0),
|
||||
{ok, V} = ink_fetch(Ink2, "Key1", 2),
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue