diff --git a/include/leveled.hrl b/include/leveled.hrl index 7843739..13d862e 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -20,17 +20,6 @@ expire_tombstones = false :: boolean(), penciller :: pid()}). --record(penciller_work, - {next_sqn :: integer(), - clerk :: pid(), - src_level :: integer(), - start_time :: tuple(), - ledger_filepath :: string(), - unreferenced_files :: list(), - new_files :: list(), - level_counts :: dict(), - target_is_basement = false ::boolean()}). - -record(level, {level :: integer(), is_basement = false :: boolean(), diff --git a/src/leveled_log.erl b/src/leveled_log.erl index f4b59a2..f5829be 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -148,7 +148,7 @@ {"PC010", {info, "Merge to be commenced for FileToMerge=~s with MSN=~w"}}, {"PC011", - {info, "Merge completed with MSN=~w Level=~w and FileCounter=~w"}}, + {info, "Merge completed with MSN=~w to Level=~w and FileCounter=~w"}}, {"PC012", {info, "File to be created as part of MSN=~w Filename=~s"}}, {"PC013", diff --git a/src/leveled_manifest.erl b/src/leveled_manifest.erl index 82c323a..f7efed9 100644 --- a/src/leveled_manifest.erl +++ b/src/leveled_manifest.erl @@ -32,30 +32,56 @@ -export([ new_manifest/0, open_manifest/1, - save_manifest/3, - initiate_from_manifest/1, - key_lookup/4, - key_lookup/5, - range_lookup/5, + copy_manifest/1, + load_manifest/3, + save_manifest/2, + get_manifest_sqn/1, + key_lookup/3, + range_lookup/4, + merge_lookup/4, insert_manifest_entry/4, remove_manifest_entry/4, - add_snapshot/4, + mergefile_selector/2, + add_snapshot/3, release_snapshot/2, - ready_to_delete/2 + ready_to_delete/2, + check_for_work/2, + is_basement/2, + dump_pidmap/1, + levelzero_present/1, + pointer_convert/2 ]). -include_lib("eunit/include/eunit.hrl"). -define(MANIFEST_FILEX, "man"). -define(MANIFEST_FP, "ledger_manifest"). +-define(MAX_LEVELS, 8). +-define(END_KEY, {null, null, null, null}). +-record(manifest, {table, + % A Multi-Version ETS table for lookup + pidmap, + % A dictionary to map filenames to {Pid, DeleteSQN} + manifest_sqn = 0 :: integer(), + % The current manifest SQN + is_clone = false :: boolean(), + % Is this manifest held by a clone (i.e. snapshot) + level_counts, + % An array of level counts to speed up compation work assessment + snapshots :: list(), + % A list of snaphots (i.e. clones) + delete_sqn :: integer()|infinity + % The lowest SQN of any clone + }). %%%============================================================================ %%% API %%%============================================================================ new_manifest() -> - ets:new(manifest, [ordered_set]). + Table = ets:new(manifest, [ordered_set]), + new_manifest(Table). open_manifest(RootPath) -> % Open the manifest in the file path which has the highest SQN, and will @@ -75,76 +101,289 @@ open_manifest(RootPath) -> ValidManSQNs = lists:reverse(lists:sort(lists:foldl(ExtractSQNFun, [], Filenames))), - open_manifestfile(RootPath, ValidManSQNs). + {ManSQN, Table} = open_manifestfile(RootPath, ValidManSQNs), + Manifest = new_manifest(Table), + Manifest#manifest{manifest_sqn = ManSQN}. + +copy_manifest(Manifest) -> + % Copy the manifest ensuring anything only the master process should care + % about is switched to undefined + #manifest{is_clone = true, + table = Manifest#manifest.table, + manifest_sqn = Manifest#manifest.manifest_sqn, + pidmap = Manifest#manifest.pidmap}. -save_manifest(Manifest, RootPath, ManSQN) -> - FP = filepath(RootPath, ManSQN, current_manifest), - ets:tab2file(Manifest, +load_manifest(Manifest, PidFun, SQNFun) -> + FlatManifest = ets:tab2list(Manifest#manifest.table), + InitiateFun = + fun({{L, _EK, FN}, {_SK, ActSt, DelSt}}, {MaxSQN, AccMan}) -> + case {ActSt, DelSt} of + {{active, _ActSQN}, {tomb, infinity}} -> + Pid = PidFun(FN), + PidMap0 = dict:store(FN, + {Pid, infinity}, + AccMan#manifest.pidmap), + LC = array:get(L, AccMan#manifest.level_counts), + LC0 = array:set(L, LC + 1, AccMan#manifest.level_counts), + AccMan0 = AccMan#manifest{pidmap = PidMap0, + level_counts = LC0}, + SQN = SQNFun(Pid), + MaxSQN0 = max(MaxSQN, SQN), + {MaxSQN0, AccMan0}; + {_, {tomb, _TombSQN}} -> + {MaxSQN, AccMan} + end + end, + lists:foldl(InitiateFun, {1, Manifest}, FlatManifest). + +save_manifest(Manifest, RootPath) -> + FP = filepath(RootPath, Manifest#manifest.manifest_sqn, current_manifest), + ets:tab2file(Manifest#manifest.table, FP, [{extended_info, [md5sum]}, {sync, true}]). -initiate_from_manifest(Manifest) -> - FlatManifest = ets:tab2list(Manifest), - InitiateFun = - fun({{L, _EK, FN}, {_SK, ActSt, DelSt}}, {FNList, MaxSQN, LCount}) -> - case {ActSt, DelSt} of - {{active, ActSQN}, {tomb, infinity}} -> - {[FN|FNList], - max(ActSQN, MaxSQN), - dict:update_counter(L, 1, LCount)}; - {_, {tomb, TombSQN}} -> - {FNList, max(TombSQN, MaxSQN), LCount} - end - end, - lists:foldl(InitiateFun, {[], 0, dict:new()}, FlatManifest). - - insert_manifest_entry(Manifest, ManSQN, Level, Entry) -> Key = {Level, Entry#manifest_entry.end_key, Entry#manifest_entry.filename}, + Pid = Entry#manifest_entry.owner, Value = {Entry#manifest_entry.start_key, {active, ManSQN}, {tomb, infinity}}, - true = ets:insert_new(Manifest, {Key, Value}). + true = ets:insert_new(Manifest#manifest.table, {Key, Value}), + PidMap0 = dict:store(Entry#manifest_entry.filename, + {Pid, infinity}, + Manifest#manifest.pidmap), + LC = array:get(Level, Manifest#manifest.level_counts), + LCArray0 = array:set(Level, LC + 1, Manifest#manifest.level_counts), + MaxManSQN = max(ManSQN, Manifest#manifest.manifest_sqn), + Manifest#manifest{pidmap = PidMap0, + level_counts = LCArray0, + manifest_sqn = MaxManSQN}. remove_manifest_entry(Manifest, ManSQN, Level, Entry) -> Key = {Level, Entry#manifest_entry.end_key, Entry#manifest_entry.filename}, [{Key, Value0}] = ets:lookup(Manifest, Key), {StartKey, {active, ActiveSQN}, {tomb, infinity}} = Value0, Value1 = {StartKey, {active, ActiveSQN}, {tomb, ManSQN}}, - true = ets:insert(Manifest, {Key, Value1}). + true = ets:insert(Manifest#manifest.table, {Key, Value1}), + {Pid, infinity} = dict:fetch(Entry#manifest_entry.filename, + Manifest#manifest.pidmap), + PidMap0 = dict:store(Entry#manifest_entry.filename, + {Pid, ManSQN}, + Manifest#manifest.pidmap), + LC = array:get(Level, Manifest#manifest.level_counts), + LCArray0 = array:set(Level, LC - 1, Manifest#manifest.level_counts), + MaxManSQN = max(ManSQN, Manifest#manifest.manifest_sqn), + Manifest#manifest{pidmap = PidMap0, + level_counts = LCArray0, + manifest_sqn = MaxManSQN}. -key_lookup(Manifest, Level, Key, ManSQN) -> - key_lookup(Manifest, Level, Key, ManSQN, false). +get_manifest_sqn(Manifest) -> + Manifest#manifest.manifest_sqn. -key_lookup(Manifest, Level, Key, ManSQN, GC) -> - key_lookup(Manifest, Level, {Key, 0}, Key, ManSQN, GC). +key_lookup(Manifest, Level, Key) -> + GC = + case Manifest#manifest.is_clone of + true -> + false; + false -> + {true, Manifest#manifest.delete_sqn} + end, + FN = key_lookup(Manifest#manifest.table, + Level, + Key, + Manifest#manifest.manifest_sqn, + GC), + case FN of + false -> + false; + _ -> + {Pid, _TombSQN} = dict:fetch(FN, Manifest#manifest.pidmap), + Pid + end. + +range_lookup(Manifest, Level, StartKey, EndKey) -> + MapFun = + fun({{_Level, _LastKey, FN}, FirstKey}) -> + {next, dict:fetch(FN, Manifest#manifest.pidmap), FirstKey} + end, + range_lookup(Manifest, Level, StartKey, EndKey, MapFun). -range_lookup(Manifest, Level, StartKey, EndKey, ManSQN) -> - range_lookup(Manifest, Level, {StartKey, 0}, StartKey, EndKey, [], ManSQN). +merge_lookup(Manifest, Level, StartKey, EndKey) -> + MapFun = + fun({{_Level, LastKey, FN}, FirstKey}) -> + Owner = dict:fetch(FN, Manifest#manifest.pidmap), + #manifest_entry{filename = FN, + owner = Owner, + start_key = FirstKey, + end_key = LastKey} + end, + range_lookup(Manifest, Level, StartKey, EndKey, MapFun). -add_snapshot(SnapList0, Pid, ManifestSQN, Timeout) -> - [{Pid, ManifestSQN, Timeout}|SnapList0]. +pointer_convert(Manifest, EntryList) -> + MapFun = + fun(Entry) -> + {next, + dict:fetch(Entry#manifest_entry.filename, + Manifest#manifest.pidmap), + all} + end, + lists:map(MapFun, EntryList). -release_snapshot(SnapList0, Pid) -> +%% An algorithm for discovering which files to merge .... +%% We can find the most optimal file: +%% - The one with the most overlapping data below? +%% - The one that overlaps with the fewest files below? +%% - The smallest file? +%% We could try and be fair in some way (merge oldest first) +%% Ultimately, there is a lack of certainty that being fair or optimal is +%% genuinely better - eventually every file has to be compacted. +%% +%% Hence, the initial implementation is to select files to merge at random +mergefile_selector(Manifest, Level) -> + KL = range_lookup(Manifest#manifest.table, + Level, + {all, 0}, + all, + ?END_KEY, + [], + Manifest#manifest.manifest_sqn), + {{Level, LastKey, FN}, + FirstKey} = lists:nth(random:uniform(length(KL)), KL), + {Owner, infinity} = dict:fetch(FN, Manifest#manifest.pidmap), + #manifest_entry{filename = FN, + owner = Owner, + start_key = FirstKey, + end_key = LastKey}. + +add_snapshot(Manifest, Pid, Timeout) -> + SnapEntry = {Pid, Manifest#manifest.manifest_sqn, Timeout}, + SnapList0 = [SnapEntry|Manifest#manifest.snapshots], + MinDelSQN = min(Manifest#manifest.delete_sqn, Manifest#manifest.manifest_sqn), + Manifest#manifest{snapshots = SnapList0, delete_sqn = MinDelSQN}. + +release_snapshot(Manifest, Pid) -> FilterFun = - fun({P, SQN, TS}, Acc) -> + fun({P, SQN, TS}, {Acc, MinSQN}) -> case P of Pid -> Acc; _ -> - [{P, SQN, TS}|Acc] + {[{P, SQN, TS}|Acc], min(SQN, MinSQN)} end end, - lists:foldl(FilterFun, [], SnapList0). + {SnapList0, DeleteSQN} = lists:foldl(FilterFun, + {[], infinity}, + Manifest#manifest.snapshots), + leveled_log:log("P0004", [SnapList0]), + Manifest#manifest{snapshots = SnapList0, delete_sqn = DeleteSQN}. -ready_to_delete(SnapList0, DeleteSQN) -> - ready_to_delete(SnapList0, DeleteSQN, os:timestamp()). +ready_to_delete(Manifest, Filename) -> + case dict:fetch(Filename, Manifest#manifest.pidmap) of + {P, infinity} -> + {false, P}; + {P, DeleteSQN} -> + {ready_to_delete(Manifest#manifest.snapshots, + DeleteSQN, + os:timestamp()), + P} + end. + +check_for_work(Manifest, Thresholds) -> + CheckLevelFun = + fun({Level, MaxCount}, {AccL, AccC}) -> + case dict:fetch(Level, Manifest#manifest.level_counts) of + LC when LC > MaxCount -> + {[Level|AccL], AccC + LC - MaxCount}; + _ -> + {AccL, AccC} + end + end, + lists:foldl(CheckLevelFun, {[], 0}, Thresholds). + +is_basement(Manifest, Level) -> + CheckFun = + fun(L, Acc) -> + case array:get(L, Manifest#manifest.level_counts) of + 0 -> + Acc; + _N -> + false + end + end, + lists:foldl(CheckFun, true, lists:seq(Level + 1, ?MAX_LEVELS)). + +dump_pidmap(Manifest) -> + dict:to_list(Manifest#manifest.pidmap). + +levelzero_present(Manifest) -> + case key_lookup(Manifest, 0, all) of + false -> + false; + _ -> + true + end. %%%============================================================================ %%% Internal Functions %%%============================================================================ + +new_manifest(Table) -> + #manifest{ + table = Table, + pidmap = dict:new(), + level_counts = array:new([{size, ?MAX_LEVELS + 1}, {default, 0}]), + snapshots = [], + delete_sqn = infinity + }. + +range_lookup(Manifest, Level, StartKey, EndKey, MapFun) -> + KL = range_lookup(Manifest#manifest.table, + Level, + {StartKey, 0}, + StartKey, + EndKey, + [], + Manifest#manifest.manifest_sqn), + lists:map(MapFun, KL). + +range_lookup(Manifest, Level, {LastKey, LastFN}, SK, EK, Acc, ManSQN) -> + case ets:next(Manifest, {Level, LastKey, LastFN}) of + '$end_of_table' -> + Acc; + {Level, NextKey, NextFN} -> + [{K, V}] = ets:lookup(Manifest, {Level, NextKey, NextFN}), + {FirstKey, {active, ActiveSQN}, {tomb, TombSQN}} = V, + Active = (ManSQN >= ActiveSQN) and (ManSQN < TombSQN), + case Active of + true -> + PostEnd = leveled_codec:endkey_passed(EK, FirstKey), + case PostEnd of + true -> + Acc; + false -> + range_lookup(Manifest, + Level, + {NextKey, NextFN}, + SK, + EK, + Acc ++ [{K, FirstKey}], + ManSQN) + end; + false -> + range_lookup(Manifest, + Level, + {NextKey, NextFN}, + SK, + EK, + Acc, + ManSQN) + end; + {OtherLevel, _, _} when OtherLevel > Level -> + Acc + end. + ready_to_delete(SnapList, FileDeleteSQN, Now) -> FilterFun = fun({P, SnapSQN, ExpiryTS}, Acc) -> @@ -179,10 +418,10 @@ filepath(RootPath, NewMSN, current_manifest) -> open_manifestfile(_RootPath, []) -> leveled_log:log("P0013", []), - new_manifest(); + {0, new_manifest()}; open_manifestfile(_RootPath, [0]) -> leveled_log:log("P0013", []), - new_manifest(); + {0, new_manifest()}; open_manifestfile(RootPath, [TopManSQN|Rest]) -> CurrManFile = filepath(RootPath, TopManSQN, current_manifest), case ets:file2tab(CurrManFile, [{verify,true}]) of @@ -191,9 +430,12 @@ open_manifestfile(RootPath, [TopManSQN|Rest]) -> open_manifestfile(RootPath, Rest); {ok, Table} -> leveled_log:log("P0012", [TopManSQN]), - Table + {TopManSQN, Table} end. +key_lookup(Manifest, Level, KeyToFind, ManSQN, GC) -> + key_lookup(Manifest, Level, {KeyToFind, any}, KeyToFind, ManSQN, GC). + key_lookup(Manifest, Level, {LastKey, LastFN}, KeyToFind, ManSQN, GC) -> case ets:next(Manifest, {Level, LastKey, LastFN}) of '$end_of_table' -> @@ -234,41 +476,6 @@ key_lookup(Manifest, Level, {LastKey, LastFN}, KeyToFind, ManSQN, GC) -> false end. -range_lookup(Manifest, Level, {LastKey, LastFN}, SK, EK, Acc, ManSQN) -> - case ets:next(Manifest, {Level, LastKey, LastFN}) of - '$end_of_table' -> - Acc; - {Level, NextKey, NextFN} -> - [{_K, V}] = ets:lookup(Manifest, {Level, NextKey, NextFN}), - {FirstKey, {active, ActiveSQN}, {tomb, TombSQN}} = V, - Active = (ManSQN >= ActiveSQN) and (ManSQN < TombSQN), - case Active of - true -> - PostEnd = leveled_codec:endkey_passed(EK, FirstKey), - case PostEnd of - true -> - Acc; - false -> - range_lookup(Manifest, - Level, - {NextKey, NextFN}, - SK, - EK, - Acc ++ [NextFN], - ManSQN) - end; - false -> - range_lookup(Manifest, - Level, - {NextKey, NextFN}, - SK, - EK, - Acc, - ManSQN) - end; - {OtherLevel, _, _} when OtherLevel > Level -> - Acc - end. %%%============================================================================ %%% Test @@ -276,8 +483,6 @@ range_lookup(Manifest, Level, {LastKey, LastFN}, SK, EK, Acc, ManSQN) -> -ifdef(TEST). - - rangequery_manifest_test() -> E1 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"}, end_key={i, "Bucket1", {"Idx1", "Fld9"}, "K93"}, @@ -298,7 +503,7 @@ rangequery_manifest_test() -> end_key={o, "Bucket1", "K996", null}, filename="Z6"}, - Manifest = open_manifestfile(dummy, []), + Manifest = new_manifest(), insert_manifest_entry(Manifest, 1, 1, E1), insert_manifest_entry(Manifest, 1, 1, E2), insert_manifest_entry(Manifest, 1, 1, E3), @@ -308,22 +513,22 @@ rangequery_manifest_test() -> SK1 = {o, "Bucket1", "K711", null}, EK1 = {o, "Bucket1", "K999", null}, - RL1_1 = range_lookup(Manifest, 1, SK1, EK1, 1), + RL1_1 = range_lookup(Manifest, 1, SK1, EK1), ?assertMatch(["Z3"], RL1_1), - RL1_2 = range_lookup(Manifest, 2, SK1, EK1, 1), + RL1_2 = range_lookup(Manifest, 2, SK1, EK1), ?assertMatch(["Z5", "Z6"], RL1_2), SK2 = {i, "Bucket1", {"Idx1", "Fld8"}, null}, EK2 = {i, "Bucket1", {"Idx1", "Fld8"}, null}, - RL2_1 = range_lookup(Manifest, 1, SK2, EK2, 1), + RL2_1 = range_lookup(Manifest, 1, SK2, EK2), ?assertMatch(["Z1"], RL2_1), - RL2_2 = range_lookup(Manifest, 2, SK2, EK2, 1), + RL2_2 = range_lookup(Manifest, 2, SK2, EK2), ?assertMatch(["Z5"], RL2_2), SK3 = {o, "Bucket1", "K994", null}, EK3 = {o, "Bucket1", "K995", null}, - RL3_1 = range_lookup(Manifest, 1, SK3, EK3, 1), + RL3_1 = range_lookup(Manifest, 1, SK3, EK3), ?assertMatch([], RL3_1), - RL3_2 = range_lookup(Manifest, 2, SK3, EK3, 1), + RL3_2 = range_lookup(Manifest, 2, SK3, EK3), ?assertMatch(["Z6"], RL3_2), E1_2 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld4"}, "K8"}, @@ -347,172 +552,19 @@ rangequery_manifest_test() -> remove_manifest_entry(Manifest, 2, 1, E2), remove_manifest_entry(Manifest, 2, 1, E3), - RL1_1A = range_lookup(Manifest, 1, SK1, EK1, 1), + RL1_1A = range_lookup(Manifest, 1, SK1, EK1), ?assertMatch(["Z3"], RL1_1A), - RL2_1A = range_lookup(Manifest, 1, SK2, EK2, 1), + RL2_1A = range_lookup(Manifest, 1, SK2, EK2), ?assertMatch(["Z1"], RL2_1A), - RL3_1A = range_lookup(Manifest, 1, SK3, EK3, 1), + RL3_1A = range_lookup(Manifest, 1, SK3, EK3), ?assertMatch([], RL3_1A), - RL1_1B = range_lookup(Manifest, 1, SK1, EK1, 2), + RL1_1B = range_lookup(Manifest, 1, SK1, EK1), ?assertMatch(["Y3", "Y4"], RL1_1B), - RL2_1B = range_lookup(Manifest, 1, SK2, EK2, 2), + RL2_1B = range_lookup(Manifest, 1, SK2, EK2), ?assertMatch(["Y1"], RL2_1B), - RL3_1B = range_lookup(Manifest, 1, SK3, EK3, 2), + RL3_1B = range_lookup(Manifest, 1, SK3, EK3), ?assertMatch(["Y4"], RL3_1B). -startup_manifest() - E1 = #manifest_entry{start_key={o, "Bucket1", "K0001", null}, - end_key={o, "Bucket1", "K0990", null}, - filename="Z1"}, - E2 = #manifest_entry{start_key={o, "Bucket1", "K1003", null}, - end_key={o, "Bucket1", "K3692", null}, - filename="Z2"}, - E3 = #manifest_entry{start_key={o, "Bucket1", "K3750", null}, - end_key={o, "Bucket1", "K9930", null}, - filename="Z3"}, - - Manifest0 = open_manifestfile(dummy, []), - insert_manifest_entry(Manifest0, 1, 1, E1), - insert_manifest_entry(Manifest0, 1, 1, E2), - insert_manifest_entry(Manifest0, 1, 1, E3), - Manifest0 - -keyquery_manifest_test() -> - Manifest0 = startup_manifest(), - - EToRemove = #manifest_entry{start_key={o, "Bucket99", "K3750", null}, - end_key={o, "Bucket99", "K9930", null}, - filename="ZR"}, - insert_manifest_entry(Manifest0, 1, 1, EToRemove), - remove_manifest_entry(Manifest0, 2, 1, EToRemove), - - RootPath = "../test", - ok = filelib:ensure_dir(filepath(RootPath, manifest)), - ok = save_manifest(Manifest0, RootPath, 2), - true = ets:delete(Manifest0), - ?assertMatch(true, filelib:is_file(filepath(RootPath, - 2, - current_manifest))), - - BadFP = filepath(RootPath, 3, current_manifest), - ok = file:write_file(BadFP, list_to_binary("nonsense")), - ?assertMatch(true, filelib:is_file(BadFP)), - - Manifest = open_manifest(RootPath), - {FNList, ManSQN, LCount} = initiate_from_manifest(Manifest), - ?assertMatch(["Z1", "Z2", "Z3"], lists:sort(FNList)), - ?assertMatch(2, ManSQN), - ?assertMatch(3, dict:fetch(1, LCount)), - - K1 = {o, "Bucket1", "K0000", null}, - K2 = {o, "Bucket1", "K0001", null}, - K3 = {o, "Bucket1", "K0002", null}, - K4 = {o, "Bucket1", "K0990", null}, - K5 = {o, "Bucket1", "K0991", null}, - K6 = {o, "Bucket1", "K1003", null}, - K7 = {o, "Bucket1", "K1004", null}, - K8 = {o, "Bucket1", "K3692", null}, - K9 = {o, "Bucket1", "K3693", null}, - K10 = {o, "Bucket1", "K3750", null}, - K11 = {o, "Bucket1", "K3751", null}, - K12 = {o, "Bucket1", "K9930", null}, - K13 = {o, "Bucket1", "K9931", null}, - - ?assertMatch(false, key_lookup(Manifest, 1, K1, 2)), - ?assertMatch("Z1", key_lookup(Manifest, 1, K2, 2)), - ?assertMatch("Z1", key_lookup(Manifest, 1, K3, 2)), - ?assertMatch("Z1", key_lookup(Manifest, 1, K4, 2)), - ?assertMatch(false, key_lookup(Manifest, 1, K5, 2)), - ?assertMatch("Z2", key_lookup(Manifest, 1, K6, 2)), - ?assertMatch("Z2", key_lookup(Manifest, 1, K7, 2)), - ?assertMatch("Z2", key_lookup(Manifest, 1, K8, 2)), - ?assertMatch(false, key_lookup(Manifest, 1, K9, 2)), - ?assertMatch("Z3", key_lookup(Manifest, 1, K10, 2)), - ?assertMatch("Z3", key_lookup(Manifest, 1, K11, 2)), - ?assertMatch("Z3", key_lookup(Manifest, 1, K12, 2)), - ?assertMatch(false, key_lookup(Manifest, 1, K13, 2)), - - E1_2 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld4"}, "K8"}, - end_key={i, "Bucket1", {"Idx1", "Fld9"}, "K62"}, - filename="Y1"}, - E2_2 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld9"}, "K67"}, - end_key={o, "Bucket1", "K45", null}, - filename="Y2"}, - E3_2 = #manifest_entry{start_key={o, "Bucket1", "K47", null}, - end_key={o, "Bucket1", "K812", null}, - filename="Y3"}, - E4_2 = #manifest_entry{start_key={o, "Bucket1", "K815", null}, - end_key={o, "Bucket1", "K998", null}, - filename="Y4"}, - - insert_manifest_entry(Manifest, 3, 1, E1_2), - insert_manifest_entry(Manifest, 3, 1, E2_2), - insert_manifest_entry(Manifest, 3, 1, E3_2), - insert_manifest_entry(Manifest, 3, 1, E4_2), - - S1 = ets:info(Manifest, size), - - remove_manifest_entry(Manifest, 3, 1, E1), - remove_manifest_entry(Manifest, 3, 1, E2), - remove_manifest_entry(Manifest, 3, 1, E3), - - S2 = ets:info(Manifest, size), - ?assertMatch(true, S2 == S1), - - ?assertMatch("Y2", key_lookup(Manifest, 1, K1, 3)), - ?assertMatch("Y2", key_lookup(Manifest, 1, K10, 3)), - ?assertMatch("Y4", key_lookup(Manifest, 1, K12, 3)), - - S3 = ets:info(Manifest, size), - ?assertMatch(true, S3 == S1), - - ?assertMatch("Y2", key_lookup(Manifest, 1, K1, 3, {true, 3})), - ?assertMatch("Y2", key_lookup(Manifest, 1, K10, 3, {true, 3})), - ?assertMatch("Y4", key_lookup(Manifest, 1, K12, 3, {true, 3})), - - S4 = ets:info(Manifest, size), - ?assertMatch(true, S4 == S1), - - ?assertMatch("Y2", key_lookup(Manifest, 1, K1, 4, {true, 4})), - ?assertMatch("Y2", key_lookup(Manifest, 1, K10, 4, {true, 4})), - ?assertMatch("Y4", key_lookup(Manifest, 1, K12, 4, {true, 4})), - - S5 = ets:info(Manifest, size), - ?assertMatch(true, S5 < S1). - -snapshot_test() -> - Snap0 = [], - - ?assertMatch(true, ready_to_delete(Snap0, 1)), - - {MegaS0, S0, MicroS0} = os:timestamp(), - - Snap1 = add_snapshot(Snap0, pid_1, 3, {MegaS0, S0 + 100, MicroS0}), - Snap2 = add_snapshot(Snap1, pid_2, 4, {MegaS0, S0 + 200, MicroS0}), - Snap3 = add_snapshot(Snap2, pid_3, 4, {MegaS0, S0 + 150, MicroS0}), - Snap4 = add_snapshot(Snap3, pid_4, 5, {MegaS0, S0 + 300, MicroS0}), - - ?assertMatch(true, - ready_to_delete(Snap4, 2, {MegaS0, S0, MicroS0})), - ?assertMatch(false, - ready_to_delete(Snap4, 3, {MegaS0, S0, MicroS0})), - ?assertMatch(true, - ready_to_delete(Snap4, 3, {MegaS0, S0 + 150, MicroS0})), - ?assertMatch(false, - ready_to_delete(Snap4, 4, {MegaS0, S0 + 150, MicroS0})), - ?assertMatch(true, - ready_to_delete(Snap4, 4, {MegaS0, S0 + 250, MicroS0})), - - Snap5 = release_snapshot(Snap4, pid_1), - ?assertMatch(true, - ready_to_delete(Snap5, 3, {MegaS0, S0, MicroS0})). - - -allatlevel_test() -> - Manifest0 = startup_manifest(), - AllAtL1 = range_lookup(Manifest, 1, all, {null, null, null, null}, 1), - ?assertMatch(["Z1", "Z2", "Z3"], AllAtL1). - -endif. \ No newline at end of file diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index 58ed347..a0ddbcd 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -25,26 +25,28 @@ -include("include/leveled.hrl"). --export([init/1, +-export([ + init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - clerk_new/1, + code_change/3 + ]). + +-export([ + clerk_new/2, clerk_prompt/1, - clerk_manifestchange/3, - code_change/3]). + clerk_close/1 + ]). -include_lib("eunit/include/eunit.hrl"). --define(MAX_TIMEOUT, 2000). --define(MIN_TIMEOUT, 50). --define(END_KEY, {null, null, null, null}). +-define(MAX_TIMEOUT, 1000). +-define(MIN_TIMEOUT, 200). -record(state, {owner :: pid(), - manifest, % ets table reference - change_pending=false :: boolean(), - work_item :: #penciller_work{}|null}). + root_path :: string()}). %%%============================================================================ %%% API @@ -69,26 +71,22 @@ clerk_close(Pid) -> init([]) -> {ok, #state{}}. -handle_call({load, Owner, Manifest}, _From, State) -> - {reply, - ok, - State#state{owner=Owner, manifest=Manifest}, - ?MIN_TIMEOUT}. +handle_call({load, Owner, RootPath}, _From, State) -> + {reply, ok, State#state{owner=Owner, root_path=RootPath}, ?MIN_TIMEOUT}. handle_cast(prompt, State) -> - {noreply, State, ?MIN_TIMEOUT}; + handle_info(timeout, State); handle_cast(close, State) -> - (stop, normal, State). + {stop, normal, State}. -handle_info(timeout, State=#state{change_pending=Pnd}) when Pnd == false -> +handle_info(timeout, State) -> case requestandhandle_work(State) of - {false, Timeout} -> - {noreply, State, Timeout}; - {true, WI} -> + false -> + {noreply, State, ?MAX_TIMEOUT}; + true -> % No timeout now as will wait for call to return manifest % change - {noreply, - State#state{change_pending=true, work_item=WI}} + {noreply, State, ?MIN_TIMEOUT} end. @@ -105,182 +103,116 @@ code_change(_OldVsn, State, _Extra) -> requestandhandle_work(State) -> case leveled_penciller:pcl_workforclerk(State#state.owner) of - false -> + none -> leveled_log:log("PC006", []), false; - {SrcLevel, ManifestSQN} -> - {Additions, Removals} = merge(Level, - State#state.manifest, - ManifestSQN), + {SrcLevel, Manifest} -> + {UpdManifest, EntriesToDelete} = merge(SrcLevel, + Manifest, + State#state.root_path), leveled_log:log("PC007", []), ok = leveled_penciller:pcl_commitmanifestchange(State#state.owner, - SrcLevel, - Additions, - Removals, - ManifestSQN), + UpdManifest), + ok = leveled_manifest:save_manifest(UpdManifest, + State#state.root_path), + ok = notify_deletions(EntriesToDelete, State#state.owner), true end. -merge(SrcLevel, Manifest, ManifestSQN) -> - SrcF = select_filetomerge(SrcLevel, Manifest), - - - Candidates = check_for_merge_candidates(SrcF, SinkFiles), - %% TODO: - %% Need to work out if this is the top level - %% And then tell merge process to create files at the top level - %% Which will include the reaping of expired tombstones - leveled_log:log("PC008", [SrcLevel, length(Candidates)]), - - MergedFiles = case length(Candidates) of +merge(SrcLevel, Manifest, RootPath) -> + Src = leveled_manifest:mergefile_selector(Manifest, SrcLevel), + NewSQN = leveled_manifest:get_manifest_sqn(Manifest) + 1, + SinkList = leveled_manifest:merge_lookup(Manifest, + SrcLevel + 1, + Src#manifest_entry.start_key, + Src#manifest_entry.end_key), + Candidates = length(SinkList), + leveled_log:log("PC008", [SrcLevel, Candidates]), + case Candidates of 0 -> %% If no overlapping candiates, manifest change only required %% %% TODO: need to think still about simply renaming when at %% lower level leveled_log:log("PC009", - [SrcF#manifest_entry.filename, SrcLevel + 1]), - [SrcF]; + [Src#manifest_entry.filename, SrcLevel + 1]), + Man0 = leveled_manifest:remove_manifest_entry(Manifest, + NewSQN, + SrcLevel, + Src), + Man1 = leveled_manifest:insert_manifest_entry(Man0, + NewSQN, + SrcLevel + 1, + Src), + {Man1, []}; _ -> - perform_merge({SrcF#manifest_entry.owner, - SrcF#manifest_entry.filename}, - Candidates, - {SrcLevel, WI#penciller_work.target_is_basement}, - {WI#penciller_work.ledger_filepath, - WI#penciller_work.next_sqn}) - end, - NewLevel = lists:sort(lists:append(MergedFiles, Others)), - UpdMFest2 = lists:keystore(SrcLevel + 1, - 1, - UpdMFest1, - {SrcLevel + 1, NewLevel}), - - ok = filelib:ensure_dir(WI#penciller_work.manifest_file), - {ok, Handle} = file:open(WI#penciller_work.manifest_file, - [binary, raw, write]), - ok = file:write(Handle, term_to_binary(UpdMFest2)), - ok = file:close(Handle), - case lists:member(SrcF, MergedFiles) of - true -> - {UpdMFest2, Candidates}; - false -> - %% Can rub out src file as it is not part of output - {UpdMFest2, Candidates ++ [SrcF]} + FilePath = leveled_penciller:filepath(RootPath, + NewSQN, + new_merge_files), + perform_merge(Manifest, Src, SinkList, SrcLevel, FilePath, NewSQN) end. - -mark_for_delete([], _Penciller) -> +notify_deletions([], _Penciller) -> ok; -mark_for_delete([Head|Tail], Penciller) -> +notify_deletions([Head|Tail], Penciller) -> ok = leveled_sst:sst_setfordelete(Head#manifest_entry.owner, Penciller), - mark_for_delete(Tail, Penciller). - - -check_for_merge_candidates(SrcF, SinkFiles) -> - lists:partition(fun(Ref) -> - case {Ref#manifest_entry.start_key, - Ref#manifest_entry.end_key} of - {_, EK} when SrcF#manifest_entry.start_key > EK -> - false; - {SK, _} when SrcF#manifest_entry.end_key < SK -> - false; - _ -> - true - end end, - SinkFiles). - - -%% An algorithm for discovering which files to merge .... -%% We can find the most optimal file: -%% - The one with the most overlapping data below? -%% - The one that overlaps with the fewest files below? -%% - The smallest file? -%% We could try and be fair in some way (merge oldest first) -%% Ultimately, there is a lack of certainty that being fair or optimal is -%% genuinely better - eventually every file has to be compacted. -%% -%% Hence, the initial implementation is to select files to merge at random - -select_filetomerge(SrcLevel, Manifest, ManifestSQN) -> - Level = leveled_manifest:range_lookup(Manifest, - 1, - all, - ?END_KEY, - ManifestSQN), - - FN = lists:nth(random:uniform(length(Level)), Level). - - + notify_deletions(Tail, Penciller). + %% Assumption is that there is a single SST from a higher level that needs -%% to be merged into multiple SSTs at a lower level. This should create an -%% entirely new set of SSTs, and the calling process can then update the -%% manifest. +%% to be merged into multiple SSTs at a lower level. %% -%% Once the FileToMerge has been emptied, the remainder of the candidate list -%% needs to be placed in a remainder SST that may be of a sub-optimal (small) -%% size. This stops the need to perpetually roll over the whole level if the -%% level consists of already full files. Some smartness may be required when -%% selecting the candidate list so that small files just outside the candidate -%% list be included to avoid a proliferation of small files. -%% -%% FileToMerge should be a tuple of {FileName, Pid} where the Pid is the Pid of -%% the gen_server leveled_sft process representing the file. -%% -%% CandidateList should be a list of {StartKey, EndKey, Pid} tuples -%% representing different gen_server leveled_sft processes, sorted by StartKey. -%% -%% The level is the level which the new files should be created at. +%% SrcLevel is the level of the src sst file, the sink should be srcLevel + 1 -perform_merge({SrcPid, SrcFN}, CandidateList, LevelInfo, {Filepath, MSN}) -> - leveled_log:log("PC010", [SrcFN, MSN]), - PointerList = lists:map(fun(P) -> - {next, P#manifest_entry.owner, all} end, - CandidateList), - MaxSQN = leveled_sst:sst_getmaxsequencenumber(SrcPid), - do_merge([{next, SrcPid, all}], - PointerList, - LevelInfo, - {Filepath, MSN}, - MaxSQN, - 0, - []). +perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN) -> + leveled_log:log("PC010", [Src#manifest_entry.filename, NewSQN]), + SrcList = [{next, Src#manifest_entry.owner, all}], + SinkPointerList = leveled_manifest:pointer_convert(Manifest, SinkList), + MaxSQN = leveled_sst:sst_getmaxsequencenumber(Src#manifest_entry.owner), + SinkLevel = SrcLevel + 1, + SinkBasement = leveled_basement:is_basement(Manifest, SinkLevel), + Man0 = do_merge(SrcList, SinkPointerList, + SinkLevel, SinkBasement, + RootPath, NewSQN, MaxSQN, + 0, Manifest), + RemoveFun = + fun(Entry, AccMan) -> + leveled_manifest:remove_manifest_entry(AccMan, + NewSQN, + SinkLevel, + Entry) + end, + Man1 = lists:foldl(RemoveFun, Man0, SinkList), + leveled_manifest:remove_manifest_entry(Man1, NewSQN, SrcLevel, Src). -do_merge([], [], {SrcLevel, _IsB}, {_Filepath, MSN}, _MaxSQN, - FileCounter, OutList) -> - leveled_log:log("PC011", [MSN, SrcLevel, FileCounter]), - OutList; -do_merge(KL1, KL2, {SrcLevel, IsB}, {Filepath, MSN}, MaxSQN, - FileCounter, OutList) -> - FileName = lists:flatten(io_lib:format(Filepath ++ "_~w_~w.sst", - [SrcLevel + 1, FileCounter])), - leveled_log:log("PC012", [MSN, FileName]), +do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, Counter, Man0) -> + leveled_log:log("PC011", [NewSQN, SinkLevel, Counter]), + Man0; +do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, Counter, Man0) -> + FileName = lists:flatten(io_lib:format(RP ++ "_~w_~w.sst", + [SinkLevel, Counter])), + leveled_log:log("PC012", [NewSQN, FileName]), TS1 = os:timestamp(), - case leveled_sst:sst_new(FileName, KL1, KL2, IsB, SrcLevel + 1, MaxSQN) of + case leveled_sst:sst_new(FileName, KL1, KL2, SinkB, SinkLevel, MaxSQN) of empty -> leveled_log:log("PC013", [FileName]), - OutList; + Man0; {ok, Pid, Reply} -> {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply, - ExtMan = lists:append(OutList, - [#manifest_entry{start_key=SmallestKey, - end_key=HighestKey, - owner=Pid, - filename=FileName}]), + Entry = #manifest_entry{start_key=SmallestKey, + end_key=HighestKey, + owner=Pid, + filename=FileName}, + Man1 = leveled_manifest:insert_manifest_entry(Man0, + NewSQN, + SinkLevel, + Entry), leveled_log:log_timer("PC015", [], TS1), do_merge(KL1Rem, KL2Rem, - {SrcLevel, IsB}, {Filepath, MSN}, MaxSQN, - FileCounter + 1, ExtMan) - end. - - -get_item(Index, List, Default) -> - case lists:keysearch(Index, 1, List) of - {value, {Index, Value}} -> - Value; - false -> - Default + SinkLevel, SinkB, + RP, NewSQN, MaxSQN, + Counter + 1, Man1) end. @@ -306,26 +238,6 @@ generate_randomkeys(Count, Acc, BucketLow, BRange) -> null}}, generate_randomkeys(Count - 1, [RandKey|Acc], BucketLow, BRange). -choose_pid_toquery([ManEntry|_T], Key) when - Key >= ManEntry#manifest_entry.start_key, - ManEntry#manifest_entry.end_key >= Key -> - ManEntry#manifest_entry.owner; -choose_pid_toquery([_H|T], Key) -> - choose_pid_toquery(T, Key). - - -find_randomkeys(_FList, 0, _Source) -> - ok; -find_randomkeys(FList, Count, Source) -> - KV1 = lists:nth(random:uniform(length(Source)), Source), - K1 = leveled_codec:strip_to_keyonly(KV1), - P1 = choose_pid_toquery(FList, K1), - FoundKV = leveled_sst:sst_get(P1, K1), - Found = leveled_codec:strip_to_keyonly(FoundKV), - io:format("success finding ~w in ~w~n", [K1, P1]), - ?assertMatch(K1, Found), - find_randomkeys(FList, Count - 1, Source). - merge_file_test() -> KL1_L1 = lists:sort(generate_randomkeys(8000, 0, 1000)), @@ -353,57 +265,22 @@ merge_file_test() -> 2, KL4_L2, undefined), - Result = perform_merge({PidL1_1, "../test/KL1_L1.sst"}, - [#manifest_entry{owner=PidL2_1}, - #manifest_entry{owner=PidL2_2}, - #manifest_entry{owner=PidL2_3}, - #manifest_entry{owner=PidL2_4}], - {2, false}, {"../test/", 99}), - lists:foreach(fun(ManEntry) -> - {o, B1, K1} = ManEntry#manifest_entry.start_key, - {o, B2, K2} = ManEntry#manifest_entry.end_key, - io:format("Result of ~s ~s and ~s ~s with Pid ~w~n", - [B1, K1, B2, K2, ManEntry#manifest_entry.owner]) end, - Result), - io:format("Finding keys in KL1_L1~n"), - ok = find_randomkeys(Result, 50, KL1_L1), - io:format("Finding keys in KL1_L2~n"), - ok = find_randomkeys(Result, 50, KL1_L2), - io:format("Finding keys in KL2_L2~n"), - ok = find_randomkeys(Result, 50, KL2_L2), - io:format("Finding keys in KL3_L2~n"), - ok = find_randomkeys(Result, 50, KL3_L2), - io:format("Finding keys in KL4_L2~n"), - ok = find_randomkeys(Result, 50, KL4_L2), - leveled_sst:sst_clear(PidL1_1), - leveled_sst:sst_clear(PidL2_1), - leveled_sst:sst_clear(PidL2_2), - leveled_sst:sst_clear(PidL2_3), - leveled_sst:sst_clear(PidL2_4), - lists:foreach(fun(ManEntry) -> - leveled_sst:sst_clear(ManEntry#manifest_entry.owner) end, - Result). - -select_merge_candidates_test() -> - Sink1 = #manifest_entry{start_key = {o, "Bucket", "Key1"}, - end_key = {o, "Bucket", "Key20000"}}, - Sink2 = #manifest_entry{start_key = {o, "Bucket", "Key20001"}, - end_key = {o, "Bucket1", "Key1"}}, - Src1 = #manifest_entry{start_key = {o, "Bucket", "Key40001"}, - end_key = {o, "Bucket", "Key60000"}}, - {Candidates, Others} = check_for_merge_candidates(Src1, [Sink1, Sink2]), - ?assertMatch([Sink2], Candidates), - ?assertMatch([Sink1], Others). - - -select_merge_file_test() -> - L0 = [{{o, "B1", "K1"}, {o, "B3", "K3"}, dummy_pid}], - L1 = [{{o, "B1", "K1"}, {o, "B2", "K2"}, dummy_pid}, - {{o, "B2", "K3"}, {o, "B4", "K4"}, dummy_pid}], - Manifest = [{0, L0}, {1, L1}], - {FileRef, NewManifest} = select_filetomerge(0, Manifest), - ?assertMatch(FileRef, {{o, "B1", "K1"}, {o, "B3", "K3"}, dummy_pid}), - ?assertMatch(NewManifest, [{0, []}, {1, L1}]). + E1 = #manifest_entry{owner = PidL1_1, filename = "../test/KL1_L1.sst"}, + E2 = #manifest_entry{owner = PidL2_1, filename = "../test/KL1_L2.sst"}, + E3 = #manifest_entry{owner = PidL2_2, filename = "../test/KL2_L2.sst"}, + E4 = #manifest_entry{owner = PidL2_3, filename = "../test/KL3_L2.sst"}, + E5 = #manifest_entry{owner = PidL2_4, filename = "../test/KL4_L2.sst"}, + + Man0 = leveled_manifest:new_manifest(), + Man1 = leveled_manifest:insert_manifest_entry(Man0, 1, 2, E1), + Man2 = leveled_manifest:insert_manifest_entry(Man1, 1, 2, E1), + Man3 = leveled_manifest:insert_manifest_entry(Man2, 1, 2, E1), + Man4 = leveled_manifest:insert_manifest_entry(Man3, 1, 2, E1), + Man5 = leveled_manifest:insert_manifest_entry(Man4, 2, 1, E1), + + Man6 = perform_merge(Man5, E1, [E2, E3, E4, E5], 1, "../test", 3), + + ?assertMatch(3, leveled_manifest:get_manifest_sqn(Man6)). coverage_cheat_test() -> {ok, _State1} = code_change(null, #state{}, null). diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 0f4fa2d..d4423cb 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -161,12 +161,15 @@ -include("include/leveled.hrl"). --export([init/1, +-export([ + init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3, + code_change/3]). + +-export([ pcl_start/1, pcl_pushmem/2, pcl_fetchlevelzero/2, @@ -184,7 +187,10 @@ pcl_registersnapshot/2, pcl_releasesnapshot/2, pcl_loadsnapshot/2, - pcl_getstartupsequencenumber/1, + pcl_getstartupsequencenumber/1]). + +-export([ + filepath/3, clean_testdir/1]). -include_lib("eunit/include/eunit.hrl"). @@ -208,13 +214,8 @@ -define(ITERATOR_SCANWIDTH, 4). -define(SNAPSHOT_TIMEOUT, 3600). --record(state, {manifest, % an ETS table reference - manifest_sqn = 0 :: integer(), +-record(state, {manifest, % a manifest record from the leveled_manifest module persisted_sqn = 0 :: integer(), % The highest SQN persisted - registered_snapshots = [] :: list(), - pidmap = dict:new() :: dict(), - level_counts :: dict(), - deletions_pending = dict:new() ::dict(), ledger_sqn = 0 :: integer(), % The highest SQN added to L0 root_path = "../test" :: string(), @@ -290,8 +291,8 @@ pcl_checksequencenumber(Pid, Key, SQN) -> pcl_workforclerk(Pid) -> gen_server:call(Pid, work_for_clerk, infinity). -pcl_confirmmanifestchange(Pid, WI) -> - gen_server:cast(Pid, {manifest_change, WI}). +pcl_confirmmanifestchange(Pid, Manifest) -> + gen_server:cast(Pid, {manifest_change, Manifest}). pcl_confirml0complete(Pid, FN, StartKey, EndKey) -> gen_server:cast(Pid, {levelzero_complete, FN, StartKey, EndKey}). @@ -328,9 +329,11 @@ init([PCLopts]) -> {undefined, true} -> SrcPenciller = PCLopts#penciller_options.source_penciller, {ok, State} = pcl_registersnapshot(SrcPenciller, self()), + ManifestClone = leveled_manifest:copy_manifest(State#state.manifest), leveled_log:log("P0001", [self()]), - io:format("Snapshot ledger sqn at ~w~n", [State#state.ledger_sqn]), - {ok, State#state{is_snapshot=true, source_penciller=SrcPenciller}}; + {ok, State#state{is_snapshot=true, + source_penciller=SrcPenciller, + manifest=ManifestClone}}; %% Need to do something about timeout {_RootPath, false} -> start_from_file(PCLopts) @@ -375,24 +378,18 @@ handle_call({push_mem, {PushedTree, PushedIdx, MinSQN, MaxSQN}}, State)} end; handle_call({fetch, Key, Hash}, _From, State) -> - Structure = {State#state.manifest, - State#state.pid_map, - State#state.manifest_sqn}, {R, HeadTimer} = timed_fetch_mem(Key, Hash, - Structure, + State#state.manifest, State#state.levelzero_cache, State#state.levelzero_index, State#state.head_timing), {reply, R, State#state{head_timing=HeadTimer}}; handle_call({check_sqn, Key, Hash, SQN}, _From, State) -> - Structure = {State#state.manifest, - State#state.pid_map, - State#state.manifest_sqn}, {reply, compare_to_sqn(plain_fetch_mem(Key, Hash, - Structure, + State#state.manifest, State#state.levelzero_cache, State#state.levelzero_index), SQN), @@ -412,19 +409,15 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys}, List end, - ConvertToPointerFun = - fun(FN) -> {next, dict:fetch(FN, State#state.pid_map), StartKey} end, SetupFoldFun = fun(Level, Acc) -> - FNs = leveled_manifest:range_lookup(State#state.manifest, - Level, - StartKey, - EndKey, - State#state.manifest_sqn), - Pointers = lists:map(ConvertToPointerFun, FNs), + Pointers = leveled_manifest:range_lookup(State#state.manifest, + Level, + StartKey, + EndKey), case Pointers of [] -> Acc; - PL -> Acc ++ [{L, PL}] + PL -> Acc ++ [{Level, PL}] end end, SSTiter = lists:foldl(SetupFoldFun, [], lists:seq(0, ?MAX_LEVELS - 1)), @@ -435,29 +428,37 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys}, MaxKeys), {reply, Acc, State#state{levelzero_astree = L0AsList}}; -handle_call(work_for_clerk, From, State) -> - DelayForPendingL0 = State#state.levelzero_pending, - {WL, WC} = check_for_work(State#state.level_counts), - case WC of - 0 -> - {reply, none, State#state{work_backlog=false}}; - N when N > ?WORKQUEUE_BACKLOG_TOLERANCE -> - leveled_log:log("P0024", [N, true]), - [TL|_Tail] = WL, - {reply, TL, State#state{work_backlog=true}}; - N -> - leveled_log:log("P0024", [N, false]), - [TL|_Tail] = WL, - {reply, TL, State#state{work_backlog=false}} +handle_call(work_for_clerk, _From, State) -> + case State#state.levelzero_pending of + true -> + {reply, none, State}; + false -> + {WL, WC} = leveled_manifest:check_for_work(State#state.manifest, + ?LEVEL_SCALEFACTOR), + case WC of + 0 -> + {reply, none, State#state{work_backlog=false}}; + N when N > ?WORKQUEUE_BACKLOG_TOLERANCE -> + leveled_log:log("P0024", [N, true]), + [TL|_Tail] = WL, + {reply, + {TL, State#state.manifest}, + State#state{work_backlog=true}}; + N -> + leveled_log:log("P0024", [N, false]), + [TL|_Tail] = WL, + {reply, + {TL, State#state.manifest}, + State#state{work_backlog=false}} + end end; handle_call(get_startup_sqn, _From, State) -> {reply, State#state.persisted_sqn, State}; handle_call({register_snapshot, Snapshot}, _From, State) -> - RegisteredSnaps = add_snapshot(State#state.registered_snapshots, - Snapshot, - State#state.manifest_sqn, - ?SNAPSHOT_TIMEOUT), - {reply, {ok, State}, State#state{registered_snapshots = RegisteredSnaps}}; + Manifest0 = leveled_manifest:add_snapshot(State#state.manifest, + Snapshot, + ?SNAPSHOT_TIMEOUT), + {reply, {ok, State}, State#state{manifest = Manifest0}}; handle_call({load_snapshot, {BookieIncrTree, BookieIdx, MinSQN, MaxSQN}}, _From, State) -> L0D = leveled_pmem:add_to_cache(State#state.levelzero_size, @@ -483,37 +484,22 @@ handle_call(doom, _From, State) -> FilesFP = State#state.root_path ++ "/" ++ ?FILES_FP ++ "/", {stop, normal, {ok, [ManifestFP, FilesFP]}, State}. -handle_cast({manifest_change, WI}, State) -> - NewManifestSQN = WI#next_sqn, - UnreferenceFun = - fun(FN, Acc) -> - dict:store(FN, NewManifestSQN, Acc) - end, - DelPending = lists:foldl(UnreferenceFun, - State#state.deletions_pending, - WI#unreferenced_files), - {noreply, State{deletions_pending = DelPending, - manifest_sqn = NewManifestSQN}}; +handle_cast({manifest_change, NewManifest}, State) -> + {noreply, State#state{manifest = NewManifest}}; handle_cast({release_snapshot, Snapshot}, State) -> - Rs = leveled_manifest:release_snapshot(State#state.registered_snapshots, - Snapshot), + Manifest0 = leveled_manifest:release_snapshot(State#state.manifest, + Snapshot), leveled_log:log("P0003", [Snapshot]), - leveled_log:log("P0004", [Rs]), - {noreply, State#state{registered_snapshots=Rs}}; + {noreply, State#state{manifest=Manifest0}}; handle_cast({confirm_delete, Filename}, State=#state{is_snapshot=Snap}) when Snap == false -> - DeleteSQN = dict:fetch(Filename, State#state.deletions_pending), - R2D = leveled_manifest:ready_to_delete(State#state.registered_snapshots, - DeleteSQN), + R2D = leveled_manifest:ready_to_delete(State#state.manifest, Filename), case R2D of - true -> - PidToDelete = dict:fetch(Filename, State#state.pidmap), - leveled_log:log("P0005", [FileName]), - DP0 = dict:erase(Filename, State#state.deletions_pending), - PM0 = dict:erase(Filename, State#state.pidmap), + {true, Pid} -> + leveled_log:log("P0005", [Filename]), ok = leveled_sst:sst_deleteconfirmed(Pid), - {noreply, State#state{deletions_pending = DP0, pidmap = PM0}}; - false -> + {noreply, State}; + {false, _Pid} -> {noreply, State} end; handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) -> @@ -522,17 +508,19 @@ handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) -> end_key=EndKey, owner=State#state.levelzero_constructor, filename=FN}, - UpdMan = lists:keystore(0, 1, State#state.manifest, {0, [ManEntry]}), + ManifestSQN = leveled_manifest:get_manifest_sqn(State#state.manifest) + 1, + UpdMan = leveled_manifest:insert_manifest_entry(State#state.manifest, + ManifestSQN, + 0, + ManEntry), % Prompt clerk to ask about work - do this for every L0 roll UpdIndex = leveled_pmem:clear_index(State#state.levelzero_index), ok = leveled_pclerk:clerk_prompt(State#state.clerk), - UpdLevelCounts = dict:store(0, 1, State#state.level_counts), {noreply, State#state{levelzero_cache=[], levelzero_index=UpdIndex, levelzero_pending=false, levelzero_constructor=undefined, levelzero_size=0, - level_counts=UpdLevelCounts, manifest=UpdMan, persisted_sqn=State#state.ledger_sqn}}. @@ -557,22 +545,21 @@ terminate(Reason, State) -> ok = leveled_pclerk:clerk_close(State#state.clerk), leveled_log:log("P0008", [Reason]), - L0 = key_lookup(State#state.manifest, 0, all, State#state.manifest_sqn), - case {UpdState#state.levelzero_pending, L0} of + L0 = leveled_manifest:key_lookup(State#state.manifest, 0, all), + case {State#state.levelzero_pending, L0} of {false, false} -> - L0Pid = roll_memory(UpdState, true), + L0Pid = roll_memory(State, true), ok = leveled_sst:sst_close(L0Pid); StatusTuple -> leveled_log:log("P0010", [StatusTuple]) end, % Tidy shutdown of individual files - lists:foreach(fun({_FN, Pid}) -> + lists:foreach(fun({_FN, {Pid, _DSQN}}) -> ok = leveled_sst:sst_close(Pid) end, - dict:to_list(State#state.pidmap)), + leveled_manifest:dump_pidmap(State#state.manifest)), leveled_log:log("P0011", []), - ok. @@ -594,7 +581,7 @@ start_from_file(PCLopts) -> M end, - {ok, MergeClerk} = leveled_pclerk:clerk_new(self()), + {ok, MergeClerk} = leveled_pclerk:clerk_new(self(), RootPath), CoinToss = PCLopts#penciller_options.levelzero_cointoss, % Used to randomly defer the writing of L0 file. Intended to help with @@ -608,19 +595,19 @@ start_from_file(PCLopts) -> levelzero_index=leveled_pmem:new_index()}, %% Open manifest - Manifest = leveled_manifest:open_manifest(RootPath), - {FNList, - ManSQN, - LevelCounts) = leveled_manifest:initiate_from_manifest(Manifest), - InitiateFun = - fun(FN, {AccMaxSQN, AccPidMap}) -> - {ok, P, {_FK, _LK}} = leveled_sst:sst_open(FN), - FileMaxSQN = leveled_sst:sst_getmaxsequencenumber(P), - {max(AccMaxSQN, FileMaxSQN), dict:store(FN, P, AccPidMap)} + Manifest0 = leveled_manifest:open_manifest(RootPath), + OpenFun = + fun(FN) -> + {ok, Pid, {_FK, _LK}} = leveled_sst:sst_open(FN), + Pid end, - {MaxSQN, PidMap} = lists:foldl(InitiateFun, {0, dict:new()}, FNList), + SQNFun = fun leveled_sst:sst_getmaxsequencenumber/1, + {MaxSQN, Manifest1} = leveled_manifest:load_manifest(Manifest0, + OpenFun, + SQNFun), leveled_log:log("P0014", [MaxSQN]), - + ManSQN = leveled_manifest:get_manifest_sqn(Manifest1), + %% Find any L0 files L0FN = filepath(RootPath, ManSQN, new_merge_files) ++ "_0_0.sst", case filelib:is_file(L0FN) of @@ -632,41 +619,26 @@ start_from_file(PCLopts) -> L0SQN = leveled_sst:sst_getmaxsequencenumber(L0Pid), L0Entry = #manifest_entry{start_key = L0StartKey, end_key = L0EndKey, - filename = L0FN}, - PidMap0 = dict:store(L0FN, L0Pid, PidMap), - insert_manifest_entry(Manifest, ManSQN, 0, L0Entry) + filename = L0FN, + owner = L0Pid}, + Manifest2 = leveled_manifest:insert_manifest_entry(Manifest1, + ManSQN + 1, + 0, + L0Entry), leveled_log:log("P0016", [L0SQN]), LedgerSQN = max(MaxSQN, L0SQN), {ok, - InitState#state{manifest = Manifest, - manifest_sqn = ManSQN, + InitState#state{manifest = Manifest2, ledger_sqn = LedgerSQN, - persisted_sqn = LedgerSQN, - level_counts = LevelCounts, - pid_map = PidMap0}}; + persisted_sqn = LedgerSQN}}; false -> leveled_log:log("P0017", []), {ok, - InitState#state{manifest = Manifest, - manifest_sqn = ManSQN, + InitState#state{manifest = Manifest1, ledger_sqn = MaxSQN, - persisted_sqn = MaxSQN, - level_counts = LevelCounts, - pid_map = PidMap}} + persisted_sqn = MaxSQN}} end. -check_for_work(LevelCounts) -> - CheckLevelFun = - fun({Level, MaxCount}, {AccL, AccC}) -> - case dict:fetch(Level, LevelCounts) of - LC when LC > MaxCount -> - {[Level|AccL], AccC + LC - MaxCount}; - _ -> - {AccL, AccC} - end - end, - lists:foldl(CheckLevelFun, {[], 0}, ?LEVEL_SCALEFACTOR). - update_levelzero(L0Size, {PushedTree, PushedIdx, MinSQN, MaxSQN}, LedgerSQN, L0Cache, State) -> @@ -688,7 +660,7 @@ update_levelzero(L0Size, {PushedTree, PushedIdx, MinSQN, MaxSQN}, ledger_sqn=UpdMaxSQN}, CacheTooBig = NewL0Size > State#state.levelzero_maxcachesize, CacheMuchTooBig = NewL0Size > ?SUPER_MAX_TABLE_SIZE, - Level0Free = length(get_item(0, State#state.manifest, [])) == 0, + L0Free = not leveled_manifest:levelzero_present(State#state.manifest), RandomFactor = case State#state.levelzero_cointoss of true -> @@ -702,7 +674,7 @@ update_levelzero(L0Size, {PushedTree, PushedIdx, MinSQN, MaxSQN}, true end, JitterCheck = RandomFactor or CacheMuchTooBig, - case {CacheTooBig, Level0Free, JitterCheck} of + case {CacheTooBig, L0Free, JitterCheck} of {true, true, true} -> L0Constructor = roll_memory(UpdState, false), leveled_log:log_timer("P0031", [], SW), @@ -747,15 +719,15 @@ roll_memory(State, true) -> Constructor. levelzero_filename(State) -> - MSN = State#state.manifest_sqn, + ManSQN = leveled_manifest:get_manifest_sqn(State#state.manifest), FileName = State#state.root_path ++ "/" ++ ?FILES_FP ++ "/" - ++ integer_to_list(MSN) ++ "_0_0", + ++ integer_to_list(ManSQN) ++ "_0_0", FileName. -timed_fetch_mem(Key, Hash, Structure, L0Cache, L0Index, HeadTimer) -> +timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, HeadTimer) -> SW = os:timestamp(), - {R, Level} = fetch_mem(Key, Hash, Structure, L0Cache, L0Index), + {R, Level} = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index), UpdHeadTimer = case R of not_present -> @@ -765,32 +737,30 @@ timed_fetch_mem(Key, Hash, Structure, L0Cache, L0Index, HeadTimer) -> end, {R, UpdHeadTimer}. -plain_fetch_mem(Key, Hash, Structure, L0Cache, L0Index) -> - R = fetch_mem(Key, Hash, Structure, L0Cache, L0Index), +plain_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> + R = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index), element(1, R). -fetch_mem(Key, Hash, Structure, L0Cache, L0Index) -> +fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> PosList = leveled_pmem:check_index(Hash, L0Index), L0Check = leveled_pmem:check_levelzero(Key, Hash, PosList, L0Cache), case L0Check of {false, not_found} -> - fetch(Key, Hash, Structure, 0, fun timed_sst_get/3); + fetch(Key, Hash, Manifest, 0, fun timed_sst_get/3); {true, KV} -> {KV, 0} end. -fetch(_Key, _Hash, _Structure, ?MAX_LEVELS + 1, _FetchFun) -> +fetch(_Key, _Hash, _Manifest, ?MAX_LEVELS + 1, _FetchFun) -> {not_present, basement}; -fetch(Key, Hash, Structure, Level, FetchFun) -> - {Manifest, PidMap, ManSQN} = Structure, - case leveled_manifest:key_lookup(Manifest, Level, Key, ManSQN) of +fetch(Key, Hash, Manifest, Level, FetchFun) -> + case leveled_manifest:key_lookup(Manifest, Level, Key) of false -> - fetch(Key, Hash, Structure, Level + 1, FetchFun); - FN -> - FP = dict:fetch(FN, PidMap), + fetch(Key, Hash, Manifest, Level + 1, FetchFun); + FP -> case FetchFun(FP, Key, Hash) of not_present -> - fetch(Key, Hash, Structure, Level + 1, FetchFun); + fetch(Key, Hash, Manifest, Level + 1, FetchFun); ObjectFound -> {ObjectFound, Level} end @@ -827,7 +797,6 @@ compare_to_sqn(Obj, SQN) -> end. - %% Looks to find the best choice for the next key across the levels (other %% than in-memory table) %% In finding the best choice, the next key in a given level may be a next @@ -1246,57 +1215,6 @@ simple_server_test() -> clean_testdir(RootPath). -rangequery_manifest_test() -> - {E1, - E2, - E3} = {#manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"}, - end_key={i, "Bucket1", {"Idx1", "Fld9"}, "K93"}, - filename="Z1"}, - #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld9"}, "K97"}, - end_key={o, "Bucket1", "K71", null}, - filename="Z2"}, - #manifest_entry{start_key={o, "Bucket1", "K75", null}, - end_key={o, "Bucket1", "K993", null}, - filename="Z3"}}, - {E4, - E5, - E6} = {#manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"}, - end_key={i, "Bucket1", {"Idx1", "Fld7"}, "K93"}, - filename="Z4"}, - #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld7"}, "K97"}, - end_key={o, "Bucket1", "K78", null}, - filename="Z5"}, - #manifest_entry{start_key={o, "Bucket1", "K81", null}, - end_key={o, "Bucket1", "K996", null}, - filename="Z6"}}, - Man = [{1, [E1, E2, E3]}, {2, [E4, E5, E6]}], - SK1 = {o, "Bucket1", "K711", null}, - EK1 = {o, "Bucket1", "K999", null}, - R1 = initiate_rangequery_frommanifest(SK1, EK1, Man), - ?assertMatch([{1, [{next, E3, SK1}]}, - {2, [{next, E5, SK1}, {next, E6, SK1}]}], - R1), - SK2 = {i, "Bucket1", {"Idx1", "Fld8"}, null}, - EK2 = {i, "Bucket1", {"Idx1", "Fld8"}, null}, - R2 = initiate_rangequery_frommanifest(SK2, EK2, Man), - ?assertMatch([{1, [{next, E1, SK2}]}, {2, [{next, E5, SK2}]}], R2), - R3 = initiate_rangequery_frommanifest({i, "Bucket1", {"Idx0", "Fld8"}, null}, - {i, "Bucket1", {"Idx0", "Fld9"}, null}, - Man), - ?assertMatch([], R3). - -print_manifest_test() -> - M1 = #manifest_entry{start_key={i, "Bucket1", {<<"Idx1">>, "Fld1"}, "K8"}, - end_key={i, 4565, {"Idx1", "Fld9"}, "K93"}, - filename="Z1"}, - M2 = #manifest_entry{start_key={i, self(), {null, "Fld1"}, "K8"}, - end_key={i, <<200:32/integer>>, {"Idx1", "Fld9"}, "K93"}, - filename="Z1"}, - M3 = #manifest_entry{start_key={?STD_TAG, self(), {null, "Fld1"}, "K8"}, - end_key={?RIAK_TAG, <<200:32/integer>>, {"Idx1", "Fld9"}, "K93"}, - filename="Z1"}, - print_manifest([{1, [M1, M2, M3]}]). - simple_findnextkey_test() -> QueryArray = [ {2, [{{o, "Bucket1", "Key1"}, {5, {active, infinity}, null}}, @@ -1463,81 +1381,6 @@ create_file_test() -> {ok, Bin} = file:read_file("../test/new_file.sst.discarded"), ?assertMatch("hello", binary_to_term(Bin)). -commit_manifest_test() -> - Sent_WI = #penciller_work{next_sqn=1, - src_level=0, - start_time=os:timestamp()}, - Resp_WI = #penciller_work{next_sqn=1, - src_level=0}, - State = #state{ongoing_work = [Sent_WI], - root_path = "test", - manifest_sqn = 0}, - ManifestFP = "test" ++ "/" ++ ?MANIFEST_FP ++ "/", - ok = filelib:ensure_dir(ManifestFP), - ok = file:write_file(ManifestFP ++ "nonzero_1.pnd", - term_to_binary("dummy data")), - - L1_0 = [{1, [#manifest_entry{filename="1.sst"}]}], - Resp_WI0 = Resp_WI#penciller_work{new_manifest=L1_0, - unreferenced_files=[]}, - {ok, State0} = commit_manifest_change(Resp_WI0, State), - ?assertMatch(1, State0#state.manifest_sqn), - ?assertMatch([], get_item(0, State0#state.manifest, [])), - - L0Entry = [#manifest_entry{filename="0.sst"}], - ManifestPlus = [{0, L0Entry}|State0#state.manifest], - - NxtSent_WI = #penciller_work{next_sqn=2, - src_level=1, - start_time=os:timestamp()}, - NxtResp_WI = #penciller_work{next_sqn=2, - src_level=1}, - State1 = State0#state{ongoing_work=[NxtSent_WI], - manifest = ManifestPlus}, - - ok = file:write_file(ManifestFP ++ "nonzero_2.pnd", - term_to_binary("dummy data")), - - L2_0 = [#manifest_entry{filename="2.sst"}], - NxtResp_WI0 = NxtResp_WI#penciller_work{new_manifest=[{2, L2_0}], - unreferenced_files=[]}, - {ok, State2} = commit_manifest_change(NxtResp_WI0, State1), - - ?assertMatch(1, State1#state.manifest_sqn), - ?assertMatch(2, State2#state.manifest_sqn), - ?assertMatch(L0Entry, get_item(0, State2#state.manifest, [])), - ?assertMatch(L2_0, get_item(2, State2#state.manifest, [])), - - clean_testdir(State#state.root_path). - - -badmanifest_test() -> - RootPath = "../test/ledger", - clean_testdir(RootPath), - {ok, PCL} = pcl_start(#penciller_options{root_path=RootPath, - max_inmemory_tablesize=1000}), - Key1_pre = {{o,"Bucket0001", "Key0001", null}, - {1001, {active, infinity}, null}}, - Key1 = add_missing_hash(Key1_pre), - KL1 = generate_randomkeys({1000, 1}), - - ok = maybe_pause_push(PCL, KL1 ++ [Key1]), - %% Added together, as split apart there will be a race between the close - %% call to the penciller and the second fetch of the cache entry - ?assertMatch(Key1, pcl_fetch(PCL, {o, "Bucket0001", "Key0001", null})), - - timer:sleep(100), % Avoids confusion if L0 file not written before close - ok = pcl_close(PCL), - - ManifestFP = filepath(RootPath, manifest), - ok = file:write_file(filename:join(ManifestFP, "yeszero_123.man"), - term_to_binary("hello")), - {ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath, - max_inmemory_tablesize=1000}), - ?assertMatch(Key1, pcl_fetch(PCLr, {o,"Bucket0001", "Key0001", null})), - ok = pcl_close(PCLr), - clean_testdir(RootPath). - checkready(Pid) -> try leveled_sst:sst_checkready(Pid)