Manifest now back to a simple list

This has refactored code with the implementation of the manifest
isolated in to a seperate module, and the pure async relationship
between penciller and their clerk.  However, the manifest is just a
simple list at each level.
This commit is contained in:
martinsumner 2017-01-17 10:12:15 +00:00
parent 72d16af2b1
commit 9832ecc369
4 changed files with 108 additions and 72 deletions

View file

@ -14,10 +14,12 @@
open_manifest/1,
copy_manifest/1,
load_manifest/3,
close_manifest/2,
save_manifest/2,
get_manifest_sqn/1,
key_lookup/3,
range_lookup/4,
merge_lookup/4,
insert_manifest_entry/4,
remove_manifest_entry/4,
switch_manifest_entry/4,
@ -87,8 +89,7 @@ open_manifest(RootPath) ->
ValidManSQNs = lists:reverse(lists:sort(lists:foldl(ExtractSQNFun,
[],
Filenames))),
Manifest = open_manifestfile(RootPath, ValidManSQNs),
Manifest.
open_manifestfile(RootPath, ValidManSQNs).
copy_manifest(Manifest) ->
% Copy the manifest ensuring anything only the master process should care
@ -100,12 +101,20 @@ load_manifest(Manifest, PidFun, SQNFun) ->
fun(LevelIdx, {AccMaxSQN, AccMan}) ->
L0 = array:get(LevelIdx, AccMan#manifest.levels),
{L1, SQN1} = load_level(LevelIdx, L0, PidFun, SQNFun),
{max(AccMaxSQN, SQN1),
AccMan#manifest{levels = array:set(LevelIdx, L1, AccMan)}}
UpdLevels = array:set(LevelIdx, L1, AccMan#manifest.levels),
{max(AccMaxSQN, SQN1), AccMan#manifest{levels = UpdLevels}}
end,
lists:foldl(UpdateLevelFun, {0, Manifest},
lists:seq(0, Manifest#manifest.basement)).
close_manifest(Manifest, CloseEntryFun) ->
CloseLevelFun =
fun(LevelIdx) ->
Level = array:get(LevelIdx, Manifest#manifest.levels),
close_level(LevelIdx, Level, CloseEntryFun)
end,
lists:foreach(CloseLevelFun, lists:seq(0, Manifest#manifest.basement)).
save_manifest(Manifest, RootPath) ->
FP = filepath(RootPath, Manifest#manifest.manifest_sqn, current_manifest),
ManBin = term_to_binary(Manifest),
@ -176,24 +185,21 @@ key_lookup(Manifest, LevelIdx, Key) ->
array:get(LevelIdx, Manifest#manifest.levels),
Key)
end.
range_lookup(Manifest, LevelIdx, StartKey, EndKey) ->
Range =
case LevelIdx > Manifest#manifest.basement of
true ->
[];
false ->
range_lookup_level(LevelIdx,
array:get(LevelIdx,
Manifest#manifest.levels),
StartKey,
EndKey)
end,
MakePointerFun =
fun(M) ->
{next, M, StartKey}
end,
lists:map(MakePointerFun, Range).
range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun).
merge_lookup(Manifest, LevelIdx, StartKey, EndKey) ->
MakePointerFun =
fun(M) ->
{next, M, all}
end,
range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun).
%% An algorithm for discovering which files to merge ....
@ -208,7 +214,7 @@ range_lookup(Manifest, LevelIdx, StartKey, EndKey) ->
%% Hence, the initial implementation is to select files to merge at random
mergefile_selector(Manifest, LevelIdx) ->
Level = array:get(LevelIdx, Manifest#manifest.levels),
lists:nth(random:uniform(length(Level), Level)).
lists:nth(random:uniform(length(Level)), Level).
add_snapshot(Manifest, Pid, Timeout) ->
{MegaNow, SecNow, _} = os:timestamp(),
@ -299,12 +305,14 @@ load_level(_LevelIdx, Level, PidFun, SQNFun) ->
LevelLoadFun =
fun(ME, {L_Out, L_MaxSQN}) ->
FN = ME#manifest_entry.filename,
{ok, P, _Keys} = PidFun(FN),
P = PidFun(FN),
SQN = SQNFun(P),
{[ME#manifest_entry{owner=P}|L_Out], max(SQN, L_MaxSQN)}
end,
lists:foldr(LevelLoadFun, {[], 0}, Level).
close_level(_LevelIdx, Level, CloseEntryFun) ->
lists:foreach(CloseEntryFun, Level).
is_empty(_LevelIdx, []) ->
true;
@ -337,7 +345,6 @@ remove_section(Level, StartKey, Length) ->
Pre ++ Post.
key_lookup_level(_LevelIdx, [], _Key) ->
false;
key_lookup_level(LevelIdx, [Entry|Rest], Key) ->
@ -353,6 +360,20 @@ key_lookup_level(LevelIdx, [Entry|Rest], Key) ->
key_lookup_level(LevelIdx, Rest, Key)
end.
range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun) ->
Range =
case LevelIdx > Manifest#manifest.basement of
true ->
[];
false ->
range_lookup_level(LevelIdx,
array:get(LevelIdx,
Manifest#manifest.levels),
StartKey,
EndKey)
end,
lists:map(MakePointerFun, Range).
range_lookup_level(_LevelIdx, Level, QStartKey, QEndKey) ->
BeforeFun =
fun(M) ->
@ -392,13 +413,13 @@ filepath(RootPath, NewMSN, current_manifest) ->
open_manifestfile(_RootPath, []) ->
leveled_log:log("P0013", []),
{0, new_manifest()};
new_manifest();
open_manifestfile(_RootPath, [0]) ->
leveled_log:log("P0013", []),
{0, new_manifest()};
new_manifest();
open_manifestfile(RootPath, [TopManSQN|Rest]) ->
CurrManFile = filepath(RootPath, TopManSQN, current_manifest),
FileBin = file:read_file(CurrManFile),
{ok, FileBin} = file:read_file(CurrManFile),
<<CRC:32/integer, BinaryOfTerm/binary>> = FileBin,
case erlang:crc32(BinaryOfTerm) of
CRC ->

View file

@ -38,7 +38,8 @@
clerk_new/2,
clerk_prompt/1,
clerk_push/2,
clerk_close/1
clerk_close/1,
clerk_promptdeletions/2
]).
-include_lib("eunit/include/eunit.hrl").
@ -47,7 +48,8 @@
-define(MIN_TIMEOUT, 200).
-record(state, {owner :: pid(),
root_path :: string()}).
root_path :: string(),
pending_deletions = dict:new() :: dict:dict()}).
%%%============================================================================
%%% API
@ -62,6 +64,9 @@ clerk_new(Owner, Manifest) ->
clerk_prompt(Pid) ->
gen_server:cast(Pid, prompt).
clerk_promptdeletions(Pid, ManifestSQN) ->
gen_server:cast(Pid, {prompt_deletions, ManifestSQN}).
clerk_push(Pid, Work) ->
gen_server:cast(Pid, {push_work, Work}).
@ -83,8 +88,14 @@ handle_call(close, _From, State) ->
handle_cast(prompt, State) ->
handle_info(timeout, State);
handle_cast({push_work, Work}, State) ->
handle_work(Work, State),
{noreply, State, ?MIN_TIMEOUT}.
{ManifestSQN, Deletions} = handle_work(Work, State),
PDs = dict:store(ManifestSQN, Deletions, State#state.pending_deletions),
{noreply, State#state{pending_deletions = PDs}, ?MAX_TIMEOUT};
handle_cast({prompt_deletions, ManifestSQN}, State) ->
Deletions = dict:fetch(ManifestSQN, State#state.pending_deletions),
ok = notify_deletions(Deletions, State#state.owner),
UpdDeletions = dict:erase(ManifestSQN, State#state.pending_deletions),
{noreply, State#state{pending_deletions = UpdDeletions}, ?MIN_TIMEOUT}.
handle_info(timeout, State) ->
request_work(State),
@ -117,7 +128,7 @@ handle_work({SrcLevel, Manifest}, State) ->
ok = leveled_manifest:save_manifest(UpdManifest,
State#state.root_path),
leveled_log:log_timer("PC018", [], SWSM),
ok = notify_deletions(EntriesToDelete, State#state.owner).
{leveled_manifest:get_manifest_sqn(UpdManifest), EntriesToDelete}.
merge(SrcLevel, Manifest, RootPath) ->
Src = leveled_manifest:mergefile_selector(Manifest, SrcLevel),
@ -130,21 +141,13 @@ merge(SrcLevel, Manifest, RootPath) ->
leveled_log:log("PC008", [SrcLevel, Candidates]),
case Candidates of
0 ->
%% If no overlapping candiates, manifest change only required
%%
%% TODO: need to think still about simply renaming when at
%% lower level
leveled_log:log("PC009",
[Src#manifest_entry.filename, SrcLevel + 1]),
Man0 = leveled_manifest:remove_manifest_entry(Manifest,
Man0 = leveled_manifest:switch_manifest_entry(Manifest,
NewSQN,
SrcLevel,
Src),
Man1 = leveled_manifest:insert_manifest_entry(Man0,
NewSQN,
SrcLevel + 1,
Src),
{Man1, []};
{Man0, []};
_ ->
FilePath = leveled_penciller:filepath(RootPath,
NewSQN,
@ -166,53 +169,59 @@ notify_deletions([Head|Tail], Penciller) ->
perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN) ->
leveled_log:log("PC010", [Src#manifest_entry.filename, NewSQN]),
SrcList = [{next, Src#manifest_entry.owner, all}],
SinkPointerList = leveled_manifest:pointer_convert(Manifest, SinkList),
SrcList = [{next, Src, all}],
MaxSQN = leveled_sst:sst_getmaxsequencenumber(Src#manifest_entry.owner),
SinkLevel = SrcLevel + 1,
SinkBasement = leveled_manifest:is_basement(Manifest, SinkLevel),
Man0 = do_merge(SrcList, SinkPointerList,
SinkLevel, SinkBasement,
RootPath, NewSQN, MaxSQN,
0, Manifest),
RemoveFun =
fun(Entry, AccMan) ->
leveled_manifest:remove_manifest_entry(AccMan,
Additions = do_merge(SrcList, SinkList,
SinkLevel, SinkBasement,
RootPath, NewSQN, MaxSQN,
[]),
RevertPointerFun =
fun({next, ME, _SK}) ->
ME
end,
Man0 = leveled_manifest:insert_manifest_entry(Manifest,
NewSQN,
SinkLevel,
Entry)
end,
Man1 = lists:foldl(RemoveFun, Man0, SinkList),
Man2 = leveled_manifest:remove_manifest_entry(Man1, NewSQN, SrcLevel, Src),
Additions),
Man1 = leveled_manifest:remove_manifest_entry(Man0,
NewSQN,
SinkLevel,
lists:map(RevertPointerFun,
SinkList)),
Man2 = leveled_manifest:remove_manifest_entry(Man1,
NewSQN,
SrcLevel,
Src),
{Man2, [Src|SinkList]}.
do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, Counter, Man0) ->
leveled_log:log("PC011", [NewSQN, SinkLevel, Counter]),
Man0;
do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, Counter, Man0) ->
do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, Additions) ->
leveled_log:log("PC011", [NewSQN, SinkLevel, length(Additions)]),
Additions;
do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, Additions) ->
FileName = lists:flatten(io_lib:format(RP ++ "_~w_~w.sst",
[SinkLevel, Counter])),
[SinkLevel, length(Additions)])),
leveled_log:log("PC012", [NewSQN, FileName, SinkB]),
TS1 = os:timestamp(),
case leveled_sst:sst_new(FileName, KL1, KL2, SinkB, SinkLevel, MaxSQN) of
empty ->
leveled_log:log("PC013", [FileName]),
Man0;
do_merge([], [],
SinkLevel, SinkB,
RP, NewSQN, MaxSQN,
Additions);
{ok, Pid, Reply} ->
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply,
Entry = #manifest_entry{start_key=SmallestKey,
end_key=HighestKey,
owner=Pid,
filename=FileName},
Man1 = leveled_manifest:insert_manifest_entry(Man0,
NewSQN,
SinkLevel,
Entry),
leveled_log:log_timer("PC015", [], TS1),
do_merge(KL1Rem, KL2Rem,
SinkLevel, SinkB,
RP, NewSQN, MaxSQN,
Counter + 1, Man1)
Additions ++ [Entry])
end.
@ -294,7 +303,9 @@ merge_file_test() ->
Man4 = leveled_manifest:insert_manifest_entry(Man3, 1, 2, E5),
Man5 = leveled_manifest:insert_manifest_entry(Man4, 2, 1, E1),
{Man6, _Dels} = perform_merge(Man5, E1, [E2, E3, E4, E5], 1, "../test", 3),
PointerList = lists:map(fun(ME) -> {next, ME, all} end,
[E2, E3, E4, E5]),
{Man6, _Dels} = perform_merge(Man5, E1, PointerList, 1, "../test", 3),
?assertMatch(3, leveled_manifest:get_manifest_sqn(Man6)).

View file

@ -461,6 +461,8 @@ handle_call(doom, _From, State) ->
{stop, normal, {ok, [ManifestFP, FilesFP]}, State}.
handle_cast({manifest_change, NewManifest}, State) ->
NewManSQN = leveled_manifest:get_manifest_sqn(NewManifest),
ok = leveled_pclerk:clerk_promptdeletions(State#state.clerk, NewManSQN),
{noreply, State#state{manifest = NewManifest, work_ongoing=false}};
handle_cast({release_snapshot, Snapshot}, State) ->
Manifest0 = leveled_manifest:release_snapshot(State#state.manifest,
@ -569,10 +571,11 @@ terminate(Reason, State) ->
end,
% Tidy shutdown of individual files
lists:foreach(fun({_FN, {Pid, _DSQN}}) ->
ok = leveled_sst:sst_close(Pid)
end,
leveled_manifest:dump_pidmap(State#state.manifest)),
EntryCloseFun =
fun(ME) ->
ok = leveled_sst:sst_close(ME#manifest_entry.owner)
end,
leveled_manifest:close_manifest(State#state.manifest, EntryCloseFun),
leveled_log:log("P0011", []),
ok.

View file

@ -1175,8 +1175,8 @@ maybe_expand_pointer([{pointer, SSTPid, Slot, StartKey, all}|Tail]) ->
expand_list_by_pointer({pointer, SSTPid, Slot, StartKey, all},
Tail,
?MERGE_SCANWIDTH);
maybe_expand_pointer([{next, SSTPid, StartKey}|Tail]) ->
expand_list_by_pointer({next, SSTPid, StartKey, all},
maybe_expand_pointer([{next, ManEntry, StartKey}|Tail]) ->
expand_list_by_pointer({next, ManEntry, StartKey, all},
Tail,
?MERGE_SCANWIDTH);
maybe_expand_pointer(List) ->
@ -1202,8 +1202,9 @@ expand_list_by_pointer({pointer, SSTPid, Slot, StartKey, EndKey}, Tail, Width) -
{AccPointers, AccTail} = lists:foldl(FoldFun, InitAcc, Tail),
ExpPointers = leveled_sst:sst_getslots(SSTPid, AccPointers),
lists:append(ExpPointers, AccTail);
expand_list_by_pointer({next, SSTPid, StartKey, EndKey}, Tail, Width) ->
leveled_log:log("SST10", [SSTPid, is_pid(SSTPid)]),
expand_list_by_pointer({next, ManEntry, StartKey, EndKey}, Tail, Width) ->
SSTPid = ManEntry#manifest_entry.owner,
leveled_log:log("SST10", [SSTPid, is_process_alive(SSTPid)]),
ExpPointer = leveled_sst:sst_getkvrange(SSTPid, StartKey, EndKey, Width),
ExpPointer ++ Tail.