Switch out implementation of manifest
This gives a new manifest implementation that is an array of lists. Just basic unit testing of lookup insertion and removal in this module. The API was changed subtly, and so nothing broader will work at this stage
This commit is contained in:
parent
38c7c9be9b
commit
72d16af2b1
2 changed files with 275 additions and 370 deletions
|
@ -1 +0,0 @@
|
||||||
[].
|
|
|
@ -3,27 +3,7 @@
|
||||||
%% The manifest is an ordered set of files for each level to be used to find
|
%% The manifest is an ordered set of files for each level to be used to find
|
||||||
%% which file is relevant for a given key or range lookup at a given level.
|
%% which file is relevant for a given key or range lookup at a given level.
|
||||||
%%
|
%%
|
||||||
%% It is implemented as an ETS table, primarily to optimise lookup speed, but
|
|
||||||
%% also for the convenience of the tab2file format to support the persisting
|
|
||||||
%% of the manifest. However, it needs to be a multi-version table, as the
|
|
||||||
%% table may be accessed by both the real actor but also cloned actors too,
|
|
||||||
%% and they need to see state at different points in time.
|
|
||||||
%%
|
|
||||||
%% The ets table is an ordered set. The Key is a tuple of:
|
|
||||||
%%
|
|
||||||
%% {Level, LastKey, Filename}
|
|
||||||
%%
|
|
||||||
%% for the file. The manifest entry will have a value of:
|
|
||||||
%%
|
|
||||||
%% {FirstKey, {active, aSQN}, {tomb, tSQN}}
|
|
||||||
%%
|
|
||||||
%% When an item is added to the manifest it is added with aSQN set to the
|
|
||||||
%% manifets SQN which is intended to make this change current, and a tSQN
|
|
||||||
%% of infinity. When an item is removed the element is first altered so
|
|
||||||
%% that the tSQN is set to the next ManifestSQN. When the active
|
|
||||||
%% (non-cloned) actor reads items in the manifest it should also reap any
|
|
||||||
%% tombstone elements that have passed the lowest manifest SQN of any of
|
|
||||||
%% the registered clones.
|
|
||||||
|
|
||||||
-module(leveled_manifest).
|
-module(leveled_manifest).
|
||||||
|
|
||||||
|
@ -38,19 +18,16 @@
|
||||||
get_manifest_sqn/1,
|
get_manifest_sqn/1,
|
||||||
key_lookup/3,
|
key_lookup/3,
|
||||||
range_lookup/4,
|
range_lookup/4,
|
||||||
merge_lookup/4,
|
|
||||||
insert_manifest_entry/4,
|
insert_manifest_entry/4,
|
||||||
remove_manifest_entry/4,
|
remove_manifest_entry/4,
|
||||||
|
switch_manifest_entry/4,
|
||||||
mergefile_selector/2,
|
mergefile_selector/2,
|
||||||
add_snapshot/3,
|
add_snapshot/3,
|
||||||
release_snapshot/2,
|
release_snapshot/2,
|
||||||
ready_to_delete/2,
|
ready_to_delete/2,
|
||||||
delete_confirmed/2,
|
|
||||||
check_for_work/2,
|
check_for_work/2,
|
||||||
is_basement/2,
|
is_basement/2,
|
||||||
dump_pidmap/1,
|
levelzero_present/1
|
||||||
levelzero_present/1,
|
|
||||||
pointer_convert/2
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
|
@ -63,20 +40,18 @@
|
||||||
-define(MANIFEST_FP, "ledger_manifest").
|
-define(MANIFEST_FP, "ledger_manifest").
|
||||||
-define(MAX_LEVELS, 8).
|
-define(MAX_LEVELS, 8).
|
||||||
|
|
||||||
-record(manifest, {table,
|
-record(manifest, {levels,
|
||||||
% A Multi-Version ETS table for lookup
|
% an array of lists or trees representing the manifest
|
||||||
pidmap,
|
|
||||||
% A dictionary to map filenames to {Pid, DeleteSQN}
|
|
||||||
manifest_sqn = 0 :: integer(),
|
manifest_sqn = 0 :: integer(),
|
||||||
% The current manifest SQN
|
% The current manifest SQN
|
||||||
is_clone = false :: boolean(),
|
|
||||||
% Is this manifest held by a clone (i.e. snapshot)
|
|
||||||
level_counts,
|
|
||||||
% An array of level counts to speed up compation work assessment
|
|
||||||
snapshots :: list(),
|
snapshots :: list(),
|
||||||
% A list of snaphots (i.e. clones)
|
% A list of snaphots (i.e. clones)
|
||||||
delete_sqn :: integer()|infinity,
|
min_snapshot_sqn = 0 :: integer(),
|
||||||
% The lowest SQN of any clone
|
% The smallest snapshot manifest SQN in the snapshot
|
||||||
|
% list
|
||||||
|
pending_deletes :: dict:dict(),
|
||||||
|
% a dictionary mapping keys (filenames) to SQN when
|
||||||
|
% the deletion was made
|
||||||
basement :: integer()
|
basement :: integer()
|
||||||
% Currently the lowest level (the largest number)
|
% Currently the lowest level (the largest number)
|
||||||
}).
|
}).
|
||||||
|
@ -86,8 +61,13 @@
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
new_manifest() ->
|
new_manifest() ->
|
||||||
Table = ets:new(manifest, [ordered_set, public]),
|
#manifest{
|
||||||
new_manifest(Table).
|
levels = array:new([{size, ?MAX_LEVELS + 1}, {default, []}]),
|
||||||
|
manifest_sqn = 0,
|
||||||
|
snapshots = [],
|
||||||
|
pending_deletes = dict:new(),
|
||||||
|
basement = 0
|
||||||
|
}.
|
||||||
|
|
||||||
open_manifest(RootPath) ->
|
open_manifest(RootPath) ->
|
||||||
% Open the manifest in the file path which has the highest SQN, and will
|
% Open the manifest in the file path which has the highest SQN, and will
|
||||||
|
@ -107,149 +87,114 @@ open_manifest(RootPath) ->
|
||||||
ValidManSQNs = lists:reverse(lists:sort(lists:foldl(ExtractSQNFun,
|
ValidManSQNs = lists:reverse(lists:sort(lists:foldl(ExtractSQNFun,
|
||||||
[],
|
[],
|
||||||
Filenames))),
|
Filenames))),
|
||||||
{ManSQN, Manifest} = open_manifestfile(RootPath, ValidManSQNs),
|
Manifest = open_manifestfile(RootPath, ValidManSQNs),
|
||||||
Manifest#manifest{manifest_sqn = ManSQN, delete_sqn = ManSQN}.
|
Manifest.
|
||||||
|
|
||||||
copy_manifest(Manifest) ->
|
copy_manifest(Manifest) ->
|
||||||
% Copy the manifest ensuring anything only the master process should care
|
% Copy the manifest ensuring anything only the master process should care
|
||||||
% about is switched to undefined
|
% about is switched to undefined
|
||||||
#manifest{is_clone = true,
|
Manifest#manifest{snapshots = undefined, pending_deletes = undefined}.
|
||||||
table = Manifest#manifest.table,
|
|
||||||
manifest_sqn = Manifest#manifest.manifest_sqn,
|
|
||||||
pidmap = Manifest#manifest.pidmap}.
|
|
||||||
|
|
||||||
load_manifest(Manifest, PidFun, SQNFun) ->
|
load_manifest(Manifest, PidFun, SQNFun) ->
|
||||||
FlatManifest = ets:tab2list(Manifest#manifest.table),
|
UpdateLevelFun =
|
||||||
InitiateFun =
|
fun(LevelIdx, {AccMaxSQN, AccMan}) ->
|
||||||
fun({{L, _EK, FN}, {_SK, ActSt, DelSt}}, {MaxSQN, AccMan}) ->
|
L0 = array:get(LevelIdx, AccMan#manifest.levels),
|
||||||
case {ActSt, DelSt} of
|
{L1, SQN1} = load_level(LevelIdx, L0, PidFun, SQNFun),
|
||||||
{{active, _ActSQN}, {tomb, infinity}} ->
|
{max(AccMaxSQN, SQN1),
|
||||||
Pid = PidFun(FN),
|
AccMan#manifest{levels = array:set(LevelIdx, L1, AccMan)}}
|
||||||
PidMap0 = dict:store(FN,
|
|
||||||
{Pid, infinity},
|
|
||||||
AccMan#manifest.pidmap),
|
|
||||||
LC = array:get(L, AccMan#manifest.level_counts),
|
|
||||||
LC0 = array:set(L, LC + 1, AccMan#manifest.level_counts),
|
|
||||||
Basement = max(AccMan#manifest.basement, L),
|
|
||||||
AccMan0 = AccMan#manifest{pidmap = PidMap0,
|
|
||||||
level_counts = LC0,
|
|
||||||
basement = Basement},
|
|
||||||
SQN = SQNFun(Pid),
|
|
||||||
MaxSQN0 = max(MaxSQN, SQN),
|
|
||||||
{MaxSQN0, AccMan0};
|
|
||||||
{_, {tomb, _TombSQN}} ->
|
|
||||||
{MaxSQN, AccMan}
|
|
||||||
end
|
|
||||||
end,
|
end,
|
||||||
lists:foldl(InitiateFun, {1, Manifest}, FlatManifest).
|
lists:foldl(UpdateLevelFun, {0, Manifest},
|
||||||
|
lists:seq(0, Manifest#manifest.basement)).
|
||||||
|
|
||||||
save_manifest(Manifest, RootPath) ->
|
save_manifest(Manifest, RootPath) ->
|
||||||
FP = filepath(RootPath, Manifest#manifest.manifest_sqn, current_manifest),
|
FP = filepath(RootPath, Manifest#manifest.manifest_sqn, current_manifest),
|
||||||
ets:tab2file(Manifest#manifest.table,
|
ManBin = term_to_binary(Manifest),
|
||||||
FP,
|
CRC = erlang:crc32(ManBin),
|
||||||
[{extended_info, [md5sum]}]).
|
ok = file:write_file(FP, <<CRC:32/integer, ManBin/binary>>).
|
||||||
|
|
||||||
|
insert_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) ->
|
||||||
|
Levels = Manifest#manifest.levels,
|
||||||
|
Level = array:get(LevelIdx, Levels),
|
||||||
|
UpdLevel = add_entry(LevelIdx, Level, Entry),
|
||||||
|
Basement = max(LevelIdx, Manifest#manifest.basement),
|
||||||
|
Manifest#manifest{levels = array:set(LevelIdx, UpdLevel, Levels),
|
||||||
|
basement = Basement,
|
||||||
|
manifest_sqn = ManSQN}.
|
||||||
|
|
||||||
insert_manifest_entry(Manifest, ManSQN, Level, Entry) ->
|
remove_manifest_entry(Manifest, ManSQN, LevelIdx, Entry) ->
|
||||||
Key = {Level, Entry#manifest_entry.end_key, Entry#manifest_entry.filename},
|
Levels = Manifest#manifest.levels,
|
||||||
Pid = Entry#manifest_entry.owner,
|
Level = array:get(LevelIdx, Levels),
|
||||||
Value = {Entry#manifest_entry.start_key,
|
UpdLevel = remove_entry(LevelIdx, Level, Entry),
|
||||||
{active, ManSQN},
|
DelFun =
|
||||||
{tomb, infinity}},
|
fun(E, Acc) ->
|
||||||
true = ets:insert_new(Manifest#manifest.table, {Key, Value}),
|
dict:store(E#manifest_entry.filename, ManSQN, Acc)
|
||||||
PidMap0 = dict:store(Entry#manifest_entry.filename,
|
end,
|
||||||
{Pid, infinity},
|
Entries =
|
||||||
Manifest#manifest.pidmap),
|
case is_list(Entry) of
|
||||||
LC = array:get(Level, Manifest#manifest.level_counts),
|
true ->
|
||||||
LCArray0 = array:set(Level, LC + 1, Manifest#manifest.level_counts),
|
Entry;
|
||||||
MaxManSQN = max(ManSQN, Manifest#manifest.manifest_sqn),
|
false ->
|
||||||
Basement = max(Level, Manifest#manifest.basement),
|
[Entry]
|
||||||
Manifest#manifest{pidmap = PidMap0,
|
end,
|
||||||
level_counts = LCArray0,
|
PendingDeletes = lists:foldl(DelFun,
|
||||||
manifest_sqn = MaxManSQN,
|
Manifest#manifest.pending_deletes,
|
||||||
basement = Basement}.
|
Entries),
|
||||||
|
UpdLevels = array:set(LevelIdx, UpdLevel, Levels),
|
||||||
|
case is_empty(LevelIdx, UpdLevel) of
|
||||||
|
true ->
|
||||||
|
Manifest#manifest{levels = UpdLevels,
|
||||||
|
basement = get_basement(UpdLevels),
|
||||||
|
manifest_sqn = ManSQN,
|
||||||
|
pending_deletes = PendingDeletes};
|
||||||
|
false ->
|
||||||
|
Manifest#manifest{levels = UpdLevels,
|
||||||
|
manifest_sqn = ManSQN,
|
||||||
|
pending_deletes = PendingDeletes}
|
||||||
|
end.
|
||||||
|
|
||||||
remove_manifest_entry(Manifest, ManSQN, Level, Entry) ->
|
switch_manifest_entry(Manifest, ManSQN, SrcLevel, Entry) ->
|
||||||
Key = {Level, Entry#manifest_entry.end_key, Entry#manifest_entry.filename},
|
% Move to level below - so needs to be removed but not marked as a
|
||||||
[{Key, Value0}] = ets:lookup(Manifest#manifest.table, Key),
|
% pending deletion
|
||||||
{StartKey, {active, ActiveSQN}, {tomb, infinity}} = Value0,
|
Levels = Manifest#manifest.levels,
|
||||||
Value1 = {StartKey, {active, ActiveSQN}, {tomb, ManSQN}},
|
Level = array:get(SrcLevel, Levels),
|
||||||
true = ets:insert(Manifest#manifest.table, {Key, Value1}),
|
UpdLevel = remove_entry(SrcLevel, Level, Entry),
|
||||||
{Pid, infinity} = dict:fetch(Entry#manifest_entry.filename,
|
UpdLevels = array:set(SrcLevel, UpdLevel, Levels),
|
||||||
Manifest#manifest.pidmap),
|
insert_manifest_entry(Manifest#manifest{levels = UpdLevels},
|
||||||
PidMap0 = dict:store(Entry#manifest_entry.filename,
|
ManSQN,
|
||||||
{Pid, ManSQN},
|
SrcLevel + 1,
|
||||||
Manifest#manifest.pidmap),
|
Entry).
|
||||||
LC = array:get(Level, Manifest#manifest.level_counts),
|
|
||||||
LCArray0 = array:set(Level, LC - 1, Manifest#manifest.level_counts),
|
|
||||||
MaxManSQN = max(ManSQN, Manifest#manifest.manifest_sqn),
|
|
||||||
Manifest#manifest{pidmap = PidMap0,
|
|
||||||
level_counts = LCArray0,
|
|
||||||
manifest_sqn = MaxManSQN}.
|
|
||||||
|
|
||||||
get_manifest_sqn(Manifest) ->
|
get_manifest_sqn(Manifest) ->
|
||||||
Manifest#manifest.manifest_sqn.
|
Manifest#manifest.manifest_sqn.
|
||||||
|
|
||||||
key_lookup(Manifest, Level, Key) ->
|
key_lookup(Manifest, LevelIdx, Key) ->
|
||||||
case Level > Manifest#manifest.basement of
|
case LevelIdx > Manifest#manifest.basement of
|
||||||
true ->
|
true ->
|
||||||
false;
|
false;
|
||||||
false ->
|
false ->
|
||||||
GC =
|
key_lookup_level(LevelIdx,
|
||||||
case Manifest#manifest.is_clone of
|
array:get(LevelIdx, Manifest#manifest.levels),
|
||||||
true ->
|
Key)
|
||||||
false;
|
|
||||||
false ->
|
|
||||||
{true,
|
|
||||||
min(Manifest#manifest.delete_sqn,
|
|
||||||
Manifest#manifest.manifest_sqn)}
|
|
||||||
end,
|
|
||||||
FN = key_lookup(Manifest#manifest.table,
|
|
||||||
Level,
|
|
||||||
Key,
|
|
||||||
Manifest#manifest.manifest_sqn,
|
|
||||||
GC),
|
|
||||||
case FN of
|
|
||||||
false ->
|
|
||||||
false;
|
|
||||||
_ ->
|
|
||||||
{Pid, _TombSQN} = dict:fetch(FN, Manifest#manifest.pidmap),
|
|
||||||
Pid
|
|
||||||
end
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
range_lookup(Manifest, Level, StartKey, EndKey) ->
|
range_lookup(Manifest, LevelIdx, StartKey, EndKey) ->
|
||||||
MapFun =
|
Range =
|
||||||
fun({{_Level, _LastKey, FN}, FirstKey}) ->
|
case LevelIdx > Manifest#manifest.basement of
|
||||||
{Pid, _SQN} = dict:fetch(FN, Manifest#manifest.pidmap),
|
true ->
|
||||||
case FirstKey < StartKey of
|
[];
|
||||||
true ->
|
false ->
|
||||||
{next, Pid, StartKey};
|
range_lookup_level(LevelIdx,
|
||||||
false ->
|
array:get(LevelIdx,
|
||||||
{next, Pid, FirstKey}
|
Manifest#manifest.levels),
|
||||||
end
|
StartKey,
|
||||||
|
EndKey)
|
||||||
end,
|
end,
|
||||||
range_lookup(Manifest, Level, StartKey, EndKey, MapFun).
|
MakePointerFun =
|
||||||
|
fun(M) ->
|
||||||
|
{next, M, StartKey}
|
||||||
|
end,
|
||||||
|
lists:map(MakePointerFun, Range).
|
||||||
|
|
||||||
merge_lookup(Manifest, Level, StartKey, EndKey) ->
|
|
||||||
MapFun =
|
|
||||||
fun({{_Level, LastKey, FN}, FirstKey}) ->
|
|
||||||
{Owner, _DelSQN} = dict:fetch(FN, Manifest#manifest.pidmap),
|
|
||||||
#manifest_entry{filename = FN,
|
|
||||||
owner = Owner,
|
|
||||||
start_key = FirstKey,
|
|
||||||
end_key = LastKey}
|
|
||||||
end,
|
|
||||||
range_lookup(Manifest, Level, StartKey, EndKey, MapFun).
|
|
||||||
|
|
||||||
pointer_convert(Manifest, EntryList) ->
|
|
||||||
MapFun =
|
|
||||||
fun(Entry) ->
|
|
||||||
{Pid, _DelSQN} = dict:fetch(Entry#manifest_entry.filename,
|
|
||||||
Manifest#manifest.pidmap),
|
|
||||||
{next, Pid, all}
|
|
||||||
end,
|
|
||||||
lists:map(MapFun, EntryList).
|
|
||||||
|
|
||||||
%% An algorithm for discovering which files to merge ....
|
%% An algorithm for discovering which files to merge ....
|
||||||
%% We can find the most optimal file:
|
%% We can find the most optimal file:
|
||||||
|
@ -261,174 +206,179 @@ pointer_convert(Manifest, EntryList) ->
|
||||||
%% genuinely better - eventually every file has to be compacted.
|
%% genuinely better - eventually every file has to be compacted.
|
||||||
%%
|
%%
|
||||||
%% Hence, the initial implementation is to select files to merge at random
|
%% Hence, the initial implementation is to select files to merge at random
|
||||||
mergefile_selector(Manifest, Level) ->
|
mergefile_selector(Manifest, LevelIdx) ->
|
||||||
KL = range_lookup(Manifest#manifest.table,
|
Level = array:get(LevelIdx, Manifest#manifest.levels),
|
||||||
Level,
|
lists:nth(random:uniform(length(Level), Level)).
|
||||||
{all, 0},
|
|
||||||
all,
|
|
||||||
all,
|
|
||||||
[],
|
|
||||||
Manifest#manifest.manifest_sqn),
|
|
||||||
{{Level, LastKey, FN},
|
|
||||||
FirstKey} = lists:nth(random:uniform(length(KL)), KL),
|
|
||||||
{Owner, infinity} = dict:fetch(FN, Manifest#manifest.pidmap),
|
|
||||||
#manifest_entry{filename = FN,
|
|
||||||
owner = Owner,
|
|
||||||
start_key = FirstKey,
|
|
||||||
end_key = LastKey}.
|
|
||||||
|
|
||||||
add_snapshot(Manifest, Pid, Timeout) ->
|
add_snapshot(Manifest, Pid, Timeout) ->
|
||||||
{MegaNow, SecNow, _} = os:timestamp(),
|
{MegaNow, SecNow, _} = os:timestamp(),
|
||||||
TimeToTimeout = MegaNow * 1000000 + SecNow + Timeout,
|
TimeToTimeout = MegaNow * 1000000 + SecNow + Timeout,
|
||||||
SnapEntry = {Pid, Manifest#manifest.manifest_sqn, TimeToTimeout},
|
SnapEntry = {Pid, Manifest#manifest.manifest_sqn, TimeToTimeout},
|
||||||
SnapList0 = [SnapEntry|Manifest#manifest.snapshots],
|
SnapList0 = [SnapEntry|Manifest#manifest.snapshots],
|
||||||
MinDelSQN = min(Manifest#manifest.delete_sqn, Manifest#manifest.manifest_sqn),
|
ManSQN = Manifest#manifest.manifest_sqn,
|
||||||
Manifest#manifest{snapshots = SnapList0, delete_sqn = MinDelSQN}.
|
case Manifest#manifest.min_snapshot_sqn of
|
||||||
|
0 ->
|
||||||
|
|
||||||
|
Manifest#manifest{snapshots = SnapList0,
|
||||||
|
min_snapshot_sqn = ManSQN};
|
||||||
|
N ->
|
||||||
|
N0 = min(N, ManSQN),
|
||||||
|
Manifest#manifest{snapshots = SnapList0, min_snapshot_sqn = N0}
|
||||||
|
end.
|
||||||
|
|
||||||
release_snapshot(Manifest, Pid) ->
|
release_snapshot(Manifest, Pid) ->
|
||||||
FilterFun =
|
FilterFun =
|
||||||
fun({P, SQN, TS}, {Acc, MinSQN}) ->
|
fun({P, SQN, TS}, {Acc, MinSQN}) ->
|
||||||
case P of
|
case P of
|
||||||
Pid ->
|
Pid ->
|
||||||
{Acc, MinSQN};
|
{Acc, min(SQN, MinSQN)};
|
||||||
_ ->
|
_ ->
|
||||||
{[{P, SQN, TS}|Acc], min(SQN, MinSQN)}
|
{[{P, SQN, TS}|Acc], min(SQN, MinSQN)}
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
{SnapList0,
|
{SnapList0, MinSnapSQN} = lists:foldl(FilterFun,
|
||||||
DeleteSQN} = lists:foldl(FilterFun,
|
{[], infinity},
|
||||||
{[], infinity},
|
Manifest#manifest.snapshots),
|
||||||
Manifest#manifest.snapshots),
|
|
||||||
leveled_log:log("P0004", [SnapList0]),
|
leveled_log:log("P0004", [SnapList0]),
|
||||||
Manifest#manifest{snapshots = SnapList0, delete_sqn = DeleteSQN}.
|
case SnapList0 of
|
||||||
|
[] ->
|
||||||
ready_to_delete(Manifest, Filename) ->
|
Manifest#manifest{snapshots = SnapList0,
|
||||||
case dict:fetch(Filename, Manifest#manifest.pidmap) of
|
min_snapshot_sqn = 0};
|
||||||
{P, infinity} ->
|
_ ->
|
||||||
{false, P};
|
Manifest#manifest{snapshots = SnapList0,
|
||||||
{P, DeleteSQN} ->
|
min_snapshot_sqn = MinSnapSQN}
|
||||||
{MegaNow, SecNow, _} = os:timestamp(),
|
|
||||||
{ready_to_delete(Manifest#manifest.snapshots,
|
|
||||||
DeleteSQN,
|
|
||||||
MegaNow * 1000000 + SecNow),
|
|
||||||
P}
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
delete_confirmed(Manifest, Filename) ->
|
ready_to_delete(Manifest, Filename) ->
|
||||||
PidMap = dict:erase(Filename, Manifest#manifest.pidmap),
|
ChangeSQN = dict:fetch(Filename, Manifest#manifest.pending_deletes),
|
||||||
% Would be better to clear ETS at this point rather than on lookup?
|
case Manifest#manifest.min_snapshot_sqn >= ChangeSQN of
|
||||||
Manifest#manifest{pidmap = PidMap}.
|
true ->
|
||||||
|
% Every snapshot is looking at a version of history after this
|
||||||
|
% was removed
|
||||||
|
PDs = dict:erase(Filename, Manifest#manifest.pending_deletes),
|
||||||
|
{true, Manifest#manifest{pending_deletes = PDs}};
|
||||||
|
false ->
|
||||||
|
{false, Manifest}
|
||||||
|
end.
|
||||||
|
|
||||||
check_for_work(Manifest, Thresholds) ->
|
check_for_work(Manifest, Thresholds) ->
|
||||||
CheckLevelFun =
|
CheckLevelFun =
|
||||||
fun({Level, MaxCount}, {AccL, AccC}) ->
|
fun({LevelIdx, MaxCount}, {AccL, AccC}) ->
|
||||||
case array:get(Level, Manifest#manifest.level_counts) of
|
case LevelIdx > Manifest#manifest.basement of
|
||||||
LC when LC > MaxCount ->
|
true ->
|
||||||
{[Level|AccL], AccC + LC - MaxCount};
|
{AccL, AccC};
|
||||||
_ ->
|
false ->
|
||||||
{AccL, AccC}
|
Level = array:get(LevelIdx, Manifest#manifest.levels),
|
||||||
|
S = size(LevelIdx, Level),
|
||||||
|
case S > MaxCount of
|
||||||
|
true ->
|
||||||
|
{[LevelIdx|AccL], AccC + S - MaxCount};
|
||||||
|
false ->
|
||||||
|
{AccL, AccC}
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
lists:foldl(CheckLevelFun, {[], 0}, Thresholds).
|
lists:foldr(CheckLevelFun, {[], 0}, Thresholds).
|
||||||
|
|
||||||
is_basement(Manifest, Level) ->
|
is_basement(Manifest, Level) ->
|
||||||
Level >= Manifest#manifest.basement.
|
Level >= Manifest#manifest.basement.
|
||||||
|
|
||||||
dump_pidmap(Manifest) ->
|
|
||||||
dict:to_list(Manifest#manifest.pidmap).
|
|
||||||
|
|
||||||
levelzero_present(Manifest) ->
|
levelzero_present(Manifest) ->
|
||||||
case key_lookup(Manifest, 0, all) of
|
not is_empty(0, array:get(0, Manifest#manifest.levels)).
|
||||||
false ->
|
|
||||||
false;
|
|
||||||
_ ->
|
|
||||||
true
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% Internal Functions
|
%%% Internal Functions
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
|
%% All these internal functions that work on a level are also passed LeveIdx
|
||||||
|
%% even if this is not presently relevant. Currnetly levels are lists, but
|
||||||
|
%% future branches may make lower levels trees or skiplists to improve fetch
|
||||||
|
%% efficiency
|
||||||
|
|
||||||
new_manifest(Table) ->
|
load_level(_LevelIdx, Level, PidFun, SQNFun) ->
|
||||||
#manifest{
|
LevelLoadFun =
|
||||||
table = Table,
|
fun(ME, {L_Out, L_MaxSQN}) ->
|
||||||
pidmap = dict:new(),
|
FN = ME#manifest_entry.filename,
|
||||||
level_counts = array:new([{size, ?MAX_LEVELS + 1}, {default, 0}]),
|
{ok, P, _Keys} = PidFun(FN),
|
||||||
snapshots = [],
|
SQN = SQNFun(P),
|
||||||
delete_sqn = 0
|
{[ME#manifest_entry{owner=P}|L_Out], max(SQN, L_MaxSQN)}
|
||||||
}.
|
end,
|
||||||
|
lists:foldr(LevelLoadFun, {[], 0}, Level).
|
||||||
|
|
||||||
range_lookup(Manifest, Level, StartKey, EndKey, MapFun) ->
|
|
||||||
KL = range_lookup(Manifest#manifest.table,
|
|
||||||
Level,
|
|
||||||
{StartKey, 0},
|
|
||||||
StartKey,
|
|
||||||
EndKey,
|
|
||||||
[],
|
|
||||||
Manifest#manifest.manifest_sqn),
|
|
||||||
lists:map(MapFun, KL).
|
|
||||||
|
|
||||||
range_lookup(Manifest, Level, {LastKey, LastFN}, SK, EK, Acc, ManSQN) ->
|
is_empty(_LevelIdx, []) ->
|
||||||
case ets:next(Manifest, {Level, LastKey, LastFN}) of
|
true;
|
||||||
'$end_of_table' ->
|
is_empty(_LevelIdx, _Level) ->
|
||||||
Acc;
|
false.
|
||||||
{Level, NextKey, NextFN} ->
|
|
||||||
[{K, V}] = ets:lookup(Manifest, {Level, NextKey, NextFN}),
|
size(_LevelIdx, Level) ->
|
||||||
{FirstKey, {active, ActiveSQN}, {tomb, TombSQN}} = V,
|
length(Level).
|
||||||
Active = (ManSQN >= ActiveSQN) and (ManSQN < TombSQN),
|
|
||||||
case Active of
|
add_entry(_LevelIdx, Level, Entries) when is_list(Entries) ->
|
||||||
|
lists:sort(Level ++ Entries);
|
||||||
|
add_entry(_LevelIdx, Level, Entry) ->
|
||||||
|
lists:sort([Entry|Level]).
|
||||||
|
|
||||||
|
remove_entry(_LevelIdx, Level, Entries) when is_list(Entries) ->
|
||||||
|
% We're assuming we're removing a sorted sublist
|
||||||
|
RemLength = length(Entries),
|
||||||
|
RemStart = lists:nth(1, Entries),
|
||||||
|
remove_section(Level, RemStart#manifest_entry.start_key, RemLength - 1);
|
||||||
|
remove_entry(_LevelIdx, Level, Entry) ->
|
||||||
|
remove_section(Level, Entry#manifest_entry.start_key, 0).
|
||||||
|
|
||||||
|
remove_section(Level, StartKey, Length) ->
|
||||||
|
PredFun =
|
||||||
|
fun(E) ->
|
||||||
|
E#manifest_entry.start_key < StartKey
|
||||||
|
end,
|
||||||
|
{Pre, Rest} = lists:splitwith(PredFun, Level),
|
||||||
|
Post = lists:nthtail(length(Rest) - Length, Rest),
|
||||||
|
Pre ++ Post.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
key_lookup_level(_LevelIdx, [], _Key) ->
|
||||||
|
false;
|
||||||
|
key_lookup_level(LevelIdx, [Entry|Rest], Key) ->
|
||||||
|
case Entry#manifest_entry.end_key >= Key of
|
||||||
|
true ->
|
||||||
|
case Key >= Entry#manifest_entry.start_key of
|
||||||
true ->
|
true ->
|
||||||
PostEnd = leveled_codec:endkey_passed(EK, FirstKey),
|
Entry#manifest_entry.owner;
|
||||||
case PostEnd of
|
|
||||||
true ->
|
|
||||||
Acc;
|
|
||||||
false ->
|
|
||||||
range_lookup(Manifest,
|
|
||||||
Level,
|
|
||||||
{NextKey, NextFN},
|
|
||||||
SK,
|
|
||||||
EK,
|
|
||||||
Acc ++ [{K, FirstKey}],
|
|
||||||
ManSQN)
|
|
||||||
end;
|
|
||||||
false ->
|
false ->
|
||||||
range_lookup(Manifest,
|
false
|
||||||
Level,
|
|
||||||
{NextKey, NextFN},
|
|
||||||
SK,
|
|
||||||
EK,
|
|
||||||
Acc,
|
|
||||||
ManSQN)
|
|
||||||
end;
|
end;
|
||||||
{OtherLevel, _, _} when OtherLevel > Level ->
|
false ->
|
||||||
Acc
|
key_lookup_level(LevelIdx, Rest, Key)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
ready_to_delete(SnapList, FileDeleteSQN, Now) ->
|
range_lookup_level(_LevelIdx, Level, QStartKey, QEndKey) ->
|
||||||
FilterFun =
|
BeforeFun =
|
||||||
fun({P, SnapSQN, ExpiryTS}, Acc) ->
|
fun(M) ->
|
||||||
case Acc of
|
QStartKey > M#manifest_entry.end_key
|
||||||
|
end,
|
||||||
|
NotAfterFun =
|
||||||
|
fun(M) ->
|
||||||
|
not leveled_codec:endkey_passed(QEndKey,
|
||||||
|
M#manifest_entry.start_key)
|
||||||
|
end,
|
||||||
|
{_Before, MaybeIn} = lists:splitwith(BeforeFun, Level),
|
||||||
|
{In, _After} = lists:splitwith(NotAfterFun, MaybeIn),
|
||||||
|
In.
|
||||||
|
|
||||||
|
get_basement(Levels) ->
|
||||||
|
GetBaseFun =
|
||||||
|
fun(L, Acc) ->
|
||||||
|
case is_empty(L, array:get(L, Levels)) of
|
||||||
false ->
|
false ->
|
||||||
false;
|
max(L, Acc);
|
||||||
true ->
|
true ->
|
||||||
case FileDeleteSQN < SnapSQN of
|
Acc
|
||||||
true ->
|
|
||||||
% Snapshot taken after the file deletion
|
|
||||||
true;
|
|
||||||
false ->
|
|
||||||
case Now > ExpiryTS of
|
|
||||||
true ->
|
|
||||||
leveled_log:log("P0034", [P]),
|
|
||||||
true;
|
|
||||||
false ->
|
|
||||||
false
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
lists:foldl(FilterFun, true, SnapList).
|
lists:foldl(GetBaseFun, 0, lists:seq(0, ?MAX_LEVELS)).
|
||||||
|
|
||||||
|
|
||||||
filepath(RootPath, manifest) ->
|
filepath(RootPath, manifest) ->
|
||||||
MFP = RootPath ++ "/" ++ ?MANIFEST_FP ++ "/",
|
MFP = RootPath ++ "/" ++ ?MANIFEST_FP ++ "/",
|
||||||
|
@ -448,60 +398,17 @@ open_manifestfile(_RootPath, [0]) ->
|
||||||
{0, new_manifest()};
|
{0, new_manifest()};
|
||||||
open_manifestfile(RootPath, [TopManSQN|Rest]) ->
|
open_manifestfile(RootPath, [TopManSQN|Rest]) ->
|
||||||
CurrManFile = filepath(RootPath, TopManSQN, current_manifest),
|
CurrManFile = filepath(RootPath, TopManSQN, current_manifest),
|
||||||
case ets:file2tab(CurrManFile, [{verify,true}]) of
|
FileBin = file:read_file(CurrManFile),
|
||||||
{error, Reason} ->
|
<<CRC:32/integer, BinaryOfTerm/binary>> = FileBin,
|
||||||
leveled_log:log("P0033", [CurrManFile, Reason]),
|
case erlang:crc32(BinaryOfTerm) of
|
||||||
open_manifestfile(RootPath, Rest);
|
CRC ->
|
||||||
{ok, Table} ->
|
|
||||||
leveled_log:log("P0012", [TopManSQN]),
|
leveled_log:log("P0012", [TopManSQN]),
|
||||||
{TopManSQN, new_manifest(Table)}
|
binary_to_term(BinaryOfTerm);
|
||||||
|
_ ->
|
||||||
|
leveled_log:log("P0033", [CurrManFile, "crc wonky"]),
|
||||||
|
open_manifestfile(RootPath, Rest)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
key_lookup(Manifest, Level, KeyToFind, ManSQN, GC) ->
|
|
||||||
key_lookup(Manifest, Level, {KeyToFind, any}, KeyToFind, ManSQN, GC).
|
|
||||||
|
|
||||||
key_lookup(Manifest, Level, {LastKey, LastFN}, KeyToFind, ManSQN, GC) ->
|
|
||||||
case ets:next(Manifest, {Level, LastKey, LastFN}) of
|
|
||||||
'$end_of_table' ->
|
|
||||||
false;
|
|
||||||
{Level, NextKey, NextFN} ->
|
|
||||||
[{K, V}] = ets:lookup(Manifest, {Level, NextKey, NextFN}),
|
|
||||||
{FirstKey, {active, ActiveSQN}, {tomb, TombSQN}} = V,
|
|
||||||
Active = (ManSQN >= ActiveSQN) and (ManSQN < TombSQN),
|
|
||||||
case Active of
|
|
||||||
true ->
|
|
||||||
InRange = (KeyToFind >= FirstKey) or (KeyToFind == all),
|
|
||||||
case InRange of
|
|
||||||
true ->
|
|
||||||
NextFN;
|
|
||||||
false ->
|
|
||||||
false
|
|
||||||
end;
|
|
||||||
false ->
|
|
||||||
case GC of
|
|
||||||
false ->
|
|
||||||
ok;
|
|
||||||
{true, GC_SQN} ->
|
|
||||||
case TombSQN < GC_SQN of
|
|
||||||
true ->
|
|
||||||
leveled_log:log("P0036", [element(3, K)]),
|
|
||||||
ets:delete(Manifest, K);
|
|
||||||
false ->
|
|
||||||
ok
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
key_lookup(Manifest,
|
|
||||||
Level,
|
|
||||||
{NextKey, NextFN},
|
|
||||||
KeyToFind,
|
|
||||||
ManSQN,
|
|
||||||
GC)
|
|
||||||
end;
|
|
||||||
{OtherLevel, _, _} when OtherLevel > Level ->
|
|
||||||
false
|
|
||||||
end.
|
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% Test
|
%%% Test
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
@ -575,14 +482,16 @@ changeup_setup(Man6) ->
|
||||||
owner="pid_y4",
|
owner="pid_y4",
|
||||||
filename="Y4"},
|
filename="Y4"},
|
||||||
|
|
||||||
Man7 = insert_manifest_entry(Man6, 2, 1, E1_2),
|
Man7 = remove_manifest_entry(Man6, 2, 1, E1),
|
||||||
Man8 = insert_manifest_entry(Man7, 2, 1, E2_2),
|
Man8 = remove_manifest_entry(Man7, 2, 1, E2),
|
||||||
Man9 = insert_manifest_entry(Man8, 2, 1, E3_2),
|
Man9 = remove_manifest_entry(Man8, 2, 1, E3),
|
||||||
Man10 = insert_manifest_entry(Man9, 2, 1, E4_2),
|
|
||||||
|
Man10 = insert_manifest_entry(Man9, 2, 1, E1_2),
|
||||||
|
Man11 = insert_manifest_entry(Man10, 2, 1, E2_2),
|
||||||
|
Man12 = insert_manifest_entry(Man11, 2, 1, E3_2),
|
||||||
|
Man13 = insert_manifest_entry(Man12, 2, 1, E4_2),
|
||||||
% remove_manifest_entry(Manifest, ManSQN, Level, Entry)
|
% remove_manifest_entry(Manifest, ManSQN, Level, Entry)
|
||||||
Man11 = remove_manifest_entry(Man10, 2, 1, E1),
|
|
||||||
Man12 = remove_manifest_entry(Man11, 2, 1, E2),
|
|
||||||
Man13 = remove_manifest_entry(Man12, 2, 1, E3),
|
|
||||||
{Man7, Man8, Man9, Man10, Man11, Man12, Man13}.
|
{Man7, Man8, Man9, Man10, Man11, Man12, Man13}.
|
||||||
|
|
||||||
keylookup_manifest_test() ->
|
keylookup_manifest_test() ->
|
||||||
|
@ -620,7 +529,6 @@ keylookup_manifest_test() ->
|
||||||
|
|
||||||
?assertMatch("pid_z2", key_lookup(Man6, 1, LK1_2)),
|
?assertMatch("pid_z2", key_lookup(Man6, 1, LK1_2)),
|
||||||
?assertMatch("pid_z2", key_lookup(Man6, 1, LK1_3)),
|
?assertMatch("pid_z2", key_lookup(Man6, 1, LK1_3)),
|
||||||
io:format("Commencing failing test:~n"),
|
|
||||||
?assertMatch("pid_z3", key_lookup(Man6, 1, LK1_4)),
|
?assertMatch("pid_z3", key_lookup(Man6, 1, LK1_4)),
|
||||||
?assertMatch("pid_z3", key_lookup(Man6, 1, LK1_5)),
|
?assertMatch("pid_z3", key_lookup(Man6, 1, LK1_5)),
|
||||||
|
|
||||||
|
@ -636,50 +544,48 @@ keylookup_manifest_test() ->
|
||||||
rangequery_manifest_test() ->
|
rangequery_manifest_test() ->
|
||||||
{_Man0, _Man1, _Man2, _Man3, _Man4, _Man5, Man6} = initial_setup(),
|
{_Man0, _Man1, _Man2, _Man3, _Man4, _Man5, Man6} = initial_setup(),
|
||||||
|
|
||||||
|
PidMapFun =
|
||||||
|
fun(Pointer) ->
|
||||||
|
{next, ME, _SK} = Pointer,
|
||||||
|
ME#manifest_entry.owner
|
||||||
|
end,
|
||||||
|
|
||||||
SK1 = {o, "Bucket1", "K711", null},
|
SK1 = {o, "Bucket1", "K711", null},
|
||||||
EK1 = {o, "Bucket1", "K999", null},
|
EK1 = {o, "Bucket1", "K999", null},
|
||||||
RL1_1 = range_lookup(Man6, 1, SK1, EK1),
|
RL1_1 = lists:map(PidMapFun, range_lookup(Man6, 1, SK1, EK1)),
|
||||||
?assertMatch([{next, "pid_z3", {o, "Bucket1", "K75", null}}], RL1_1),
|
?assertMatch(["pid_z3"], RL1_1),
|
||||||
RL1_2 = range_lookup(Man6, 2, SK1, EK1),
|
RL1_2 = lists:map(PidMapFun, range_lookup(Man6, 2, SK1, EK1)),
|
||||||
?assertMatch([{next, "pid_z5", {o, "Bucket1", "K711", null}},
|
?assertMatch(["pid_z5", "pid_z6"], RL1_2),
|
||||||
{next, "pid_z6", {o, "Bucket1", "K81", null}}],
|
|
||||||
RL1_2),
|
|
||||||
SK2 = {i, "Bucket1", {"Idx1", "Fld8"}, null},
|
SK2 = {i, "Bucket1", {"Idx1", "Fld8"}, null},
|
||||||
EK2 = {i, "Bucket1", {"Idx1", "Fld8"}, null},
|
EK2 = {i, "Bucket1", {"Idx1", "Fld8"}, null},
|
||||||
RL2_1 = range_lookup(Man6, 1, SK2, EK2),
|
RL2_1 = lists:map(PidMapFun, range_lookup(Man6, 1, SK2, EK2)),
|
||||||
?assertMatch([{next, "pid_z1", {i, "Bucket1", {"Idx1", "Fld8"}, null}}],
|
?assertMatch(["pid_z1"], RL2_1),
|
||||||
RL2_1),
|
RL2_2 = lists:map(PidMapFun, range_lookup(Man6, 2, SK2, EK2)),
|
||||||
RL2_2 = range_lookup(Man6, 2, SK2, EK2),
|
?assertMatch(["pid_z5"], RL2_2),
|
||||||
?assertMatch([{next, "pid_z5", {i, "Bucket1", {"Idx1", "Fld8"}, null}}],
|
|
||||||
RL2_2),
|
|
||||||
|
|
||||||
SK3 = {o, "Bucket1", "K994", null},
|
SK3 = {o, "Bucket1", "K994", null},
|
||||||
EK3 = {o, "Bucket1", "K995", null},
|
EK3 = {o, "Bucket1", "K995", null},
|
||||||
RL3_1 = range_lookup(Man6, 1, SK3, EK3),
|
RL3_1 = lists:map(PidMapFun, range_lookup(Man6, 1, SK3, EK3)),
|
||||||
?assertMatch([], RL3_1),
|
?assertMatch([], RL3_1),
|
||||||
RL3_2 = range_lookup(Man6, 2, SK3, EK3),
|
RL3_2 = lists:map(PidMapFun, range_lookup(Man6, 2, SK3, EK3)),
|
||||||
?assertMatch([{next, "pid_z6", {o, "Bucket1", "K994", null}}], RL3_2),
|
?assertMatch(["pid_z6"], RL3_2),
|
||||||
|
|
||||||
{_Man7, _Man8, _Man9, _Man10, _Man11, _Man12,
|
{_Man7, _Man8, _Man9, _Man10, _Man11, _Man12,
|
||||||
Man13} = changeup_setup(Man6),
|
Man13} = changeup_setup(Man6),
|
||||||
|
|
||||||
% Results unchanged despiter ES table change if using old manifest
|
RL1_1A = lists:map(PidMapFun, range_lookup(Man6, 1, SK1, EK1)),
|
||||||
RL1_1A = range_lookup(Man6, 1, SK1, EK1),
|
?assertMatch(["pid_z3"], RL1_1A),
|
||||||
?assertMatch([{next, "pid_z3", {o, "Bucket1", "K75", null}}], RL1_1A),
|
RL2_1A = lists:map(PidMapFun, range_lookup(Man6, 1, SK2, EK2)),
|
||||||
RL2_1A = range_lookup(Man6, 1, SK2, EK2),
|
?assertMatch(["pid_z1"], RL2_1A),
|
||||||
?assertMatch([{next, "pid_z1", {i, "Bucket1", {"Idx1", "Fld8"}, null}}],
|
RL3_1A = lists:map(PidMapFun, range_lookup(Man6, 1, SK3, EK3)),
|
||||||
RL2_1A),
|
|
||||||
RL3_1A = range_lookup(Man6, 1, SK3, EK3),
|
|
||||||
?assertMatch([], RL3_1A),
|
?assertMatch([], RL3_1A),
|
||||||
|
|
||||||
RL1_1B = range_lookup(Man13, 1, SK1, EK1),
|
RL1_1B = lists:map(PidMapFun, range_lookup(Man13, 1, SK1, EK1)),
|
||||||
?assertMatch([{next, "pid_y3", {o, "Bucket1", "K711", null}},
|
?assertMatch(["pid_y3", "pid_y4"], RL1_1B),
|
||||||
{next, "pid_y4", {o, "Bucket1", "K815", null}}], RL1_1B),
|
RL2_1B = lists:map(PidMapFun, range_lookup(Man13, 1, SK2, EK2)),
|
||||||
RL2_1B = range_lookup(Man13, 1, SK2, EK2),
|
?assertMatch(["pid_y1"], RL2_1B),
|
||||||
?assertMatch([{next, "pid_y1", {i, "Bucket1", {"Idx1", "Fld8"}, null}}],
|
RL3_1B = lists:map(PidMapFun, range_lookup(Man13, 1, SK3, EK3)),
|
||||||
RL2_1B),
|
?assertMatch(["pid_y4"], RL3_1B).
|
||||||
RL3_1B = range_lookup(Man13, 1, SK3, EK3),
|
|
||||||
?assertMatch([{next, "pid_y4", {o, "Bucket1", "K994", null}}], RL3_1B).
|
|
||||||
|
|
||||||
levelzero_present_test() ->
|
levelzero_present_test() ->
|
||||||
E0 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"},
|
E0 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"},
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue