Make compression algorithm an option

Compression can be switched between LZ4 and zlib (native).

The setting to determine if compression should happen on receipt is now a macro definition in leveled_codec.
This commit is contained in:
Martin Sumner 2017-11-06 15:54:58 +00:00
parent 4c44e86eab
commit 61b7be5039
9 changed files with 474 additions and 264 deletions

View file

@ -35,7 +35,7 @@
]).
-export([
clerk_new/2,
clerk_new/3,
clerk_prompt/1,
clerk_push/2,
clerk_close/1,
@ -49,15 +49,19 @@
-record(state, {owner :: pid() | undefined,
root_path :: string() | undefined,
pending_deletions = dict:new() % OTP 16 does not like type
pending_deletions = dict:new(), % OTP 16 does not like type
compression_method :: lz4|native
}).
%%%============================================================================
%%% API
%%%============================================================================
clerk_new(Owner, Manifest) ->
{ok, Pid} = gen_server:start(?MODULE, [], []),
clerk_new(Owner, Manifest, CompressionMethod) ->
{ok, Pid} =
gen_server:start(?MODULE,
[{compression_method, CompressionMethod}],
[]),
ok = gen_server:call(Pid, {load, Owner, Manifest}, infinity),
leveled_log:log("PC001", [Pid, Owner]),
{ok, Pid}.
@ -78,8 +82,8 @@ clerk_close(Pid) ->
%%% gen_server callbacks
%%%============================================================================
init([]) ->
{ok, #state{}}.
init([{compression_method, CompressionMethod}]) ->
{ok, #state{compression_method = CompressionMethod}}.
handle_call({load, Owner, RootPath}, _From, State) ->
{reply, ok, State#state{owner=Owner, root_path=RootPath}, ?MIN_TIMEOUT};
@ -120,7 +124,8 @@ request_work(State) ->
handle_work({SrcLevel, Manifest}, State) ->
{UpdManifest, EntriesToDelete} = merge(SrcLevel,
Manifest,
State#state.root_path),
State#state.root_path,
State#state.compression_method),
leveled_log:log("PC007", []),
SWMC = os:timestamp(),
ok = leveled_penciller:pcl_manifestchange(State#state.owner,
@ -132,7 +137,7 @@ handle_work({SrcLevel, Manifest}, State) ->
leveled_log:log_timer("PC018", [], SWSM),
{leveled_pmanifest:get_manifest_sqn(UpdManifest), EntriesToDelete}.
merge(SrcLevel, Manifest, RootPath) ->
merge(SrcLevel, Manifest, RootPath, CompressionMethod) ->
Src = leveled_pmanifest:mergefile_selector(Manifest, SrcLevel),
NewSQN = leveled_pmanifest:get_manifest_sqn(Manifest) + 1,
SinkList = leveled_pmanifest:merge_lookup(Manifest,
@ -152,7 +157,9 @@ merge(SrcLevel, Manifest, RootPath) ->
{Man0, []};
_ ->
SST_RP = leveled_penciller:sst_rootpath(RootPath),
perform_merge(Manifest, Src, SinkList, SrcLevel, SST_RP, NewSQN)
perform_merge(Manifest,
Src, SinkList, SrcLevel,
SST_RP, NewSQN, CompressionMethod)
end.
notify_deletions([], _Penciller) ->
@ -167,16 +174,21 @@ notify_deletions([Head|Tail], Penciller) ->
%%
%% SrcLevel is the level of the src sst file, the sink should be srcLevel + 1
perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN) ->
perform_merge(Manifest,
Src, SinkList, SrcLevel,
RootPath, NewSQN,
CompressionMethod) ->
leveled_log:log("PC010", [Src#manifest_entry.filename, NewSQN]),
SrcList = [{next, Src, all}],
MaxSQN = leveled_sst:sst_getmaxsequencenumber(Src#manifest_entry.owner),
SinkLevel = SrcLevel + 1,
SinkBasement = leveled_pmanifest:is_basement(Manifest, SinkLevel),
Additions = do_merge(SrcList, SinkList,
SinkLevel, SinkBasement,
RootPath, NewSQN, MaxSQN,
[]),
Additions =
do_merge(SrcList, SinkList,
SinkLevel, SinkBasement,
RootPath, NewSQN, MaxSQN,
CompressionMethod,
[]),
RevertPointerFun =
fun({next, ME, _SK}) ->
ME
@ -193,22 +205,23 @@ perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN) ->
Src),
{Man2, [Src|SinkManifestList]}.
do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, Additions) ->
do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, _CM, Additions) ->
leveled_log:log("PC011", [NewSQN, SinkLevel, length(Additions)]),
Additions;
do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, Additions) ->
do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, CM, Additions) ->
FileName = leveled_penciller:sst_filename(NewSQN,
SinkLevel,
length(Additions)),
leveled_log:log("PC012", [NewSQN, FileName, SinkB]),
TS1 = os:timestamp(),
case leveled_sst:sst_new(RP, FileName,
KL1, KL2, SinkB, SinkLevel, MaxSQN) of
KL1, KL2, SinkB, SinkLevel, MaxSQN, CM) of
empty ->
leveled_log:log("PC013", [FileName]),
do_merge([], [],
SinkLevel, SinkB,
RP, NewSQN, MaxSQN,
CM,
Additions);
{ok, Pid, Reply} ->
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply,
@ -220,6 +233,7 @@ do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, Additions) ->
do_merge(KL1Rem, KL2Rem,
SinkLevel, SinkB,
RP, NewSQN, MaxSQN,
CM,
Additions ++ [Entry])
end.
@ -265,31 +279,36 @@ merge_file_test() ->
"KL1_L1.sst",
1,
KL1_L1,
999999),
999999,
native),
KL1_L2 = lists:sort(generate_randomkeys(8000, 0, 250)),
{ok, PidL2_1, _} = leveled_sst:sst_new("../test/",
"KL1_L2.sst",
2,
KL1_L2,
999999),
999999,
native),
KL2_L2 = lists:sort(generate_randomkeys(8000, 250, 250)),
{ok, PidL2_2, _} = leveled_sst:sst_new("../test/",
"KL2_L2.sst",
2,
KL2_L2,
999999),
999999,
lz4),
KL3_L2 = lists:sort(generate_randomkeys(8000, 500, 250)),
{ok, PidL2_3, _} = leveled_sst:sst_new("../test/",
"KL3_L2.sst",
2,
KL3_L2,
999999),
999999,
lz4),
KL4_L2 = lists:sort(generate_randomkeys(8000, 750, 250)),
{ok, PidL2_4, _} = leveled_sst:sst_new("../test/",
"KL4_L2.sst",
2,
KL4_L2,
999999),
999999,
lz4),
E1 = #manifest_entry{owner = PidL1_1,
filename = "./KL1_L1.sst",
@ -321,7 +340,8 @@ merge_file_test() ->
PointerList = lists:map(fun(ME) -> {next, ME, all} end,
[E2, E3, E4, E5]),
{Man6, _Dels} = perform_merge(Man5, E1, PointerList, 1, "../test", 3),
{Man6, _Dels} =
perform_merge(Man5, E1, PointerList, 1, "../test", 3, native),
?assertMatch(3, leveled_pmanifest:get_manifest_sqn(Man6)).