diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index 66b7c74..5055a01 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -183,16 +183,12 @@ perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN) -> ME end, SinkManifestList = lists:map(RevertPointerFun, SinkList), - Man0 = leveled_pmanifest:remove_manifest_entry(Manifest, - NewSQN, - SinkLevel, - SinkManifestList), - Man1 = leveled_pmanifest:insert_manifest_entry(Man0, + Man0 = leveled_pmanifest:replace_manifest_entry(Manifest, NewSQN, SinkLevel, + SinkManifestList, Additions), - - Man2 = leveled_pmanifest:remove_manifest_entry(Man1, + Man2 = leveled_pmanifest:remove_manifest_entry(Man0, NewSQN, SrcLevel, Src), diff --git a/src/leveled_pmanifest.erl b/src/leveled_pmanifest.erl index d65ec9c..d8e2ba4 100644 --- a/src/leveled_pmanifest.erl +++ b/src/leveled_pmanifest.erl @@ -32,6 +32,7 @@ merge_lookup/4, insert_manifest_entry/4, remove_manifest_entry/4, + replace_manifest_entry/5, switch_manifest_entry/4, mergefile_selector/2, add_snapshot/3, @@ -149,6 +150,32 @@ save_manifest(Manifest, RootPath) -> CRC = erlang:crc32(ManBin), ok = file:write_file(FP, <>). + +replace_manifest_entry(Manifest, ManSQN, LevelIdx, Removals, []) -> + remove_manifest_entry(Manifest, ManSQN, LevelIdx, Removals); +replace_manifest_entry(Manifest, ManSQN, LevelIdx, Removals, Additions) -> + Levels = Manifest#manifest.levels, + Level = array:get(LevelIdx, Levels), + UpdLevel = replace_entry(LevelIdx, Level, Removals, Additions), + leveled_log:log("PC019", ["insert", LevelIdx, UpdLevel]), + PendingDeletes = update_pendingdeletes(ManSQN, + Removals, + Manifest#manifest.pending_deletes), + UpdLevels = array:set(LevelIdx, UpdLevel, Levels), + case is_empty(LevelIdx, UpdLevel) of + true -> + Manifest#manifest{levels = UpdLevels, + basement = get_basement(UpdLevels), + manifest_sqn = ManSQN, + pending_deletes = PendingDeletes}; + false -> + Basement = max(LevelIdx, Manifest#manifest.basement), + Manifest#manifest{levels = UpdLevels, + basement = Basement, + manifest_sqn = ManSQN, + pending_deletes = PendingDeletes} + end. + insert_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) -> Levels = Manifest#manifest.levels, Level = array:get(LevelIdx, Levels), @@ -164,22 +191,9 @@ remove_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) -> Level = array:get(LevelIdx, Levels), UpdLevel = remove_entry(LevelIdx, Level, Entry), leveled_log:log("PC019", ["remove", LevelIdx, UpdLevel]), - DelFun = - fun(E, Acc) -> - dict:store(E#manifest_entry.filename, - {ManSQN, E}, - Acc) - end, - Entries = - case is_list(Entry) of - true -> - Entry; - false -> - [Entry] - end, - PendingDeletes = lists:foldl(DelFun, - Manifest#manifest.pending_deletes, - Entries), + PendingDeletes = update_pendingdeletes(ManSQN, + Entry, + Manifest#manifest.pending_deletes), UpdLevels = array:set(LevelIdx, UpdLevel, Levels), case is_empty(LevelIdx, UpdLevel) of true -> @@ -414,13 +428,18 @@ add_entry(LevelIdx, Level, Entries) when is_list(Entries) -> add_entry(LevelIdx, Level, Entry) -> add_entry(LevelIdx, Level, [Entry]). -remove_entry(LevelIdx, Level, Entries) when is_list(Entries) -> +remove_entry(LevelIdx, Level, Entries) -> % We're assuming we're removing a sorted sublist - RemLength = length(Entries), - [RemStart|_Tail] = Entries, - remove_section(LevelIdx, Level, RemStart, RemLength); -remove_entry(LevelIdx, Level, Entry) -> - remove_section(LevelIdx, Level, Entry, 1). + {RemLength, FirstRemoval} = measure_removals(Entries), + remove_section(LevelIdx, Level, FirstRemoval, RemLength). + +measure_removals(Removals) -> + case is_list(Removals) of + true -> + {length(Removals), lists:nth(1, Removals)}; + false -> + {1, Removals} + end. remove_section(LevelIdx, Level, FirstEntry, SectionLength) -> PredFun = pred_fun(LevelIdx, @@ -439,6 +458,59 @@ remove_section(LevelIdx, Level, FirstEntry, SectionLength) -> ?TREE_WIDTH) end. +replace_entry(LevelIdx, Level, Removals, Additions) when LevelIdx =< 1 -> + {SectionLength, FirstEntry} = measure_removals(Removals), + PredFun = pred_fun(LevelIdx, + FirstEntry#manifest_entry.start_key, + FirstEntry#manifest_entry.end_key), + {LHS, RHS} = lists:splitwith(PredFun, Level), + Post = lists:nthtail(SectionLength, RHS), + case is_list(Additions) of + true -> + lists:append([LHS, Additions, Post]); + false -> + lists:append([LHS, [Additions], Post]) + end; +replace_entry(LevelIdx, Level, Removals, Additions) -> + {SectionLength, FirstEntry} = measure_removals(Removals), + PredFun = pred_fun(LevelIdx, + FirstEntry#manifest_entry.start_key, + FirstEntry#manifest_entry.end_key), + {LHS, RHS} = lists:splitwith(PredFun, leveled_tree:to_list(Level)), + Post = lists:nthtail(SectionLength, RHS), + UpdList = + case is_list(Additions) of + true -> + MapFun = + fun(ME) -> + {ME#manifest_entry.end_key, ME} + end, + Additions0 = lists:map(MapFun, Additions), + lists:append([LHS, Additions0, Post]); + false -> + lists:append([LHS, + [{Additions#manifest_entry.end_key, + Additions}], + Post]) + end, + leveled_tree:from_orderedlist(UpdList, ?TREE_TYPE, ?TREE_WIDTH). + + +update_pendingdeletes(ManSQN, Removals, PendingDeletes) -> + DelFun = + fun(E, Acc) -> + dict:store(E#manifest_entry.filename, + {ManSQN, E}, + Acc) + end, + Entries = + case is_list(Removals) of + true -> + Removals; + false -> + [Removals] + end, + lists:foldl(DelFun, PendingDeletes, Entries). key_lookup_level(LevelIdx, [], _Key) when LevelIdx =< 1 -> false;