Merge branch 'master' into mas-aae-segementfoldplus

This commit is contained in:
Martin Sumner 2017-11-07 11:22:56 +00:00
commit 8f27b3b628
12 changed files with 605 additions and 309 deletions

View file

@ -48,6 +48,8 @@
source_inker :: pid() | undefined, source_inker :: pid() | undefined,
reload_strategy = [] :: list(), reload_strategy = [] :: list(),
waste_retention_period :: integer() | undefined, waste_retention_period :: integer() | undefined,
compression_method :: lz4|native,
compress_on_receipt :: boolean(),
max_run_length}). max_run_length}).
-record(penciller_options, -record(penciller_options,
@ -58,6 +60,7 @@
bookies_mem :: tuple() | undefined, bookies_mem :: tuple() | undefined,
source_penciller :: pid() | undefined, source_penciller :: pid() | undefined,
snapshot_longrunning = true :: boolean(), snapshot_longrunning = true :: boolean(),
compression_method :: lz4|native,
levelzero_cointoss = false :: boolean()}). levelzero_cointoss = false :: boolean()}).
-record(iclerk_options, -record(iclerk_options,
@ -65,6 +68,7 @@
max_run_length :: integer() | undefined, max_run_length :: integer() | undefined,
cdb_options = #cdb_options{} :: #cdb_options{}, cdb_options = #cdb_options{} :: #cdb_options{},
waste_retention_period :: integer() | undefined, waste_retention_period :: integer() | undefined,
compression_method :: lz4|native,
reload_strategy = [] :: list()}). reload_strategy = [] :: list()}).
-record(recent_aae, {filter :: whitelist|blacklist, -record(recent_aae, {filter :: whitelist|blacklist,

View file

@ -9,3 +9,7 @@
{erl_opts, [debug_info, {parse_transform, lager_transform}, {parse_transform, eqc_cover}]}, {erl_opts, [debug_info, {parse_transform, lager_transform}, {parse_transform, eqc_cover}]},
{plugins, [rebar_eqc]}]} {plugins, [rebar_eqc]}]}
]}. ]}.
{deps, [
{lz4, ".*", {git, "https://github.com/martinsumner/erlang-lz4", {branch, "mas-leveled"}}}
]}.

View file

@ -1 +1,4 @@
[]. [{<<"lz4">>,
{git,"https://github.com/martinsumner/erlang-lz4",
{ref,"09d539685e616b614e851926384439384601ee5a"}},
0}].

View file

@ -81,6 +81,8 @@
-define(JOURNAL_SIZE_JITTER, 20). -define(JOURNAL_SIZE_JITTER, 20).
-define(LONG_RUNNING, 80000). -define(LONG_RUNNING, 80000).
-define(RECENT_AAE, false). -define(RECENT_AAE, false).
-define(COMPRESSION_METHOD, lz4).
-define(COMPRESSION_POINT, on_receipt).
-record(ledger_cache, {mem :: ets:tab(), -record(ledger_cache, {mem :: ets:tab(),
loader = leveled_tree:empty(?CACHE_TYPE) loader = leveled_tree:empty(?CACHE_TYPE)
@ -143,19 +145,34 @@ book_start(RootPath, LedgerCacheSize, JournalSize, SyncStrategy) ->
%% @doc Start a Leveled Key/Value store - full options support. %% @doc Start a Leveled Key/Value store - full options support.
%% %%
%% Allows an options proplists to be passed for setting options. There are %% Allows an options proplists to be passed for setting options. There are
%% two primary additional options this allows over book_start/4: %% four primary additional options this allows over book_start/4:
%% - retain_strategy %% - retain_strategy
%% - waste_retention_period %% - waste_retention_period
%% - compression_method
%% - compression_point
%% %%
%% Both of these relate to compaction in the Journal. The retain_strategy %% Both of the first two options relate to compaction in the Journal. The
%% determines if a skinny record of the object should be retained following %% retain_strategydetermines if a skinny record of the object should be
%% compaction, and how thta should be used when recovering lost state in the %% retained following compaction, and how that should be used when recovering
%% Ledger. %% lost state in the Ledger.
%%
%% This is relevant to when Riak uses Leveled in that KeyChanges are presented
%% by the vnode to the backend as deltas. This means that if those key
%% changes do not remain recorded in the journal once the value has been
%% compacted - rebuilding the ledger from the Journal would lead to incorrect
%% index entries being present.
%% %%
%% Currently compacted records no longer in use are not removed but moved to %% Currently compacted records no longer in use are not removed but moved to
%% a journal_waste folder, and the waste_retention_period determines how long %% a journal_waste folder, and the waste_retention_period determines how long
%% this history should be kept for (for example to allow for it to be backed %% this history should be kept for (for example to allow for it to be backed
%% up before deletion) %% up before deletion).
%%
%% Compression method and point allow Leveled to be switched from using bif
%% based compression (zlib) to suing nif based compression (lz4). The
%% compression point can be changed between on_receipt (all values are
%% compressed as they are received), to on_compact where values are originally
%% stored uncompressed (speeding PUT times), and are only compressed when
%% they are first subject to compaction
%% %%
%% TODO: %% TODO:
%% The reload_strategy is exposed as currently no firm decision has been made %% The reload_strategy is exposed as currently no firm decision has been made
@ -387,7 +404,8 @@ init([Opts]) ->
unit_minutes = UnitMinutes} unit_minutes = UnitMinutes}
end, end,
{Inker, Penciller} = startup(InkerOpts, PencillerOpts, RecentAAE), {Inker, Penciller} =
startup(InkerOpts, PencillerOpts, RecentAAE),
NewETS = ets:new(mem, [ordered_set]), NewETS = ets:new(mem, [ordered_set]),
leveled_log:log("B0001", [Inker, Penciller]), leveled_log:log("B0001", [Inker, Penciller]),
@ -899,16 +917,41 @@ set_options(Opts) ->
ok = filelib:ensure_dir(JournalFP), ok = filelib:ensure_dir(JournalFP),
ok = filelib:ensure_dir(LedgerFP), ok = filelib:ensure_dir(LedgerFP),
CompressionMethod =
case get_opt(compression_method, Opts, ?COMPRESSION_METHOD) of
native ->
% Note native compression will have reduced performance
% https://github.com/martinsumner/leveled/issues/95
native;
lz4 ->
% Must include lz4 library in rebar.config
lz4
end,
CompressOnReceipt =
case get_opt(compression_point, Opts, ?COMPRESSION_POINT) of
on_receipt ->
% Note this will add measurable delay to PUT time
% https://github.com/martinsumner/leveled/issues/95
true;
on_compact ->
% If using lz4 this is not recommended
false
end,
{#inker_options{root_path = JournalFP, {#inker_options{root_path = JournalFP,
reload_strategy = ReloadStrategy, reload_strategy = ReloadStrategy,
max_run_length = get_opt(max_run_length, Opts), max_run_length = get_opt(max_run_length, Opts),
waste_retention_period = WRP, waste_retention_period = WRP,
cdb_options = #cdb_options{max_size=MaxJournalSize, compression_method = CompressionMethod,
binary_mode=true, compress_on_receipt = CompressOnReceipt,
sync_strategy=SyncStrat}}, cdb_options =
#cdb_options{max_size=MaxJournalSize,
binary_mode=true,
sync_strategy=SyncStrat}},
#penciller_options{root_path = LedgerFP, #penciller_options{root_path = LedgerFP,
max_inmemory_tablesize = PCLL0CacheSize, max_inmemory_tablesize = PCLL0CacheSize,
levelzero_cointoss = true}}. levelzero_cointoss = true,
compression_method = CompressionMethod}}.
startup(InkerOpts, PencillerOpts, RecentAAE) -> startup(InkerOpts, PencillerOpts, RecentAAE) ->
{ok, Inker} = leveled_inker:ink_start(InkerOpts), {ok, Inker} = leveled_inker:ink_start(InkerOpts),

View file

@ -46,15 +46,16 @@
to_ledgerkey/3, to_ledgerkey/3,
to_ledgerkey/5, to_ledgerkey/5,
from_ledgerkey/1, from_ledgerkey/1,
to_inkerkv/4, to_inkerkv/3,
to_inkerkv/6,
from_inkerkv/1, from_inkerkv/1,
from_inkerkv/2, from_inkerkv/2,
from_journalkey/1, from_journalkey/1,
compact_inkerkvc/2, compact_inkerkvc/2,
split_inkvalue/1, split_inkvalue/1,
check_forinkertype/2, check_forinkertype/2,
maybe_compress/1, maybe_compress/2,
create_value_for_journal/2, create_value_for_journal/3,
build_metadata_object/2, build_metadata_object/2,
generate_ledgerkv/5, generate_ledgerkv/5,
get_size/2, get_size/2,
@ -219,11 +220,13 @@ to_ledgerkey(Bucket, Key, Tag) ->
%% Return the Key, Value and Hash Option for this object. The hash option %% Return the Key, Value and Hash Option for this object. The hash option
%% indicates whether the key would ever be looked up directly, and so if it %% indicates whether the key would ever be looked up directly, and so if it
%% requires an entry in the hash table %% requires an entry in the hash table
to_inkerkv(LedgerKey, SQN, to_fetch, null) -> to_inkerkv(LedgerKey, SQN, to_fetch) ->
{{SQN, ?INKT_STND, LedgerKey}, null, true}; {{SQN, ?INKT_STND, LedgerKey}, null, true}.
to_inkerkv(LedgerKey, SQN, Object, KeyChanges) ->
to_inkerkv(LedgerKey, SQN, Object, KeyChanges, PressMethod, Compress) ->
InkerType = check_forinkertype(LedgerKey, Object), InkerType = check_forinkertype(LedgerKey, Object),
Value = create_value_for_journal({Object, KeyChanges}, false), Value =
create_value_for_journal({Object, KeyChanges}, Compress, PressMethod),
{{SQN, InkerType, LedgerKey}, Value}. {{SQN, InkerType, LedgerKey}, Value}.
%% Used when fetching objects, so only handles standard, hashable entries %% Used when fetching objects, so only handles standard, hashable entries
@ -240,45 +243,51 @@ from_inkerkv(Object, ToIgnoreKeyChanges) ->
Object Object
end. end.
create_value_for_journal({Object, KeyChanges}, Compress) create_value_for_journal({Object, KeyChanges}, Compress, Method)
when not is_binary(KeyChanges) -> when not is_binary(KeyChanges) ->
KeyChangeBin = term_to_binary(KeyChanges, [compressed]), KeyChangeBin = term_to_binary(KeyChanges, [compressed]),
create_value_for_journal({Object, KeyChangeBin}, Compress); create_value_for_journal({Object, KeyChangeBin}, Compress, Method);
create_value_for_journal({Object, KeyChangeBin}, Compress) -> create_value_for_journal({Object, KeyChangeBin}, Compress, Method) ->
KeyChangeBinLen = byte_size(KeyChangeBin), KeyChangeBinLen = byte_size(KeyChangeBin),
ObjectBin = serialise_object(Object, Compress), ObjectBin = serialise_object(Object, Compress, Method),
TypeCode = encode_valuetype(is_binary(Object), Compress), TypeCode = encode_valuetype(is_binary(Object), Compress, Method),
<<ObjectBin/binary, <<ObjectBin/binary,
KeyChangeBin/binary, KeyChangeBin/binary,
KeyChangeBinLen:32/integer, KeyChangeBinLen:32/integer,
TypeCode:8/integer>>. TypeCode:8/integer>>.
maybe_compress({null, KeyChanges}) -> maybe_compress({null, KeyChanges}, _PressMethod) ->
create_value_for_journal({null, KeyChanges}, false); create_value_for_journal({null, KeyChanges}, false, native);
maybe_compress(JournalBin) -> maybe_compress(JournalBin, PressMethod) ->
Length0 = byte_size(JournalBin) - 5, Length0 = byte_size(JournalBin) - 5,
<<JBin0:Length0/binary, <<JBin0:Length0/binary,
KeyChangeLength:32/integer, KeyChangeLength:32/integer,
Type:8/integer>> = JournalBin, Type:8/integer>> = JournalBin,
{IsBinary, IsCompressed} = decode_valuetype(Type), {IsBinary, IsCompressed, IsLz4} = decode_valuetype(Type),
case IsCompressed of case IsCompressed of
true -> true ->
JournalBin; JournalBin;
false -> false ->
Length1 = Length0 - KeyChangeLength, Length1 = Length0 - KeyChangeLength,
<<OBin2:Length1/binary, KCBin2:KeyChangeLength/binary>> = JBin0, <<OBin2:Length1/binary, KCBin2:KeyChangeLength/binary>> = JBin0,
V0 = {deserialise_object(OBin2, IsBinary, IsCompressed), V0 = {deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4),
binary_to_term(KCBin2)}, binary_to_term(KCBin2)},
create_value_for_journal(V0, true) create_value_for_journal(V0, true, PressMethod)
end. end.
serialise_object(Object, false) when is_binary(Object) -> serialise_object(Object, false, _Method) when is_binary(Object) ->
Object; Object;
serialise_object(Object, true) when is_binary(Object) -> serialise_object(Object, true, Method) when is_binary(Object) ->
zlib:compress(Object); case Method of
serialise_object(Object, false) -> lz4 ->
{ok, Bin} = lz4:pack(Object),
Bin;
native ->
zlib:compress(Object)
end;
serialise_object(Object, false, _Method) ->
term_to_binary(Object); term_to_binary(Object);
serialise_object(Object, true) -> serialise_object(Object, true, _Method) ->
term_to_binary(Object, [compressed]). term_to_binary(Object, [compressed]).
revert_value_from_journal(JournalBin) -> revert_value_from_journal(JournalBin) ->
@ -289,26 +298,34 @@ revert_value_from_journal(JournalBin, ToIgnoreKeyChanges) ->
<<JBin0:Length0/binary, <<JBin0:Length0/binary,
KeyChangeLength:32/integer, KeyChangeLength:32/integer,
Type:8/integer>> = JournalBin, Type:8/integer>> = JournalBin,
{IsBinary, IsCompressed} = decode_valuetype(Type), {IsBinary, IsCompressed, IsLz4} = decode_valuetype(Type),
Length1 = Length0 - KeyChangeLength, Length1 = Length0 - KeyChangeLength,
case ToIgnoreKeyChanges of case ToIgnoreKeyChanges of
true -> true ->
<<OBin2:Length1/binary, _KCBin2:KeyChangeLength/binary>> = JBin0, <<OBin2:Length1/binary, _KCBin2:KeyChangeLength/binary>> = JBin0,
{deserialise_object(OBin2, IsBinary, IsCompressed), []}; {deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4), []};
false -> false ->
<<OBin2:Length1/binary, KCBin2:KeyChangeLength/binary>> = JBin0, <<OBin2:Length1/binary, KCBin2:KeyChangeLength/binary>> = JBin0,
{deserialise_object(OBin2, IsBinary, IsCompressed), {deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4),
binary_to_term(KCBin2)} binary_to_term(KCBin2)}
end. end.
deserialise_object(Binary, true, true) -> deserialise_object(Binary, true, true, true) ->
{ok, Deflated} = lz4:unpack(Binary),
Deflated;
deserialise_object(Binary, true, true, false) ->
zlib:uncompress(Binary); zlib:uncompress(Binary);
deserialise_object(Binary, true, false) -> deserialise_object(Binary, true, false, _IsLz4) ->
Binary; Binary;
deserialise_object(Binary, false, _) -> deserialise_object(Binary, false, _, _IsLz4) ->
binary_to_term(Binary). binary_to_term(Binary).
encode_valuetype(IsBinary, IsCompressed) -> encode_valuetype(IsBinary, IsCompressed, Method) ->
Bit3 =
case Method of
lz4 -> 4;
native -> 0
end,
Bit2 = Bit2 =
case IsBinary of case IsBinary of
true -> 2; true -> 2;
@ -319,12 +336,13 @@ encode_valuetype(IsBinary, IsCompressed) ->
true -> 1; true -> 1;
false -> 0 false -> 0
end, end,
Bit1 + Bit2. Bit1 + Bit2 + Bit3.
decode_valuetype(TypeInt) -> decode_valuetype(TypeInt) ->
IsCompressed = TypeInt band 1 == 1, IsCompressed = TypeInt band 1 == 1,
IsBinary = TypeInt band 2 == 2, IsBinary = TypeInt band 2 == 2,
{IsBinary, IsCompressed}. IsLz4 = TypeInt band 4 == 4,
{IsBinary, IsCompressed, IsLz4}.
from_journalkey({SQN, _Type, LedgerKey}) -> from_journalkey({SQN, _Type, LedgerKey}) ->
{SQN, LedgerKey}. {SQN, LedgerKey}.

View file

@ -109,7 +109,8 @@
cdb_options, cdb_options,
waste_retention_period :: integer() | undefined, waste_retention_period :: integer() | undefined,
waste_path :: string() | undefined, waste_path :: string() | undefined,
reload_strategy = ?DEFAULT_RELOAD_STRATEGY :: list()}). reload_strategy = ?DEFAULT_RELOAD_STRATEGY :: list(),
compression_method :: lz4|native}).
-record(candidate, {low_sqn :: integer() | undefined, -record(candidate, {low_sqn :: integer() | undefined,
filename :: string() | undefined, filename :: string() | undefined,
@ -167,7 +168,9 @@ init([IClerkOpts]) ->
cdb_options = CDBopts, cdb_options = CDBopts,
reload_strategy = ReloadStrategy, reload_strategy = ReloadStrategy,
waste_path = WP, waste_path = WP,
waste_retention_period = WRP}}. waste_retention_period = WRP,
compression_method =
IClerkOpts#iclerk_options.compression_method}}.
handle_call(_Msg, _From, State) -> handle_call(_Msg, _From, State) ->
{reply, not_supported, State}. {reply, not_supported, State}.
@ -194,7 +197,8 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO},
FilterFun, FilterFun,
FilterServer, FilterServer,
MaxSQN, MaxSQN,
State#state.reload_strategy), State#state.reload_strategy,
State#state.compression_method),
FilesToDelete = lists:map(fun(C) -> FilesToDelete = lists:map(fun(C) ->
{C#candidate.low_sqn, {C#candidate.low_sqn,
C#candidate.filename, C#candidate.filename,
@ -490,7 +494,8 @@ update_inker(Inker, ManifestSlice, FilesToDelete) ->
FilesToDelete), FilesToDelete),
ok. ok.
compact_files(BestRun, CDBopts, FilterFun, FilterServer, MaxSQN, RStrategy) -> compact_files(BestRun, CDBopts, FilterFun, FilterServer,
MaxSQN, RStrategy, PressMethod) ->
BatchesOfPositions = get_all_positions(BestRun, []), BatchesOfPositions = get_all_positions(BestRun, []),
compact_files(BatchesOfPositions, compact_files(BatchesOfPositions,
CDBopts, CDBopts,
@ -499,19 +504,20 @@ compact_files(BestRun, CDBopts, FilterFun, FilterServer, MaxSQN, RStrategy) ->
FilterServer, FilterServer,
MaxSQN, MaxSQN,
RStrategy, RStrategy,
PressMethod,
[]). []).
compact_files([], _CDBopts, null, _FilterFun, _FilterServer, _MaxSQN, compact_files([], _CDBopts, null, _FilterFun, _FilterServer, _MaxSQN,
_RStrategy, ManSlice0) -> _RStrategy, _PressMethod, ManSlice0) ->
ManSlice0; ManSlice0;
compact_files([], _CDBopts, ActiveJournal0, _FilterFun, _FilterServer, _MaxSQN, compact_files([], _CDBopts, ActiveJournal0, _FilterFun, _FilterServer, _MaxSQN,
_RStrategy, ManSlice0) -> _RStrategy, _PressMethod, ManSlice0) ->
ManSlice1 = ManSlice0 ++ leveled_imanifest:generate_entry(ActiveJournal0), ManSlice1 = ManSlice0 ++ leveled_imanifest:generate_entry(ActiveJournal0),
ManSlice1; ManSlice1;
compact_files([Batch|T], CDBopts, ActiveJournal0, compact_files([Batch|T], CDBopts, ActiveJournal0,
FilterFun, FilterServer, MaxSQN, FilterFun, FilterServer, MaxSQN,
RStrategy, ManSlice0) -> RStrategy, PressMethod, ManSlice0) ->
{SrcJournal, PositionList} = Batch, {SrcJournal, PositionList} = Batch,
KVCs0 = leveled_cdb:cdb_directfetch(SrcJournal, KVCs0 = leveled_cdb:cdb_directfetch(SrcJournal,
PositionList, PositionList,
@ -524,9 +530,10 @@ compact_files([Batch|T], CDBopts, ActiveJournal0,
{ActiveJournal1, ManSlice1} = write_values(KVCs1, {ActiveJournal1, ManSlice1} = write_values(KVCs1,
CDBopts, CDBopts,
ActiveJournal0, ActiveJournal0,
ManSlice0), ManSlice0,
PressMethod),
compact_files(T, CDBopts, ActiveJournal1, FilterFun, FilterServer, MaxSQN, compact_files(T, CDBopts, ActiveJournal1, FilterFun, FilterServer, MaxSQN,
RStrategy, ManSlice1). RStrategy, PressMethod, ManSlice1).
get_all_positions([], PositionBatches) -> get_all_positions([], PositionBatches) ->
PositionBatches; PositionBatches;
@ -577,12 +584,12 @@ filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) ->
KVCs). KVCs).
write_values([], _CDBopts, Journal0, ManSlice0) -> write_values([], _CDBopts, Journal0, ManSlice0, _PressMethod) ->
{Journal0, ManSlice0}; {Journal0, ManSlice0};
write_values(KVCList, CDBopts, Journal0, ManSlice0) -> write_values(KVCList, CDBopts, Journal0, ManSlice0, PressMethod) ->
KVList = lists:map(fun({K, V, _C}) -> KVList = lists:map(fun({K, V, _C}) ->
% Compress the value as part of compaction % Compress the value as part of compaction
{K, leveled_codec:maybe_compress(V)} {K, leveled_codec:maybe_compress(V, PressMethod)}
end, end,
KVCList), KVCList),
{ok, Journal1} = case Journal0 of {ok, Journal1} = case Journal0 of
@ -605,7 +612,7 @@ write_values(KVCList, CDBopts, Journal0, ManSlice0) ->
{Journal1, ManSlice0}; {Journal1, ManSlice0};
roll -> roll ->
ManSlice1 = ManSlice0 ++ leveled_imanifest:generate_entry(Journal1), ManSlice1 = ManSlice0 ++ leveled_imanifest:generate_entry(Journal1),
write_values(KVCList, CDBopts, null, ManSlice1) write_values(KVCList, CDBopts, null, ManSlice1, PressMethod)
end. end.
clear_waste(State) -> clear_waste(State) ->
@ -754,7 +761,8 @@ test_ledgerkey(Key) ->
{o, "Bucket", Key, null}. {o, "Bucket", Key, null}.
test_inkerkv(SQN, Key, V, IdxSpecs) -> test_inkerkv(SQN, Key, V, IdxSpecs) ->
leveled_codec:to_inkerkv(test_ledgerkey(Key), SQN, V, IdxSpecs). leveled_codec:to_inkerkv(test_ledgerkey(Key), SQN, V, IdxSpecs,
native, false).
fetch_testcdb(RP) -> fetch_testcdb(RP) ->
FN1 = leveled_inker:filepath(RP, 1, new_journal), FN1 = leveled_inker:filepath(RP, 1, new_journal),
@ -839,7 +847,8 @@ compact_single_file_recovr_test() ->
LedgerFun1, LedgerFun1,
LedgerSrv1, LedgerSrv1,
9, 9,
[{?STD_TAG, recovr}]), [{?STD_TAG, recovr}],
native),
io:format("FN of ~s~n", [FN]), io:format("FN of ~s~n", [FN]),
?assertMatch(2, LowSQN), ?assertMatch(2, LowSQN),
?assertMatch(probably, ?assertMatch(probably,
@ -876,7 +885,8 @@ compact_single_file_retain_test() ->
LedgerFun1, LedgerFun1,
LedgerSrv1, LedgerSrv1,
9, 9,
[{?STD_TAG, retain}]), [{?STD_TAG, retain}],
native),
io:format("FN of ~s~n", [FN]), io:format("FN of ~s~n", [FN]),
?assertMatch(1, LowSQN), ?assertMatch(1, LowSQN),
?assertMatch(probably, ?assertMatch(probably,
@ -936,13 +946,16 @@ compact_singlefile_totwosmallfiles_testto() ->
lists:foreach(fun(X) -> lists:foreach(fun(X) ->
LK = test_ledgerkey("Key" ++ integer_to_list(X)), LK = test_ledgerkey("Key" ++ integer_to_list(X)),
Value = leveled_rand:rand_bytes(1024), Value = leveled_rand:rand_bytes(1024),
{IK, IV} = leveled_codec:to_inkerkv(LK, X, Value, []), {IK, IV} =
leveled_codec:to_inkerkv(LK, X, Value, [],
native, true),
ok = leveled_cdb:cdb_put(CDB1, IK, IV) ok = leveled_cdb:cdb_put(CDB1, IK, IV)
end, end,
lists:seq(1, 1000)), lists:seq(1, 1000)),
{ok, NewName} = leveled_cdb:cdb_complete(CDB1), {ok, NewName} = leveled_cdb:cdb_complete(CDB1),
{ok, CDBr} = leveled_cdb:cdb_open_reader(NewName), {ok, CDBr} = leveled_cdb:cdb_open_reader(NewName),
CDBoptsSmall = #cdb_options{binary_mode=true, max_size=400000, file_path=CP}, CDBoptsSmall =
#cdb_options{binary_mode=true, max_size=400000, file_path=CP},
BestRun1 = [#candidate{low_sqn=1, BestRun1 = [#candidate{low_sqn=1,
filename=leveled_cdb:cdb_filename(CDBr), filename=leveled_cdb:cdb_filename(CDBr),
journal=CDBr, journal=CDBr,
@ -954,7 +967,8 @@ compact_singlefile_totwosmallfiles_testto() ->
FakeFilterFun, FakeFilterFun,
null, null,
900, 900,
[{?STD_TAG, recovr}]), [{?STD_TAG, recovr}],
native),
?assertMatch(2, length(ManifestSlice)), ?assertMatch(2, length(ManifestSlice)),
lists:foreach(fun({_SQN, _FN, CDB, _LK}) -> lists:foreach(fun({_SQN, _FN, CDB, _LK}) ->
ok = leveled_cdb:cdb_deletepending(CDB), ok = leveled_cdb:cdb_deletepending(CDB),

View file

@ -136,6 +136,8 @@
clerk :: pid() | undefined, clerk :: pid() | undefined,
compaction_pending = false :: boolean(), compaction_pending = false :: boolean(),
is_snapshot = false :: boolean(), is_snapshot = false :: boolean(),
compression_method :: lz4|native,
compress_on_receipt :: boolean(),
source_inker :: pid() | undefined}). source_inker :: pid() | undefined}).
@ -506,10 +508,13 @@ start_from_file(InkOpts) ->
ReloadStrategy = InkOpts#inker_options.reload_strategy, ReloadStrategy = InkOpts#inker_options.reload_strategy,
MRL = InkOpts#inker_options.max_run_length, MRL = InkOpts#inker_options.max_run_length,
WRP = InkOpts#inker_options.waste_retention_period, WRP = InkOpts#inker_options.waste_retention_period,
PressMethod = InkOpts#inker_options.compression_method,
PressOnReceipt = InkOpts#inker_options.compress_on_receipt,
IClerkOpts = #iclerk_options{inker = self(), IClerkOpts = #iclerk_options{inker = self(),
cdb_options=IClerkCDBOpts, cdb_options=IClerkCDBOpts,
waste_retention_period = WRP, waste_retention_period = WRP,
reload_strategy = ReloadStrategy, reload_strategy = ReloadStrategy,
compression_method = PressMethod,
max_run_length = MRL}, max_run_length = MRL},
{ok, Clerk} = leveled_iclerk:clerk_new(IClerkOpts), {ok, Clerk} = leveled_iclerk:clerk_new(IClerkOpts),
@ -525,16 +530,21 @@ start_from_file(InkOpts) ->
active_journaldb = ActiveJournal, active_journaldb = ActiveJournal,
root_path = RootPath, root_path = RootPath,
cdb_options = CDBopts#cdb_options{waste_path=WasteFP}, cdb_options = CDBopts#cdb_options{waste_path=WasteFP},
compression_method = PressMethod,
compress_on_receipt = PressOnReceipt,
clerk = Clerk}}. clerk = Clerk}}.
put_object(LedgerKey, Object, KeyChanges, State) -> put_object(LedgerKey, Object, KeyChanges, State) ->
NewSQN = State#state.journal_sqn + 1, NewSQN = State#state.journal_sqn + 1,
ActiveJournal = State#state.active_journaldb, ActiveJournal = State#state.active_journaldb,
{JournalKey, JournalBin} = leveled_codec:to_inkerkv(LedgerKey, {JournalKey, JournalBin} =
NewSQN, leveled_codec:to_inkerkv(LedgerKey,
Object, NewSQN,
KeyChanges), Object,
KeyChanges,
State#state.compression_method,
State#state.compress_on_receipt),
case leveled_cdb:cdb_put(ActiveJournal, case leveled_cdb:cdb_put(ActiveJournal,
JournalKey, JournalKey,
JournalBin) of JournalBin) of
@ -576,19 +586,15 @@ get_object(LedgerKey, SQN, Manifest) ->
get_object(LedgerKey, SQN, Manifest, ToIgnoreKeyChanges) -> get_object(LedgerKey, SQN, Manifest, ToIgnoreKeyChanges) ->
JournalP = leveled_imanifest:find_entry(SQN, Manifest), JournalP = leveled_imanifest:find_entry(SQN, Manifest),
{InkerKey, _V, true} = leveled_codec:to_inkerkv(LedgerKey, {InkerKey, _V, true} =
SQN, leveled_codec:to_inkerkv(LedgerKey, SQN, to_fetch),
to_fetch,
null),
Obj = leveled_cdb:cdb_get(JournalP, InkerKey), Obj = leveled_cdb:cdb_get(JournalP, InkerKey),
leveled_codec:from_inkerkv(Obj, ToIgnoreKeyChanges). leveled_codec:from_inkerkv(Obj, ToIgnoreKeyChanges).
key_check(LedgerKey, SQN, Manifest) -> key_check(LedgerKey, SQN, Manifest) ->
JournalP = leveled_imanifest:find_entry(SQN, Manifest), JournalP = leveled_imanifest:find_entry(SQN, Manifest),
{InkerKey, _V, true} = leveled_codec:to_inkerkv(LedgerKey, {InkerKey, _V, true} =
SQN, leveled_codec:to_inkerkv(LedgerKey, SQN, to_fetch),
to_fetch,
null),
leveled_cdb:cdb_keycheck(JournalP, InkerKey). leveled_cdb:cdb_keycheck(JournalP, InkerKey).
build_manifest(ManifestFilenames, build_manifest(ManifestFilenames,
@ -848,7 +854,7 @@ initiate_penciller_snapshot(Bookie) ->
-ifdef(TEST). -ifdef(TEST).
create_value_for_journal(Obj, Comp) -> create_value_for_journal(Obj, Comp) ->
leveled_codec:create_value_for_journal(Obj, Comp). leveled_codec:create_value_for_journal(Obj, Comp, native).
build_dummy_journal() -> build_dummy_journal() ->
F = fun(X) -> X end, F = fun(X) -> X end,
@ -930,7 +936,9 @@ simple_inker_test() ->
build_dummy_journal(), build_dummy_journal(),
CDBopts = #cdb_options{max_size=300000, binary_mode=true}, CDBopts = #cdb_options{max_size=300000, binary_mode=true},
{ok, Ink1} = ink_start(#inker_options{root_path=RootPath, {ok, Ink1} = ink_start(#inker_options{root_path=RootPath,
cdb_options=CDBopts}), cdb_options=CDBopts,
compression_method=native,
compress_on_receipt=true}),
Obj1 = ink_get(Ink1, "Key1", 1), Obj1 = ink_get(Ink1, "Key1", 1),
?assertMatch({{1, "Key1"}, {"TestValue1", []}}, Obj1), ?assertMatch({{1, "Key1"}, {"TestValue1", []}}, Obj1),
Obj3 = ink_get(Ink1, "Key1", 3), Obj3 = ink_get(Ink1, "Key1", 3),
@ -952,7 +960,9 @@ simple_inker_completeactivejournal_test() ->
F1r = filename:join(JournalFP, "nursery_1.pnd"), F1r = filename:join(JournalFP, "nursery_1.pnd"),
ok = file:rename(F1, F1r), ok = file:rename(F1, F1r),
{ok, Ink1} = ink_start(#inker_options{root_path=RootPath, {ok, Ink1} = ink_start(#inker_options{root_path=RootPath,
cdb_options=CDBopts}), cdb_options=CDBopts,
compression_method=native,
compress_on_receipt=true}),
Obj1 = ink_get(Ink1, "Key1", 1), Obj1 = ink_get(Ink1, "Key1", 1),
?assertMatch({{1, "Key1"}, {"TestValue1", []}}, Obj1), ?assertMatch({{1, "Key1"}, {"TestValue1", []}}, Obj1),
Obj2 = ink_get(Ink1, "Key4", 4), Obj2 = ink_get(Ink1, "Key4", 4),
@ -970,7 +980,9 @@ compact_journal_test() ->
RStrategy = [{?STD_TAG, recovr}], RStrategy = [{?STD_TAG, recovr}],
{ok, Ink1} = ink_start(#inker_options{root_path=RootPath, {ok, Ink1} = ink_start(#inker_options{root_path=RootPath,
cdb_options=CDBopts, cdb_options=CDBopts,
reload_strategy=RStrategy}), reload_strategy=RStrategy,
compression_method=native,
compress_on_receipt=false}),
{ok, NewSQN1, _ObjSize} = ink_put(Ink1, {ok, NewSQN1, _ObjSize} = ink_put(Ink1,
test_ledgerkey("KeyAA"), test_ledgerkey("KeyAA"),
"TestValueAA", "TestValueAA",
@ -1030,7 +1042,9 @@ empty_manifest_test() ->
clean_testdir(RootPath), clean_testdir(RootPath),
CDBopts = #cdb_options{max_size=300000}, CDBopts = #cdb_options{max_size=300000},
{ok, Ink1} = ink_start(#inker_options{root_path=RootPath, {ok, Ink1} = ink_start(#inker_options{root_path=RootPath,
cdb_options=CDBopts}), cdb_options=CDBopts,
compression_method=native,
compress_on_receipt=true}),
?assertMatch(not_present, ink_fetch(Ink1, "Key1", 1)), ?assertMatch(not_present, ink_fetch(Ink1, "Key1", 1)),
CheckFun = fun(L, K, SQN) -> lists:member({SQN, K}, L) end, CheckFun = fun(L, K, SQN) -> lists:member({SQN, K}, L) end,
@ -1050,7 +1064,9 @@ empty_manifest_test() ->
ok = file:write_file(FN, term_to_binary("Hello")), ok = file:write_file(FN, term_to_binary("Hello")),
{ok, Ink2} = ink_start(#inker_options{root_path=RootPath, {ok, Ink2} = ink_start(#inker_options{root_path=RootPath,
cdb_options=CDBopts}), cdb_options=CDBopts,
compression_method=native,
compress_on_receipt=false}),
?assertMatch(not_present, ink_fetch(Ink2, "Key1", 1)), ?assertMatch(not_present, ink_fetch(Ink2, "Key1", 1)),
{ok, SQN, Size} = ink_put(Ink2, "Key1", "Value1", {[], infinity}), {ok, SQN, Size} = ink_put(Ink2, "Key1", "Value1", {[], infinity}),
?assertMatch(2, SQN), ?assertMatch(2, SQN),

View file

@ -35,7 +35,7 @@
]). ]).
-export([ -export([
clerk_new/2, clerk_new/3,
clerk_prompt/1, clerk_prompt/1,
clerk_push/2, clerk_push/2,
clerk_close/1, clerk_close/1,
@ -49,15 +49,19 @@
-record(state, {owner :: pid() | undefined, -record(state, {owner :: pid() | undefined,
root_path :: string() | undefined, root_path :: string() | undefined,
pending_deletions = dict:new() % OTP 16 does not like type pending_deletions = dict:new(), % OTP 16 does not like type
compression_method :: lz4|native
}). }).
%%%============================================================================ %%%============================================================================
%%% API %%% API
%%%============================================================================ %%%============================================================================
clerk_new(Owner, Manifest) -> clerk_new(Owner, Manifest, CompressionMethod) ->
{ok, Pid} = gen_server:start(?MODULE, [], []), {ok, Pid} =
gen_server:start(?MODULE,
[{compression_method, CompressionMethod}],
[]),
ok = gen_server:call(Pid, {load, Owner, Manifest}, infinity), ok = gen_server:call(Pid, {load, Owner, Manifest}, infinity),
leveled_log:log("PC001", [Pid, Owner]), leveled_log:log("PC001", [Pid, Owner]),
{ok, Pid}. {ok, Pid}.
@ -78,8 +82,8 @@ clerk_close(Pid) ->
%%% gen_server callbacks %%% gen_server callbacks
%%%============================================================================ %%%============================================================================
init([]) -> init([{compression_method, CompressionMethod}]) ->
{ok, #state{}}. {ok, #state{compression_method = CompressionMethod}}.
handle_call({load, Owner, RootPath}, _From, State) -> handle_call({load, Owner, RootPath}, _From, State) ->
{reply, ok, State#state{owner=Owner, root_path=RootPath}, ?MIN_TIMEOUT}; {reply, ok, State#state{owner=Owner, root_path=RootPath}, ?MIN_TIMEOUT};
@ -120,7 +124,8 @@ request_work(State) ->
handle_work({SrcLevel, Manifest}, State) -> handle_work({SrcLevel, Manifest}, State) ->
{UpdManifest, EntriesToDelete} = merge(SrcLevel, {UpdManifest, EntriesToDelete} = merge(SrcLevel,
Manifest, Manifest,
State#state.root_path), State#state.root_path,
State#state.compression_method),
leveled_log:log("PC007", []), leveled_log:log("PC007", []),
SWMC = os:timestamp(), SWMC = os:timestamp(),
ok = leveled_penciller:pcl_manifestchange(State#state.owner, ok = leveled_penciller:pcl_manifestchange(State#state.owner,
@ -132,7 +137,7 @@ handle_work({SrcLevel, Manifest}, State) ->
leveled_log:log_timer("PC018", [], SWSM), leveled_log:log_timer("PC018", [], SWSM),
{leveled_pmanifest:get_manifest_sqn(UpdManifest), EntriesToDelete}. {leveled_pmanifest:get_manifest_sqn(UpdManifest), EntriesToDelete}.
merge(SrcLevel, Manifest, RootPath) -> merge(SrcLevel, Manifest, RootPath, CompressionMethod) ->
Src = leveled_pmanifest:mergefile_selector(Manifest, SrcLevel), Src = leveled_pmanifest:mergefile_selector(Manifest, SrcLevel),
NewSQN = leveled_pmanifest:get_manifest_sqn(Manifest) + 1, NewSQN = leveled_pmanifest:get_manifest_sqn(Manifest) + 1,
SinkList = leveled_pmanifest:merge_lookup(Manifest, SinkList = leveled_pmanifest:merge_lookup(Manifest,
@ -152,7 +157,9 @@ merge(SrcLevel, Manifest, RootPath) ->
{Man0, []}; {Man0, []};
_ -> _ ->
SST_RP = leveled_penciller:sst_rootpath(RootPath), SST_RP = leveled_penciller:sst_rootpath(RootPath),
perform_merge(Manifest, Src, SinkList, SrcLevel, SST_RP, NewSQN) perform_merge(Manifest,
Src, SinkList, SrcLevel,
SST_RP, NewSQN, CompressionMethod)
end. end.
notify_deletions([], _Penciller) -> notify_deletions([], _Penciller) ->
@ -167,16 +174,21 @@ notify_deletions([Head|Tail], Penciller) ->
%% %%
%% SrcLevel is the level of the src sst file, the sink should be srcLevel + 1 %% SrcLevel is the level of the src sst file, the sink should be srcLevel + 1
perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN) -> perform_merge(Manifest,
Src, SinkList, SrcLevel,
RootPath, NewSQN,
CompressionMethod) ->
leveled_log:log("PC010", [Src#manifest_entry.filename, NewSQN]), leveled_log:log("PC010", [Src#manifest_entry.filename, NewSQN]),
SrcList = [{next, Src, all}], SrcList = [{next, Src, all}],
MaxSQN = leveled_sst:sst_getmaxsequencenumber(Src#manifest_entry.owner), MaxSQN = leveled_sst:sst_getmaxsequencenumber(Src#manifest_entry.owner),
SinkLevel = SrcLevel + 1, SinkLevel = SrcLevel + 1,
SinkBasement = leveled_pmanifest:is_basement(Manifest, SinkLevel), SinkBasement = leveled_pmanifest:is_basement(Manifest, SinkLevel),
Additions = do_merge(SrcList, SinkList, Additions =
SinkLevel, SinkBasement, do_merge(SrcList, SinkList,
RootPath, NewSQN, MaxSQN, SinkLevel, SinkBasement,
[]), RootPath, NewSQN, MaxSQN,
CompressionMethod,
[]),
RevertPointerFun = RevertPointerFun =
fun({next, ME, _SK}) -> fun({next, ME, _SK}) ->
ME ME
@ -193,22 +205,23 @@ perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN) ->
Src), Src),
{Man2, [Src|SinkManifestList]}. {Man2, [Src|SinkManifestList]}.
do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, Additions) -> do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, _CM, Additions) ->
leveled_log:log("PC011", [NewSQN, SinkLevel, length(Additions)]), leveled_log:log("PC011", [NewSQN, SinkLevel, length(Additions)]),
Additions; Additions;
do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, Additions) -> do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, CM, Additions) ->
FileName = leveled_penciller:sst_filename(NewSQN, FileName = leveled_penciller:sst_filename(NewSQN,
SinkLevel, SinkLevel,
length(Additions)), length(Additions)),
leveled_log:log("PC012", [NewSQN, FileName, SinkB]), leveled_log:log("PC012", [NewSQN, FileName, SinkB]),
TS1 = os:timestamp(), TS1 = os:timestamp(),
case leveled_sst:sst_new(RP, FileName, case leveled_sst:sst_new(RP, FileName,
KL1, KL2, SinkB, SinkLevel, MaxSQN) of KL1, KL2, SinkB, SinkLevel, MaxSQN, CM) of
empty -> empty ->
leveled_log:log("PC013", [FileName]), leveled_log:log("PC013", [FileName]),
do_merge([], [], do_merge([], [],
SinkLevel, SinkB, SinkLevel, SinkB,
RP, NewSQN, MaxSQN, RP, NewSQN, MaxSQN,
CM,
Additions); Additions);
{ok, Pid, Reply} -> {ok, Pid, Reply} ->
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply, {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply,
@ -220,6 +233,7 @@ do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, Additions) ->
do_merge(KL1Rem, KL2Rem, do_merge(KL1Rem, KL2Rem,
SinkLevel, SinkB, SinkLevel, SinkB,
RP, NewSQN, MaxSQN, RP, NewSQN, MaxSQN,
CM,
Additions ++ [Entry]) Additions ++ [Entry])
end. end.
@ -265,31 +279,36 @@ merge_file_test() ->
"KL1_L1.sst", "KL1_L1.sst",
1, 1,
KL1_L1, KL1_L1,
999999), 999999,
native),
KL1_L2 = lists:sort(generate_randomkeys(8000, 0, 250)), KL1_L2 = lists:sort(generate_randomkeys(8000, 0, 250)),
{ok, PidL2_1, _} = leveled_sst:sst_new("../test/", {ok, PidL2_1, _} = leveled_sst:sst_new("../test/",
"KL1_L2.sst", "KL1_L2.sst",
2, 2,
KL1_L2, KL1_L2,
999999), 999999,
native),
KL2_L2 = lists:sort(generate_randomkeys(8000, 250, 250)), KL2_L2 = lists:sort(generate_randomkeys(8000, 250, 250)),
{ok, PidL2_2, _} = leveled_sst:sst_new("../test/", {ok, PidL2_2, _} = leveled_sst:sst_new("../test/",
"KL2_L2.sst", "KL2_L2.sst",
2, 2,
KL2_L2, KL2_L2,
999999), 999999,
lz4),
KL3_L2 = lists:sort(generate_randomkeys(8000, 500, 250)), KL3_L2 = lists:sort(generate_randomkeys(8000, 500, 250)),
{ok, PidL2_3, _} = leveled_sst:sst_new("../test/", {ok, PidL2_3, _} = leveled_sst:sst_new("../test/",
"KL3_L2.sst", "KL3_L2.sst",
2, 2,
KL3_L2, KL3_L2,
999999), 999999,
lz4),
KL4_L2 = lists:sort(generate_randomkeys(8000, 750, 250)), KL4_L2 = lists:sort(generate_randomkeys(8000, 750, 250)),
{ok, PidL2_4, _} = leveled_sst:sst_new("../test/", {ok, PidL2_4, _} = leveled_sst:sst_new("../test/",
"KL4_L2.sst", "KL4_L2.sst",
2, 2,
KL4_L2, KL4_L2,
999999), 999999,
lz4),
E1 = #manifest_entry{owner = PidL1_1, E1 = #manifest_entry{owner = PidL1_1,
filename = "./KL1_L1.sst", filename = "./KL1_L1.sst",
@ -321,7 +340,8 @@ merge_file_test() ->
PointerList = lists:map(fun(ME) -> {next, ME, all} end, PointerList = lists:map(fun(ME) -> {next, ME, all} end,
[E2, E3, E4, E5]), [E2, E3, E4, E5]),
{Man6, _Dels} = perform_merge(Man5, E1, PointerList, 1, "../test", 3), {Man6, _Dels} =
perform_merge(Man5, E1, PointerList, 1, "../test", 3, native),
?assertMatch(3, leveled_pmanifest:get_manifest_sqn(Man6)). ?assertMatch(3, leveled_pmanifest:get_manifest_sqn(Man6)).

View file

@ -244,7 +244,9 @@
work_ongoing = false :: boolean(), % i.e. compaction work work_ongoing = false :: boolean(), % i.e. compaction work
work_backlog = false :: boolean(), % i.e. compaction work work_backlog = false :: boolean(), % i.e. compaction work
head_timing :: tuple() | undefined}). head_timing :: tuple() | undefined,
compression_method :: lz4|native}).
-type penciller_options() :: #penciller_options{}. -type penciller_options() :: #penciller_options{}.
-type bookies_memory() :: {tuple()|empty_cache, -type bookies_memory() :: {tuple()|empty_cache,
@ -861,25 +863,28 @@ sst_filename(ManSQN, Level, Count) ->
start_from_file(PCLopts) -> start_from_file(PCLopts) ->
RootPath = PCLopts#penciller_options.root_path, RootPath = PCLopts#penciller_options.root_path,
MaxTableSize = case PCLopts#penciller_options.max_inmemory_tablesize of MaxTableSize =
undefined -> case PCLopts#penciller_options.max_inmemory_tablesize of
?MAX_TABLESIZE; undefined ->
M -> ?MAX_TABLESIZE;
M M ->
end, M
end,
PressMethod = PCLopts#penciller_options.compression_method,
{ok, MergeClerk} = leveled_pclerk:clerk_new(self(), RootPath), {ok, MergeClerk} = leveled_pclerk:clerk_new(self(), RootPath, PressMethod),
CoinToss = PCLopts#penciller_options.levelzero_cointoss, CoinToss = PCLopts#penciller_options.levelzero_cointoss,
% Used to randomly defer the writing of L0 file. Intended to help with % Used to randomly defer the writing of L0 file. Intended to help with
% vnode syncronisation issues (e.g. stop them all by default merging to % vnode syncronisation issues (e.g. stop them all by default merging to
% level zero concurrently) % level zero concurrently)
InitState = #state{clerk=MergeClerk, InitState = #state{clerk = MergeClerk,
root_path=RootPath, root_path = RootPath,
levelzero_maxcachesize=MaxTableSize, levelzero_maxcachesize = MaxTableSize,
levelzero_cointoss=CoinToss, levelzero_cointoss = CoinToss,
levelzero_index=leveled_pmem:new_index()}, levelzero_index = leveled_pmem:new_index(),
compression_method = PressMethod},
%% Open manifest %% Open manifest
Manifest0 = leveled_pmanifest:open_manifest(RootPath), Manifest0 = leveled_pmanifest:open_manifest(RootPath),
@ -887,7 +892,8 @@ start_from_file(PCLopts) ->
fun(FN) -> fun(FN) ->
{ok, {ok,
Pid, Pid,
{_FK, _LK}} = leveled_sst:sst_open(sst_rootpath(RootPath), FN), {_FK, _LK}} =
leveled_sst:sst_open(sst_rootpath(RootPath), FN),
Pid Pid
end, end,
SQNFun = fun leveled_sst:sst_getmaxsequencenumber/1, SQNFun = fun leveled_sst:sst_getmaxsequencenumber/1,
@ -1033,7 +1039,8 @@ roll_memory(State, false) ->
length(State#state.levelzero_cache), length(State#state.levelzero_cache),
FetchFun, FetchFun,
PCL, PCL,
State#state.ledger_sqn), State#state.ledger_sqn,
State#state.compression_method),
{ok, Constructor, _} = R, {ok, Constructor, _} = R,
Constructor; Constructor;
roll_memory(State, true) -> roll_memory(State, true) ->
@ -1047,7 +1054,8 @@ roll_memory(State, true) ->
FileName, FileName,
0, 0,
KVList, KVList,
State#state.ledger_sqn), State#state.ledger_sqn,
State#state.compression_method),
{ok, Constructor, _} = R, {ok, Constructor, _} = R,
Constructor. Constructor.
@ -1498,7 +1506,8 @@ simple_server_test() ->
RootPath = "../test/ledger", RootPath = "../test/ledger",
clean_testdir(RootPath), clean_testdir(RootPath),
{ok, PCL} = pcl_start(#penciller_options{root_path=RootPath, {ok, PCL} = pcl_start(#penciller_options{root_path=RootPath,
max_inmemory_tablesize=1000}), max_inmemory_tablesize=1000,
compression_method=native}),
Key1_Pre = {{o,"Bucket0001", "Key0001", null}, Key1_Pre = {{o,"Bucket0001", "Key0001", null},
{1, {active, infinity}, null}}, {1, {active, infinity}, null}},
Key1 = add_missing_hash(Key1_Pre), Key1 = add_missing_hash(Key1_Pre),
@ -1538,7 +1547,8 @@ simple_server_test() ->
ok = pcl_close(PCL), ok = pcl_close(PCL),
{ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath, {ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath,
max_inmemory_tablesize=1000}), max_inmemory_tablesize=1000,
compression_method=native}),
?assertMatch(2003, pcl_getstartupsequencenumber(PCLr)), ?assertMatch(2003, pcl_getstartupsequencenumber(PCLr)),
% ok = maybe_pause_push(PCLr, [Key2] ++ KL2 ++ [Key3]), % ok = maybe_pause_push(PCLr, [Key2] ++ KL2 ++ [Key3]),
@ -1790,7 +1800,8 @@ create_file_test() ->
1, 1,
FetchFun, FetchFun,
undefined, undefined,
10000), 10000,
native),
lists:foreach(fun(X) -> lists:foreach(fun(X) ->
case checkready(SP) of case checkready(SP) of
timeout -> timeout ->

File diff suppressed because it is too large Load diff

View file

@ -9,7 +9,8 @@
load_and_count/1, load_and_count/1,
load_and_count_withdelete/1, load_and_count_withdelete/1,
space_clear_ondelete/1, space_clear_ondelete/1,
is_empty_test/1 is_empty_test/1,
many_put_fetch_switchcompression/1
]). ]).
all() -> [ all() -> [
@ -20,7 +21,8 @@ all() -> [
load_and_count, load_and_count,
load_and_count_withdelete, load_and_count_withdelete,
space_clear_ondelete, space_clear_ondelete,
is_empty_test is_empty_test,
many_put_fetch_switchcompression
]. ].
@ -72,7 +74,8 @@ many_put_fetch_head(_Config) ->
RootPath = testutil:reset_filestructure(), RootPath = testutil:reset_filestructure(),
StartOpts1 = [{root_path, RootPath}, StartOpts1 = [{root_path, RootPath},
{max_pencillercachesize, 16000}, {max_pencillercachesize, 16000},
{sync_strategy, riak_sync}], {sync_strategy, riak_sync},
{compression_point, on_compact}],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1), {ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
{TestObject, TestSpec} = testutil:generate_testobject(), {TestObject, TestSpec} = testutil:generate_testobject(),
ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), ok = testutil:book_riakput(Bookie1, TestObject, TestSpec),
@ -81,7 +84,8 @@ many_put_fetch_head(_Config) ->
StartOpts2 = [{root_path, RootPath}, StartOpts2 = [{root_path, RootPath},
{max_journalsize, 500000000}, {max_journalsize, 500000000},
{max_pencillercachesize, 32000}, {max_pencillercachesize, 32000},
{sync_strategy, testutil:sync_strategy()}], {sync_strategy, testutil:sync_strategy()},
{compression_point, on_receipt}],
{ok, Bookie2} = leveled_bookie:book_start(StartOpts2), {ok, Bookie2} = leveled_bookie:book_start(StartOpts2),
testutil:check_forobject(Bookie2, TestObject), testutil:check_forobject(Bookie2, TestObject),
GenList = [2, 20002, 40002, 60002, 80002, GenList = [2, 20002, 40002, 60002, 80002,
@ -683,3 +687,49 @@ is_empty_test(_Config) ->
true = sets:size(BLpd3()) == 0, true = sets:size(BLpd3()) == 0,
ok = leveled_bookie:book_close(Bookie1). ok = leveled_bookie:book_close(Bookie1).
many_put_fetch_switchcompression(_Config) ->
RootPath = testutil:reset_filestructure(),
StartOpts1 = [{root_path, RootPath},
{max_pencillercachesize, 16000},
{sync_strategy, riak_sync},
{compression_method, native}],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
{TestObject, TestSpec} = testutil:generate_testobject(),
ok = testutil:book_riakput(Bookie1, TestObject, TestSpec),
testutil:check_forobject(Bookie1, TestObject),
ok = leveled_bookie:book_close(Bookie1),
StartOpts2 = [{root_path, RootPath},
{max_journalsize, 500000000},
{max_pencillercachesize, 32000},
{sync_strategy, testutil:sync_strategy()},
{compression_method, lz4}],
%% Change compression method
{ok, Bookie2} = leveled_bookie:book_start(StartOpts2),
testutil:check_forobject(Bookie2, TestObject),
GenList = [2, 40002, 80002, 120002],
CLs = testutil:load_objects(40000, GenList, Bookie2, TestObject,
fun testutil:generate_smallobjects/2),
CL1A = lists:nth(1, CLs),
ChkListFixed = lists:nth(length(CLs), CLs),
testutil:check_forlist(Bookie2, CL1A),
ObjList2A = testutil:generate_objects(5000, 2),
testutil:riakload(Bookie2, ObjList2A),
ChkList2A = lists:sublist(lists:sort(ObjList2A), 1000),
testutil:check_forlist(Bookie2, ChkList2A),
testutil:check_forlist(Bookie2, ChkListFixed),
testutil:check_forobject(Bookie2, TestObject),
testutil:check_forlist(Bookie2, ChkList2A),
testutil:check_forlist(Bookie2, ChkListFixed),
testutil:check_forobject(Bookie2, TestObject),
ok = leveled_bookie:book_close(Bookie2),
%% Change method back again
{ok, Bookie3} = leveled_bookie:book_start(StartOpts1),
testutil:check_forlist(Bookie3, ChkList2A),
testutil:check_forlist(Bookie3, ChkListFixed),
testutil:check_forobject(Bookie3, TestObject),
testutil:check_formissingobject(Bookie3, "Bookie1", "MissingKey0123"),
ok = leveled_bookie:book_destroy(Bookie3).

View file

@ -305,17 +305,21 @@ aae_bustedjournal(_Config) ->
journal_compaction_bustedjournal(_Config) -> journal_compaction_bustedjournal(_Config) ->
% Different circumstances will be created in different runs % Different circumstances will be created in different runs
busted_journal_test(10000000), busted_journal_test(10000000, native, on_receipt, true),
busted_journal_test(7777777). busted_journal_test(7777777, lz4, on_compact, true),
busted_journal_test(8888888, lz4, on_receipt, true),
busted_journal_test(7777777, lz4, on_compact, false).
busted_journal_test(MaxJournalSize) -> busted_journal_test(MaxJournalSize, PressMethod, PressPoint, Bust) ->
% Simply confirms that none of this causes a crash % Simply confirms that none of this causes a crash
RootPath = testutil:reset_filestructure(), RootPath = testutil:reset_filestructure(),
StartOpts1 = [{root_path, RootPath}, StartOpts1 = [{root_path, RootPath},
{max_journalsize, MaxJournalSize}, {max_journalsize, MaxJournalSize},
{max_run_length, 10}, {max_run_length, 10},
{sync_strategy, testutil:sync_strategy()}], {sync_strategy, testutil:sync_strategy()},
{compression_method, PressMethod},
{compression_point, PressPoint}],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1), {ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
{TestObject, TestSpec} = testutil:generate_testobject(), {TestObject, TestSpec} = testutil:generate_testobject(),
ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), ok = testutil:book_riakput(Bookie1, TestObject, TestSpec),
@ -331,11 +335,18 @@ busted_journal_test(MaxJournalSize) ->
ObjList2), ObjList2),
ok = leveled_bookie:book_close(Bookie1), ok = leveled_bookie:book_close(Bookie1),
CDBFiles = testutil:find_journals(RootPath), case Bust of
lists:foreach(fun(FN) -> true ->
testutil:corrupt_journal(RootPath, FN, 100, 2048, 1000) CDBFiles = testutil:find_journals(RootPath),
end, lists:foreach(fun(FN) ->
CDBFiles), testutil:corrupt_journal(RootPath,
FN,
100, 2048, 1000)
end,
CDBFiles);
false ->
ok
end,
{ok, Bookie2} = leveled_bookie:book_start(StartOpts1), {ok, Bookie2} = leveled_bookie:book_start(StartOpts1),
@ -358,6 +369,7 @@ busted_journal_test(MaxJournalSize) ->
testutil:reset_filestructure(10000). testutil:reset_filestructure(10000).
rotating_object_check(BookOpts, B, NumberOfObjects) -> rotating_object_check(BookOpts, B, NumberOfObjects) ->
{ok, Book1} = leveled_bookie:book_start(BookOpts), {ok, Book1} = leveled_bookie:book_start(BookOpts),
{KSpcL1, V1} = testutil:put_indexed_objects(Book1, B, NumberOfObjects), {KSpcL1, V1} = testutil:put_indexed_objects(Book1, B, NumberOfObjects),