Add ZSTD compression (#430)

* Add support for zstd and split compression

Add support for using zstd as an alternative to native, lz4.

Upgrade lz4 to v1.9.4 (with ARM enhancements).

Allow for split compression algorithms - i.e. use native on journal, but lz4 on ledger.

* Switch to AdRoll zstd

Development appears to be active and ongoing.  No issues running on different linux flavours.

* Use realistic bucket name

* Update README.md

* Switch branch

* Add comment following review
This commit is contained in:
Martin Sumner 2024-01-23 16:25:03 +00:00 committed by GitHub
parent c294570bce
commit 999ce8ba5b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 156 additions and 73 deletions

View file

@ -78,8 +78,8 @@ In order to contribute to leveled, fork the repository, make a branch for your c
To have rebar3 execute the full set of tests, run:
```rebar3 as test do xref, dialyzer, cover --reset, eunit --cover, ct --cover, cover --verbose```
```./rebar3 do xref, dialyzer, cover --reset, eunit --cover, ct --cover, cover --verbose```
For those with a Quickcheck license, property-based tests can also be run using:
```rebar3 as eqc do eunit --module=leveled_simpleeqc, eunit --module=leveled_statemeqc```
```./rebar3 as eqc do eunit --module=leveled_simpleeqc, eunit --module=leveled_statemeqc```

View file

@ -43,13 +43,23 @@
]}.
%% @doc Compression method
%% Can be lz4 or native (which will use the Erlang native zlib compression)
%% within term_to_binary
%% Can be lz4, zstd or native (which will use the Erlang native zlib
%% compression) within term_to_binary
{mapping, "leveled.compression_method", "leveled.compression_method", [
{datatype, {enum, [native, lz4, none]}},
{datatype, {enum, [native, lz4, zstd, none]}},
{default, native}
]}.
%% @doc Ledger compression
%% If an alternative compression method is preferred specifically for the
%% ledger, it can be specified here. Default is as_store - use whatever method
%% has been defined in leveled.compression.method. Alternatives are native,
%% lz4, ztsd and none
{mapping, "leveled.ledger_compression", "leveled.ledger_compression", [
{datatype, {enum, [as_store, native, lz4, zstd, none]}},
{default, as_store}
]}.
%% @doc Compression point
%% The point at which compression is applied to the Journal (the Ledger is
%% always compressed). Use on_receipt or on_compact. on_compact is suitable

View file

@ -39,11 +39,21 @@
%% Can be lz4 or native (which will use the Erlang native zlib compression)
%% within term_to_binary
{mapping, "multi_backend.$name.leveled.compression_method", "riak_kv.multi_backend", [
{datatype, {enum, [native, lz4, none]}},
{datatype, {enum, [native, lz4, zstd, none]}},
{default, native},
hidden
]}.
%% @doc Ledger compression
%% If an alternative compression method is preferred specifically for the
%% ledger, it can be specified here. Default is as_store - use whatever method
%% has been defined in leveled.compression.method. Alternatives are native,
%% lz4, ztsd and none
{mapping, "multi_backend.$name.ledger_compression", "riak_kv.multi_backend", [
{datatype, {enum, [as_store, native, lz4, zstd, none]}},
{default, as_store}
]}.
%% @doc Compression point
%% The point at which compression is applied to the Journal (the Ledger is
%% always compressed). Use on_receipt or on_compact. on_compact is suitable

View file

@ -25,7 +25,8 @@
]}.
{deps, [
{lz4, ".*", {git, "https://github.com/martinsumner/erlang-lz4", {tag, "0.2.5"}}}
{lz4, ".*", {git, "https://github.com/martinsumner/erlang-lz4", {branch, "develop-3.1"}}},
{zstd, ".*", {git, "https://github.com/nhs-riak/zstd-erlang", {branch, "nhse-develop"}}}
]}.
{ct_opts, [{dir, ["test/end_to_end"]}]}.

View file

@ -5,6 +5,7 @@
{registered, []},
{applications, [
lz4,
zstd,
kernel,
stdlib
]},

View file

@ -125,6 +125,7 @@
{max_pencillercachesize, ?MAX_PCL_CACHE_SIZE},
{ledger_preloadpagecache_level, ?SST_PAGECACHELEVEL_LOOKUP},
{compression_method, ?COMPRESSION_METHOD},
{ledger_compression, as_store},
{compression_point, ?COMPRESSION_POINT},
{compression_level, ?COMPRESSION_LEVEL},
{log_level, ?LOG_LEVEL},
@ -292,13 +293,15 @@
% To which level of the ledger should the ledger contents be
% pre-loaded into the pagecache (using fadvise on creation and
% startup)
{compression_method, native|lz4|none} |
{compression_method, native|lz4|zstd|none} |
% Compression method and point allow Leveled to be switched from
% using bif based compression (zlib) to using nif based compression
% (lz4). To disable compression use none. This will disable in
% the ledger as well as the journla (both on_receipt and
% on_compact).
% (lz4 or zstd).
% Defaults to ?COMPRESSION_METHOD
{ledger_compression, as_store|native|lz4|zstd|none} |
% Define an alternative to the compression method to be used by the
% ledger only. Default is as_store - use the method defined as
% compression_method for the whole store
{compression_point, on_compact|on_receipt} |
% The =compression point can be changed between on_receipt (all
% values are compressed as they are received), to on_compact where
@ -1812,6 +1815,14 @@ set_options(Opts, Monitor) ->
true = SFL_CompPerc >= 0.0,
CompressionMethod = proplists:get_value(compression_method, Opts),
JournalCompression = CompressionMethod,
LedgerCompression =
case proplists:get_value(ledger_compression, Opts) of
as_store ->
CompressionMethod;
AltMethod ->
AltMethod
end,
CompressOnReceipt =
case proplists:get_value(compression_point, Opts) of
on_receipt ->
@ -1835,7 +1846,7 @@ set_options(Opts, Monitor) ->
maxrunlength_compactionperc = MRL_CompPerc,
waste_retention_period = WRP,
snaptimeout_long = SnapTimeoutLong,
compression_method = CompressionMethod,
compression_method = JournalCompression,
compress_on_receipt = CompressOnReceipt,
score_onein = ScoreOneIn,
cdb_options =
@ -1854,7 +1865,7 @@ set_options(Opts, Monitor) ->
snaptimeout_long = SnapTimeoutLong,
sst_options =
#sst_options{
press_method = CompressionMethod,
press_method = LedgerCompression,
press_level = CompressionLevel,
log_options = leveled_log:get_opts(),
max_sstslots = MaxSSTSlots,

View file

@ -52,9 +52,6 @@
accumulate_index/2,
count_tombs/2]).
-define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w").
-define(NRT_IDX, "$aae.").
-type tag() ::
leveled_head:object_tag()|?IDX_TAG|?HEAD_TAG|atom().
-type key() ::
@ -108,7 +105,7 @@
-type object_spec() ::
object_spec_v0()|object_spec_v1().
-type compression_method() ::
lz4|native|none.
lz4|native|zstd|none.
-type index_specs() ::
list({add|remove, any(), any()}).
-type journal_keychanges() ::
@ -489,7 +486,6 @@ get_tagstrategy(Tag, Strategy) ->
to_inkerkey(LedgerKey, SQN) ->
{SQN, ?INKT_STND, LedgerKey}.
-spec to_inkerkv(ledger_key(), non_neg_integer(), any(), journal_keychanges(),
compression_method(), boolean()) -> {journal_key(), any()}.
%% @doc
@ -524,7 +520,6 @@ from_inkerkv(Object, ToIgnoreKeyChanges) ->
Object
end.
-spec create_value_for_journal({any(), journal_keychanges()|binary()},
boolean(), compression_method()) -> binary().
%% @doc
@ -549,14 +544,14 @@ maybe_compress(JournalBin, PressMethod) ->
<<JBin0:Length0/binary,
KeyChangeLength:32/integer,
Type:8/integer>> = JournalBin,
{IsBinary, IsCompressed, IsLz4} = decode_valuetype(Type),
{IsBinary, IsCompressed, CompMethod} = decode_valuetype(Type),
case IsCompressed of
true ->
JournalBin;
false ->
Length1 = Length0 - KeyChangeLength,
<<OBin2:Length1/binary, KCBin2:KeyChangeLength/binary>> = JBin0,
V0 = {deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4),
V0 = {deserialise_object(OBin2, IsBinary, IsCompressed, CompMethod),
binary_to_term(KCBin2)},
create_value_for_journal(V0, true, PressMethod)
end.
@ -568,6 +563,8 @@ serialise_object(Object, true, Method) when is_binary(Object) ->
lz4 ->
{ok, Bin} = lz4:pack(Object),
Bin;
zstd ->
zstd:compress(Object);
native ->
zlib:compress(Object);
none ->
@ -590,35 +587,42 @@ revert_value_from_journal(JournalBin, ToIgnoreKeyChanges) ->
<<JBin0:Length0/binary,
KeyChangeLength:32/integer,
Type:8/integer>> = JournalBin,
{IsBinary, IsCompressed, IsLz4} = decode_valuetype(Type),
{IsBinary, IsCompressed, CompMethod} = decode_valuetype(Type),
Length1 = Length0 - KeyChangeLength,
case ToIgnoreKeyChanges of
true ->
<<OBin2:Length1/binary, _KCBin2:KeyChangeLength/binary>> = JBin0,
{deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4),
{deserialise_object(OBin2, IsBinary, IsCompressed, CompMethod),
{[], infinity}};
false ->
<<OBin2:Length1/binary, KCBin2:KeyChangeLength/binary>> = JBin0,
{deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4),
{deserialise_object(OBin2, IsBinary, IsCompressed, CompMethod),
binary_to_term(KCBin2)}
end.
deserialise_object(Binary, true, true, true) ->
deserialise_object(Binary, true, true, lz4) ->
{ok, Deflated} = lz4:unpack(Binary),
Deflated;
deserialise_object(Binary, true, true, false) ->
deserialise_object(Binary, true, true, zstd) ->
zstd:decompress(Binary);
deserialise_object(Binary, true, true, native) ->
zlib:uncompress(Binary);
deserialise_object(Binary, true, false, _IsLz4) ->
deserialise_object(Binary, true, false, _) ->
Binary;
deserialise_object(Binary, false, _, _IsLz4) ->
deserialise_object(Binary, false, _, _) ->
binary_to_term(Binary).
-spec encode_valuetype(boolean(), boolean(), native|lz4|zstd|none) -> 0..15.
%% @doc Note that IsCompressed will be based on the compression_point
%% configuration option when the object is first stored (i.e. only `true` if
%% this is set to `on_receipt`). On compaction this will be set to true.
encode_valuetype(IsBinary, IsCompressed, Method) ->
Bit3 =
{Bit3, Bit4} =
case Method of
lz4 -> 4;
native -> 0;
none -> 0
lz4 -> {4, 0};
zstd -> {4, 8};
native -> {0, 0};
none -> {0, 0}
end,
Bit2 =
case IsBinary of
@ -630,17 +634,26 @@ encode_valuetype(IsBinary, IsCompressed, Method) ->
true -> 1;
false -> 0
end,
Bit1 + Bit2 + Bit3.
Bit1 + Bit2 + Bit3 + Bit4.
-spec decode_valuetype(integer()) -> {boolean(), boolean(), boolean()}.
-spec decode_valuetype(integer())
-> {boolean(), boolean(), compression_method()}.
%% @doc
%% Check bit flags to confirm how the object has been serialised
decode_valuetype(TypeInt) ->
IsCompressed = TypeInt band 1 == 1,
IsBinary = TypeInt band 2 == 2,
IsLz4 = TypeInt band 4 == 4,
{IsBinary, IsCompressed, IsLz4}.
CompressionMethod =
case TypeInt band 12 of
0 ->
native;
4 ->
lz4;
12 ->
zstd
end,
{IsBinary, IsCompressed, CompressionMethod}.
-spec from_journalkey(journal_key()) -> {integer(), ledger_key()}.
%% @doc

View file

@ -151,7 +151,7 @@
-type slot_index_value()
:: #slot_index_value{}.
-type press_method()
:: lz4|native|none.
:: lz4|native|zstd|none.
-type range_endpoint()
:: all|leveled_codec:ledger_key().
-type slot_pointer()
@ -1489,14 +1489,15 @@ read_file(Filename, State, LoadPageCache) ->
Bloom}.
gen_fileversion(PressMethod, IdxModDate, CountOfTombs) ->
% Native or none can be treated the same once written, as reader
% does not need to know as compression info will be in header of the
% Native or none can be treated the same once written, as reader
% does not need to know as compression info will be in header of the
% block
Bit1 =
case PressMethod of
Bit1 =
case PressMethod of
lz4 -> 1;
native -> 0;
none -> 0
none -> 0;
zstd -> 0
end,
Bit2 =
case IdxModDate of
@ -1505,18 +1506,25 @@ gen_fileversion(PressMethod, IdxModDate, CountOfTombs) ->
false ->
0
end,
Bit3 =
Bit3 =
case CountOfTombs of
not_counted ->
0;
_ ->
4
end,
Bit1 + Bit2 + Bit3.
Bit4 =
case PressMethod of
zstd ->
8;
_ ->
0
end,
Bit1 + Bit2 + Bit3 + Bit4.
imp_fileversion(VersionInt, State) ->
UpdState0 =
case VersionInt band 1 of
UpdState0 =
case VersionInt band 1 of
0 ->
State#state{compression_method = native};
1 ->
@ -1529,11 +1537,18 @@ imp_fileversion(VersionInt, State) ->
2 ->
UpdState0#state{index_moddate = true}
end,
case VersionInt band 4 of
0 ->
UpdState1;
4 ->
UpdState1#state{tomb_count = 0}
UpdState2 =
case VersionInt band 4 of
0 ->
UpdState1;
4 ->
UpdState1#state{tomb_count = 0}
end,
case VersionInt band 8 of
0 ->
UpdState2;
8 ->
UpdState2#state{compression_method = zstd}
end.
open_reader(Filename, LoadPageCache) ->
@ -1658,12 +1673,15 @@ serialise_block(Term, native) ->
Bin = term_to_binary(Term, ?BINARY_SETTINGS),
CRC32 = hmac(Bin),
<<Bin/binary, CRC32:32/integer>>;
serialise_block(Term, zstd) ->
Bin = zstd:compress(term_to_binary(Term)),
CRC32 = hmac(Bin),
<<Bin/binary, CRC32:32/integer>>;
serialise_block(Term, none) ->
Bin = term_to_binary(Term),
CRC32 = hmac(Bin),
<<Bin/binary, CRC32:32/integer>>.
-spec deserialise_block(binary(), press_method()) -> any().
%% @doc
%% Convert binary to term
@ -1686,6 +1704,8 @@ deserialise_block(_Bin, _PM) ->
deserialise_checkedblock(Bin, lz4) ->
{ok, Bin0} = lz4:unpack(Bin),
binary_to_term(Bin0);
deserialise_checkedblock(Bin, zstd) ->
binary_to_term(zstd:decompress(Bin));
deserialise_checkedblock(Bin, _Other) ->
% native or none can be treated the same
binary_to_term(Bin).
@ -4207,6 +4227,7 @@ stop_whenstarter_stopped_testto() ->
corrupted_block_range_test() ->
corrupted_block_rangetester(native, 100),
corrupted_block_rangetester(lz4, 100),
corrupted_block_rangetester(zstd, 100),
corrupted_block_rangetester(none, 100).
corrupted_block_rangetester(PressMethod, TestCount) ->
@ -4251,6 +4272,7 @@ corrupted_block_rangetester(PressMethod, TestCount) ->
corrupted_block_fetch_test() ->
corrupted_block_fetch_tester(native),
corrupted_block_fetch_tester(lz4),
corrupted_block_fetch_tester(zstd),
corrupted_block_fetch_tester(none).
corrupted_block_fetch_tester(PressMethod) ->

View file

@ -1036,25 +1036,36 @@ remove_journal_test(_Config) ->
ok = leveled_bookie:book_destroy(Bookie3).
many_put_fetch_switchcompression(_Config) ->
{T0, ok} =
timer:tc(fun many_put_fetch_switchcompression_tester/1, [native]),
{T1, ok} =
timer:tc(fun many_put_fetch_switchcompression_tester/1, [lz4]),
{T2, ok} =
timer:tc(fun many_put_fetch_switchcompression_tester/1, [zstd]),
io:format("Test timings native=~w lz4=~w, zstd=~w", [T0, T1, T2]).
many_put_fetch_switchcompression_tester(CompressionMethod) ->
RootPath = testutil:reset_filestructure(),
StartOpts1 = [{root_path, RootPath},
{max_pencillercachesize, 16000},
{max_journalobjectcount, 30000},
{compression_level, 3},
{sync_strategy, testutil:sync_strategy()},
{compression_method, native}],
{compression_method, native},
{ledger_compression, none}],
StartOpts2 = [{root_path, RootPath},
{max_pencillercachesize, 24000},
{max_journalobjectcount, 30000},
{sync_strategy, testutil:sync_strategy()},
{compression_method, lz4}],
{compression_method, CompressionMethod},
{ledger_compression, as_store}],
StartOpts3 = [{root_path, RootPath},
{max_pencillercachesize, 16000},
{max_journalobjectcount, 30000},
{sync_strategy, testutil:sync_strategy()},
{compression_method, none}],
{compression_method, none},
{ledger_compression, as_store}],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
@ -1173,7 +1184,6 @@ many_put_fetch_switchcompression(_Config) ->
ok = leveled_bookie:book_destroy(Bookie6).
safereaderror_startup(_Config) ->
RootPath = testutil:reset_filestructure(),
StartOpts1 = [{root_path, RootPath},

View file

@ -8,41 +8,45 @@
all() -> [riak_ctperf].
suite() -> [{timetrap, {hours, 16}}].
% For full performance test
riak_fullperf(_Config) ->
R2A = riak_load_tester(<<"B0">>, 2000000, 2048, [], native),
riak_fullperf(2048, zstd, as_store).
riak_fullperf(ObjSize, PM, LC) ->
Bucket = {<<"SensibleBucketTypeName">>, <<"SensibleBucketName0">>},
R2A = riak_load_tester(Bucket, 2000000, ObjSize, [], PM, LC),
output_result(R2A),
R2B = riak_load_tester(<<"B0">>, 2000000, 2048, [], native),
R2B = riak_load_tester(Bucket, 2000000, ObjSize, [], PM, LC),
output_result(R2B),
R2C = riak_load_tester(<<"B0">>, 2000000, 2048, [], native),
R2C = riak_load_tester(Bucket, 2000000, ObjSize, [], PM, LC),
output_result(R2C),
R5A = riak_load_tester(<<"B0">>, 5000000, 2048, [], native),
R5A = riak_load_tester(Bucket, 5000000, ObjSize, [], PM, LC),
output_result(R5A),
R5B = riak_load_tester(<<"B0">>, 5000000, 2048, [], native),
R5B = riak_load_tester(Bucket, 5000000, ObjSize, [], PM, LC),
output_result(R5B),
R10 = riak_load_tester(<<"B0">>, 10000000, 2048, [], native),
R10 = riak_load_tester(Bucket, 10000000, ObjSize, [], PM, LC),
output_result(R10)
.
riak_profileperf(_Config) ->
riak_load_tester(
<<"B0">>,
{<<"SensibleBucketTypeName">>, <<"SensibleBucketName0">>},
2000000,
2048,
[load, head, get, query, mini_query, full, guess, estimate, update],
native).
zstd,
as_store
).
% For standard ct test runs
riak_ctperf(_Config) ->
riak_load_tester(<<"B0">>, 400000, 1024, [], native).
riak_load_tester(<<"B0">>, 400000, 1024, [], native, as_store).
riak_load_tester(Bucket, KeyCount, ObjSize, ProfileList, PressMethod) ->
riak_load_tester(Bucket, KeyCount, ObjSize, ProfileList, PM, LC) ->
ct:log(
?INFO,
"Basic riak test with KeyCount ~w ObjSize ~w",
[KeyCount, ObjSize]
"Basic riak test with KeyCount ~w ObjSize ~w PressMethod ~w Ledger ~w",
[KeyCount, ObjSize, PM, LC]
),
IndexCount = 100000,
@ -55,7 +59,8 @@ riak_load_tester(Bucket, KeyCount, ObjSize, ProfileList, PressMethod) ->
[{root_path, RootPath},
{sync_strategy, testutil:sync_strategy()},
{log_level, warn},
{compression_method, PressMethod},
{compression_method, PM},
{ledger_compression, LC},
{forced_logs,
[b0015, b0016, b0017, b0018, p0032, sst12]}
],
@ -175,7 +180,7 @@ riak_load_tester(Bucket, KeyCount, ObjSize, ProfileList, PressMethod) ->
{_Inker, _Pcl, SSTPids, _PClerk, CDBPids, _IClerk} = get_pids(Bookie1),
leveled_bookie:book_destroy(Bookie1),
{KeyCount, ObjSize, PressMethod,
{KeyCount, ObjSize, {PM, LC},
TotalLoadTime,
TotalHeadTime, TotalGetTime,
TotalQueryTime, TotalMiniQueryTime, FullFoldTime, SegFoldTime,