Push log update through to cdb/sst

Using the cdb_options and sst_options records
This commit is contained in:
Martin Sumner 2018-12-11 20:42:00 +00:00
parent 9ca6b499e1
commit 6677f2e5c6
12 changed files with 242 additions and 214 deletions

View file

@ -50,18 +50,18 @@
-record(state, {owner :: pid() | undefined,
root_path :: string() | undefined,
pending_deletions = dict:new(), % OTP 16 does not like type
compression_method = native :: lz4|native
sst_options :: #sst_options{}
}).
%%%============================================================================
%%% API
%%%============================================================================
clerk_new(Owner, Manifest, CompressionMethod) ->
clerk_new(Owner, Manifest, OptsSST) ->
{ok, Pid} =
gen_server:start_link(?MODULE,
[leveled_log:get_opts(),
{compression_method, CompressionMethod}],
{sst_options, OptsSST}],
[]),
ok = gen_server:call(Pid, {load, Owner, Manifest}, infinity),
leveled_log:log("PC001", [Pid, Owner]),
@ -83,9 +83,9 @@ clerk_close(Pid) ->
%%% gen_server callbacks
%%%============================================================================
init([LogOpts, {compression_method, CompressionMethod}]) ->
init([LogOpts, {sst_options, OptsSST}]) ->
leveled_log:save(LogOpts),
{ok, #state{compression_method = CompressionMethod}}.
{ok, #state{sst_options = OptsSST}}.
handle_call({load, Owner, RootPath}, _From, State) ->
{reply, ok, State#state{owner=Owner, root_path=RootPath}, ?MIN_TIMEOUT};
@ -127,7 +127,7 @@ handle_work({SrcLevel, Manifest}, State) ->
{UpdManifest, EntriesToDelete} = merge(SrcLevel,
Manifest,
State#state.root_path,
State#state.compression_method),
State#state.sst_options),
leveled_log:log("PC007", []),
SWMC = os:timestamp(),
ok = leveled_penciller:pcl_manifestchange(State#state.owner,
@ -139,7 +139,7 @@ handle_work({SrcLevel, Manifest}, State) ->
leveled_log:log_timer("PC018", [], SWSM),
{leveled_pmanifest:get_manifest_sqn(UpdManifest), EntriesToDelete}.
merge(SrcLevel, Manifest, RootPath, CompressionMethod) ->
merge(SrcLevel, Manifest, RootPath, OptsSST) ->
Src = leveled_pmanifest:mergefile_selector(Manifest, SrcLevel),
NewSQN = leveled_pmanifest:get_manifest_sqn(Manifest) + 1,
SinkList = leveled_pmanifest:merge_lookup(Manifest,
@ -161,7 +161,7 @@ merge(SrcLevel, Manifest, RootPath, CompressionMethod) ->
SST_RP = leveled_penciller:sst_rootpath(RootPath),
perform_merge(Manifest,
Src, SinkList, SrcLevel,
SST_RP, NewSQN, CompressionMethod)
SST_RP, NewSQN, OptsSST)
end.
notify_deletions([], _Penciller) ->
@ -179,7 +179,7 @@ notify_deletions([Head|Tail], Penciller) ->
perform_merge(Manifest,
Src, SinkList, SrcLevel,
RootPath, NewSQN,
CompressionMethod) ->
OptsSST) ->
leveled_log:log("PC010", [Src#manifest_entry.filename, NewSQN]),
SrcList = [{next, Src, all}],
MaxSQN = leveled_sst:sst_getmaxsequencenumber(Src#manifest_entry.owner),
@ -189,7 +189,7 @@ perform_merge(Manifest,
do_merge(SrcList, SinkList,
SinkLevel, SinkBasement,
RootPath, NewSQN, MaxSQN,
CompressionMethod,
OptsSST,
[]),
RevertPointerFun =
fun({next, ME, _SK}) ->
@ -207,23 +207,24 @@ perform_merge(Manifest,
Src),
{Man2, [Src|SinkManifestList]}.
do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, _CM, Additions) ->
do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, _Opts, Additions) ->
leveled_log:log("PC011", [NewSQN, SinkLevel, length(Additions)]),
Additions;
do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, CM, Additions) ->
do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions) ->
FileName = leveled_penciller:sst_filename(NewSQN,
SinkLevel,
length(Additions)),
leveled_log:log("PC012", [NewSQN, FileName, SinkB]),
TS1 = os:timestamp(),
case leveled_sst:sst_new(RP, FileName,
KL1, KL2, SinkB, SinkLevel, MaxSQN, CM) of
KL1, KL2, SinkB, SinkLevel, MaxSQN,
OptsSST) of
empty ->
leveled_log:log("PC013", [FileName]),
do_merge([], [],
SinkLevel, SinkB,
RP, NewSQN, MaxSQN,
CM,
OptsSST,
Additions);
{ok, Pid, Reply, Bloom} ->
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply,
@ -236,7 +237,7 @@ do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, CM, Additions) ->
do_merge(KL1Rem, KL2Rem,
SinkLevel, SinkB,
RP, NewSQN, MaxSQN,
CM,
OptsSST,
Additions ++ [Entry])
end.
@ -288,7 +289,7 @@ merge_file_test() ->
1,
KL1_L1,
999999,
native),
#sst_options{}),
KL1_L2 = lists:sort(generate_randomkeys(8000, 0, 250)),
{ok, PidL2_1, _, _} =
leveled_sst:sst_new("../test/",
@ -296,7 +297,7 @@ merge_file_test() ->
2,
KL1_L2,
999999,
native),
#sst_options{}),
KL2_L2 = lists:sort(generate_randomkeys(8000, 250, 250)),
{ok, PidL2_2, _, _} =
leveled_sst:sst_new("../test/",
@ -304,7 +305,7 @@ merge_file_test() ->
2,
KL2_L2,
999999,
lz4),
#sst_options{press_method = lz4}),
KL3_L2 = lists:sort(generate_randomkeys(8000, 500, 250)),
{ok, PidL2_3, _, _} =
leveled_sst:sst_new("../test/",
@ -312,7 +313,7 @@ merge_file_test() ->
2,
KL3_L2,
999999,
lz4),
#sst_options{press_method = lz4}),
KL4_L2 = lists:sort(generate_randomkeys(8000, 750, 250)),
{ok, PidL2_4, _, _} =
leveled_sst:sst_new("../test/",
@ -320,7 +321,7 @@ merge_file_test() ->
2,
KL4_L2,
999999,
lz4),
#sst_options{press_method = lz4}),
E1 = #manifest_entry{owner = PidL1_1,
filename = "./KL1_L1.sst",
@ -353,11 +354,12 @@ merge_file_test() ->
PointerList = lists:map(fun(ME) -> {next, ME, all} end,
[E2, E3, E4, E5]),
{Man6, _Dels} =
perform_merge(Man5, E1, PointerList, 1, "../test", 3, native),
perform_merge(Man5, E1, PointerList, 1, "../test", 3, #sst_options{}),
?assertMatch(3, leveled_pmanifest:get_manifest_sqn(Man6)).
coverage_cheat_test() ->
{ok, _State1} = code_change(null, #state{}, null).
{ok, _State1} =
code_change(null, #state{sst_options=#sst_options{}}, null).
-endif.