commit
92275cfc2f
7 changed files with 613 additions and 71 deletions
|
@ -28,11 +28,22 @@
|
|||
-define(PENDING_FILEX, "pnd").
|
||||
-define(SKIP_WIDTH, 16).
|
||||
|
||||
-type manifest() :: list({integer(), list()}).
|
||||
%% The manifest is divided into blocks by sequence number, with each block
|
||||
%% being a list of manifest entries for that SQN range.
|
||||
-type manifest_entry() :: {integer(), string(), pid()|string(), any()}.
|
||||
%% The Entry should have a pid() as the third element, but a string() may be
|
||||
%% used in unit tests
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
%%% API
|
||||
%%%============================================================================
|
||||
|
||||
-spec generate_entry(pid()) -> list().
|
||||
%% @doc
|
||||
%% Generate a list with a single iManifest entry for a journal file. Used
|
||||
%% only by the clerk when creating new entries for compacted files.
|
||||
generate_entry(Journal) ->
|
||||
{ok, NewFN} = leveled_cdb:cdb_complete(Journal),
|
||||
{ok, PidR} = leveled_cdb:cdb_open_reader(NewFN),
|
||||
|
@ -44,7 +55,12 @@ generate_entry(Journal) ->
|
|||
leveled_log:log("IC013", [NewFN]),
|
||||
[]
|
||||
end.
|
||||
|
||||
|
||||
-spec add_entry(manifest(), manifest_entry(), boolean()) -> manifest().
|
||||
%% @doc
|
||||
%% Add a new entry to the manifest, if this is the rolling of a new active
|
||||
%% journal the boolean ToEnd can be used to indicate it should be simply
|
||||
%% appended to the end of the manifest.
|
||||
add_entry(Manifest, Entry, ToEnd) ->
|
||||
{SQN, FN, PidR, LastKey} = Entry,
|
||||
StrippedName = filename:rootname(FN),
|
||||
|
@ -57,6 +73,10 @@ add_entry(Manifest, Entry, ToEnd) ->
|
|||
from_list(Man1)
|
||||
end.
|
||||
|
||||
-spec append_lastkey(manifest(), pid(), any()) -> manifest().
|
||||
%% @doc
|
||||
%% On discovery of the last key in the last journal entry, the manifest can
|
||||
%% be updated through this function to have the last key
|
||||
append_lastkey(Manifest, Pid, LastKey) ->
|
||||
[{SQNMarker, SQNL}|ManifestTail] = Manifest,
|
||||
[{E_SQN, E_FN, E_P, E_LK}|SQNL_Tail] = SQNL,
|
||||
|
@ -68,22 +88,35 @@ append_lastkey(Manifest, Pid, LastKey) ->
|
|||
Manifest
|
||||
end.
|
||||
|
||||
-spec remove_entry(manifest(), manifest_entry()) -> manifest().
|
||||
%% @doc
|
||||
%% Remove an entry from a manifest (after compaction)
|
||||
remove_entry(Manifest, Entry) ->
|
||||
{SQN, FN, _PidR, _LastKey} = Entry,
|
||||
leveled_log:log("I0013", [FN]),
|
||||
Man0 = lists:keydelete(SQN, 1, to_list(Manifest)),
|
||||
from_list(Man0).
|
||||
|
||||
-spec find_entry(integer(), manifest()) -> pid()|string().
|
||||
%% @doc
|
||||
%% Given a SQN find the relevant manifest_entry, returning just the pid() of
|
||||
%% the journal file (which may be a string() in unit tests)
|
||||
find_entry(SQN, [{SQNMarker, SubL}|_Tail]) when SQN >= SQNMarker ->
|
||||
find_subentry(SQN, SubL);
|
||||
find_entry(SQN, [_TopEntry|Tail]) ->
|
||||
find_entry(SQN, Tail).
|
||||
|
||||
-spec head_entry(manifest()) -> manifest_entry().
|
||||
%% @doc
|
||||
%% Return the head manifets entry (the most recent journal)
|
||||
head_entry(Manifest) ->
|
||||
[{_SQNMarker, SQNL}|_Tail] = Manifest,
|
||||
[HeadEntry|_SQNL_Tail] = SQNL,
|
||||
HeadEntry.
|
||||
|
||||
|
||||
-spec to_list(manifest()) -> list().
|
||||
%% @doc
|
||||
%% Convert the manifest to a flat list
|
||||
to_list(Manifest) ->
|
||||
FoldFun =
|
||||
fun({_SQNMarker, SubL}, Acc) ->
|
||||
|
@ -91,6 +124,9 @@ to_list(Manifest) ->
|
|||
end,
|
||||
lists:foldl(FoldFun, [], Manifest).
|
||||
|
||||
-spec reader(integer(), string()) -> manifest().
|
||||
%% @doc
|
||||
%% Given a file path and a manifest SQN return the inker manifest
|
||||
reader(SQN, RootPath) ->
|
||||
ManifestPath = leveled_inker:filepath(RootPath, manifest_dir),
|
||||
leveled_log:log("I0015", [ManifestPath, SQN]),
|
||||
|
@ -99,7 +135,10 @@ reader(SQN, RootPath) ->
|
|||
++ ".man")),
|
||||
from_list(lists:reverse(lists:sort(binary_to_term(MBin)))).
|
||||
|
||||
|
||||
-spec writer(manifest(), integer(), string()) -> ok.
|
||||
%% @doc
|
||||
%% Given a manifest and a manifest SQN and a file path, save the manifest to
|
||||
%% disk
|
||||
writer(Manifest, ManSQN, RootPath) ->
|
||||
ManPath = leveled_inker:filepath(RootPath, manifest_dir),
|
||||
NewFN = filename:join(ManPath,
|
||||
|
@ -114,12 +153,18 @@ writer(Manifest, ManSQN, RootPath) ->
|
|||
ok = file:rename(TmpFN, NewFN),
|
||||
ok
|
||||
end.
|
||||
|
||||
|
||||
-spec printer(manifest()) -> ok.
|
||||
%% @doc
|
||||
%% Print the manifest to the log
|
||||
printer(Manifest) ->
|
||||
lists:foreach(fun({SQN, FN, _PID, _LK}) ->
|
||||
leveled_log:log("I0017", [SQN, FN]) end,
|
||||
to_list(Manifest)).
|
||||
|
||||
-spec complete_filex() -> string().
|
||||
%% @doc
|
||||
%% Return the file extension to be used for a completed manifest file
|
||||
complete_filex() ->
|
||||
?MANIFEST_FILEX.
|
||||
|
||||
|
|
|
@ -106,7 +106,7 @@
|
|||
ink_compactionpending/1,
|
||||
ink_getmanifest/1,
|
||||
ink_updatemanifest/3,
|
||||
ink_print_manifest/1,
|
||||
ink_printmanifest/1,
|
||||
ink_close/1,
|
||||
ink_doom/1,
|
||||
build_dummy_journal/0,
|
||||
|
@ -139,43 +139,140 @@
|
|||
source_inker :: pid()}).
|
||||
|
||||
|
||||
-type inker_options() :: #inker_options{}.
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
%%% API
|
||||
%%%============================================================================
|
||||
|
||||
|
||||
-spec ink_start(inker_options()) -> {ok, pid()}.
|
||||
%% @doc
|
||||
%% Startup an inker process - passing in options.
|
||||
%%
|
||||
%% The first options are root_path and start_snapshot - if the inker is to be a
|
||||
%% snapshot clone of another inker then start_snapshot should be true,
|
||||
%% otherwise the root_path sill be used to find a file structure to provide a
|
||||
%% starting point of state for the inker.
|
||||
%%
|
||||
%% The inker will need ot read and write CDB files (if it is not a snapshot),
|
||||
%% and so a cdb_options record should be passed in as an inker_option to be
|
||||
%% used when opening such files.
|
||||
%%
|
||||
%% The inker will need to know what the reload strategy is, to inform the
|
||||
%% clerk about the rules to enforce during compaction.
|
||||
ink_start(InkerOpts) ->
|
||||
gen_server:start(?MODULE, [InkerOpts], []).
|
||||
|
||||
-spec ink_put(pid(),
|
||||
{atom(), any(), any(), any()}|string(),
|
||||
any(),
|
||||
{list(), integer()|infinity}) ->
|
||||
{ok, integer(), integer()}.
|
||||
%% @doc
|
||||
%% PUT an object into the journal, returning the sequence number for the PUT
|
||||
%% as well as the size of the object (information required by the ledger).
|
||||
%%
|
||||
%% The primary key is expected to be a tuple of the form
|
||||
%% {Tag, Bucket, Key, null}, but unit tests support pure string Keys and so
|
||||
%% these types are also supported.
|
||||
%%
|
||||
%% KeyChanges is a tuple of {KeyChanges, TTL} where the TTL is an
|
||||
%% expiry time (or infinity).
|
||||
ink_put(Pid, PrimaryKey, Object, KeyChanges) ->
|
||||
gen_server:call(Pid, {put, PrimaryKey, Object, KeyChanges}, infinity).
|
||||
|
||||
-spec ink_get(pid(),
|
||||
{atom(), any(), any(), any()}|string(),
|
||||
integer()) ->
|
||||
{{integer(), any()}, {any(), any()}}.
|
||||
%% @doc
|
||||
%% Fetch the object as stored in the Journal. Will not mask errors, should be
|
||||
%% used only in tests
|
||||
ink_get(Pid, PrimaryKey, SQN) ->
|
||||
gen_server:call(Pid, {get, PrimaryKey, SQN}, infinity).
|
||||
|
||||
-spec ink_fetch(pid(),
|
||||
{atom(), any(), any(), any()}|string(),
|
||||
integer()) ->
|
||||
any().
|
||||
%% @doc
|
||||
%% Fetch the value that was stored for a given Key at a particular SQN (i.e.
|
||||
%% this must be a SQN of the write for this key). the full object is returned
|
||||
%% or the atome not_present if there is no such Key stored at that SQN, or if
|
||||
%% fetching the Key prompted some anticipated error (e.g. CRC check failed)
|
||||
ink_fetch(Pid, PrimaryKey, SQN) ->
|
||||
gen_server:call(Pid, {fetch, PrimaryKey, SQN}, infinity).
|
||||
|
||||
-spec ink_keycheck(pid(),
|
||||
{atom(), any(), any(), any()}|string(),
|
||||
integer()) ->
|
||||
probably|missing.
|
||||
%% @doc
|
||||
%% Quick check to determine if key is probably present. Positive results have
|
||||
%% a very small false positive rate, as can be triggered through a hash
|
||||
%% collision.
|
||||
ink_keycheck(Pid, PrimaryKey, SQN) ->
|
||||
gen_server:call(Pid, {key_check, PrimaryKey, SQN}, infinity).
|
||||
|
||||
-spec ink_registersnapshot(pid(), pid()) -> {list(), pid()}.
|
||||
%% @doc
|
||||
%% Register a snapshot clone for the process, returning the Manifest and the
|
||||
%% pid of the active journal.
|
||||
ink_registersnapshot(Pid, Requestor) ->
|
||||
gen_server:call(Pid, {register_snapshot, Requestor}, infinity).
|
||||
|
||||
-spec ink_releasesnapshot(pid(), pid()) -> ok.
|
||||
%% @doc
|
||||
%% Release a registered snapshot as it is no longer in use. This should be
|
||||
%% called by all terminating snapshots - otherwise space may not be cleared
|
||||
%% following compaction.
|
||||
ink_releasesnapshot(Pid, Snapshot) ->
|
||||
gen_server:cast(Pid, {release_snapshot, Snapshot}).
|
||||
|
||||
-spec ink_confirmdelete(pid(), integer()) -> boolean().
|
||||
%% @doc
|
||||
%% Confirm if a Journal CDB file can be deleted, as it has been set to delete
|
||||
%% and is no longer in use by any snapshots
|
||||
ink_confirmdelete(Pid, ManSQN) ->
|
||||
gen_server:call(Pid, {confirm_delete, ManSQN}).
|
||||
|
||||
-spec ink_close(pid()) -> ok.
|
||||
%% @doc
|
||||
%% Close the inker, prompting all the Journal file processes to be called.
|
||||
ink_close(Pid) ->
|
||||
gen_server:call(Pid, close, infinity).
|
||||
|
||||
-spec ink_doom(pid()) -> {ok, [{string(), string(), string(), string()}]}.
|
||||
%% @doc
|
||||
%% Test function used to close a file, and return all file paths (potentially
|
||||
%% to erase all persisted existence)
|
||||
ink_doom(Pid) ->
|
||||
gen_server:call(Pid, doom, 60000).
|
||||
|
||||
-spec ink_loadpcl(pid(), integer(), fun(), pid()) -> ok.
|
||||
%% @doc
|
||||
%% Function to prompt load of the Ledger at startup. the Penciller should
|
||||
%% have determined the lowest SQN not present in the Ledger, and the inker
|
||||
%% should fold over the Journal from that point, using the function to load
|
||||
%% penciller with the results.
|
||||
%%
|
||||
%% The load fun should be a five arity function like:
|
||||
%% load_fun(KeyInJournal, ValueInJournal, _Position, Acc0, ExtractFun)
|
||||
ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) ->
|
||||
gen_server:call(Pid, {load_pcl, MinSQN, FilterFun, Penciller}, infinity).
|
||||
|
||||
-spec ink_compactjournal(pid(), pid(), integer()) -> ok.
|
||||
%% @doc
|
||||
%% Trigger a compaction event. the compaction event will use a sqn check
|
||||
%% against the Ledger to see if a value can be compacted - if the penciller
|
||||
%% believes it to be superseded that it can be compacted.
|
||||
%%
|
||||
%% The inker will get the maximum persisted sequence number from the
|
||||
%% initiate_penciller_snapshot/1 function - and use that as a pre-filter so
|
||||
%% that any value that was written more recently than the last flush to disk
|
||||
%% of the Ledger will not be considered for compaction (as this may be
|
||||
%% required to reload the Ledger on startup).
|
||||
ink_compactjournal(Pid, Bookie, Timeout) ->
|
||||
CheckerInitiateFun = fun initiate_penciller_snapshot/1,
|
||||
CheckerCloseFun = fun leveled_penciller:pcl_close/1,
|
||||
|
@ -200,16 +297,35 @@ ink_compactjournal(Pid, Checker, InitiateFun, CloseFun, FilterFun, Timeout) ->
|
|||
FilterFun,
|
||||
Timeout},
|
||||
infinity).
|
||||
|
||||
-spec ink_compactioncomplete(pid()) -> ok.
|
||||
%% @doc
|
||||
%% Used by a clerk to state that a compaction process is over, only change
|
||||
%% is to unlock the Inker for further compactions.
|
||||
ink_compactioncomplete(Pid) ->
|
||||
gen_server:call(Pid, compaction_complete, infinity).
|
||||
|
||||
-spec ink_compactionpending(pid()) -> boolean().
|
||||
%% @doc
|
||||
%% Is there ongoing compaction work? No compaction work should be initiated
|
||||
%5 if there is already some compaction work ongoing.
|
||||
ink_compactionpending(Pid) ->
|
||||
gen_server:call(Pid, compaction_pending, infinity).
|
||||
|
||||
|
||||
-spec ink_getmanifest(pid()) -> list().
|
||||
%% @doc
|
||||
%% Allows the clerk to fetch the manifest at the point it starts a compaction
|
||||
%% job
|
||||
ink_getmanifest(Pid) ->
|
||||
gen_server:call(Pid, get_manifest, infinity).
|
||||
|
||||
-spec ink_updatemanifest(pid(), list(), list()) -> {ok, integer()}.
|
||||
%% @doc
|
||||
%% Add a section of new entries into the manifest, and drop a bunch of deleted
|
||||
%% files out of the manifest. Used to update the manifest after a compaction
|
||||
%% job.
|
||||
%%
|
||||
%% Returns {ok, ManSQN} with the ManSQN being the sequence number of the
|
||||
%% updated manifest
|
||||
ink_updatemanifest(Pid, ManifestSnippet, DeletedFiles) ->
|
||||
gen_server:call(Pid,
|
||||
{update_manifest,
|
||||
|
@ -217,7 +333,10 @@ ink_updatemanifest(Pid, ManifestSnippet, DeletedFiles) ->
|
|||
DeletedFiles},
|
||||
infinity).
|
||||
|
||||
ink_print_manifest(Pid) ->
|
||||
-spec ink_printmanifest(pid()) -> ok.
|
||||
%% @doc
|
||||
%% Used in tests to print out the manifest
|
||||
ink_printmanifest(Pid) ->
|
||||
gen_server:call(Pid, print_manifest, infinity).
|
||||
|
||||
%%%============================================================================
|
||||
|
@ -857,27 +976,31 @@ compact_journal_test() ->
|
|||
reload_strategy=RStrategy}),
|
||||
{ok, NewSQN1, _ObjSize} = ink_put(Ink1,
|
||||
test_ledgerkey("KeyAA"),
|
||||
"TestValueAA", []),
|
||||
"TestValueAA",
|
||||
{[], infinity}),
|
||||
?assertMatch(NewSQN1, 5),
|
||||
ok = ink_print_manifest(Ink1),
|
||||
ok = ink_printmanifest(Ink1),
|
||||
R0 = ink_get(Ink1, test_ledgerkey("KeyAA"), 5),
|
||||
?assertMatch(R0, {{5, test_ledgerkey("KeyAA")}, {"TestValueAA", []}}),
|
||||
?assertMatch(R0,
|
||||
{{5, test_ledgerkey("KeyAA")},
|
||||
{"TestValueAA", {[], infinity}}}),
|
||||
FunnyLoop = lists:seq(1, 48),
|
||||
Checker = lists:map(fun(X) ->
|
||||
PK = "KeyZ" ++ integer_to_list(X),
|
||||
{ok, SQN, _} = ink_put(Ink1,
|
||||
test_ledgerkey(PK),
|
||||
crypto:rand_bytes(10000),
|
||||
[]),
|
||||
{[], infinity}),
|
||||
{SQN, test_ledgerkey(PK)}
|
||||
end,
|
||||
FunnyLoop),
|
||||
{ok, NewSQN2, _ObjSize} = ink_put(Ink1,
|
||||
test_ledgerkey("KeyBB"),
|
||||
"TestValueBB", []),
|
||||
"TestValueBB",
|
||||
{[], infinity}),
|
||||
?assertMatch(NewSQN2, 54),
|
||||
ActualManifest = ink_getmanifest(Ink1),
|
||||
ok = ink_print_manifest(Ink1),
|
||||
ok = ink_printmanifest(Ink1),
|
||||
?assertMatch(3, length(ActualManifest)),
|
||||
ok = ink_compactjournal(Ink1,
|
||||
Checker,
|
||||
|
@ -938,7 +1061,7 @@ empty_manifest_test() ->
|
|||
{ok, Ink2} = ink_start(#inker_options{root_path=RootPath,
|
||||
cdb_options=CDBopts}),
|
||||
?assertMatch(not_present, ink_fetch(Ink2, "Key1", 1)),
|
||||
{ok, SQN, Size} = ink_put(Ink2, "Key1", "Value1", []),
|
||||
{ok, SQN, Size} = ink_put(Ink2, "Key1", "Value1", {[], infinity}),
|
||||
?assertMatch(2, SQN),
|
||||
?assertMatch(true, Size > 0),
|
||||
{ok, V} = ink_fetch(Ink2, "Key1", 2),
|
||||
|
|
|
@ -243,19 +243,59 @@
|
|||
|
||||
head_timing :: tuple()}).
|
||||
|
||||
-type penciller_options() :: #penciller_options{}.
|
||||
-type bookies_memory() :: {tuple()|empty_cache,
|
||||
array:array()|empty_array,
|
||||
integer()|infinity,
|
||||
integer()}.
|
||||
-type pcl_state() :: #state{}.
|
||||
|
||||
%%%============================================================================
|
||||
%%% API
|
||||
%%%============================================================================
|
||||
|
||||
|
||||
-spec pcl_start(penciller_options()) -> {ok, pid()}.
|
||||
%% @doc
|
||||
%% Start a penciller using a penciller options record. The start_snapshot
|
||||
%% option should be used if this is to be a clone of an existing penciller,
|
||||
%% otherwise the penciller will look in root path for a manifest and
|
||||
%% associated sst files to start-up from a previous persisted state.
|
||||
%%
|
||||
%% When starting a clone a query can also be passed. This prevents the whole
|
||||
%% Level Zero memory space from being copied to the snapshot, instead the
|
||||
%% query is run against the level zero space and just the query results are
|
||||
%5 copied into the clone.
|
||||
pcl_start(PCLopts) ->
|
||||
gen_server:start(?MODULE, [PCLopts], []).
|
||||
|
||||
-spec pcl_pushmem(pid(), bookies_memory()) -> ok|returned.
|
||||
%% @doc
|
||||
%% Load the contents of the Bookie's memory of recent additions to the Ledger
|
||||
%% to the Ledger proper.
|
||||
%%
|
||||
%% The load is made up of a cache in the form of a leveled_skiplist tuple (or
|
||||
%% the atom empty_cache if no cache is present), an index of entries in the
|
||||
%% skiplist in the form of leveled_pmem index (or empty_index), the minimum
|
||||
%% sequence number in the cache and the maximum sequence number.
|
||||
%%
|
||||
%% If the penciller does not have capacity for the pushed cache it will
|
||||
%% respond with the atom 'returned'. This is a signal to hold the memory
|
||||
%% at the Bookie, and try again soon. This normally only occurs when there
|
||||
%% is a backlog of merges - so the bookie should backoff for longer each time.
|
||||
pcl_pushmem(Pid, LedgerCache) ->
|
||||
%% Bookie to dump memory onto penciller
|
||||
gen_server:call(Pid, {push_mem, LedgerCache}, infinity).
|
||||
|
||||
-spec pcl_fetchlevelzero(pid(), integer()) -> tuple().
|
||||
%% @doc
|
||||
%% Allows a single slot of the penciller's levelzero cache to be fetched. The
|
||||
%% levelzero cache can be up to 40K keys - sending this to the process that is
|
||||
%% persisting this in a SST file in a single cast will lock the process for
|
||||
%% 30-40ms. This allows that process to fetch this slot by slot, so that
|
||||
%% this is split into a series of smaller events.
|
||||
%%
|
||||
%% The return value will be a leveled_skiplist that forms that part of the
|
||||
%% cache
|
||||
pcl_fetchlevelzero(Pid, Slot) ->
|
||||
%% Timeout to cause crash of L0 file when it can't get the close signal
|
||||
%% as it is deadlocked making this call.
|
||||
|
@ -263,7 +303,17 @@ pcl_fetchlevelzero(Pid, Slot) ->
|
|||
%% If the timeout gets hit outside of close scenario the Penciller will
|
||||
%% be stuck in L0 pending
|
||||
gen_server:call(Pid, {fetch_levelzero, Slot}, 60000).
|
||||
|
||||
|
||||
-spec pcl_fetch(pid(), tuple()) -> {tuple(), tuple()}|not_present.
|
||||
%% @doc
|
||||
%% Fetch a key, return the first (highest SQN) occurrence of that Key along
|
||||
%% with the value.
|
||||
%%
|
||||
%% The Key needs to be hashable (i.e. have a tag which indicates that the key
|
||||
%% can be looked up) - index entries are not hashable for example.
|
||||
%%
|
||||
%% If the hash is already knonw, call pcl_fetch/3 as magic_hash is a
|
||||
%% relatively expensive hash function
|
||||
pcl_fetch(Pid, Key) ->
|
||||
Hash = leveled_codec:magic_hash(Key),
|
||||
if
|
||||
|
@ -271,19 +321,48 @@ pcl_fetch(Pid, Key) ->
|
|||
gen_server:call(Pid, {fetch, Key, Hash}, infinity)
|
||||
end.
|
||||
|
||||
-spec pcl_fetch(pid(), tuple(), integer()) -> {tuple(), tuple()}|not_present.
|
||||
%% @doc
|
||||
%% Fetch a key, return the first (highest SQN) occurrence of that Key along
|
||||
%% with the value.
|
||||
%%
|
||||
%% Hash should be result of leveled_codec:magic_hash(Key)
|
||||
pcl_fetch(Pid, Key, Hash) ->
|
||||
gen_server:call(Pid, {fetch, Key, Hash}, infinity).
|
||||
|
||||
-spec pcl_fetchkeys(pid(), tuple(), tuple(), fun(), any()) -> any().
|
||||
%% @doc
|
||||
%% Run a range query between StartKey and EndKey (inclusive). This will cover
|
||||
%% all keys in the range - so must only be run against snapshots of the
|
||||
%% penciller to avoid blocking behaviour.
|
||||
%%
|
||||
%% Comparison with the upper-end of the range (EndKey) is done using
|
||||
%% leveled_codec:endkey_passed/2 - so use nulls within the tuple to manage
|
||||
%% the top of the range. Comparison with the start of the range is based on
|
||||
%% Erlang term order.
|
||||
pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc) ->
|
||||
gen_server:call(Pid,
|
||||
{fetch_keys, StartKey, EndKey, AccFun, InitAcc, -1},
|
||||
infinity).
|
||||
|
||||
-spec pcl_fetchnextkey(pid(), tuple(), tuple(), fun(), any()) -> any().
|
||||
%% @doc
|
||||
%% Run a range query between StartKey and EndKey (inclusive). This has the
|
||||
%% same constraints as pcl_fetchkeys/5, but will only return the first key
|
||||
%% found in erlang term order.
|
||||
pcl_fetchnextkey(Pid, StartKey, EndKey, AccFun, InitAcc) ->
|
||||
gen_server:call(Pid,
|
||||
{fetch_keys, StartKey, EndKey, AccFun, InitAcc, 1},
|
||||
infinity).
|
||||
|
||||
-spec pcl_checksequencenumber(pid(), tuple(), integer()) -> boolean().
|
||||
%% @doc
|
||||
%% Check if the sequence number of the passed key is not replaced by a change
|
||||
%% after the passed sequence number. Will return true if the Key is present
|
||||
%% and either is equal to, or prior to the passed SQN.
|
||||
%%
|
||||
%% If the key is not present, it will be assumed that a higher sequence number
|
||||
%% tombstone once existed, and false will be returned.
|
||||
pcl_checksequencenumber(Pid, Key, SQN) ->
|
||||
Hash = leveled_codec:magic_hash(Key),
|
||||
if
|
||||
|
@ -291,32 +370,80 @@ pcl_checksequencenumber(Pid, Key, SQN) ->
|
|||
gen_server:call(Pid, {check_sqn, Key, Hash, SQN}, infinity)
|
||||
end.
|
||||
|
||||
-spec pcl_workforclerk(pid()) -> ok.
|
||||
%% @doc
|
||||
%% A request from the clerk to check for work. If work is present the
|
||||
%% Penciller will cast back to the clerk, no response is sent to this
|
||||
%% request.
|
||||
pcl_workforclerk(Pid) ->
|
||||
gen_server:cast(Pid, work_for_clerk).
|
||||
|
||||
-spec pcl_manifestchange(pid(), tuple()) -> ok.
|
||||
%% @doc
|
||||
%% Provide a manifest record (i.e. the output of the leveled_pmanifest module)
|
||||
%% that is required to beocme the new manifest.
|
||||
pcl_manifestchange(Pid, Manifest) ->
|
||||
gen_server:cast(Pid, {manifest_change, Manifest}).
|
||||
|
||||
-spec pcl_confirml0complete(pid(), string(), tuple(), tuple()) -> ok.
|
||||
%% @doc
|
||||
%% Allows a SST writer that has written a L0 file to confirm that the file
|
||||
%% is now complete, so the filename and key ranges can be added to the
|
||||
%% manifest and the file can be used in place of the in-memory levelzero
|
||||
%% cache.
|
||||
pcl_confirml0complete(Pid, FN, StartKey, EndKey) ->
|
||||
gen_server:cast(Pid, {levelzero_complete, FN, StartKey, EndKey}).
|
||||
|
||||
-spec pcl_confirmdelete(pid(), string(), pid()) -> ok.
|
||||
%% @doc
|
||||
%% Poll from a delete_pending file requesting a message if the file is now
|
||||
%% ready for deletion (i.e. all snapshots which depend on the file have
|
||||
%% finished)
|
||||
pcl_confirmdelete(Pid, FileName, FilePid) ->
|
||||
gen_server:cast(Pid, {confirm_delete, FileName, FilePid}).
|
||||
|
||||
-spec pcl_getstartupsequencenumber(pid()) -> integer().
|
||||
%% @doc
|
||||
%% At startup the penciller will get the largest sequence number that is
|
||||
%% within the persisted files. This function allows for this sequence number
|
||||
%% to be fetched - so that it can be used to determine parts of the Ledger
|
||||
%% which may have been lost in the last shutdown (so that the ledger can
|
||||
%% be reloaded from that point in the Journal)
|
||||
pcl_getstartupsequencenumber(Pid) ->
|
||||
gen_server:call(Pid, get_startup_sqn, infinity).
|
||||
|
||||
-spec pcl_registersnapshot(pid(),
|
||||
pid(),
|
||||
no_lookup|{tuple(), tuple()}|undefined,
|
||||
bookies_memory(),
|
||||
boolean())
|
||||
-> {ok, pcl_state()}.
|
||||
%% @doc
|
||||
%% Register a snapshot of the penciller, returning a state record from the
|
||||
%% penciller for the snapshot to use as its LoopData
|
||||
pcl_registersnapshot(Pid, Snapshot, Query, BookiesMem, LR) ->
|
||||
gen_server:call(Pid,
|
||||
{register_snapshot, Snapshot, Query, BookiesMem, LR},
|
||||
infinity).
|
||||
|
||||
-spec pcl_releasesnapshot(pid(), pid()) -> ok.
|
||||
%% @doc
|
||||
%% Inform the primary penciller that a snapshot is finished, so that the
|
||||
%% penciller can allow deletes to proceed if appropriate.
|
||||
pcl_releasesnapshot(Pid, Snapshot) ->
|
||||
gen_server:cast(Pid, {release_snapshot, Snapshot}).
|
||||
|
||||
-spec pcl_close(pid()) -> ok.
|
||||
%% @doc
|
||||
%% Close the penciller neatly, trying to persist to disk anything in the memory
|
||||
pcl_close(Pid) ->
|
||||
gen_server:call(Pid, close, 60000).
|
||||
|
||||
-spec pcl_doom(pid()) -> {ok, list()}.
|
||||
%% @doc
|
||||
%% Close the penciller neatly, trying to persist to disk anything in the memory
|
||||
%% Return a list of filepaths from where files exist for this penciller (should
|
||||
%% the calling process which to erase the store).
|
||||
pcl_doom(Pid) ->
|
||||
gen_server:call(Pid, doom, 60000).
|
||||
|
||||
|
|
|
@ -73,10 +73,19 @@
|
|||
% Currently the lowest level (the largest number)
|
||||
}).
|
||||
|
||||
-type manifest() :: #manifest{}.
|
||||
-type manifest_entry() :: #manifest_entry{}.
|
||||
|
||||
%%%============================================================================
|
||||
%%% API
|
||||
%%%============================================================================
|
||||
|
||||
-spec new_manifest() -> manifest().
|
||||
%% @doc
|
||||
%% The manifest in this case is a manifest of the ledger. This contains
|
||||
%% information on the layout of the files, but also information of snapshots
|
||||
%% that may have an influence on the manifest as they require files to remain
|
||||
%% after the primary penciller is happy for them to be removed.
|
||||
new_manifest() ->
|
||||
LevelArray0 = array:new([{size, ?MAX_LEVELS + 1}, {default, []}]),
|
||||
SetLowerLevelFun =
|
||||
|
@ -94,6 +103,10 @@ new_manifest() ->
|
|||
basement = 0
|
||||
}.
|
||||
|
||||
-spec open_manifest(string()) -> manifest().
|
||||
%% @doc
|
||||
%% Open a manifest in the appropriate sub-directory of the RootPath, and will
|
||||
%% return an empty manifest if no such manifest is present.
|
||||
open_manifest(RootPath) ->
|
||||
% Open the manifest in the file path which has the highest SQN, and will
|
||||
% open without error
|
||||
|
@ -113,12 +126,22 @@ open_manifest(RootPath) ->
|
|||
[],
|
||||
Filenames))),
|
||||
open_manifestfile(RootPath, ValidManSQNs).
|
||||
|
||||
|
||||
-spec copy_manifest(manifest()) -> manifest().
|
||||
%% @doc
|
||||
%% Used to pass the manifest to a snapshot, removing information not required
|
||||
%% by a snapshot
|
||||
copy_manifest(Manifest) ->
|
||||
% Copy the manifest ensuring anything only the master process should care
|
||||
% about is switched to undefined
|
||||
Manifest#manifest{snapshots = undefined, pending_deletes = undefined}.
|
||||
|
||||
-spec load_manifest(manifest(), fun(), fun()) -> {integer(), manifest()}.
|
||||
%% @doc
|
||||
%% Roll over the manifest starting a process to manage each file in the
|
||||
%% manifest. The PidFun should be able to return the Pid of a file process
|
||||
%% (having started one). The SQNFun will return the max sequence number
|
||||
%% of that file, if passed the Pid that owns it.
|
||||
load_manifest(Manifest, PidFun, SQNFun) ->
|
||||
UpdateLevelFun =
|
||||
fun(LevelIdx, {AccMaxSQN, AccMan}) ->
|
||||
|
@ -130,6 +153,11 @@ load_manifest(Manifest, PidFun, SQNFun) ->
|
|||
lists:foldl(UpdateLevelFun, {0, Manifest},
|
||||
lists:seq(0, Manifest#manifest.basement)).
|
||||
|
||||
-spec close_manifest(manifest(), fun()) -> ok.
|
||||
%% @doc
|
||||
%% Close all the files in the manifest (using CloseEntryFun to call close on
|
||||
%% a file). Firts all the files in the active manifest are called, and then
|
||||
%% any files which were pending deletion.
|
||||
close_manifest(Manifest, CloseEntryFun) ->
|
||||
CloseLevelFun =
|
||||
fun(LevelIdx) ->
|
||||
|
@ -144,6 +172,9 @@ close_manifest(Manifest, CloseEntryFun) ->
|
|||
end,
|
||||
lists:foreach(ClosePDFun, dict:to_list(Manifest#manifest.pending_deletes)).
|
||||
|
||||
-spec save_manifest(manifest(), string()) -> ok.
|
||||
%% @doc
|
||||
%% Save the manifest to file (with a checksum)
|
||||
save_manifest(Manifest, RootPath) ->
|
||||
FP = filepath(RootPath, Manifest#manifest.manifest_sqn, current_manifest),
|
||||
ManBin = term_to_binary(Manifest#manifest{snapshots = [],
|
||||
|
@ -152,7 +183,15 @@ save_manifest(Manifest, RootPath) ->
|
|||
CRC = erlang:crc32(ManBin),
|
||||
ok = file:write_file(FP, <<CRC:32/integer, ManBin/binary>>).
|
||||
|
||||
|
||||
-spec replace_manifest_entry(manifest(), integer(), integer(),
|
||||
list()|manifest_entry(),
|
||||
list()|manifest_entry()) -> manifest().
|
||||
%% @doc
|
||||
%% Replace a list of manifest entries in the manifest with a new set of entries
|
||||
%% Pass in the new manifest SQN to be used for this manifest. The list of
|
||||
%% entries can just be a single entry
|
||||
%%
|
||||
%% This is generally called on the level being merged down into.
|
||||
replace_manifest_entry(Manifest, ManSQN, LevelIdx, Removals, Additions) ->
|
||||
Levels = Manifest#manifest.levels,
|
||||
Level = array:get(LevelIdx, Levels),
|
||||
|
@ -176,6 +215,11 @@ replace_manifest_entry(Manifest, ManSQN, LevelIdx, Removals, Additions) ->
|
|||
pending_deletes = PendingDeletes}
|
||||
end.
|
||||
|
||||
-spec insert_manifest_entry(manifest(), integer(), integer(),
|
||||
list()|manifest_entry()) -> manifest().
|
||||
%% @doc
|
||||
%% Place a single new manifest entry into a level of the manifest, at a given
|
||||
%% level and manifest sequence number
|
||||
insert_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) ->
|
||||
Levels = Manifest#manifest.levels,
|
||||
Level = array:get(LevelIdx, Levels),
|
||||
|
@ -186,6 +230,10 @@ insert_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) ->
|
|||
basement = Basement,
|
||||
manifest_sqn = ManSQN}.
|
||||
|
||||
-spec remove_manifest_entry(manifest(), integer(), integer(),
|
||||
list()|manifest_entry()) -> manifest().
|
||||
%% @doc
|
||||
%% Remove a manifest entry (as it has been merged into the level below)
|
||||
remove_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) ->
|
||||
Levels = Manifest#manifest.levels,
|
||||
Level = array:get(LevelIdx, Levels),
|
||||
|
@ -207,6 +255,11 @@ remove_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) ->
|
|||
pending_deletes = PendingDeletes}
|
||||
end.
|
||||
|
||||
-spec switch_manifest_entry(manifest(), integer(), integer(),
|
||||
list()|manifest_entry()) -> manifest().
|
||||
%% @doc
|
||||
%% Switch a manifest etry from this level to the level below (i.e when there
|
||||
%% are no overlapping manifest entries in the level below)
|
||||
switch_manifest_entry(Manifest, ManSQN, SrcLevel, Entry) ->
|
||||
% Move to level below - so needs to be removed but not marked as a
|
||||
% pending deletion
|
||||
|
@ -219,9 +272,16 @@ switch_manifest_entry(Manifest, ManSQN, SrcLevel, Entry) ->
|
|||
SrcLevel + 1,
|
||||
Entry).
|
||||
|
||||
-spec get_manifest_sqn(manifest()) -> integer().
|
||||
%% @doc
|
||||
%% Return the manifest SQN for this manifest
|
||||
get_manifest_sqn(Manifest) ->
|
||||
Manifest#manifest.manifest_sqn.
|
||||
|
||||
-spec key_lookup(manifest(), integer(), tuple()) -> false|manifest_entry().
|
||||
%% @doc
|
||||
%% For a given key find which manifest entry covers that key at that level,
|
||||
%% returning false if there is no covering manifest entry at that level.
|
||||
key_lookup(Manifest, LevelIdx, Key) ->
|
||||
case LevelIdx > Manifest#manifest.basement of
|
||||
true ->
|
||||
|
@ -232,6 +292,10 @@ key_lookup(Manifest, LevelIdx, Key) ->
|
|||
Key)
|
||||
end.
|
||||
|
||||
-spec range_lookup(manifest(), integer(), tuple(), tuple()) -> list().
|
||||
%% @doc
|
||||
%% Return a list of manifest_entry pointers at this level which cover the
|
||||
%% key query range.
|
||||
range_lookup(Manifest, LevelIdx, StartKey, EndKey) ->
|
||||
MakePointerFun =
|
||||
fun(M) ->
|
||||
|
@ -239,6 +303,11 @@ range_lookup(Manifest, LevelIdx, StartKey, EndKey) ->
|
|||
end,
|
||||
range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun).
|
||||
|
||||
-spec merge_lookup(manifest(), integer(), tuple(), tuple()) -> list().
|
||||
%% @doc
|
||||
%% Return a list of manifest_entry pointers at this level which cover the
|
||||
%% key query range, only all keys in the files should be included in the
|
||||
%% pointers, not just the queries in the range.
|
||||
merge_lookup(Manifest, LevelIdx, StartKey, EndKey) ->
|
||||
MakePointerFun =
|
||||
fun(M) ->
|
||||
|
@ -247,7 +316,8 @@ merge_lookup(Manifest, LevelIdx, StartKey, EndKey) ->
|
|||
range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun).
|
||||
|
||||
|
||||
|
||||
-spec mergefile_selector(manifest(), integer()) -> manifest_entry().
|
||||
%% @doc
|
||||
%% An algorithm for discovering which files to merge ....
|
||||
%% We can find the most optimal file:
|
||||
%% - The one with the most overlapping data below?
|
||||
|
@ -267,14 +337,25 @@ mergefile_selector(Manifest, LevelIdx) ->
|
|||
{_SK, ME} = lists:nth(random:uniform(length(Level)), Level),
|
||||
ME.
|
||||
|
||||
%% When the cllerk returns an update manifest to the penciller, the penciller
|
||||
%% should restore its view of the snapshots to that manifest
|
||||
-spec merge_snapshot(manifest(), manifest()) -> manifest().
|
||||
%% @doc
|
||||
%% When the clerk returns an updated manifest to the penciller, the penciller
|
||||
%% should restore its view of the snapshots to that manifest. Snapshots can
|
||||
%% be received in parallel to the manifest ebing updated, so the updated
|
||||
%% manifest must not trample over any accrued state in the manifest.
|
||||
merge_snapshot(PencillerManifest, ClerkManifest) ->
|
||||
ClerkManifest#manifest{snapshots =
|
||||
PencillerManifest#manifest.snapshots,
|
||||
min_snapshot_sqn =
|
||||
PencillerManifest#manifest.min_snapshot_sqn}.
|
||||
|
||||
-spec add_snapshot(manifest(), pid()|atom(), integer()) -> manifest().
|
||||
%% @doc
|
||||
%% Add a snapshot reference to the manifest, withe rusing the pid or an atom
|
||||
%% known to reference a special process. The timeout should be in seconds, and
|
||||
%% the snapshot will assume to have expired at timeout (and so at that stage
|
||||
%% files which depended on the snapshot will potentially expire, and if the
|
||||
%% clone is still active it may crash)
|
||||
add_snapshot(Manifest, Pid, Timeout) ->
|
||||
SnapEntry = {Pid, Manifest#manifest.manifest_sqn, seconds_now(), Timeout},
|
||||
SnapList0 = [SnapEntry|Manifest#manifest.snapshots],
|
||||
|
@ -289,6 +370,9 @@ add_snapshot(Manifest, Pid, Timeout) ->
|
|||
min_snapshot_sqn = N0}
|
||||
end.
|
||||
|
||||
-spec release_snapshot(manifest(), pid()|atom()) -> manifest().
|
||||
%% @doc
|
||||
%% When a clone is complete the release should be notified to the manifest.
|
||||
release_snapshot(Manifest, Pid) ->
|
||||
FilterFun =
|
||||
fun({P, SQN, ST, TO}, {Acc, MinSQN, Found}) ->
|
||||
|
@ -324,6 +408,10 @@ release_snapshot(Manifest, Pid) ->
|
|||
min_snapshot_sqn = MinSnapSQN}
|
||||
end.
|
||||
|
||||
-spec ready_to_delete(manifest(), string()) -> {boolean(), manifest()}.
|
||||
%% @doc
|
||||
%% A SST file which is in the delete_pending state can check to see if it is
|
||||
%% ready to delete against the manifest.
|
||||
ready_to_delete(Manifest, Filename) ->
|
||||
{ChangeSQN, _ME} = dict:fetch(Filename, Manifest#manifest.pending_deletes),
|
||||
case Manifest#manifest.min_snapshot_sqn of
|
||||
|
@ -343,6 +431,18 @@ ready_to_delete(Manifest, Filename) ->
|
|||
{false, release_snapshot(Manifest, ?PHANTOM_PID)}
|
||||
end.
|
||||
|
||||
-spec check_for_work(manifest(), list()) -> {list(), integer()}.
|
||||
%% @doc
|
||||
%% Check for compaction work in the manifest - look at levels which contain
|
||||
%% more files in the threshold.
|
||||
%%
|
||||
%% File count determines size in leveled (unlike leveldb which works on the
|
||||
%% total data volume). Files are fixed size in terms of keys, and the size of
|
||||
%% metadata is assumed to be contianed and regular and so uninteresting for
|
||||
%% level sizing.
|
||||
%%
|
||||
%% Return a list of levels which are over-sized as well as the total items
|
||||
%% across the manifest which are beyond the size (the total work outstanding).
|
||||
check_for_work(Manifest, Thresholds) ->
|
||||
CheckLevelFun =
|
||||
fun({LevelIdx, MaxCount}, {AccL, AccC}) ->
|
||||
|
@ -362,9 +462,17 @@ check_for_work(Manifest, Thresholds) ->
|
|||
end,
|
||||
lists:foldr(CheckLevelFun, {[], 0}, Thresholds).
|
||||
|
||||
-spec is_basement(manifest(), integer()) -> boolean().
|
||||
%% @doc
|
||||
%% Is this level the lowest in the manifest which contains active files. When
|
||||
%% merging down to the basement level special rules may apply (for example to
|
||||
%% reap tombstones)
|
||||
is_basement(Manifest, Level) ->
|
||||
Level >= Manifest#manifest.basement.
|
||||
|
||||
-spec levelzero_present(manifest()) -> boolean().
|
||||
%% @doc
|
||||
%% Is there a file in level zero (as only one file only can be in level zero).
|
||||
levelzero_present(Manifest) ->
|
||||
not is_empty(0, array:get(0, Manifest#manifest.levels)).
|
||||
|
||||
|
@ -688,7 +796,7 @@ initial_setup() ->
|
|||
owner="pid_z6"},
|
||||
|
||||
Man0 = new_manifest(),
|
||||
% insert_manifest_entry(Manifest, ManSQN, Level, Entry)
|
||||
|
||||
Man1 = insert_manifest_entry(Man0, 1, 1, E1),
|
||||
Man2 = insert_manifest_entry(Man1, 1, 1, E2),
|
||||
Man3 = insert_manifest_entry(Man2, 1, 1, E3),
|
||||
|
@ -747,11 +855,9 @@ random_select_test() ->
|
|||
L1File = mergefile_selector(LastManifest, 1),
|
||||
% This blows up if the function is not prepared for the different format
|
||||
% https://github.com/martinsumner/leveled/issues/43
|
||||
L2File = mergefile_selector(LastManifest, 2),
|
||||
_L2File = mergefile_selector(LastManifest, 2),
|
||||
Level1 = array:get(1, LastManifest#manifest.levels),
|
||||
?assertMatch(true, lists:member(L1File, Level1)),
|
||||
?assertMatch(true, is_record(L1File, manifest_entry)),
|
||||
?assertMatch(true, is_record(L2File, manifest_entry)).
|
||||
?assertMatch(true, lists:member(L1File, Level1)).
|
||||
|
||||
keylookup_manifest_test() ->
|
||||
{Man0, Man1, Man2, Man3, _Man4, _Man5, Man6} = initial_setup(),
|
||||
|
@ -965,35 +1071,35 @@ snapshot_release_test() ->
|
|||
filename="Z3",
|
||||
owner="pid_z3"},
|
||||
|
||||
Man7 = add_snapshot(Man6, "pid_a1", 3600),
|
||||
Man7 = add_snapshot(Man6, pid_a1, 3600),
|
||||
Man8 = remove_manifest_entry(Man7, 2, 1, E1),
|
||||
Man9 = add_snapshot(Man8, "pid_a2", 3600),
|
||||
Man9 = add_snapshot(Man8, pid_a2, 3600),
|
||||
Man10 = remove_manifest_entry(Man9, 3, 1, E2),
|
||||
Man11 = add_snapshot(Man10, "pid_a3", 3600),
|
||||
Man11 = add_snapshot(Man10, pid_a3, 3600),
|
||||
Man12 = remove_manifest_entry(Man11, 4, 1, E3),
|
||||
Man13 = add_snapshot(Man12, "pid_a4", 3600),
|
||||
Man13 = add_snapshot(Man12, pid_a4, 3600),
|
||||
|
||||
?assertMatch(false, element(1, ready_to_delete(Man8, "Z1"))),
|
||||
?assertMatch(false, element(1, ready_to_delete(Man10, "Z2"))),
|
||||
?assertMatch(false, element(1, ready_to_delete(Man12, "Z3"))),
|
||||
|
||||
Man14 = release_snapshot(Man13, "pid_a1"),
|
||||
Man14 = release_snapshot(Man13, pid_a1),
|
||||
?assertMatch(false, element(1, ready_to_delete(Man14, "Z2"))),
|
||||
?assertMatch(false, element(1, ready_to_delete(Man14, "Z3"))),
|
||||
{Bool14, Man15} = ready_to_delete(Man14, "Z1"),
|
||||
?assertMatch(true, Bool14),
|
||||
|
||||
%This doesn't change anything - released snaphsot not the min
|
||||
Man16 = release_snapshot(Man15, "pid_a4"),
|
||||
Man16 = release_snapshot(Man15, pid_a4),
|
||||
?assertMatch(false, element(1, ready_to_delete(Man16, "Z2"))),
|
||||
?assertMatch(false, element(1, ready_to_delete(Man16, "Z3"))),
|
||||
|
||||
Man17 = release_snapshot(Man16, "pid_a2"),
|
||||
Man17 = release_snapshot(Man16, pid_a2),
|
||||
?assertMatch(false, element(1, ready_to_delete(Man17, "Z3"))),
|
||||
{Bool17, Man18} = ready_to_delete(Man17, "Z2"),
|
||||
?assertMatch(true, Bool17),
|
||||
|
||||
Man19 = release_snapshot(Man18, "pid_a3"),
|
||||
Man19 = release_snapshot(Man18, pid_a3),
|
||||
|
||||
io:format("MinSnapSQN ~w~n", [Man19#manifest.min_snapshot_sqn]),
|
||||
|
||||
|
@ -1003,11 +1109,11 @@ snapshot_release_test() ->
|
|||
|
||||
snapshot_timeout_test() ->
|
||||
Man6 = element(7, initial_setup()),
|
||||
Man7 = add_snapshot(Man6, "pid_a1", 3600),
|
||||
Man7 = add_snapshot(Man6, pid_a1, 3600),
|
||||
?assertMatch(1, length(Man7#manifest.snapshots)),
|
||||
Man8 = release_snapshot(Man7, "pid_a1"),
|
||||
Man8 = release_snapshot(Man7, pid_a1),
|
||||
?assertMatch(0, length(Man8#manifest.snapshots)),
|
||||
Man9 = add_snapshot(Man8, "pid_a1", 0),
|
||||
Man9 = add_snapshot(Man8, pid_a1, 0),
|
||||
timer:sleep(2001),
|
||||
?assertMatch(1, length(Man9#manifest.snapshots)),
|
||||
Man10 = release_snapshot(Man9, ?PHANTOM_PID),
|
||||
|
|
|
@ -43,11 +43,20 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-type index_array() :: array:array().
|
||||
|
||||
%%%============================================================================
|
||||
%%% API
|
||||
%%%============================================================================
|
||||
|
||||
-spec prepare_for_index(index_array(), integer()|no_lookup) -> index_array().
|
||||
%% @doc
|
||||
%% Add the hash of a key to the index. This is 'prepared' in the sense that
|
||||
%% this index is not use until it is loaded into the main index.
|
||||
%%
|
||||
%% prepare_for_index is called from the Bookie when been added to the ledger
|
||||
%% cache, but the index is not used until that ledger cache is in the
|
||||
%% penciller L0 memory
|
||||
prepare_for_index(IndexArray, no_lookup) ->
|
||||
IndexArray;
|
||||
prepare_for_index(IndexArray, Hash) ->
|
||||
|
@ -55,7 +64,55 @@ prepare_for_index(IndexArray, Hash) ->
|
|||
Bin = array:get(Slot, IndexArray),
|
||||
array:set(Slot, <<Bin/binary, 1:1/integer, H0:23/integer>>, IndexArray).
|
||||
|
||||
-spec add_to_index(index_array(), index_array(), integer()) -> index_array().
|
||||
%% @doc
|
||||
%% Expand the penciller's current index array with the details from a new
|
||||
%% ledger cache tree sent from the Bookie. The tree will have a cache slot
|
||||
%% which is the index of this ledger_cache in the list of the ledger_caches
|
||||
add_to_index(LM1Array, L0Index, CacheSlot) when CacheSlot < 128 ->
|
||||
IndexAddFun =
|
||||
fun(Slot, Acc) ->
|
||||
Bin0 = array:get(Slot, Acc),
|
||||
BinLM1 = array:get(Slot, LM1Array),
|
||||
array:set(Slot,
|
||||
<<Bin0/binary,
|
||||
0:1/integer, CacheSlot:7/integer,
|
||||
BinLM1/binary>>,
|
||||
Acc)
|
||||
end,
|
||||
lists:foldl(IndexAddFun, L0Index, lists:seq(0, 255)).
|
||||
|
||||
-spec new_index() -> index_array().
|
||||
%% @doc
|
||||
%% Create a new index array
|
||||
new_index() ->
|
||||
array:new([{size, 256}, {default, <<>>}]).
|
||||
|
||||
-spec clear_index(index_array()) -> index_array().
|
||||
%% @doc
|
||||
%% Create a new index array
|
||||
clear_index(_L0Index) ->
|
||||
new_index().
|
||||
|
||||
-spec check_index(integer(), index_array()) -> list(integer()).
|
||||
%% @doc
|
||||
%% return a list of positions in the list of cache arrays that may contain the
|
||||
%% key associated with the hash being checked
|
||||
check_index(Hash, L0Index) ->
|
||||
{Slot, H0} = split_hash(Hash),
|
||||
Bin = array:get(Slot, L0Index),
|
||||
find_pos(Bin, H0, [], 0).
|
||||
|
||||
|
||||
-spec add_to_cache(integer(),
|
||||
{tuple(), integer(), integer()},
|
||||
integer(),
|
||||
list()) ->
|
||||
{integer(), integer(), list()}.
|
||||
%% @doc
|
||||
%% The penciller's cache is a list of leveled_trees, this adds a new tree to
|
||||
%% that cache, providing an update to the approximate size of the cache and
|
||||
%% the Ledger's SQN.
|
||||
add_to_cache(L0Size, {LevelMinus1, MinSQN, MaxSQN}, LedgerSQN, TreeList) ->
|
||||
LM1Size = leveled_tree:tsize(LevelMinus1),
|
||||
case LM1Size of
|
||||
|
@ -70,30 +127,14 @@ add_to_cache(L0Size, {LevelMinus1, MinSQN, MaxSQN}, LedgerSQN, TreeList) ->
|
|||
end
|
||||
end.
|
||||
|
||||
add_to_index(LM1Array, L0Index, CacheSlot) when CacheSlot < 128 ->
|
||||
IndexAddFun =
|
||||
fun(Slot, Acc) ->
|
||||
Bin0 = array:get(Slot, Acc),
|
||||
BinLM1 = array:get(Slot, LM1Array),
|
||||
array:set(Slot,
|
||||
<<Bin0/binary,
|
||||
0:1/integer, CacheSlot:7/integer,
|
||||
BinLM1/binary>>,
|
||||
Acc)
|
||||
end,
|
||||
lists:foldl(IndexAddFun, L0Index, lists:seq(0, 255)).
|
||||
|
||||
new_index() ->
|
||||
array:new([{size, 256}, {default, <<>>}]).
|
||||
|
||||
clear_index(_L0Index) ->
|
||||
new_index().
|
||||
|
||||
check_index(Hash, L0Index) ->
|
||||
{Slot, H0} = split_hash(Hash),
|
||||
Bin = array:get(Slot, L0Index),
|
||||
find_pos(Bin, H0, [], 0).
|
||||
|
||||
-spec to_list(integer(), fun()) -> list().
|
||||
%% @doc
|
||||
%% The cache is a list of leveled_trees of length Slots. This will fetch
|
||||
%% each tree in turn by slot ID and then produce a merged/sorted output of
|
||||
%% Keys and Values (to load into a SST file).
|
||||
%%
|
||||
%% Each slot is requested in turn to avoid halting the penciller whilst it
|
||||
%% does a large object copy of the whole cache.
|
||||
to_list(Slots, FetchFun) ->
|
||||
SW = os:timestamp(),
|
||||
SlotList = lists:reverse(lists:seq(1, Slots)),
|
||||
|
@ -107,10 +148,25 @@ to_list(Slots, FetchFun) ->
|
|||
leveled_log:log_timer("PM002", [length(FullList)], SW),
|
||||
FullList.
|
||||
|
||||
|
||||
-spec check_levelzero(tuple(), list(integer()), list())
|
||||
-> {boolean(), tuple|not_found}.
|
||||
%% @doc
|
||||
%% Check for the presence of a given Key in the Level Zero cache, with the
|
||||
%% index array having been checked first for a list of potential positions
|
||||
%% in the list of ledger caches - and then each potential ledger_cache being
|
||||
%% checked (with the most recently received cache being checked first) until a
|
||||
%% match is found.
|
||||
check_levelzero(Key, PosList, TreeList) ->
|
||||
check_levelzero(Key, leveled_codec:magic_hash(Key), PosList, TreeList).
|
||||
|
||||
-spec check_levelzero(tuple(), integer(), list(integer()), list())
|
||||
-> {boolean(), tuple|not_found}.
|
||||
%% @doc
|
||||
%% Check for the presence of a given Key in the Level Zero cache, with the
|
||||
%% index array having been checked first for a list of potential positions
|
||||
%% in the list of ledger caches - and then each potential ledger_cache being
|
||||
%% checked (with the most recently received cache being checked first) until a
|
||||
%% match is found.
|
||||
check_levelzero(_Key, _Hash, _PosList, []) ->
|
||||
{false, not_found};
|
||||
check_levelzero(_Key, _Hash, [], _TreeList) ->
|
||||
|
@ -118,7 +174,11 @@ check_levelzero(_Key, _Hash, [], _TreeList) ->
|
|||
check_levelzero(Key, Hash, PosList, TreeList) ->
|
||||
check_slotlist(Key, Hash, PosList, TreeList).
|
||||
|
||||
|
||||
-spec merge_trees(tuple(), tuple(), list(tuple()), tuple()) -> list().
|
||||
%% @doc
|
||||
%% Return a list of keys and values across the level zero cache (and the
|
||||
%% currently unmerged bookie's ledger cache) that are between StartKey
|
||||
%% and EndKey (inclusive).
|
||||
merge_trees(StartKey, EndKey, TreeList, LevelMinus1) ->
|
||||
lists:foldl(fun(Tree, Acc) ->
|
||||
R = leveled_tree:match_range(StartKey,
|
||||
|
|
|
@ -25,7 +25,9 @@
|
|||
%%% API
|
||||
%%%============================================================================
|
||||
|
||||
|
||||
-spec create_bloom(list(integer())) -> binary().
|
||||
%% @doc
|
||||
%% Create a binary bloom filter from alist of hashes
|
||||
create_bloom(HashList) ->
|
||||
case length(HashList) of
|
||||
0 ->
|
||||
|
@ -41,6 +43,9 @@ create_bloom(HashList) ->
|
|||
add_hashlist(HashList, 1, 0, 0)
|
||||
end.
|
||||
|
||||
-spec check_hash(integer(), binary()) -> boolean().
|
||||
%% @doc
|
||||
%% Check for the presence of a given hash within a bloom
|
||||
check_hash(_Hash, <<>>) ->
|
||||
false;
|
||||
check_hash(Hash, BloomBin) ->
|
||||
|
|
|
@ -30,21 +30,48 @@
|
|||
|
||||
-define(SKIP_WIDTH, 16).
|
||||
|
||||
-type tree_type() :: tree|idxt|skpl.
|
||||
-type leveled_tree() :: {tree_type(),
|
||||
integer(), % length
|
||||
any()}.
|
||||
|
||||
%%%============================================================================
|
||||
%%% API
|
||||
%%%============================================================================
|
||||
|
||||
-spec from_orderedset(ets:tab(), tree_type()) -> leveled_tree().
|
||||
%% @doc
|
||||
%% Convert an ETS table of Keys and Values (of table type ordered_set) into a
|
||||
%% leveled_tree of the given type.
|
||||
from_orderedset(Table, Type) ->
|
||||
from_orderedlist(ets:tab2list(Table), Type, ?SKIP_WIDTH).
|
||||
|
||||
-spec from_orderedset(ets:tab(), tree_type(), integer()|auto)
|
||||
-> leveled_tree().
|
||||
%% @doc
|
||||
%% Convert an ETS table of Keys and Values (of table type ordered_set) into a
|
||||
%% leveled_tree of the given type. The SkipWidth is an integer representing
|
||||
%% the underlying list size joined in the tree (the trees are all trees of
|
||||
%% lists of this size). For the skpl type the width can be auto-sized based
|
||||
%% on the length
|
||||
from_orderedset(Table, Type, SkipWidth) ->
|
||||
from_orderedlist(ets:tab2list(Table), Type, SkipWidth).
|
||||
|
||||
|
||||
-spec from_orderedlist(list(tuple()), tree_type()) -> leveled_tree().
|
||||
%% @doc
|
||||
%% Convert a list of Keys and Values (of table type ordered_set) into a
|
||||
%% leveled_tree of the given type.
|
||||
from_orderedlist(OrderedList, Type) ->
|
||||
from_orderedlist(OrderedList, Type, ?SKIP_WIDTH).
|
||||
|
||||
-spec from_orderedlist(list(tuple()), tree_type(), integer()|auto)
|
||||
-> leveled_tree().
|
||||
%% @doc
|
||||
%% Convert a list of Keys and Values (of table type ordered_set) into a
|
||||
%% leveled_tree of the given type. The SkipWidth is an integer representing
|
||||
%% the underlying list size joined in the tree (the trees are all trees of
|
||||
%% lists of this size). For the skpl type the width can be auto-sized based
|
||||
%% on the length
|
||||
from_orderedlist(OrderedList, tree, SkipWidth) ->
|
||||
L = length(OrderedList),
|
||||
{tree, L, tree_fromorderedlist(OrderedList, [], L, SkipWidth)};
|
||||
|
@ -63,7 +90,11 @@ from_orderedlist(OrderedList, skpl, _SkipWidth) ->
|
|||
end,
|
||||
{skpl, L, skpl_fromorderedlist(OrderedList, L, SkipWidth, 2)}.
|
||||
|
||||
|
||||
-spec match(tuple()|integer(), leveled_tree()) -> none|{value, any()}.
|
||||
%% @doc
|
||||
%% Return the value from a tree associated with an exact match for the given
|
||||
%% key. This assumes the tree contains the actual keys and values to be
|
||||
%% macthed against, not a manifest representing ranges of keys and values.
|
||||
match(Key, {tree, _L, Tree}) ->
|
||||
Iter = tree_iterator_from(Key, Tree),
|
||||
case tree_next(Iter) of
|
||||
|
@ -84,6 +115,12 @@ match(Key, {skpl, _L, SkipList}) ->
|
|||
SL0 = skpl_getsublist(Key, SkipList),
|
||||
lookup_match(Key, SL0).
|
||||
|
||||
-spec search(tuple()|integer(), leveled_tree(), fun()) -> none|tuple().
|
||||
%% @doc
|
||||
%% Search is used when the tree is a manifest of key ranges and it is necessary
|
||||
%% to find a rnage which may contain the key. The StartKeyFun is used if the
|
||||
%% values contain extra information that can be used to determine if the key is
|
||||
%% or is not present.
|
||||
search(Key, {tree, _L, Tree}, StartKeyFun) ->
|
||||
Iter = tree_iterator_from(Key, Tree),
|
||||
case tree_next(Iter) of
|
||||
|
@ -126,6 +163,18 @@ search(Key, {skpl, _L, SkipList}, StartKeyFun) ->
|
|||
none
|
||||
end.
|
||||
|
||||
-spec match_range(tuple()|integer()|all,
|
||||
tuple()|integer()|all,
|
||||
leveled_tree())
|
||||
-> list().
|
||||
%% @doc
|
||||
%% Return a range of value between trees from a tree associated with an
|
||||
%% exact match for the given key. This assumes the tree contains the actual
|
||||
%% keys and values to be macthed against, not a manifest representing ranges
|
||||
%% of keys and values.
|
||||
%%
|
||||
%% The keyword all can be used as a substitute for the StartKey to remove a
|
||||
%% constraint from the range.
|
||||
match_range(StartRange, EndRange, Tree) ->
|
||||
EndRangeFun =
|
||||
fun(ER, FirstRHSKey, _FirstRHSValue) ->
|
||||
|
@ -133,6 +182,15 @@ match_range(StartRange, EndRange, Tree) ->
|
|||
end,
|
||||
match_range(StartRange, EndRange, Tree, EndRangeFun).
|
||||
|
||||
-spec match_range(tuple()|integer()|all,
|
||||
tuple()|integer()|all,
|
||||
leveled_tree(),
|
||||
fun())
|
||||
-> list().
|
||||
%% @doc
|
||||
%% As match_range/3 but a function can be passed to be used when comparing the
|
||||
%5 EndKey with a key in the tree (such as leveled_codec:endkey_passed), where
|
||||
%% Erlang term comparison will not give the desired result.
|
||||
match_range(StartRange, EndRange, {tree, _L, Tree}, EndRangeFun) ->
|
||||
treelookup_range_start(StartRange, EndRange, Tree, EndRangeFun);
|
||||
match_range(StartRange, EndRange, {idxt, _L, Tree}, EndRangeFun) ->
|
||||
|
@ -140,7 +198,18 @@ match_range(StartRange, EndRange, {idxt, _L, Tree}, EndRangeFun) ->
|
|||
match_range(StartRange, EndRange, {skpl, _L, SkipList}, EndRangeFun) ->
|
||||
skpllookup_to_range(StartRange, EndRange, SkipList, EndRangeFun).
|
||||
|
||||
|
||||
-spec search_range(tuple()|integer()|all,
|
||||
tuple()|integer()|all,
|
||||
leveled_tree(),
|
||||
fun())
|
||||
-> list().
|
||||
%% @doc
|
||||
%% Extract a range from a tree, with search used when the tree is a manifest
|
||||
%% of key ranges and it is necessary to find a rnage which may encapsulate the
|
||||
%% key range.
|
||||
%%
|
||||
%% The StartKeyFun is used if the values contain extra information that can be
|
||||
%% used to determine if the key is or is not present.
|
||||
search_range(StartRange, EndRange, Tree, StartKeyFun) ->
|
||||
EndRangeFun =
|
||||
fun(ER, _FirstRHSKey, FirstRHSValue) ->
|
||||
|
@ -156,7 +225,9 @@ search_range(StartRange, EndRange, Tree, StartKeyFun) ->
|
|||
skpllookup_to_range(StartRange, EndRange, SL, EndRangeFun)
|
||||
end.
|
||||
|
||||
|
||||
-spec to_list(leveled_tree()) -> list().
|
||||
%% @doc
|
||||
%% Collapse the tree back to a list
|
||||
to_list({tree, _L, Tree}) ->
|
||||
FoldFun =
|
||||
fun({_MK, SL}, Acc) ->
|
||||
|
@ -176,10 +247,15 @@ to_list({skpl, _L, SkipList}) ->
|
|||
lists:append(Lv0List).
|
||||
|
||||
|
||||
|
||||
-spec tsize(leveled_tree()) -> integer().
|
||||
%% @doc
|
||||
%% Return the count of items in a tree
|
||||
tsize({_Type, L, _Tree}) ->
|
||||
L.
|
||||
|
||||
-spec empty(tree_type()) -> leveled_tree().
|
||||
%% @doc
|
||||
%% Return an empty tree of the given type
|
||||
empty(tree) ->
|
||||
{tree, 0, empty_tree()};
|
||||
empty(idxt) ->
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue