2016-10-09 22:33:45 +01:00
|
|
|
%% -------- PENCILLER's CLERK ---------
|
|
|
|
%%
|
|
|
|
%% The Penciller's clerk is responsible for compaction work within the Ledger.
|
|
|
|
%%
|
2017-01-12 13:48:43 +00:00
|
|
|
%% The Clerk will periodically poll the Penciller to check there is no work
|
|
|
|
%% at level zero pending completion, and if not the Clerk will examine the
|
|
|
|
%% manifest to see if work is necessary.
|
2016-10-09 22:33:45 +01:00
|
|
|
%%
|
|
|
|
%% -------- COMMITTING MANIFEST CHANGES ---------
|
|
|
|
%%
|
2016-12-29 02:07:14 +00:00
|
|
|
%% Once the Penciller has taken a manifest change, the SST file owners which no
|
2016-10-09 22:33:45 +01:00
|
|
|
%% longer form part of the manifest will be marked for delete. By marking for
|
|
|
|
%% deletion, the owners will poll to confirm when it is safe for them to be
|
|
|
|
%% deleted.
|
|
|
|
%%
|
|
|
|
%% It is imperative that the file is not marked for deletion until it is
|
|
|
|
%% certain that the manifest change has been committed. Some uncollected
|
|
|
|
%% garbage is considered acceptable.
|
|
|
|
%%
|
2017-01-12 13:48:43 +00:00
|
|
|
|
2016-07-22 16:57:28 +01:00
|
|
|
|
2016-09-20 16:13:36 +01:00
|
|
|
-module(leveled_pclerk).
|
2016-07-22 16:57:28 +01:00
|
|
|
|
2016-08-09 16:09:29 +01:00
|
|
|
-behaviour(gen_server).
|
|
|
|
|
2016-10-18 01:59:03 +01:00
|
|
|
-include("include/leveled.hrl").
|
2016-08-09 16:09:29 +01:00
|
|
|
|
2017-01-13 18:23:57 +00:00
|
|
|
-export([
|
|
|
|
init/1,
|
2016-08-09 16:09:29 +01:00
|
|
|
handle_call/3,
|
|
|
|
handle_cast/2,
|
|
|
|
handle_info/2,
|
|
|
|
terminate/2,
|
2017-01-13 18:23:57 +00:00
|
|
|
code_change/3
|
|
|
|
]).
|
|
|
|
|
|
|
|
-export([
|
|
|
|
clerk_new/2,
|
2016-10-30 18:25:30 +00:00
|
|
|
clerk_prompt/1,
|
2017-01-15 00:52:43 +00:00
|
|
|
clerk_push/2,
|
2017-01-13 18:23:57 +00:00
|
|
|
clerk_close/1
|
|
|
|
]).
|
2016-07-22 16:57:28 +01:00
|
|
|
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
|
2017-01-15 00:52:43 +00:00
|
|
|
-define(MAX_TIMEOUT, 2000).
|
2017-01-13 18:23:57 +00:00
|
|
|
-define(MIN_TIMEOUT, 200).
|
2016-08-12 01:05:59 +01:00
|
|
|
|
2016-10-08 22:15:48 +01:00
|
|
|
-record(state, {owner :: pid(),
|
2017-01-13 18:23:57 +00:00
|
|
|
root_path :: string()}).
|
2016-08-09 16:09:29 +01:00
|
|
|
|
|
|
|
%%%============================================================================
|
|
|
|
%%% API
|
|
|
|
%%%============================================================================
|
|
|
|
|
2017-01-12 13:48:43 +00:00
|
|
|
clerk_new(Owner, Manifest) ->
|
2016-08-09 16:09:29 +01:00
|
|
|
{ok, Pid} = gen_server:start(?MODULE, [], []),
|
2017-01-12 13:48:43 +00:00
|
|
|
ok = gen_server:call(Pid, {load, Owner, Manifest}, infinity),
|
2016-11-02 18:14:46 +00:00
|
|
|
leveled_log:log("PC001", [Pid, Owner]),
|
2016-08-09 16:09:29 +01:00
|
|
|
{ok, Pid}.
|
|
|
|
|
2016-10-30 18:25:30 +00:00
|
|
|
clerk_prompt(Pid) ->
|
2016-10-08 22:15:48 +01:00
|
|
|
gen_server:cast(Pid, prompt).
|
2016-08-15 16:43:39 +01:00
|
|
|
|
2017-01-15 00:52:43 +00:00
|
|
|
clerk_push(Pid, Work) ->
|
|
|
|
gen_server:cast(Pid, {push_work, Work}).
|
|
|
|
|
2017-01-12 13:48:43 +00:00
|
|
|
clerk_close(Pid) ->
|
2017-01-14 22:59:04 +00:00
|
|
|
gen_server:call(Pid, close, 20000).
|
2016-10-27 20:56:18 +01:00
|
|
|
|
2016-08-09 16:09:29 +01:00
|
|
|
%%%============================================================================
|
|
|
|
%%% gen_server callbacks
|
|
|
|
%%%============================================================================
|
|
|
|
|
|
|
|
init([]) ->
|
|
|
|
{ok, #state{}}.
|
2016-07-22 16:57:28 +01:00
|
|
|
|
2017-01-13 18:23:57 +00:00
|
|
|
handle_call({load, Owner, RootPath}, _From, State) ->
|
2017-01-14 22:59:04 +00:00
|
|
|
{reply, ok, State#state{owner=Owner, root_path=RootPath}, ?MIN_TIMEOUT};
|
|
|
|
handle_call(close, _From, State) ->
|
|
|
|
{stop, normal, ok, State}.
|
2016-10-08 22:15:48 +01:00
|
|
|
|
|
|
|
handle_cast(prompt, State) ->
|
2017-01-15 00:52:43 +00:00
|
|
|
handle_info(timeout, State);
|
|
|
|
handle_cast({push_work, Work}, State) ->
|
|
|
|
handle_work(Work, State),
|
|
|
|
{noreply, State, ?MIN_TIMEOUT}.
|
2016-10-08 22:15:48 +01:00
|
|
|
|
2017-01-13 18:23:57 +00:00
|
|
|
handle_info(timeout, State) ->
|
2017-01-15 00:52:43 +00:00
|
|
|
request_work(State),
|
|
|
|
{noreply, State, ?MAX_TIMEOUT}.
|
2016-08-09 16:09:29 +01:00
|
|
|
|
2016-10-21 21:26:28 +01:00
|
|
|
terminate(Reason, _State) ->
|
2016-11-02 18:14:46 +00:00
|
|
|
leveled_log:log("PC005", [self(), Reason]).
|
2016-08-09 16:09:29 +01:00
|
|
|
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
|
|
{ok, State}.
|
|
|
|
|
|
|
|
|
|
|
|
%%%============================================================================
|
|
|
|
%%% Internal functions
|
|
|
|
%%%============================================================================
|
|
|
|
|
2017-01-15 00:52:43 +00:00
|
|
|
request_work(State) ->
|
|
|
|
ok = leveled_penciller:pcl_workforclerk(State#state.owner).
|
|
|
|
|
|
|
|
handle_work({SrcLevel, Manifest}, State) ->
|
|
|
|
{UpdManifest, EntriesToDelete} = merge(SrcLevel,
|
|
|
|
Manifest,
|
|
|
|
State#state.root_path),
|
|
|
|
leveled_log:log("PC007", []),
|
|
|
|
ok = leveled_penciller:pcl_manifestchange(State#state.owner,
|
|
|
|
UpdManifest),
|
|
|
|
ok = leveled_manifest:save_manifest(UpdManifest,
|
|
|
|
State#state.root_path),
|
|
|
|
ok = notify_deletions(EntriesToDelete, State#state.owner).
|
2016-08-12 01:05:59 +01:00
|
|
|
|
2017-01-13 18:23:57 +00:00
|
|
|
merge(SrcLevel, Manifest, RootPath) ->
|
|
|
|
Src = leveled_manifest:mergefile_selector(Manifest, SrcLevel),
|
|
|
|
NewSQN = leveled_manifest:get_manifest_sqn(Manifest) + 1,
|
|
|
|
SinkList = leveled_manifest:merge_lookup(Manifest,
|
|
|
|
SrcLevel + 1,
|
|
|
|
Src#manifest_entry.start_key,
|
|
|
|
Src#manifest_entry.end_key),
|
|
|
|
Candidates = length(SinkList),
|
|
|
|
leveled_log:log("PC008", [SrcLevel, Candidates]),
|
|
|
|
case Candidates of
|
2016-08-09 16:09:29 +01:00
|
|
|
0 ->
|
|
|
|
%% If no overlapping candiates, manifest change only required
|
|
|
|
%%
|
|
|
|
%% TODO: need to think still about simply renaming when at
|
|
|
|
%% lower level
|
2016-11-02 18:14:46 +00:00
|
|
|
leveled_log:log("PC009",
|
2017-01-13 18:23:57 +00:00
|
|
|
[Src#manifest_entry.filename, SrcLevel + 1]),
|
|
|
|
Man0 = leveled_manifest:remove_manifest_entry(Manifest,
|
|
|
|
NewSQN,
|
|
|
|
SrcLevel,
|
|
|
|
Src),
|
|
|
|
Man1 = leveled_manifest:insert_manifest_entry(Man0,
|
|
|
|
NewSQN,
|
|
|
|
SrcLevel + 1,
|
|
|
|
Src),
|
|
|
|
{Man1, []};
|
2016-08-09 16:09:29 +01:00
|
|
|
_ ->
|
2017-01-13 18:23:57 +00:00
|
|
|
FilePath = leveled_penciller:filepath(RootPath,
|
|
|
|
NewSQN,
|
|
|
|
new_merge_files),
|
|
|
|
perform_merge(Manifest, Src, SinkList, SrcLevel, FilePath, NewSQN)
|
2016-08-10 13:02:08 +01:00
|
|
|
end.
|
|
|
|
|
2017-01-13 18:23:57 +00:00
|
|
|
notify_deletions([], _Penciller) ->
|
2016-08-10 13:02:08 +01:00
|
|
|
ok;
|
2017-01-13 18:23:57 +00:00
|
|
|
notify_deletions([Head|Tail], Penciller) ->
|
2016-12-29 02:07:14 +00:00
|
|
|
ok = leveled_sst:sst_setfordelete(Head#manifest_entry.owner, Penciller),
|
2017-01-13 18:23:57 +00:00
|
|
|
notify_deletions(Tail, Penciller).
|
|
|
|
|
2016-07-22 16:57:28 +01:00
|
|
|
|
2016-12-29 02:07:14 +00:00
|
|
|
%% Assumption is that there is a single SST from a higher level that needs
|
2017-01-13 18:23:57 +00:00
|
|
|
%% to be merged into multiple SSTs at a lower level.
|
2016-07-22 16:57:28 +01:00
|
|
|
%%
|
2017-01-13 18:23:57 +00:00
|
|
|
%% SrcLevel is the level of the src sst file, the sink should be srcLevel + 1
|
|
|
|
|
|
|
|
perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN) ->
|
|
|
|
leveled_log:log("PC010", [Src#manifest_entry.filename, NewSQN]),
|
|
|
|
SrcList = [{next, Src#manifest_entry.owner, all}],
|
|
|
|
SinkPointerList = leveled_manifest:pointer_convert(Manifest, SinkList),
|
|
|
|
MaxSQN = leveled_sst:sst_getmaxsequencenumber(Src#manifest_entry.owner),
|
|
|
|
SinkLevel = SrcLevel + 1,
|
2017-01-14 16:36:05 +00:00
|
|
|
SinkBasement = leveled_manifest:is_basement(Manifest, SinkLevel),
|
2017-01-13 18:23:57 +00:00
|
|
|
Man0 = do_merge(SrcList, SinkPointerList,
|
|
|
|
SinkLevel, SinkBasement,
|
|
|
|
RootPath, NewSQN, MaxSQN,
|
|
|
|
0, Manifest),
|
|
|
|
RemoveFun =
|
|
|
|
fun(Entry, AccMan) ->
|
|
|
|
leveled_manifest:remove_manifest_entry(AccMan,
|
|
|
|
NewSQN,
|
|
|
|
SinkLevel,
|
|
|
|
Entry)
|
|
|
|
end,
|
|
|
|
Man1 = lists:foldl(RemoveFun, Man0, SinkList),
|
2017-01-14 19:41:09 +00:00
|
|
|
Man2 = leveled_manifest:remove_manifest_entry(Man1, NewSQN, SrcLevel, Src),
|
|
|
|
{Man2, [Src|SinkList]}.
|
2017-01-13 18:23:57 +00:00
|
|
|
|
|
|
|
do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, Counter, Man0) ->
|
|
|
|
leveled_log:log("PC011", [NewSQN, SinkLevel, Counter]),
|
|
|
|
Man0;
|
|
|
|
do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, Counter, Man0) ->
|
|
|
|
FileName = lists:flatten(io_lib:format(RP ++ "_~w_~w.sst",
|
|
|
|
[SinkLevel, Counter])),
|
2017-01-14 21:19:51 +00:00
|
|
|
leveled_log:log("PC012", [NewSQN, FileName, SinkB]),
|
2016-08-10 13:02:08 +01:00
|
|
|
TS1 = os:timestamp(),
|
2017-01-13 18:23:57 +00:00
|
|
|
case leveled_sst:sst_new(FileName, KL1, KL2, SinkB, SinkLevel, MaxSQN) of
|
2016-12-29 05:09:47 +00:00
|
|
|
empty ->
|
2016-11-02 18:14:46 +00:00
|
|
|
leveled_log:log("PC013", [FileName]),
|
2017-01-13 18:23:57 +00:00
|
|
|
Man0;
|
2016-12-29 09:35:58 +00:00
|
|
|
{ok, Pid, Reply} ->
|
|
|
|
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply,
|
2017-01-13 18:23:57 +00:00
|
|
|
Entry = #manifest_entry{start_key=SmallestKey,
|
|
|
|
end_key=HighestKey,
|
|
|
|
owner=Pid,
|
|
|
|
filename=FileName},
|
|
|
|
Man1 = leveled_manifest:insert_manifest_entry(Man0,
|
|
|
|
NewSQN,
|
|
|
|
SinkLevel,
|
|
|
|
Entry),
|
2016-12-29 09:35:58 +00:00
|
|
|
leveled_log:log_timer("PC015", [], TS1),
|
|
|
|
do_merge(KL1Rem, KL2Rem,
|
2017-01-13 18:23:57 +00:00
|
|
|
SinkLevel, SinkB,
|
|
|
|
RP, NewSQN, MaxSQN,
|
|
|
|
Counter + 1, Man1)
|
2016-08-09 16:09:29 +01:00
|
|
|
end.
|
2016-07-22 16:57:28 +01:00
|
|
|
|
|
|
|
|
|
|
|
%%%============================================================================
|
|
|
|
%%% Test
|
|
|
|
%%%============================================================================
|
|
|
|
|
2016-10-09 22:33:45 +01:00
|
|
|
-ifdef(TEST).
|
2016-07-22 16:57:28 +01:00
|
|
|
|
|
|
|
generate_randomkeys(Count, BucketRangeLow, BucketRangeHigh) ->
|
|
|
|
generate_randomkeys(Count, [], BucketRangeLow, BucketRangeHigh).
|
|
|
|
|
|
|
|
generate_randomkeys(0, Acc, _BucketLow, _BucketHigh) ->
|
|
|
|
Acc;
|
|
|
|
generate_randomkeys(Count, Acc, BucketLow, BRange) ->
|
|
|
|
BNumber = string:right(integer_to_list(BucketLow + random:uniform(BRange)),
|
|
|
|
4, $0),
|
|
|
|
KNumber = string:right(integer_to_list(random:uniform(1000)), 4, $0),
|
2016-12-11 01:02:56 +00:00
|
|
|
K = {o, "Bucket" ++ BNumber, "Key" ++ KNumber},
|
|
|
|
RandKey = {K, {Count + 1,
|
|
|
|
{active, infinity},
|
|
|
|
leveled_codec:magic_hash(K),
|
|
|
|
null}},
|
2016-07-22 16:57:28 +01:00
|
|
|
generate_randomkeys(Count - 1, [RandKey|Acc], BucketLow, BRange).
|
|
|
|
|
|
|
|
|
|
|
|
merge_file_test() ->
|
2016-12-11 20:17:05 +00:00
|
|
|
KL1_L1 = lists:sort(generate_randomkeys(8000, 0, 1000)),
|
2016-12-29 02:07:14 +00:00
|
|
|
{ok, PidL1_1, _} = leveled_sst:sst_new("../test/KL1_L1.sst",
|
|
|
|
1,
|
|
|
|
KL1_L1,
|
|
|
|
undefined),
|
2016-12-11 20:17:05 +00:00
|
|
|
KL1_L2 = lists:sort(generate_randomkeys(8000, 0, 250)),
|
2016-12-29 02:07:14 +00:00
|
|
|
{ok, PidL2_1, _} = leveled_sst:sst_new("../test/KL1_L2.sst",
|
|
|
|
2,
|
|
|
|
KL1_L2,
|
|
|
|
undefined),
|
2016-12-11 20:17:05 +00:00
|
|
|
KL2_L2 = lists:sort(generate_randomkeys(8000, 250, 250)),
|
2016-12-29 02:07:14 +00:00
|
|
|
{ok, PidL2_2, _} = leveled_sst:sst_new("../test/KL2_L2.sst",
|
|
|
|
2,
|
|
|
|
KL2_L2,
|
|
|
|
undefined),
|
2016-12-11 20:17:05 +00:00
|
|
|
KL3_L2 = lists:sort(generate_randomkeys(8000, 500, 250)),
|
2016-12-29 02:07:14 +00:00
|
|
|
{ok, PidL2_3, _} = leveled_sst:sst_new("../test/KL3_L2.sst",
|
|
|
|
2,
|
|
|
|
KL3_L2,
|
|
|
|
undefined),
|
2016-12-11 20:17:05 +00:00
|
|
|
KL4_L2 = lists:sort(generate_randomkeys(8000, 750, 250)),
|
2016-12-29 02:07:14 +00:00
|
|
|
{ok, PidL2_4, _} = leveled_sst:sst_new("../test/KL4_L2.sst",
|
|
|
|
2,
|
|
|
|
KL4_L2,
|
|
|
|
undefined),
|
2017-01-15 01:47:23 +00:00
|
|
|
|
|
|
|
E1 = #manifest_entry{owner = PidL1_1,
|
|
|
|
filename = "../test/KL1_L1.sst",
|
|
|
|
end_key = lists:last(KL1_L1),
|
|
|
|
start_key = lists:nth(1, KL1_L1)},
|
|
|
|
E2 = #manifest_entry{owner = PidL2_1,
|
|
|
|
filename = "../test/KL1_L2.sst",
|
|
|
|
end_key = lists:last(KL1_L2),
|
|
|
|
start_key = lists:nth(1, KL1_L2)},
|
|
|
|
E3 = #manifest_entry{owner = PidL2_2,
|
|
|
|
filename = "../test/KL2_L2.sst",
|
|
|
|
end_key = lists:last(KL2_L2),
|
|
|
|
start_key = lists:nth(1, KL2_L2)},
|
|
|
|
E4 = #manifest_entry{owner = PidL2_3,
|
|
|
|
filename = "../test/KL3_L2.sst",
|
|
|
|
end_key = lists:last(KL3_L2),
|
|
|
|
start_key = lists:nth(1, KL3_L2)},
|
|
|
|
E5 = #manifest_entry{owner = PidL2_4,
|
|
|
|
filename = "../test/KL4_L2.sst",
|
|
|
|
end_key = lists:last(KL4_L2),
|
|
|
|
start_key = lists:nth(1, KL4_L2)},
|
2017-01-13 18:23:57 +00:00
|
|
|
|
|
|
|
Man0 = leveled_manifest:new_manifest(),
|
2017-01-15 01:47:23 +00:00
|
|
|
Man1 = leveled_manifest:insert_manifest_entry(Man0, 1, 2, E2),
|
|
|
|
Man2 = leveled_manifest:insert_manifest_entry(Man1, 1, 2, E3),
|
|
|
|
Man3 = leveled_manifest:insert_manifest_entry(Man2, 1, 2, E4),
|
|
|
|
Man4 = leveled_manifest:insert_manifest_entry(Man3, 1, 2, E5),
|
|
|
|
Man5 = leveled_manifest:insert_manifest_entry(Man4, 2, 1, E1),
|
2017-01-13 18:23:57 +00:00
|
|
|
|
2017-01-14 19:41:09 +00:00
|
|
|
{Man6, _Dels} = perform_merge(Man5, E1, [E2, E3, E4, E5], 1, "../test", 3),
|
2017-01-13 18:23:57 +00:00
|
|
|
|
|
|
|
?assertMatch(3, leveled_manifest:get_manifest_sqn(Man6)).
|
2016-10-09 22:33:45 +01:00
|
|
|
|
2016-11-14 20:43:38 +00:00
|
|
|
coverage_cheat_test() ->
|
|
|
|
{ok, _State1} = code_change(null, #state{}, null).
|
|
|
|
|
2016-11-05 17:50:28 +00:00
|
|
|
-endif.
|