Merge branch 'master' into mas-aae-segementfoldplus
This commit is contained in:
commit
4c05dc79f9
6 changed files with 295 additions and 129 deletions
|
@ -104,11 +104,15 @@
|
||||||
|
|
||||||
|
|
||||||
-type book_state() :: #state{}.
|
-type book_state() :: #state{}.
|
||||||
|
-type sync_mode() :: sync|none|riak_sync.
|
||||||
|
-type ledger_cache() :: #ledger_cache{}.
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
|
-spec book_start(string(), integer(), integer(), sync_mode()) -> {ok, pid()}.
|
||||||
|
|
||||||
%% @doc Start a Leveled Key/Value store - limited options support.
|
%% @doc Start a Leveled Key/Value store - limited options support.
|
||||||
%%
|
%%
|
||||||
%% The most common startup parameters are extracted out from the options to
|
%% The most common startup parameters are extracted out from the options to
|
||||||
|
@ -142,6 +146,8 @@ book_start(RootPath, LedgerCacheSize, JournalSize, SyncStrategy) ->
|
||||||
{max_journalsize, JournalSize},
|
{max_journalsize, JournalSize},
|
||||||
{sync_strategy, SyncStrategy}]).
|
{sync_strategy, SyncStrategy}]).
|
||||||
|
|
||||||
|
-spec book_start(list(tuple())) -> {ok, pid()}.
|
||||||
|
|
||||||
%% @doc Start a Leveled Key/Value store - full options support.
|
%% @doc Start a Leveled Key/Value store - full options support.
|
||||||
%%
|
%%
|
||||||
%% Allows an options proplists to be passed for setting options. There are
|
%% Allows an options proplists to be passed for setting options. There are
|
||||||
|
@ -152,7 +158,7 @@ book_start(RootPath, LedgerCacheSize, JournalSize, SyncStrategy) ->
|
||||||
%% - compression_point
|
%% - compression_point
|
||||||
%%
|
%%
|
||||||
%% Both of the first two options relate to compaction in the Journal. The
|
%% Both of the first two options relate to compaction in the Journal. The
|
||||||
%% retain_strategydetermines if a skinny record of the object should be
|
%% retain_strategy determines if a skinny record of the object should be
|
||||||
%% retained following compaction, and how that should be used when recovering
|
%% retained following compaction, and how that should be used when recovering
|
||||||
%% lost state in the Ledger.
|
%% lost state in the Ledger.
|
||||||
%%
|
%%
|
||||||
|
@ -165,7 +171,9 @@ book_start(RootPath, LedgerCacheSize, JournalSize, SyncStrategy) ->
|
||||||
%% Currently compacted records no longer in use are not removed but moved to
|
%% Currently compacted records no longer in use are not removed but moved to
|
||||||
%% a journal_waste folder, and the waste_retention_period determines how long
|
%% a journal_waste folder, and the waste_retention_period determines how long
|
||||||
%% this history should be kept for (for example to allow for it to be backed
|
%% this history should be kept for (for example to allow for it to be backed
|
||||||
%% up before deletion).
|
%% up before deletion). If the waste_retention_period (in seconds) is
|
||||||
|
%% undefined, then there will be no holding of this waste - unused files will
|
||||||
|
%% be immediately deleted.
|
||||||
%%
|
%%
|
||||||
%% Compression method and point allow Leveled to be switched from using bif
|
%% Compression method and point allow Leveled to be switched from using bif
|
||||||
%% based compression (zlib) to suing nif based compression (lz4). The
|
%% based compression (zlib) to suing nif based compression (lz4). The
|
||||||
|
@ -195,6 +203,10 @@ book_start(RootPath, LedgerCacheSize, JournalSize, SyncStrategy) ->
|
||||||
book_start(Opts) ->
|
book_start(Opts) ->
|
||||||
gen_server:start(?MODULE, [Opts], []).
|
gen_server:start(?MODULE, [Opts], []).
|
||||||
|
|
||||||
|
|
||||||
|
-spec book_tempput(pid(), any(), any(), any(), list(), atom(), integer()) ->
|
||||||
|
ok|pause.
|
||||||
|
|
||||||
%% @doc Put an object with an expiry time
|
%% @doc Put an object with an expiry time
|
||||||
%%
|
%%
|
||||||
%% Put an item in the store but with a Time To Live - the time when the object
|
%% Put an item in the store but with a Time To Live - the time when the object
|
||||||
|
@ -258,12 +270,18 @@ book_put(Pid, Bucket, Key, Object, IndexSpecs) ->
|
||||||
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag) ->
|
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag) ->
|
||||||
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, infinity).
|
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, infinity).
|
||||||
|
|
||||||
|
-spec book_put(pid(), any(), any(), any(), list(), atom(), infinity|integer())
|
||||||
|
-> ok|pause.
|
||||||
|
|
||||||
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) ->
|
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) ->
|
||||||
gen_server:call(Pid,
|
gen_server:call(Pid,
|
||||||
{put, Bucket, Key, Object, IndexSpecs, Tag, TTL},
|
{put, Bucket, Key, Object, IndexSpecs, Tag, TTL},
|
||||||
infinity).
|
infinity).
|
||||||
|
|
||||||
%% @doc - Standard PUT
|
|
||||||
|
-spec book_delete(pid(), any(), any(), list()) -> ok|pause.
|
||||||
|
|
||||||
|
%% @doc
|
||||||
%%
|
%%
|
||||||
%% A thin wrap around the put of a special tombstone object. There is no
|
%% A thin wrap around the put of a special tombstone object. There is no
|
||||||
%% immediate reclaim of space, simply the addition of a more recent tombstone.
|
%% immediate reclaim of space, simply the addition of a more recent tombstone.
|
||||||
|
@ -271,7 +289,11 @@ book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) ->
|
||||||
book_delete(Pid, Bucket, Key, IndexSpecs) ->
|
book_delete(Pid, Bucket, Key, IndexSpecs) ->
|
||||||
book_put(Pid, Bucket, Key, delete, IndexSpecs, ?STD_TAG).
|
book_put(Pid, Bucket, Key, delete, IndexSpecs, ?STD_TAG).
|
||||||
|
|
||||||
%% @doc - GET and HAD requests
|
|
||||||
|
-spec book_get(pid(), any(), any(), atom()) -> {ok, any()}|not_found.
|
||||||
|
-spec book_head(pid(), any(), any(), atom()) -> {ok, any()}|not_found.
|
||||||
|
|
||||||
|
%% @doc - GET and HEAD requests
|
||||||
%%
|
%%
|
||||||
%% The Bookie supports both GET and HEAD requests, with the HEAD request
|
%% The Bookie supports both GET and HEAD requests, with the HEAD request
|
||||||
%% returning only the metadata and not the actual object value. The HEAD
|
%% returning only the metadata and not the actual object value. The HEAD
|
||||||
|
@ -280,11 +302,6 @@ book_delete(Pid, Bucket, Key, IndexSpecs) ->
|
||||||
%% GET requests first follow the path of a HEAD request, and if an object is
|
%% GET requests first follow the path of a HEAD request, and if an object is
|
||||||
%% found, then fetch the value from the Journal via the Inker.
|
%% found, then fetch the value from the Journal via the Inker.
|
||||||
|
|
||||||
book_get(Pid, Bucket, Key) ->
|
|
||||||
book_get(Pid, Bucket, Key, ?STD_TAG).
|
|
||||||
|
|
||||||
book_head(Pid, Bucket, Key) ->
|
|
||||||
book_head(Pid, Bucket, Key, ?STD_TAG).
|
|
||||||
|
|
||||||
book_get(Pid, Bucket, Key, Tag) ->
|
book_get(Pid, Bucket, Key, Tag) ->
|
||||||
gen_server:call(Pid, {get, Bucket, Key, Tag}, infinity).
|
gen_server:call(Pid, {get, Bucket, Key, Tag}, infinity).
|
||||||
|
@ -292,6 +309,15 @@ book_get(Pid, Bucket, Key, Tag) ->
|
||||||
book_head(Pid, Bucket, Key, Tag) ->
|
book_head(Pid, Bucket, Key, Tag) ->
|
||||||
gen_server:call(Pid, {head, Bucket, Key, Tag}, infinity).
|
gen_server:call(Pid, {head, Bucket, Key, Tag}, infinity).
|
||||||
|
|
||||||
|
book_get(Pid, Bucket, Key) ->
|
||||||
|
book_get(Pid, Bucket, Key, ?STD_TAG).
|
||||||
|
|
||||||
|
book_head(Pid, Bucket, Key) ->
|
||||||
|
book_head(Pid, Bucket, Key, ?STD_TAG).
|
||||||
|
|
||||||
|
|
||||||
|
-spec book_returnfolder(pid(), tuple()) -> {async, fun()}.
|
||||||
|
|
||||||
%% @doc Snapshots/Clones
|
%% @doc Snapshots/Clones
|
||||||
%%
|
%%
|
||||||
%% If there is a snapshot request (e.g. to iterate over the keys) the Bookie
|
%% If there is a snapshot request (e.g. to iterate over the keys) the Bookie
|
||||||
|
@ -343,6 +369,12 @@ book_head(Pid, Bucket, Key, Tag) ->
|
||||||
book_returnfolder(Pid, RunnerType) ->
|
book_returnfolder(Pid, RunnerType) ->
|
||||||
gen_server:call(Pid, {return_runner, RunnerType}, infinity).
|
gen_server:call(Pid, {return_runner, RunnerType}, infinity).
|
||||||
|
|
||||||
|
|
||||||
|
-spec book_snapshot(pid(),
|
||||||
|
store|ledger,
|
||||||
|
tuple()|undefined,
|
||||||
|
boolean()|undefined) -> {ok, pid(), pid()|null}.
|
||||||
|
|
||||||
%% @doc create a snapshot of the store
|
%% @doc create a snapshot of the store
|
||||||
%%
|
%%
|
||||||
%% Snapshot can be based on a pre-defined query (which will be used to filter
|
%% Snapshot can be based on a pre-defined query (which will be used to filter
|
||||||
|
@ -353,6 +385,10 @@ book_returnfolder(Pid, RunnerType) ->
|
||||||
book_snapshot(Pid, SnapType, Query, LongRunning) ->
|
book_snapshot(Pid, SnapType, Query, LongRunning) ->
|
||||||
gen_server:call(Pid, {snapshot, SnapType, Query, LongRunning}, infinity).
|
gen_server:call(Pid, {snapshot, SnapType, Query, LongRunning}, infinity).
|
||||||
|
|
||||||
|
|
||||||
|
-spec book_compactjournal(pid(), integer()) -> ok.
|
||||||
|
-spec book_islastcompactionpending(pid()) -> boolean().
|
||||||
|
|
||||||
%% @doc Call for compaction of the Journal
|
%% @doc Call for compaction of the Journal
|
||||||
%%
|
%%
|
||||||
%% the scheduling of Journla compaction is called externally, so it is assumed
|
%% the scheduling of Journla compaction is called externally, so it is assumed
|
||||||
|
@ -366,6 +402,10 @@ book_compactjournal(Pid, Timeout) ->
|
||||||
book_islastcompactionpending(Pid) ->
|
book_islastcompactionpending(Pid) ->
|
||||||
gen_server:call(Pid, confirm_compact, infinity).
|
gen_server:call(Pid, confirm_compact, infinity).
|
||||||
|
|
||||||
|
|
||||||
|
-spec book_close(pid()) -> ok.
|
||||||
|
-spec book_destroy(pid()) -> ok.
|
||||||
|
|
||||||
%% @doc Clean shutdown
|
%% @doc Clean shutdown
|
||||||
%%
|
%%
|
||||||
%% A clean shutdown will persist all the information in the Penciller memory
|
%% A clean shutdown will persist all the information in the Penciller memory
|
||||||
|
@ -567,11 +607,15 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%% External functions
|
%%% External functions
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
%% @doc Empty the ledger cache table following a push
|
-spec empty_ledgercache() -> ledger_cache().
|
||||||
|
%% @doc
|
||||||
|
%% Empty the ledger cache table following a push
|
||||||
empty_ledgercache() ->
|
empty_ledgercache() ->
|
||||||
#ledger_cache{mem = ets:new(empty, [ordered_set])}.
|
#ledger_cache{mem = ets:new(empty, [ordered_set])}.
|
||||||
|
|
||||||
%% @doc push the ledgercache to the Penciller - which should respond ok or
|
-spec push_ledgercache(pid(), ledger_cache()) -> ok|returned.
|
||||||
|
%% @doc
|
||||||
|
%% Push the ledgercache to the Penciller - which should respond ok or
|
||||||
%% returned. If the response is ok the cache can be flushed, but if the
|
%% returned. If the response is ok the cache can be flushed, but if the
|
||||||
%% response is returned the cache should continue to build and it should try
|
%% response is returned the cache should continue to build and it should try
|
||||||
%% to flush at a later date
|
%% to flush at a later date
|
||||||
|
@ -582,8 +626,10 @@ push_ledgercache(Penciller, Cache) ->
|
||||||
Cache#ledger_cache.max_sqn},
|
Cache#ledger_cache.max_sqn},
|
||||||
leveled_penciller:pcl_pushmem(Penciller, CacheToLoad).
|
leveled_penciller:pcl_pushmem(Penciller, CacheToLoad).
|
||||||
|
|
||||||
%% @doc the ledger cache can be built from a queue, for example when
|
-spec loadqueue_ledgercache(ledger_cache()) -> ledger_cache().
|
||||||
%% loading the ledger from the head of the journal on startup
|
%% @doc
|
||||||
|
%% The ledger cache can be built from a queue, for example when loading the
|
||||||
|
%% ledger from the head of the journal on startup
|
||||||
%%
|
%%
|
||||||
%% The queue should be build using [NewKey|Acc] so that the most recent
|
%% The queue should be build using [NewKey|Acc] so that the most recent
|
||||||
%% key is kept in the sort
|
%% key is kept in the sort
|
||||||
|
@ -592,7 +638,12 @@ loadqueue_ledgercache(Cache) ->
|
||||||
T = leveled_tree:from_orderedlist(SL, ?CACHE_TYPE),
|
T = leveled_tree:from_orderedlist(SL, ?CACHE_TYPE),
|
||||||
Cache#ledger_cache{load_queue = [], loader = T}.
|
Cache#ledger_cache{load_queue = [], loader = T}.
|
||||||
|
|
||||||
%% @doc Allow all a snapshot to be created from part of the store, preferably
|
-spec snapshot_store(ledger_cache(),
|
||||||
|
pid(), null|pid(), store|ledger,
|
||||||
|
undefined|tuple(), undefined|boolean()) ->
|
||||||
|
{ok, pid(), pid()|null}.
|
||||||
|
%% @doc
|
||||||
|
%% Allow all a snapshot to be created from part of the store, preferably
|
||||||
%% passing in a query filter so that all of the LoopState does not need to
|
%% passing in a query filter so that all of the LoopState does not need to
|
||||||
%% be copied from the real actor to the clone
|
%% be copied from the real actor to the clone
|
||||||
%%
|
%%
|
||||||
|
@ -633,6 +684,9 @@ snapshot_store(State, SnapType, Query, LongRunning) ->
|
||||||
Query,
|
Query,
|
||||||
LongRunning).
|
LongRunning).
|
||||||
|
|
||||||
|
-spec fetch_value(pid(), {any(), integer()}) -> not_present|any().
|
||||||
|
%% @doc
|
||||||
|
%% Fetch a value from the Journal
|
||||||
fetch_value(Inker, {Key, SQN}) ->
|
fetch_value(Inker, {Key, SQN}) ->
|
||||||
SW = os:timestamp(),
|
SW = os:timestamp(),
|
||||||
case leveled_inker:ink_fetch(Inker, Key, SQN) of
|
case leveled_inker:ink_fetch(Inker, Key, SQN) of
|
||||||
|
|
|
@ -66,7 +66,7 @@
|
||||||
cdb_open_writer/2,
|
cdb_open_writer/2,
|
||||||
cdb_open_reader/1,
|
cdb_open_reader/1,
|
||||||
cdb_open_reader/2,
|
cdb_open_reader/2,
|
||||||
cdb_reopen_reader/2,
|
cdb_reopen_reader/3,
|
||||||
cdb_get/2,
|
cdb_get/2,
|
||||||
cdb_put/3,
|
cdb_put/3,
|
||||||
cdb_mput/2,
|
cdb_mput/2,
|
||||||
|
@ -138,7 +138,7 @@ cdb_open_writer(Filename, Opts) ->
|
||||||
ok = gen_fsm:sync_send_event(Pid, {open_writer, Filename}, infinity),
|
ok = gen_fsm:sync_send_event(Pid, {open_writer, Filename}, infinity),
|
||||||
{ok, Pid}.
|
{ok, Pid}.
|
||||||
|
|
||||||
-spec cdb_reopen_reader(string(), binary()) -> {ok, pid()}.
|
-spec cdb_reopen_reader(string(), binary(), cdb_options()) -> {ok, pid()}.
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Open an existing file that has already been moved into read-only mode. The
|
%% Open an existing file that has already been moved into read-only mode. The
|
||||||
%% LastKey should be known, as it has been stored in the manifest. Knowing the
|
%% LastKey should be known, as it has been stored in the manifest. Knowing the
|
||||||
|
@ -147,8 +147,9 @@ cdb_open_writer(Filename, Opts) ->
|
||||||
%%
|
%%
|
||||||
%% The LastKey is the Key of the last object added to the file - and is used to
|
%% The LastKey is the Key of the last object added to the file - and is used to
|
||||||
%% determine when scans over a file have completed.
|
%% determine when scans over a file have completed.
|
||||||
cdb_reopen_reader(Filename, LastKey) ->
|
cdb_reopen_reader(Filename, LastKey, CDBopts) ->
|
||||||
{ok, Pid} = gen_fsm:start(?MODULE, [#cdb_options{binary_mode=true}], []),
|
{ok, Pid} =
|
||||||
|
gen_fsm:start(?MODULE, [CDBopts#cdb_options{binary_mode=true}], []),
|
||||||
ok = gen_fsm:sync_send_event(Pid,
|
ok = gen_fsm:sync_send_event(Pid,
|
||||||
{open_reader, Filename, LastKey},
|
{open_reader, Filename, LastKey},
|
||||||
infinity),
|
infinity),
|
||||||
|
@ -692,17 +693,19 @@ handle_info(_Msg, StateName, State) ->
|
||||||
{next_state, StateName, State}.
|
{next_state, StateName, State}.
|
||||||
|
|
||||||
terminate(Reason, StateName, State) ->
|
terminate(Reason, StateName, State) ->
|
||||||
leveled_log:log("CDB05", [State#state.filename, Reason]),
|
leveled_log:log("CDB05", [State#state.filename, StateName, Reason]),
|
||||||
case {State#state.handle, StateName, State#state.waste_path} of
|
case {State#state.handle, StateName, State#state.waste_path} of
|
||||||
{undefined, _, _} ->
|
{undefined, _, _} ->
|
||||||
ok;
|
ok;
|
||||||
{Handle, delete_pending, undefined} ->
|
{Handle, delete_pending, undefined} ->
|
||||||
ok = file:close(Handle),
|
ok = file:close(Handle),
|
||||||
ok = file:delete(State#state.filename);
|
ok = file:delete(State#state.filename),
|
||||||
|
leveled_log:log("CDB20", [State#state.filename]);
|
||||||
{Handle, delete_pending, WasteFP} ->
|
{Handle, delete_pending, WasteFP} ->
|
||||||
file:close(Handle),
|
file:close(Handle),
|
||||||
Components = filename:split(State#state.filename),
|
Components = filename:split(State#state.filename),
|
||||||
NewName = WasteFP ++ lists:last(Components),
|
NewName = WasteFP ++ lists:last(Components),
|
||||||
|
leveled_log:log("CDB19", [State#state.filename, NewName]),
|
||||||
file:rename(State#state.filename, NewName);
|
file:rename(State#state.filename, NewName);
|
||||||
{Handle, _, _} ->
|
{Handle, _, _} ->
|
||||||
file:close(Handle)
|
file:close(Handle)
|
||||||
|
@ -750,25 +753,8 @@ set_writeops(SyncStrategy) ->
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
%% from_dict(FileName,ListOfKeyValueTuples)
|
-spec open_active_file(list()) -> {integer(), ets:tid(), any()}.
|
||||||
%% Given a filename and a dictionary, create a cdb
|
%% @doc
|
||||||
%% using the key value pairs from the dict.
|
|
||||||
from_dict(FileName,Dict) ->
|
|
||||||
KeyValueList = dict:to_list(Dict),
|
|
||||||
create(FileName, KeyValueList).
|
|
||||||
|
|
||||||
%%
|
|
||||||
%% create(FileName,ListOfKeyValueTuples) -> ok
|
|
||||||
%% Given a filename and a list of {key,value} tuples,
|
|
||||||
%% this function creates a CDB
|
|
||||||
%%
|
|
||||||
create(FileName,KeyValueList) ->
|
|
||||||
{ok, Handle} = file:open(FileName, ?WRITE_OPS),
|
|
||||||
{ok, _} = file:position(Handle, {bof, ?BASE_POSITION}),
|
|
||||||
{BasePos, HashTree} = write_key_value_pairs(Handle, KeyValueList),
|
|
||||||
close_file(Handle, HashTree, BasePos).
|
|
||||||
|
|
||||||
|
|
||||||
%% Open an active file - one for which it is assumed the hash tables have not
|
%% Open an active file - one for which it is assumed the hash tables have not
|
||||||
%% yet been written
|
%% yet been written
|
||||||
%%
|
%%
|
||||||
|
@ -794,6 +780,11 @@ open_active_file(FileName) when is_list(FileName) ->
|
||||||
end,
|
end,
|
||||||
{LastPosition, HashTree, LastKey}.
|
{LastPosition, HashTree, LastKey}.
|
||||||
|
|
||||||
|
-spec put(list()|file:io_device(),
|
||||||
|
any(), any(),
|
||||||
|
{integer(), ets:tid()}, boolean(), integer())
|
||||||
|
-> roll|{file:io_device(), integer(), ets:tid()}.
|
||||||
|
%% @doc
|
||||||
%% put(Handle, Key, Value, {LastPosition, HashDict}) -> {NewPosition, KeyDict}
|
%% put(Handle, Key, Value, {LastPosition, HashDict}) -> {NewPosition, KeyDict}
|
||||||
%% Append to an active file a new key/value pair returning an updated
|
%% Append to an active file a new key/value pair returning an updated
|
||||||
%% dictionary of Keys and positions. Returns an updated Position
|
%% dictionary of Keys and positions. Returns an updated Position
|
||||||
|
@ -819,6 +810,14 @@ put(Handle, Key, Value, {LastPosition, HashTree}, BinaryMode, MaxSize) ->
|
||||||
put_hashtree(Key, LastPosition, HashTree)}
|
put_hashtree(Key, LastPosition, HashTree)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
-spec mput(file:io_device(),
|
||||||
|
list(tuple()),
|
||||||
|
{integer(), ets:tid()}, boolean(), integer())
|
||||||
|
-> roll|{file:io_device(), integer(), ets:tid(), any()}.
|
||||||
|
%% @doc
|
||||||
|
%% Multiple puts - either all will succeed or it will return roll with non
|
||||||
|
%% succeeding.
|
||||||
mput(Handle, KVList, {LastPosition, HashTree0}, BinaryMode, MaxSize) ->
|
mput(Handle, KVList, {LastPosition, HashTree0}, BinaryMode, MaxSize) ->
|
||||||
{KPList, Bin, LastKey} = multi_key_value_to_record(KVList,
|
{KPList, Bin, LastKey} = multi_key_value_to_record(KVList,
|
||||||
BinaryMode,
|
BinaryMode,
|
||||||
|
@ -837,18 +836,11 @@ mput(Handle, KVList, {LastPosition, HashTree0}, BinaryMode, MaxSize) ->
|
||||||
{Handle, PotentialNewSize, HashTree1, LastKey}
|
{Handle, PotentialNewSize, HashTree1, LastKey}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% Should not be used for non-test PUTs by the inker - as the Max File Size
|
|
||||||
%% should be taken from the startup options not the default
|
|
||||||
put(FileName, Key, Value, {LastPosition, HashTree}) ->
|
|
||||||
put(FileName, Key, Value, {LastPosition, HashTree},
|
|
||||||
?BINARY_MODE, ?MAX_FILE_SIZE).
|
|
||||||
|
|
||||||
%%
|
|
||||||
%% get(FileName,Key) -> {key,value}
|
|
||||||
%% Given a filename and a key, returns a key and value tuple.
|
|
||||||
%%
|
|
||||||
|
|
||||||
|
|
||||||
|
-spec get_withcache(file:io_device(), any(), tuple(), boolean()) -> tuple().
|
||||||
|
%% @doc
|
||||||
|
%% Using a cache of the Index array - get a K/V pair from the file using the
|
||||||
|
%% Key
|
||||||
get_withcache(Handle, Key, Cache, BinaryMode) ->
|
get_withcache(Handle, Key, Cache, BinaryMode) ->
|
||||||
get(Handle, Key, Cache, true, BinaryMode).
|
get(Handle, Key, Cache, true, BinaryMode).
|
||||||
|
|
||||||
|
@ -858,6 +850,16 @@ get_withcache(Handle, Key, Cache, QuickCheck, BinaryMode) ->
|
||||||
get(FileNameOrHandle, Key, BinaryMode) ->
|
get(FileNameOrHandle, Key, BinaryMode) ->
|
||||||
get(FileNameOrHandle, Key, no_cache, true, BinaryMode).
|
get(FileNameOrHandle, Key, no_cache, true, BinaryMode).
|
||||||
|
|
||||||
|
|
||||||
|
-spec get(list()|file:io_device(),
|
||||||
|
any(), no_cache|tuple(),
|
||||||
|
loose_presence|any(), boolean())
|
||||||
|
-> tuple()|probably|missing.
|
||||||
|
%% @doc
|
||||||
|
%% Get a K/V pair from the file using the Key. QuickCheck can be set to
|
||||||
|
%% loose_presence if all is required is a loose check of presence (that the
|
||||||
|
%% Key is probably present as there is a hash in the hash table which matches
|
||||||
|
%% that Key)
|
||||||
get(FileName, Key, Cache, QuickCheck, BinaryMode) when is_list(FileName) ->
|
get(FileName, Key, Cache, QuickCheck, BinaryMode) when is_list(FileName) ->
|
||||||
{ok, Handle} = file:open(FileName,[binary, raw, read]),
|
{ok, Handle} = file:open(FileName,[binary, raw, read]),
|
||||||
get(Handle, Key, Cache, QuickCheck, BinaryMode);
|
get(Handle, Key, Cache, QuickCheck, BinaryMode);
|
||||||
|
@ -893,11 +895,12 @@ get_index(Handle, Index, no_cache) ->
|
||||||
% Get location of hashtable and number of entries in the hash
|
% Get location of hashtable and number of entries in the hash
|
||||||
read_next_2_integers(Handle);
|
read_next_2_integers(Handle);
|
||||||
get_index(_Handle, Index, Cache) ->
|
get_index(_Handle, Index, Cache) ->
|
||||||
element(Index + 1, Cache).
|
element(Index + 1, Cache).
|
||||||
|
|
||||||
|
-spec get_mem(any(), list()|file:io_device(), ets:tid(), boolean()) ->
|
||||||
|
tuple()|probably|missing.
|
||||||
|
%% @doc
|
||||||
%% Get a Key/Value pair from an active CDB file (with no hash table written)
|
%% Get a Key/Value pair from an active CDB file (with no hash table written)
|
||||||
%% This requires a key dictionary to be passed in (mapping keys to positions)
|
|
||||||
%% Will return {Key, Value} or missing
|
|
||||||
get_mem(Key, FNOrHandle, HashTree, BinaryMode) ->
|
get_mem(Key, FNOrHandle, HashTree, BinaryMode) ->
|
||||||
get_mem(Key, FNOrHandle, HashTree, BinaryMode, true).
|
get_mem(Key, FNOrHandle, HashTree, BinaryMode, true).
|
||||||
|
|
||||||
|
@ -912,11 +915,18 @@ get_mem(Key, Handle, HashTree, BinaryMode, QuickCheck) ->
|
||||||
{loose_presence, _L} ->
|
{loose_presence, _L} ->
|
||||||
probably;
|
probably;
|
||||||
_ ->
|
_ ->
|
||||||
extract_kvpair(Handle, ListToCheck, Key, BinaryMode)
|
extract_kvpair(Handle, ListToCheck, Key, BinaryMode)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec get_nextkey(list()|file:io_device()) ->
|
||||||
|
nomorekeys|
|
||||||
|
{any(), nomorekeys}|
|
||||||
|
{any(), file:io_device(), {integer(), integer()}}.
|
||||||
|
%% @doc
|
||||||
%% Get the next key at a position in the file (or the first key if no position
|
%% Get the next key at a position in the file (or the first key if no position
|
||||||
%% is passed). Will return both a key and the next position
|
%% is passed). Will return both a key and the next position, or nomorekeys if
|
||||||
|
%% the end has been reached (either in place of the result if there are no
|
||||||
|
%% more keys, or in place of the position if the returned key is the last key)
|
||||||
get_nextkey(Filename) when is_list(Filename) ->
|
get_nextkey(Filename) when is_list(Filename) ->
|
||||||
{ok, Handle} = file:open(Filename, [binary, raw, read]),
|
{ok, Handle} = file:open(Filename, [binary, raw, read]),
|
||||||
get_nextkey(Handle);
|
get_nextkey(Handle);
|
||||||
|
@ -941,6 +951,10 @@ get_nextkey(Handle, {Position, FirstHashPosition}) ->
|
||||||
nomorekeys
|
nomorekeys
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec hashtable_calc(ets:tid(), integer()) -> {list(), binary()}.
|
||||||
|
%% @doc
|
||||||
|
%% Create a binary representation of the hash table to be written to the end
|
||||||
|
%% of the file
|
||||||
hashtable_calc(HashTree, StartPos) ->
|
hashtable_calc(HashTree, StartPos) ->
|
||||||
Seq = lists:seq(0, 255),
|
Seq = lists:seq(0, 255),
|
||||||
SWC = os:timestamp(),
|
SWC = os:timestamp(),
|
||||||
|
@ -1596,6 +1610,34 @@ write_hash_tables([Index|Rest], HashTree, CurrPos, BasePos,
|
||||||
%% of {key,value} tuples from the CDB.
|
%% of {key,value} tuples from the CDB.
|
||||||
%%
|
%%
|
||||||
|
|
||||||
|
|
||||||
|
%% from_dict(FileName,ListOfKeyValueTuples)
|
||||||
|
%% Given a filename and a dictionary, create a cdb
|
||||||
|
%% using the key value pairs from the dict.
|
||||||
|
from_dict(FileName,Dict) ->
|
||||||
|
KeyValueList = dict:to_list(Dict),
|
||||||
|
create(FileName, KeyValueList).
|
||||||
|
|
||||||
|
|
||||||
|
%%
|
||||||
|
%% create(FileName,ListOfKeyValueTuples) -> ok
|
||||||
|
%% Given a filename and a list of {key,value} tuples,
|
||||||
|
%% this function creates a CDB
|
||||||
|
%%
|
||||||
|
create(FileName,KeyValueList) ->
|
||||||
|
{ok, Handle} = file:open(FileName, ?WRITE_OPS),
|
||||||
|
{ok, _} = file:position(Handle, {bof, ?BASE_POSITION}),
|
||||||
|
{BasePos, HashTree} = write_key_value_pairs(Handle, KeyValueList),
|
||||||
|
close_file(Handle, HashTree, BasePos).
|
||||||
|
|
||||||
|
|
||||||
|
%% Should not be used for non-test PUTs by the inker - as the Max File Size
|
||||||
|
%% should be taken from the startup options not the default
|
||||||
|
put(FileName, Key, Value, {LastPosition, HashTree}) ->
|
||||||
|
put(FileName, Key, Value, {LastPosition, HashTree},
|
||||||
|
?BINARY_MODE, ?MAX_FILE_SIZE).
|
||||||
|
|
||||||
|
|
||||||
dump(FileName) ->
|
dump(FileName) ->
|
||||||
{ok, Handle} = file:open(FileName, [binary, raw, read]),
|
{ok, Handle} = file:open(FileName, [binary, raw, read]),
|
||||||
Fn = fun(Index, Acc) ->
|
Fn = fun(Index, Acc) ->
|
||||||
|
|
|
@ -101,7 +101,6 @@
|
||||||
-define(MAXRUN_COMPACTION_TARGET, 70.0).
|
-define(MAXRUN_COMPACTION_TARGET, 70.0).
|
||||||
-define(CRC_SIZE, 4).
|
-define(CRC_SIZE, 4).
|
||||||
-define(DEFAULT_RELOAD_STRATEGY, leveled_codec:inker_reload_strategy([])).
|
-define(DEFAULT_RELOAD_STRATEGY, leveled_codec:inker_reload_strategy([])).
|
||||||
-define(DEFAULT_WASTE_RETENTION_PERIOD, 86400).
|
|
||||||
-define(INTERVALS_PER_HOUR, 4).
|
-define(INTERVALS_PER_HOUR, 4).
|
||||||
|
|
||||||
-record(state, {inker :: pid() | undefined,
|
-record(state, {inker :: pid() | undefined,
|
||||||
|
@ -150,18 +149,15 @@ init([IClerkOpts]) ->
|
||||||
ReloadStrategy = IClerkOpts#iclerk_options.reload_strategy,
|
ReloadStrategy = IClerkOpts#iclerk_options.reload_strategy,
|
||||||
CDBopts = IClerkOpts#iclerk_options.cdb_options,
|
CDBopts = IClerkOpts#iclerk_options.cdb_options,
|
||||||
WP = CDBopts#cdb_options.waste_path,
|
WP = CDBopts#cdb_options.waste_path,
|
||||||
WRP = case IClerkOpts#iclerk_options.waste_retention_period of
|
WRP = IClerkOpts#iclerk_options.waste_retention_period,
|
||||||
undefined ->
|
|
||||||
?DEFAULT_WASTE_RETENTION_PERIOD;
|
MRL =
|
||||||
WRP0 ->
|
case IClerkOpts#iclerk_options.max_run_length of
|
||||||
WRP0
|
undefined ->
|
||||||
end,
|
?MAX_COMPACTION_RUN;
|
||||||
MRL = case IClerkOpts#iclerk_options.max_run_length of
|
MRL0 ->
|
||||||
undefined ->
|
MRL0
|
||||||
?MAX_COMPACTION_RUN;
|
end,
|
||||||
MRL0 ->
|
|
||||||
MRL0
|
|
||||||
end,
|
|
||||||
|
|
||||||
{ok, #state{max_run_length = MRL,
|
{ok, #state{max_run_length = MRL,
|
||||||
inker = IClerkOpts#iclerk_options.inker,
|
inker = IClerkOpts#iclerk_options.inker,
|
||||||
|
@ -616,23 +612,27 @@ write_values(KVCList, CDBopts, Journal0, ManSlice0, PressMethod) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
clear_waste(State) ->
|
clear_waste(State) ->
|
||||||
WP = State#state.waste_path,
|
case State#state.waste_path of
|
||||||
WRP = State#state.waste_retention_period,
|
undefined ->
|
||||||
{ok, ClearedJournals} = file:list_dir(WP),
|
ok;
|
||||||
N = calendar:datetime_to_gregorian_seconds(calendar:local_time()),
|
WP ->
|
||||||
lists:foreach(fun(DelJ) ->
|
WRP = State#state.waste_retention_period,
|
||||||
LMD = filelib:last_modified(WP ++ DelJ),
|
{ok, ClearedJournals} = file:list_dir(WP),
|
||||||
case N - calendar:datetime_to_gregorian_seconds(LMD) of
|
N = calendar:datetime_to_gregorian_seconds(calendar:local_time()),
|
||||||
LMD_Delta when LMD_Delta >= WRP ->
|
DeleteJournalFun =
|
||||||
ok = file:delete(WP ++ DelJ),
|
fun(DelJ) ->
|
||||||
leveled_log:log("IC010", [WP ++ DelJ]);
|
LMD = filelib:last_modified(WP ++ DelJ),
|
||||||
LMD_Delta ->
|
case N - calendar:datetime_to_gregorian_seconds(LMD) of
|
||||||
leveled_log:log("IC011", [WP ++ DelJ,
|
LMD_Delta when LMD_Delta >= WRP ->
|
||||||
LMD_Delta]),
|
ok = file:delete(WP ++ DelJ),
|
||||||
ok
|
leveled_log:log("IC010", [WP ++ DelJ]);
|
||||||
end
|
LMD_Delta ->
|
||||||
end,
|
leveled_log:log("IC011", [WP ++ DelJ, LMD_Delta]),
|
||||||
ClearedJournals).
|
ok
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
lists:foreach(DeleteJournalFun, ClearedJournals)
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
@ -923,7 +923,9 @@ compact_empty_file_test() ->
|
||||||
{3, {o, "Bucket", "Key3", null}}],
|
{3, {o, "Bucket", "Key3", null}}],
|
||||||
LedgerFun1 = fun(_Srv, _Key, _ObjSQN) -> false end,
|
LedgerFun1 = fun(_Srv, _Key, _ObjSQN) -> false end,
|
||||||
Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4),
|
Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4),
|
||||||
?assertMatch(100.0, Score1).
|
?assertMatch(100.0, Score1),
|
||||||
|
ok = leveled_cdb:cdb_deletepending(CDB2),
|
||||||
|
ok = leveled_cdb:cdb_destroy(CDB2).
|
||||||
|
|
||||||
compare_candidate_test() ->
|
compare_candidate_test() ->
|
||||||
Candidate1 = #candidate{low_sqn=1},
|
Candidate1 = #candidate{low_sqn=1},
|
||||||
|
|
|
@ -299,6 +299,7 @@ ink_compactjournal(Pid, Checker, InitiateFun, CloseFun, FilterFun, Timeout) ->
|
||||||
FilterFun,
|
FilterFun,
|
||||||
Timeout},
|
Timeout},
|
||||||
infinity).
|
infinity).
|
||||||
|
|
||||||
-spec ink_compactioncomplete(pid()) -> ok.
|
-spec ink_compactioncomplete(pid()) -> ok.
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Used by a clerk to state that a compaction process is over, only change
|
%% Used by a clerk to state that a compaction process is over, only change
|
||||||
|
@ -489,25 +490,26 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
start_from_file(InkOpts) ->
|
start_from_file(InkOpts) ->
|
||||||
RootPath = InkOpts#inker_options.root_path,
|
% Setting the correct CDB options is important when starting the inker, in
|
||||||
CDBopts = InkOpts#inker_options.cdb_options,
|
% particular for waste retention which is determined by the CDB options
|
||||||
|
% with which the file was last opened
|
||||||
|
CDBopts = get_cdbopts(InkOpts),
|
||||||
|
|
||||||
|
% Determine filepaths
|
||||||
|
RootPath = InkOpts#inker_options.root_path,
|
||||||
JournalFP = filepath(RootPath, journal_dir),
|
JournalFP = filepath(RootPath, journal_dir),
|
||||||
filelib:ensure_dir(JournalFP),
|
filelib:ensure_dir(JournalFP),
|
||||||
CompactFP = filepath(RootPath, journal_compact_dir),
|
CompactFP = filepath(RootPath, journal_compact_dir),
|
||||||
filelib:ensure_dir(CompactFP),
|
filelib:ensure_dir(CompactFP),
|
||||||
WasteFP = filepath(RootPath, journal_waste_dir),
|
|
||||||
filelib:ensure_dir(WasteFP),
|
|
||||||
ManifestFP = filepath(RootPath, manifest_dir),
|
ManifestFP = filepath(RootPath, manifest_dir),
|
||||||
ok = filelib:ensure_dir(ManifestFP),
|
ok = filelib:ensure_dir(ManifestFP),
|
||||||
|
% The IClerk must start files with the compaction file path so that they
|
||||||
|
% will be stored correctly in this folder
|
||||||
|
IClerkCDBOpts = CDBopts#cdb_options{file_path = CompactFP},
|
||||||
|
|
||||||
{ok, ManifestFilenames} = file:list_dir(ManifestFP),
|
WRP = InkOpts#inker_options.waste_retention_period,
|
||||||
|
|
||||||
IClerkCDBOpts = CDBopts#cdb_options{file_path = CompactFP,
|
|
||||||
waste_path = WasteFP},
|
|
||||||
ReloadStrategy = InkOpts#inker_options.reload_strategy,
|
ReloadStrategy = InkOpts#inker_options.reload_strategy,
|
||||||
MRL = InkOpts#inker_options.max_run_length,
|
MRL = InkOpts#inker_options.max_run_length,
|
||||||
WRP = InkOpts#inker_options.waste_retention_period,
|
|
||||||
PressMethod = InkOpts#inker_options.compression_method,
|
PressMethod = InkOpts#inker_options.compression_method,
|
||||||
PressOnReceipt = InkOpts#inker_options.compress_on_receipt,
|
PressOnReceipt = InkOpts#inker_options.compress_on_receipt,
|
||||||
IClerkOpts = #iclerk_options{inker = self(),
|
IClerkOpts = #iclerk_options{inker = self(),
|
||||||
|
@ -516,8 +518,12 @@ start_from_file(InkOpts) ->
|
||||||
reload_strategy = ReloadStrategy,
|
reload_strategy = ReloadStrategy,
|
||||||
compression_method = PressMethod,
|
compression_method = PressMethod,
|
||||||
max_run_length = MRL},
|
max_run_length = MRL},
|
||||||
|
|
||||||
{ok, Clerk} = leveled_iclerk:clerk_new(IClerkOpts),
|
{ok, Clerk} = leveled_iclerk:clerk_new(IClerkOpts),
|
||||||
|
|
||||||
|
% The building of the manifest will load all the CDB files, starting a
|
||||||
|
% new leveled_cdb process for each file
|
||||||
|
{ok, ManifestFilenames} = file:list_dir(ManifestFP),
|
||||||
{Manifest,
|
{Manifest,
|
||||||
ManifestSQN,
|
ManifestSQN,
|
||||||
JournalSQN,
|
JournalSQN,
|
||||||
|
@ -529,11 +535,28 @@ start_from_file(InkOpts) ->
|
||||||
journal_sqn = JournalSQN,
|
journal_sqn = JournalSQN,
|
||||||
active_journaldb = ActiveJournal,
|
active_journaldb = ActiveJournal,
|
||||||
root_path = RootPath,
|
root_path = RootPath,
|
||||||
cdb_options = CDBopts#cdb_options{waste_path=WasteFP},
|
cdb_options = CDBopts,
|
||||||
compression_method = PressMethod,
|
compression_method = PressMethod,
|
||||||
compress_on_receipt = PressOnReceipt,
|
compress_on_receipt = PressOnReceipt,
|
||||||
clerk = Clerk}}.
|
clerk = Clerk}}.
|
||||||
|
|
||||||
|
get_cdbopts(InkOpts)->
|
||||||
|
CDBopts = InkOpts#inker_options.cdb_options,
|
||||||
|
WasteFP =
|
||||||
|
case InkOpts#inker_options.waste_retention_period of
|
||||||
|
undefined ->
|
||||||
|
% If the waste retention period is undefined, there will
|
||||||
|
% be no retention of waste. This is triggered by making
|
||||||
|
% the waste path undefined
|
||||||
|
undefined;
|
||||||
|
_WRP ->
|
||||||
|
WFP = filepath(InkOpts#inker_options.root_path,
|
||||||
|
journal_waste_dir),
|
||||||
|
filelib:ensure_dir(WFP),
|
||||||
|
WFP
|
||||||
|
end,
|
||||||
|
CDBopts#cdb_options{waste_path = WasteFP}.
|
||||||
|
|
||||||
|
|
||||||
put_object(LedgerKey, Object, KeyChanges, State) ->
|
put_object(LedgerKey, Object, KeyChanges, State) ->
|
||||||
NewSQN = State#state.journal_sqn + 1,
|
NewSQN = State#state.journal_sqn + 1,
|
||||||
|
@ -673,8 +696,8 @@ open_all_manifest(Man0, RootPath, CDBOpts) ->
|
||||||
PFN = FN ++ "." ++ ?PENDING_FILEX,
|
PFN = FN ++ "." ++ ?PENDING_FILEX,
|
||||||
case filelib:is_file(CFN) of
|
case filelib:is_file(CFN) of
|
||||||
true ->
|
true ->
|
||||||
{ok, Pid} = leveled_cdb:cdb_reopen_reader(CFN,
|
{ok, Pid} =
|
||||||
LK_RO),
|
leveled_cdb:cdb_reopen_reader(CFN, LK_RO, CDBOpts),
|
||||||
{LowSQN, FN, Pid, LK_RO};
|
{LowSQN, FN, Pid, LK_RO};
|
||||||
false ->
|
false ->
|
||||||
W = leveled_cdb:cdb_open_writer(PFN, CDBOpts),
|
W = leveled_cdb:cdb_open_writer(PFN, CDBOpts),
|
||||||
|
@ -916,6 +939,7 @@ build_dummy_journal(KeyConvertF) ->
|
||||||
clean_testdir(RootPath) ->
|
clean_testdir(RootPath) ->
|
||||||
clean_subdir(filepath(RootPath, journal_dir)),
|
clean_subdir(filepath(RootPath, journal_dir)),
|
||||||
clean_subdir(filepath(RootPath, journal_compact_dir)),
|
clean_subdir(filepath(RootPath, journal_compact_dir)),
|
||||||
|
clean_subdir(filepath(RootPath, journal_waste_dir)),
|
||||||
clean_subdir(filepath(RootPath, manifest_dir)).
|
clean_subdir(filepath(RootPath, manifest_dir)).
|
||||||
|
|
||||||
clean_subdir(DirPath) ->
|
clean_subdir(DirPath) ->
|
||||||
|
@ -930,7 +954,6 @@ clean_subdir(DirPath) ->
|
||||||
end,
|
end,
|
||||||
Files).
|
Files).
|
||||||
|
|
||||||
|
|
||||||
simple_inker_test() ->
|
simple_inker_test() ->
|
||||||
RootPath = "../test/journal",
|
RootPath = "../test/journal",
|
||||||
build_dummy_journal(),
|
build_dummy_journal(),
|
||||||
|
@ -973,16 +996,26 @@ simple_inker_completeactivejournal_test() ->
|
||||||
test_ledgerkey(Key) ->
|
test_ledgerkey(Key) ->
|
||||||
{o, "Bucket", Key, null}.
|
{o, "Bucket", Key, null}.
|
||||||
|
|
||||||
compact_journal_test() ->
|
compact_journal_wasteretained_test_() ->
|
||||||
|
{timeout, 60, fun() -> compact_journal_testto(300, true) end}.
|
||||||
|
|
||||||
|
compact_journal_wastediscarded_test_() ->
|
||||||
|
{timeout, 60, fun() -> compact_journal_testto(undefined, false) end}.
|
||||||
|
|
||||||
|
compact_journal_testto(WRP, ExpectedFiles) ->
|
||||||
RootPath = "../test/journal",
|
RootPath = "../test/journal",
|
||||||
build_dummy_journal(fun test_ledgerkey/1),
|
|
||||||
CDBopts = #cdb_options{max_size=300000},
|
CDBopts = #cdb_options{max_size=300000},
|
||||||
RStrategy = [{?STD_TAG, recovr}],
|
RStrategy = [{?STD_TAG, recovr}],
|
||||||
{ok, Ink1} = ink_start(#inker_options{root_path=RootPath,
|
InkOpts = #inker_options{root_path=RootPath,
|
||||||
cdb_options=CDBopts,
|
cdb_options=CDBopts,
|
||||||
reload_strategy=RStrategy,
|
reload_strategy=RStrategy,
|
||||||
compression_method=native,
|
waste_retention_period=WRP,
|
||||||
compress_on_receipt=false}),
|
compression_method=native,
|
||||||
|
compress_on_receipt=false},
|
||||||
|
|
||||||
|
build_dummy_journal(fun test_ledgerkey/1),
|
||||||
|
{ok, Ink1} = ink_start(InkOpts),
|
||||||
|
|
||||||
{ok, NewSQN1, _ObjSize} = ink_put(Ink1,
|
{ok, NewSQN1, _ObjSize} = ink_put(Ink1,
|
||||||
test_ledgerkey("KeyAA"),
|
test_ledgerkey("KeyAA"),
|
||||||
"TestValueAA",
|
"TestValueAA",
|
||||||
|
@ -1030,11 +1063,16 @@ compact_journal_test() ->
|
||||||
timer:sleep(1000),
|
timer:sleep(1000),
|
||||||
CompactedManifest2 = ink_getmanifest(Ink1),
|
CompactedManifest2 = ink_getmanifest(Ink1),
|
||||||
lists:foreach(fun({_SQN, FN, _P, _LK}) ->
|
lists:foreach(fun({_SQN, FN, _P, _LK}) ->
|
||||||
?assertMatch(0, string:str(FN, "post_compact"))
|
?assertMatch(0, string:str(FN, ?COMPACT_FP))
|
||||||
end,
|
end,
|
||||||
CompactedManifest2),
|
CompactedManifest2),
|
||||||
?assertMatch(2, length(CompactedManifest2)),
|
?assertMatch(2, length(CompactedManifest2)),
|
||||||
ink_close(Ink1),
|
ink_close(Ink1),
|
||||||
|
% Need to wait for delete_pending files to timeout
|
||||||
|
timer:sleep(12000),
|
||||||
|
% Are there files in the waste folder after compaction
|
||||||
|
{ok, WasteFNs} = file:list_dir(filepath(RootPath, journal_waste_dir)),
|
||||||
|
?assertMatch(ExpectedFiles, length(WasteFNs) > 0),
|
||||||
clean_testdir(RootPath).
|
clean_testdir(RootPath).
|
||||||
|
|
||||||
empty_manifest_test() ->
|
empty_manifest_test() ->
|
||||||
|
|
|
@ -297,7 +297,7 @@
|
||||||
{"CDB04",
|
{"CDB04",
|
||||||
{info, "Deletion confirmed for file ~s at ManifestSQN ~w"}},
|
{info, "Deletion confirmed for file ~s at ManifestSQN ~w"}},
|
||||||
{"CDB05",
|
{"CDB05",
|
||||||
{info, "Closing of filename ~s for Reason ~w"}},
|
{info, "Closing of filename ~s from state ~w for reason ~w"}},
|
||||||
{"CDB06",
|
{"CDB06",
|
||||||
{info, "File to be truncated at last position of ~w with end of "
|
{info, "File to be truncated at last position of ~w with end of "
|
||||||
++ "file at ~w"}},
|
++ "file at ~w"}},
|
||||||
|
@ -327,7 +327,11 @@
|
||||||
{info, "After ~w PUTs total write time is ~w total sync time is ~w "
|
{info, "After ~w PUTs total write time is ~w total sync time is ~w "
|
||||||
++ "and max write time is ~w and max sync time is ~w"}},
|
++ "and max write time is ~w and max sync time is ~w"}},
|
||||||
{"CDB18",
|
{"CDB18",
|
||||||
{info, "Handled return and write of hashtable"}}
|
{info, "Handled return and write of hashtable"}},
|
||||||
|
{"CDB19",
|
||||||
|
{info, "Transferring filename ~s to waste ~s"}},
|
||||||
|
{"CDB20",
|
||||||
|
{info, "Deleting filename ~s as no waste retention period defined"}}
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -112,21 +112,27 @@ many_put_fetch_head(_Config) ->
|
||||||
ok = leveled_bookie:book_destroy(Bookie3).
|
ok = leveled_bookie:book_destroy(Bookie3).
|
||||||
|
|
||||||
journal_compaction(_Config) ->
|
journal_compaction(_Config) ->
|
||||||
|
journal_compaction_tester(false, 3600),
|
||||||
|
journal_compaction_tester(false, undefined),
|
||||||
|
journal_compaction_tester(true, 3600).
|
||||||
|
|
||||||
|
journal_compaction_tester(Restart, WRP) ->
|
||||||
RootPath = testutil:reset_filestructure(),
|
RootPath = testutil:reset_filestructure(),
|
||||||
StartOpts1 = [{root_path, RootPath},
|
StartOpts1 = [{root_path, RootPath},
|
||||||
{max_journalsize, 10000000},
|
{max_journalsize, 10000000},
|
||||||
{max_run_length, 1},
|
{max_run_length, 1},
|
||||||
{sync_strategy, testutil:sync_strategy()}],
|
{sync_strategy, testutil:sync_strategy()},
|
||||||
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
|
{waste_retention_period, WRP}],
|
||||||
ok = leveled_bookie:book_compactjournal(Bookie1, 30000),
|
{ok, Bookie0} = leveled_bookie:book_start(StartOpts1),
|
||||||
|
ok = leveled_bookie:book_compactjournal(Bookie0, 30000),
|
||||||
{TestObject, TestSpec} = testutil:generate_testobject(),
|
{TestObject, TestSpec} = testutil:generate_testobject(),
|
||||||
ok = testutil:book_riakput(Bookie1, TestObject, TestSpec),
|
ok = testutil:book_riakput(Bookie0, TestObject, TestSpec),
|
||||||
testutil:check_forobject(Bookie1, TestObject),
|
testutil:check_forobject(Bookie0, TestObject),
|
||||||
ObjList1 = testutil:generate_objects(20000, 2),
|
ObjList1 = testutil:generate_objects(20000, 2),
|
||||||
testutil:riakload(Bookie1, ObjList1),
|
testutil:riakload(Bookie0, ObjList1),
|
||||||
ChkList1 = lists:sublist(lists:sort(ObjList1), 10000),
|
ChkList1 = lists:sublist(lists:sort(ObjList1), 10000),
|
||||||
testutil:check_forlist(Bookie1, ChkList1),
|
testutil:check_forlist(Bookie0, ChkList1),
|
||||||
testutil:check_forobject(Bookie1, TestObject),
|
testutil:check_forobject(Bookie0, TestObject),
|
||||||
{B2, K2, V2, Spec2, MD} = {"Bucket2",
|
{B2, K2, V2, Spec2, MD} = {"Bucket2",
|
||||||
"Key2",
|
"Key2",
|
||||||
"Value2",
|
"Value2",
|
||||||
|
@ -134,18 +140,18 @@ journal_compaction(_Config) ->
|
||||||
[{"MDK2", "MDV2"}]},
|
[{"MDK2", "MDV2"}]},
|
||||||
{TestObject2, TestSpec2} = testutil:generate_testobject(B2, K2,
|
{TestObject2, TestSpec2} = testutil:generate_testobject(B2, K2,
|
||||||
V2, Spec2, MD),
|
V2, Spec2, MD),
|
||||||
ok = testutil:book_riakput(Bookie1, TestObject2, TestSpec2),
|
ok = testutil:book_riakput(Bookie0, TestObject2, TestSpec2),
|
||||||
ok = leveled_bookie:book_compactjournal(Bookie1, 30000),
|
ok = leveled_bookie:book_compactjournal(Bookie0, 30000),
|
||||||
testutil:check_forlist(Bookie1, ChkList1),
|
testutil:check_forlist(Bookie0, ChkList1),
|
||||||
testutil:check_forobject(Bookie1, TestObject),
|
testutil:check_forobject(Bookie0, TestObject),
|
||||||
testutil:check_forobject(Bookie1, TestObject2),
|
testutil:check_forobject(Bookie0, TestObject2),
|
||||||
testutil:check_forlist(Bookie1, ChkList1),
|
testutil:check_forlist(Bookie0, ChkList1),
|
||||||
testutil:check_forobject(Bookie1, TestObject),
|
testutil:check_forobject(Bookie0, TestObject),
|
||||||
testutil:check_forobject(Bookie1, TestObject2),
|
testutil:check_forobject(Bookie0, TestObject2),
|
||||||
%% Delete some of the objects
|
%% Delete some of the objects
|
||||||
ObjListD = testutil:generate_objects(10000, 2),
|
ObjListD = testutil:generate_objects(10000, 2),
|
||||||
lists:foreach(fun({_R, O, _S}) ->
|
lists:foreach(fun({_R, O, _S}) ->
|
||||||
testutil:book_riakdelete(Bookie1,
|
testutil:book_riakdelete(Bookie0,
|
||||||
O#r_object.bucket,
|
O#r_object.bucket,
|
||||||
O#r_object.key,
|
O#r_object.key,
|
||||||
[])
|
[])
|
||||||
|
@ -154,7 +160,17 @@ journal_compaction(_Config) ->
|
||||||
|
|
||||||
%% Now replace all the other objects
|
%% Now replace all the other objects
|
||||||
ObjList2 = testutil:generate_objects(40000, 10002),
|
ObjList2 = testutil:generate_objects(40000, 10002),
|
||||||
testutil:riakload(Bookie1, ObjList2),
|
testutil:riakload(Bookie0, ObjList2),
|
||||||
|
|
||||||
|
Bookie1 =
|
||||||
|
case Restart of
|
||||||
|
true ->
|
||||||
|
ok = leveled_bookie:book_close(Bookie0),
|
||||||
|
{ok, RestartedB} = leveled_bookie:book_start(StartOpts1),
|
||||||
|
RestartedB;
|
||||||
|
false ->
|
||||||
|
Bookie0
|
||||||
|
end,
|
||||||
|
|
||||||
ok = leveled_bookie:book_compactjournal(Bookie1, 30000),
|
ok = leveled_bookie:book_compactjournal(Bookie1, 30000),
|
||||||
|
|
||||||
|
@ -184,7 +200,12 @@ journal_compaction(_Config) ->
|
||||||
[2000,2000,2000,2000,2000,2000]),
|
[2000,2000,2000,2000,2000,2000]),
|
||||||
{ok, ClearedJournals} = file:list_dir(WasteFP),
|
{ok, ClearedJournals} = file:list_dir(WasteFP),
|
||||||
io:format("~w ClearedJournals found~n", [length(ClearedJournals)]),
|
io:format("~w ClearedJournals found~n", [length(ClearedJournals)]),
|
||||||
true = length(ClearedJournals) > 0,
|
case is_integer(WRP) of
|
||||||
|
true ->
|
||||||
|
true = length(ClearedJournals) > 0;
|
||||||
|
false ->
|
||||||
|
true = length(ClearedJournals) == 0
|
||||||
|
end,
|
||||||
|
|
||||||
ChkList3 = lists:sublist(lists:sort(ObjList2), 500),
|
ChkList3 = lists:sublist(lists:sort(ObjList2), 500),
|
||||||
testutil:check_forlist(Bookie1, ChkList3),
|
testutil:check_forlist(Bookie1, ChkList3),
|
||||||
|
@ -212,7 +233,12 @@ journal_compaction(_Config) ->
|
||||||
|
|
||||||
{ok, ClearedJournalsPC} = file:list_dir(WasteFP),
|
{ok, ClearedJournalsPC} = file:list_dir(WasteFP),
|
||||||
io:format("~w ClearedJournals found~n", [length(ClearedJournalsPC)]),
|
io:format("~w ClearedJournals found~n", [length(ClearedJournalsPC)]),
|
||||||
true = length(ClearedJournalsPC) == 0,
|
case is_integer(WRP) of
|
||||||
|
true ->
|
||||||
|
true = length(ClearedJournals) > 0;
|
||||||
|
false ->
|
||||||
|
true = length(ClearedJournals) == 0
|
||||||
|
end,
|
||||||
|
|
||||||
testutil:reset_filestructure(10000).
|
testutil:reset_filestructure(10000).
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue