Add replace capability to manifest

This commit is contained in:
martinsumner 2017-01-23 11:02:54 +00:00
parent 2c4c5c9597
commit 5105df1cd6
2 changed files with 97 additions and 29 deletions

View file

@ -183,16 +183,12 @@ perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN) ->
ME ME
end, end,
SinkManifestList = lists:map(RevertPointerFun, SinkList), SinkManifestList = lists:map(RevertPointerFun, SinkList),
Man0 = leveled_pmanifest:remove_manifest_entry(Manifest, Man0 = leveled_pmanifest:replace_manifest_entry(Manifest,
NewSQN,
SinkLevel,
SinkManifestList),
Man1 = leveled_pmanifest:insert_manifest_entry(Man0,
NewSQN, NewSQN,
SinkLevel, SinkLevel,
SinkManifestList,
Additions), Additions),
Man2 = leveled_pmanifest:remove_manifest_entry(Man0,
Man2 = leveled_pmanifest:remove_manifest_entry(Man1,
NewSQN, NewSQN,
SrcLevel, SrcLevel,
Src), Src),

View file

@ -32,6 +32,7 @@
merge_lookup/4, merge_lookup/4,
insert_manifest_entry/4, insert_manifest_entry/4,
remove_manifest_entry/4, remove_manifest_entry/4,
replace_manifest_entry/5,
switch_manifest_entry/4, switch_manifest_entry/4,
mergefile_selector/2, mergefile_selector/2,
add_snapshot/3, add_snapshot/3,
@ -149,6 +150,32 @@ save_manifest(Manifest, RootPath) ->
CRC = erlang:crc32(ManBin), CRC = erlang:crc32(ManBin),
ok = file:write_file(FP, <<CRC:32/integer, ManBin/binary>>). ok = file:write_file(FP, <<CRC:32/integer, ManBin/binary>>).
replace_manifest_entry(Manifest, ManSQN, LevelIdx, Removals, []) ->
remove_manifest_entry(Manifest, ManSQN, LevelIdx, Removals);
replace_manifest_entry(Manifest, ManSQN, LevelIdx, Removals, Additions) ->
Levels = Manifest#manifest.levels,
Level = array:get(LevelIdx, Levels),
UpdLevel = replace_entry(LevelIdx, Level, Removals, Additions),
leveled_log:log("PC019", ["insert", LevelIdx, UpdLevel]),
PendingDeletes = update_pendingdeletes(ManSQN,
Removals,
Manifest#manifest.pending_deletes),
UpdLevels = array:set(LevelIdx, UpdLevel, Levels),
case is_empty(LevelIdx, UpdLevel) of
true ->
Manifest#manifest{levels = UpdLevels,
basement = get_basement(UpdLevels),
manifest_sqn = ManSQN,
pending_deletes = PendingDeletes};
false ->
Basement = max(LevelIdx, Manifest#manifest.basement),
Manifest#manifest{levels = UpdLevels,
basement = Basement,
manifest_sqn = ManSQN,
pending_deletes = PendingDeletes}
end.
insert_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) -> insert_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) ->
Levels = Manifest#manifest.levels, Levels = Manifest#manifest.levels,
Level = array:get(LevelIdx, Levels), Level = array:get(LevelIdx, Levels),
@ -164,22 +191,9 @@ remove_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) ->
Level = array:get(LevelIdx, Levels), Level = array:get(LevelIdx, Levels),
UpdLevel = remove_entry(LevelIdx, Level, Entry), UpdLevel = remove_entry(LevelIdx, Level, Entry),
leveled_log:log("PC019", ["remove", LevelIdx, UpdLevel]), leveled_log:log("PC019", ["remove", LevelIdx, UpdLevel]),
DelFun = PendingDeletes = update_pendingdeletes(ManSQN,
fun(E, Acc) -> Entry,
dict:store(E#manifest_entry.filename, Manifest#manifest.pending_deletes),
{ManSQN, E},
Acc)
end,
Entries =
case is_list(Entry) of
true ->
Entry;
false ->
[Entry]
end,
PendingDeletes = lists:foldl(DelFun,
Manifest#manifest.pending_deletes,
Entries),
UpdLevels = array:set(LevelIdx, UpdLevel, Levels), UpdLevels = array:set(LevelIdx, UpdLevel, Levels),
case is_empty(LevelIdx, UpdLevel) of case is_empty(LevelIdx, UpdLevel) of
true -> true ->
@ -414,13 +428,18 @@ add_entry(LevelIdx, Level, Entries) when is_list(Entries) ->
add_entry(LevelIdx, Level, Entry) -> add_entry(LevelIdx, Level, Entry) ->
add_entry(LevelIdx, Level, [Entry]). add_entry(LevelIdx, Level, [Entry]).
remove_entry(LevelIdx, Level, Entries) when is_list(Entries) -> remove_entry(LevelIdx, Level, Entries) ->
% We're assuming we're removing a sorted sublist % We're assuming we're removing a sorted sublist
RemLength = length(Entries), {RemLength, FirstRemoval} = measure_removals(Entries),
[RemStart|_Tail] = Entries, remove_section(LevelIdx, Level, FirstRemoval, RemLength).
remove_section(LevelIdx, Level, RemStart, RemLength);
remove_entry(LevelIdx, Level, Entry) -> measure_removals(Removals) ->
remove_section(LevelIdx, Level, Entry, 1). case is_list(Removals) of
true ->
{length(Removals), lists:nth(1, Removals)};
false ->
{1, Removals}
end.
remove_section(LevelIdx, Level, FirstEntry, SectionLength) -> remove_section(LevelIdx, Level, FirstEntry, SectionLength) ->
PredFun = pred_fun(LevelIdx, PredFun = pred_fun(LevelIdx,
@ -439,6 +458,59 @@ remove_section(LevelIdx, Level, FirstEntry, SectionLength) ->
?TREE_WIDTH) ?TREE_WIDTH)
end. end.
replace_entry(LevelIdx, Level, Removals, Additions) when LevelIdx =< 1 ->
{SectionLength, FirstEntry} = measure_removals(Removals),
PredFun = pred_fun(LevelIdx,
FirstEntry#manifest_entry.start_key,
FirstEntry#manifest_entry.end_key),
{LHS, RHS} = lists:splitwith(PredFun, Level),
Post = lists:nthtail(SectionLength, RHS),
case is_list(Additions) of
true ->
lists:append([LHS, Additions, Post]);
false ->
lists:append([LHS, [Additions], Post])
end;
replace_entry(LevelIdx, Level, Removals, Additions) ->
{SectionLength, FirstEntry} = measure_removals(Removals),
PredFun = pred_fun(LevelIdx,
FirstEntry#manifest_entry.start_key,
FirstEntry#manifest_entry.end_key),
{LHS, RHS} = lists:splitwith(PredFun, leveled_tree:to_list(Level)),
Post = lists:nthtail(SectionLength, RHS),
UpdList =
case is_list(Additions) of
true ->
MapFun =
fun(ME) ->
{ME#manifest_entry.end_key, ME}
end,
Additions0 = lists:map(MapFun, Additions),
lists:append([LHS, Additions0, Post]);
false ->
lists:append([LHS,
[{Additions#manifest_entry.end_key,
Additions}],
Post])
end,
leveled_tree:from_orderedlist(UpdList, ?TREE_TYPE, ?TREE_WIDTH).
update_pendingdeletes(ManSQN, Removals, PendingDeletes) ->
DelFun =
fun(E, Acc) ->
dict:store(E#manifest_entry.filename,
{ManSQN, E},
Acc)
end,
Entries =
case is_list(Removals) of
true ->
Removals;
false ->
[Removals]
end,
lists:foldl(DelFun, PendingDeletes, Entries).
key_lookup_level(LevelIdx, [], _Key) when LevelIdx =< 1 -> key_lookup_level(LevelIdx, [], _Key) when LevelIdx =< 1 ->
false; false;