Manifest changes - BROKEN

Going to abandond this branch for now.  The change is beoming
excessively time consuming, and it is not clear that a smaller change
might not achieve more of the objectives.

All this is broken - but perhaps could get picke dup another day.
This commit is contained in:
martinsumner 2017-01-12 13:48:43 +00:00
parent 439ddfa4eb
commit 08641e05cf
5 changed files with 216 additions and 558 deletions

View file

@ -176,7 +176,7 @@
pcl_fetchnextkey/5,
pcl_checksequencenumber/3,
pcl_workforclerk/1,
pcl_promptmanifestchange/2,
pcl_confirmmanifestchange/2,
pcl_confirml0complete/4,
pcl_confirmdelete/2,
pcl_close/1,
@ -206,13 +206,17 @@
-define(COIN_SIDECOUNT, 5).
-define(SLOW_FETCH, 20000).
-define(ITERATOR_SCANWIDTH, 4).
-define(SNAPSHOT_TIMEOUT, 3600).
-record(state, {manifest = [] :: list(),
-record(state, {manifest, % an ETS table reference
manifest_sqn = 0 :: integer(),
ledger_sqn = 0 :: integer(), % The highest SQN added to L0
persisted_sqn = 0 :: integer(), % The highest SQN persisted
registered_snapshots = [] :: list(),
unreferenced_files = [] :: list(),
pidmap = dict:new() :: dict(),
level_counts :: dict(),
deletions_pending = dict:new() ::dict(),
ledger_sqn = 0 :: integer(), % The highest SQN added to L0
root_path = "../test" :: string(),
clerk :: pid(),
@ -286,7 +290,7 @@ pcl_checksequencenumber(Pid, Key, SQN) ->
pcl_workforclerk(Pid) ->
gen_server:call(Pid, work_for_clerk, infinity).
pcl_promptmanifestchange(Pid, WI) ->
pcl_confirmmanifestchange(Pid, WI) ->
gen_server:cast(Pid, {manifest_change, WI}).
pcl_confirml0complete(Pid, FN, StartKey, EndKey) ->
@ -371,18 +375,24 @@ handle_call({push_mem, {PushedTree, PushedIdx, MinSQN, MaxSQN}},
State)}
end;
handle_call({fetch, Key, Hash}, _From, State) ->
Structure = {State#state.manifest,
State#state.pid_map,
State#state.manifest_sqn},
{R, HeadTimer} = timed_fetch_mem(Key,
Hash,
State#state.manifest,
Structure,
State#state.levelzero_cache,
State#state.levelzero_index,
State#state.head_timing),
{reply, R, State#state{head_timing=HeadTimer}};
handle_call({check_sqn, Key, Hash, SQN}, _From, State) ->
Structure = {State#state.manifest,
State#state.pid_map,
State#state.manifest_sqn},
{reply,
compare_to_sqn(plain_fetch_mem(Key,
Hash,
State#state.manifest,
Structure,
State#state.levelzero_cache,
State#state.levelzero_index),
SQN),
@ -401,23 +411,53 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys},
List ->
List
end,
SSTiter = initiate_rangequery_frommanifest(StartKey,
ConvertToPointerFun =
fun(FN) -> {next, dict:fetch(FN, State#state.pid_map), StartKey} end,
SetupFoldFun =
fun(Level, Acc) ->
FNs = leveled_manifest:range_lookup(State#state.manifest,
Level,
StartKey,
EndKey,
State#state.manifest),
State#state.manifest_sqn),
Pointers = lists:map(ConvertToPointerFun, FNs),
case Pointers of
[] -> Acc;
PL -> Acc ++ [{L, PL}]
end
end,
SSTiter = lists:foldl(SetupFoldFun, [], lists:seq(0, ?MAX_LEVELS - 1)),
Acc = keyfolder({L0AsList, SSTiter},
{StartKey, EndKey},
{AccFun, InitAcc},
MaxKeys),
{reply, Acc, State#state{levelzero_astree = L0AsList}};
handle_call(work_for_clerk, From, State) ->
{UpdState, Work} = return_work(State, From),
{reply, Work, UpdState};
DelayForPendingL0 = State#state.levelzero_pending,
{WL, WC} = check_for_work(State#state.level_counts),
case WC of
0 ->
{reply, none, State#state{work_backlog=false}};
N when N > ?WORKQUEUE_BACKLOG_TOLERANCE ->
leveled_log:log("P0024", [N, true]),
[TL|_Tail] = WL,
{reply, TL, State#state{work_backlog=true}};
N ->
leveled_log:log("P0024", [N, false]),
[TL|_Tail] = WL,
{reply, TL, State#state{work_backlog=false}}
end;
handle_call(get_startup_sqn, _From, State) ->
{reply, State#state.persisted_sqn, State};
handle_call({register_snapshot, Snapshot}, _From, State) ->
Rs = [{Snapshot,
State#state.manifest_sqn}|State#state.registered_snapshots],
{reply, {ok, State}, State#state{registered_snapshots = Rs}};
RegisteredSnaps = add_snapshot(State#state.registered_snapshots,
Snapshot,
State#state.manifest_sqn,
?SNAPSHOT_TIMEOUT),
{reply, {ok, State}, State#state{registered_snapshots = RegisteredSnaps}};
handle_call({load_snapshot, {BookieIncrTree, BookieIdx, MinSQN, MaxSQN}},
_From, State) ->
L0D = leveled_pmem:add_to_cache(State#state.levelzero_size,
@ -444,28 +484,36 @@ handle_call(doom, _From, State) ->
{stop, normal, {ok, [ManifestFP, FilesFP]}, State}.
handle_cast({manifest_change, WI}, State) ->
{ok, UpdState} = commit_manifest_change(WI, State),
ok = leveled_pclerk:clerk_manifestchange(State#state.clerk,
confirm,
false),
{noreply, UpdState};
NewManifestSQN = WI#next_sqn,
UnreferenceFun =
fun(FN, Acc) ->
dict:store(FN, NewManifestSQN, Acc)
end,
DelPending = lists:foldl(UnreferenceFun,
State#state.deletions_pending,
WI#unreferenced_files),
{noreply, State{deletions_pending = DelPending,
manifest_sqn = NewManifestSQN}};
handle_cast({release_snapshot, Snapshot}, State) ->
Rs = lists:keydelete(Snapshot, 1, State#state.registered_snapshots),
Rs = leveled_manifest:release_snapshot(State#state.registered_snapshots,
Snapshot),
leveled_log:log("P0003", [Snapshot]),
leveled_log:log("P0004", [Rs]),
{noreply, State#state{registered_snapshots=Rs}};
handle_cast({confirm_delete, FileName}, State=#state{is_snapshot=Snap})
handle_cast({confirm_delete, Filename}, State=#state{is_snapshot=Snap})
when Snap == false ->
Reply = confirm_delete(FileName,
State#state.unreferenced_files,
State#state.registered_snapshots),
case Reply of
{true, Pid} ->
UF1 = lists:keydelete(FileName, 1, State#state.unreferenced_files),
DeleteSQN = dict:fetch(Filename, State#state.deletions_pending),
R2D = leveled_manifest:ready_to_delete(State#state.registered_snapshots,
DeleteSQN),
case R2D of
true ->
PidToDelete = dict:fetch(Filename, State#state.pidmap),
leveled_log:log("P0005", [FileName]),
DP0 = dict:erase(Filename, State#state.deletions_pending),
PM0 = dict:erase(Filename, State#state.pidmap),
ok = leveled_sst:sst_deleteconfirmed(Pid),
{noreply, State#state{unreferenced_files=UF1}};
_ ->
{noreply, State#state{deletions_pending = DP0, pidmap = PM0}};
false ->
{noreply, State}
end;
handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) ->
@ -478,11 +526,13 @@ handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) ->
% Prompt clerk to ask about work - do this for every L0 roll
UpdIndex = leveled_pmem:clear_index(State#state.levelzero_index),
ok = leveled_pclerk:clerk_prompt(State#state.clerk),
UpdLevelCounts = dict:store(0, 1, State#state.level_counts),
{noreply, State#state{levelzero_cache=[],
levelzero_index=UpdIndex,
levelzero_pending=false,
levelzero_constructor=undefined,
levelzero_size=0,
level_counts=UpdLevelCounts,
manifest=UpdMan,
persisted_sqn=State#state.ledger_sqn}}.
@ -495,10 +545,6 @@ terminate(Reason, State=#state{is_snapshot=Snap}) when Snap == true ->
leveled_log:log("P0007", [Reason]),
ok;
terminate(Reason, State) ->
%% When a Penciller shuts down it isn't safe to try an manage the safe
%% finishing of any outstanding work. The last commmitted manifest will
%% be used.
%%
%% Level 0 files lie outside of the manifest, and so if there is no L0
%% file present it is safe to write the current contents of memory. If
%% there is a L0 file present - then the memory can be dropped (it is
@ -506,44 +552,27 @@ terminate(Reason, State) ->
%% as presumably the ETS file has been recently flushed, hence the presence
%% of a L0 file).
%%
%% The penciller should close each file in the unreferenced files, and
%% then each file in the manifest, and cast a close on the clerk.
%% The cast may not succeed as the clerk could be synchronously calling
%% the penciller looking for a manifest commit
%%
%% The penciller should close each file in the manifest, and cast a close
%% on the clerk.
ok = leveled_pclerk:clerk_close(State#state.clerk),
leveled_log:log("P0008", [Reason]),
MC = leveled_pclerk:clerk_manifestchange(State#state.clerk,
return,
true),
UpdState = case MC of
{ok, WI} ->
{ok, NewState} = commit_manifest_change(WI, State),
Clerk = State#state.clerk,
ok = leveled_pclerk:clerk_manifestchange(Clerk,
confirm,
true),
NewState;
no_change ->
State
end,
case {UpdState#state.levelzero_pending,
get_item(0, UpdState#state.manifest, []),
UpdState#state.levelzero_size} of
{false, [], 0} ->
leveled_log:log("P0009", []);
{false, [], _N} ->
L0Pid = roll_memory(UpdState, true),
L0 = key_lookup(State#state.manifest, 0, all, State#state.manifest_sqn),
case {UpdState#state.levelzero_pending, L0} of
{false, false} ->
L0Pid = roll_memory(UpdState, true),
ok = leveled_sst:sst_close(L0Pid);
StatusTuple ->
leveled_log:log("P0010", [StatusTuple])
end,
% Tidy shutdown of individual files
ok = close_files(0, UpdState#state.manifest),
lists:foreach(fun({_FN, Pid, _SN}) ->
ok = leveled_sst:sst_close(Pid) end,
UpdState#state.unreferenced_files),
lists:foreach(fun({_FN, Pid}) ->
ok = leveled_sst:sst_close(Pid)
end,
dict:to_list(State#state.pidmap)),
leveled_log:log("P0011", []),
ok.
@ -579,47 +608,21 @@ start_from_file(PCLopts) ->
levelzero_index=leveled_pmem:new_index()},
%% Open manifest
ManifestPath = filepath(InitState#state.root_path, manifest) ++ "/",
SSTPath = filepath(InitState#state.root_path, files) ++ "/",
ok = filelib:ensure_dir(ManifestPath),
ok = filelib:ensure_dir(SSTPath),
{ok, Filenames} = file:list_dir(ManifestPath),
CurrRegex = "nonzero_(?<MSN>[0-9]+)\\." ++ ?CURRENT_FILEX,
ValidManSQNs = lists:foldl(fun(FN, Acc) ->
case re:run(FN,
CurrRegex,
[{capture, ['MSN'], list}]) of
nomatch ->
Acc;
{match, [Int]} when is_list(Int) ->
Acc ++ [list_to_integer(Int)]
end
end,
[],
Filenames),
TopManSQN = lists:foldl(fun(X, MaxSQN) -> max(X, MaxSQN) end,
0,
ValidManSQNs),
leveled_log:log("P0012", [TopManSQN]),
ManUpdate = case TopManSQN of
0 ->
leveled_log:log("P0013", []),
{[], 0};
_ ->
CurrManFile = filepath(InitState#state.root_path,
TopManSQN,
current_manifest),
{ok, Bin} = file:read_file(CurrManFile),
Manifest = binary_to_term(Bin),
open_all_filesinmanifest(Manifest)
end,
{UpdManifest, MaxSQN} = ManUpdate,
Manifest = leveled_manifest:open_manifest(RootPath),
{FNList,
ManSQN,
LevelCounts) = leveled_manifest:initiate_from_manifest(Manifest),
InitiateFun =
fun(FN, {AccMaxSQN, AccPidMap}) ->
{ok, P, {_FK, _LK}} = leveled_sst:sst_open(FN),
FileMaxSQN = leveled_sst:sst_getmaxsequencenumber(P),
{max(AccMaxSQN, FileMaxSQN), dict:store(FN, P, AccPidMap)}
end,
{MaxSQN, PidMap} = lists:foldl(InitiateFun, {0, dict:new()}, FNList),
leveled_log:log("P0014", [MaxSQN]),
%% Find any L0 files
L0FN = filepath(RootPath, TopManSQN, new_merge_files) ++ "_0_0.sst",
L0FN = filepath(RootPath, ManSQN, new_merge_files) ++ "_0_0.sst",
case filelib:is_file(L0FN) of
true ->
leveled_log:log("P0015", [L0FN]),
@ -627,30 +630,42 @@ start_from_file(PCLopts) ->
L0Pid,
{L0StartKey, L0EndKey}} = leveled_sst:sst_open(L0FN),
L0SQN = leveled_sst:sst_getmaxsequencenumber(L0Pid),
ManifestEntry = #manifest_entry{start_key=L0StartKey,
end_key=L0EndKey,
owner=L0Pid,
filename=L0FN},
UpdManifest2 = lists:keystore(0,
1,
UpdManifest,
{0, [ManifestEntry]}),
L0Entry = #manifest_entry{start_key = L0StartKey,
end_key = L0EndKey,
filename = L0FN},
PidMap0 = dict:store(L0FN, L0Pid, PidMap),
insert_manifest_entry(Manifest, ManSQN, 0, L0Entry)
leveled_log:log("P0016", [L0SQN]),
LedgerSQN = max(MaxSQN, L0SQN),
{ok,
InitState#state{manifest=UpdManifest2,
manifest_sqn=TopManSQN,
ledger_sqn=LedgerSQN,
persisted_sqn=LedgerSQN}};
InitState#state{manifest = Manifest,
manifest_sqn = ManSQN,
ledger_sqn = LedgerSQN,
persisted_sqn = LedgerSQN,
level_counts = LevelCounts,
pid_map = PidMap0}};
false ->
leveled_log:log("P0017", []),
{ok,
InitState#state{manifest=UpdManifest,
manifest_sqn=TopManSQN,
ledger_sqn=MaxSQN,
persisted_sqn=MaxSQN}}
InitState#state{manifest = Manifest,
manifest_sqn = ManSQN,
ledger_sqn = MaxSQN,
persisted_sqn = MaxSQN,
level_counts = LevelCounts,
pid_map = PidMap}}
end.
check_for_work(LevelCounts) ->
CheckLevelFun =
fun({Level, MaxCount}, {AccL, AccC}) ->
case dict:fetch(Level, LevelCounts) of
LC when LC > MaxCount ->
{[Level|AccL], AccC + LC - MaxCount};
_ ->
{AccL, AccC}
end
end,
lists:foldl(CheckLevelFun, {[], 0}, ?LEVEL_SCALEFACTOR).
update_levelzero(L0Size, {PushedTree, PushedIdx, MinSQN, MaxSQN},
@ -738,9 +753,9 @@ levelzero_filename(State) ->
++ integer_to_list(MSN) ++ "_0_0",
FileName.
timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, HeadTimer) ->
timed_fetch_mem(Key, Hash, Structure, L0Cache, L0Index, HeadTimer) ->
SW = os:timestamp(),
{R, Level} = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index),
{R, Level} = fetch_mem(Key, Hash, Structure, L0Cache, L0Index),
UpdHeadTimer =
case R of
not_present ->
@ -750,41 +765,32 @@ timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, HeadTimer) ->
end,
{R, UpdHeadTimer}.
plain_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
R = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index),
plain_fetch_mem(Key, Hash, Structure, L0Cache, L0Index) ->
R = fetch_mem(Key, Hash, Structure, L0Cache, L0Index),
element(1, R).
fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
fetch_mem(Key, Hash, Structure, L0Cache, L0Index) ->
PosList = leveled_pmem:check_index(Hash, L0Index),
L0Check = leveled_pmem:check_levelzero(Key, Hash, PosList, L0Cache),
case L0Check of
{false, not_found} ->
fetch(Key, Hash, Manifest, 0, fun timed_sst_get/3);
fetch(Key, Hash, Structure, 0, fun timed_sst_get/3);
{true, KV} ->
{KV, 0}
end.
fetch(_Key, _Hash, _Manifest, ?MAX_LEVELS + 1, _FetchFun) ->
fetch(_Key, _Hash, _Structure, ?MAX_LEVELS + 1, _FetchFun) ->
{not_present, basement};
fetch(Key, Hash, Manifest, Level, FetchFun) ->
LevelManifest = get_item(Level, Manifest, []),
case lists:foldl(fun(File, Acc) ->
case Acc of
not_present when
Key >= File#manifest_entry.start_key,
File#manifest_entry.end_key >= Key ->
File#manifest_entry.owner;
FoundDetails ->
FoundDetails
end end,
not_present,
LevelManifest) of
not_present ->
fetch(Key, Hash, Manifest, Level + 1, FetchFun);
FileToCheck ->
case FetchFun(FileToCheck, Key, Hash) of
fetch(Key, Hash, Structure, Level, FetchFun) ->
{Manifest, PidMap, ManSQN} = Structure,
case leveled_manifest:key_lookup(Manifest, Level, Key, ManSQN) of
false ->
fetch(Key, Hash, Structure, Level + 1, FetchFun);
FN ->
FP = dict:fetch(FN, PidMap),
case FetchFun(FP, Key, Hash) of
not_present ->
fetch(Key, Hash, Manifest, Level + 1, FetchFun);
fetch(Key, Hash, Structure, Level + 1, FetchFun);
ObjectFound ->
{ObjectFound, Level}
end
@ -821,134 +827,6 @@ compare_to_sqn(Obj, SQN) ->
end.
%% Work out what the current work queue should be
%%
%% The work queue should have a lower level work at the front, and no work
%% should be added to the queue if a compaction worker has already been asked
%% to look at work at that level
%%
%% The full queue is calculated for logging purposes only
return_work(State, From) ->
{WorkQ, BasementL} = assess_workqueue([], 0, State#state.manifest, 0),
case length(WorkQ) of
L when L > 0 ->
Excess = lists:foldl(fun({_, _, OH}, Acc) -> Acc+OH end, 0, WorkQ),
[{SrcLevel, Manifest, _Overhead}|_OtherWork] = WorkQ,
leveled_log:log("P0020", [SrcLevel, From, Excess]),
IsBasement = if
SrcLevel + 1 == BasementL ->
true;
true ->
false
end,
Backlog = Excess >= ?WORKQUEUE_BACKLOG_TOLERANCE,
case State#state.levelzero_pending of
true ->
% Once the L0 file is completed there will be more work
% - so don't be busy doing other work now
leveled_log:log("P0021", []),
{State#state{work_backlog=Backlog}, none};
false ->
%% No work currently outstanding
%% Can allocate work
NextSQN = State#state.manifest_sqn + 1,
FP = filepath(State#state.root_path,
NextSQN,
new_merge_files),
ManFile = filepath(State#state.root_path,
NextSQN,
pending_manifest),
WI = #penciller_work{next_sqn=NextSQN,
clerk=From,
src_level=SrcLevel,
manifest=Manifest,
start_time = os:timestamp(),
ledger_filepath = FP,
manifest_file = ManFile,
target_is_basement = IsBasement},
{State#state{ongoing_work=[WI], work_backlog=Backlog}, WI}
end;
_ ->
{State#state{work_backlog=false}, none}
end.
close_files(?MAX_LEVELS - 1, _Manifest) ->
ok;
close_files(Level, Manifest) ->
LevelList = get_item(Level, Manifest, []),
lists:foreach(fun(F) ->
ok = leveled_sst:sst_close(F#manifest_entry.owner) end,
LevelList),
close_files(Level + 1, Manifest).
open_all_filesinmanifest(Manifest) ->
open_all_filesinmanifest({Manifest, 0}, 0).
open_all_filesinmanifest(Result, ?MAX_LEVELS - 1) ->
Result;
open_all_filesinmanifest({Manifest, TopSQN}, Level) ->
LevelList = get_item(Level, Manifest, []),
%% The Pids in the saved manifest related to now closed references
%% Need to roll over the manifest at this level starting new processes to
%5 replace them
LvlR = lists:foldl(fun(F, {FL, FL_SQN}) ->
FN = F#manifest_entry.filename,
{ok, P, _Keys} = leveled_sst:sst_open(FN),
F_SQN = leveled_sst:sst_getmaxsequencenumber(P),
{lists:append(FL,
[F#manifest_entry{owner = P}]),
max(FL_SQN, F_SQN)}
end,
{[], 0},
LevelList),
%% Result is tuple of revised file list for this level in manifest, and
%% the maximum sequence number seen at this level
{LvlFL, LvlSQN} = LvlR,
UpdManifest = lists:keystore(Level, 1, Manifest, {Level, LvlFL}),
open_all_filesinmanifest({UpdManifest, max(TopSQN, LvlSQN)}, Level + 1).
print_manifest(Manifest) ->
lists:foreach(fun(L) ->
leveled_log:log("P0022", [L]),
Level = get_item(L, Manifest, []),
lists:foreach(fun print_manifest_entry/1, Level)
end,
lists:seq(0, ?MAX_LEVELS - 1)),
ok.
print_manifest_entry(Entry) ->
{S1, S2, S3} = leveled_codec:print_key(Entry#manifest_entry.start_key),
{E1, E2, E3} = leveled_codec:print_key(Entry#manifest_entry.end_key),
leveled_log:log("P0023",
[S1, S2, S3, E1, E2, E3, Entry#manifest_entry.filename]).
initiate_rangequery_frommanifest(StartKey, EndKey, Manifest) ->
CompareFun = fun(M) ->
C1 = StartKey > M#manifest_entry.end_key,
C2 = leveled_codec:endkey_passed(EndKey,
M#manifest_entry.start_key),
not (C1 or C2) end,
FoldFun =
fun(L, AccL) ->
Level = get_item(L, Manifest, []),
FL = lists:foldl(fun(M, Acc) ->
case CompareFun(M) of
true ->
Acc ++ [{next, M, StartKey}];
false ->
Acc
end end,
[],
Level),
case FL of
[] -> AccL;
FL -> AccL ++ [{L, FL}]
end
end,
lists:foldl(FoldFun, [], lists:seq(0, ?MAX_LEVELS - 1)).
%% Looks to find the best choice for the next key across the levels (other
%% than in-memory table)
@ -1153,143 +1031,12 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, KeyRange,
end.
assess_workqueue(WorkQ, ?MAX_LEVELS - 1, _Man, BasementLevel) ->
{WorkQ, BasementLevel};
assess_workqueue(WorkQ, LevelToAssess, Man, BasementLevel) ->
MaxFiles = get_item(LevelToAssess, ?LEVEL_SCALEFACTOR, 0),
case length(get_item(LevelToAssess, Man, [])) of
FileCount when FileCount > 0 ->
NewWQ = maybe_append_work(WorkQ,
LevelToAssess,
Man,
MaxFiles,
FileCount),
assess_workqueue(NewWQ, LevelToAssess + 1, Man, LevelToAssess);
0 ->
assess_workqueue(WorkQ, LevelToAssess + 1, Man, BasementLevel)
end.
maybe_append_work(WorkQ, Level, Manifest,
MaxFiles, FileCount)
when FileCount > MaxFiles ->
Overhead = FileCount - MaxFiles,
leveled_log:log("P0024", [Overhead, Level]),
lists:append(WorkQ, [{Level, Manifest, Overhead}]);
maybe_append_work(WorkQ, _Level, _Manifest,
_MaxFiles, _FileCount) ->
WorkQ.
get_item(Index, List, Default) ->
case lists:keysearch(Index, 1, List) of
{value, {Index, Value}} ->
Value;
false ->
Default
end.
%% Request a manifest change
%% The clerk should have completed the work, and created a new manifest
%% and persisted the new view of the manifest
%%
%% To complete the change of manifest:
%% - the state of the manifest file needs to be changed from pending to current
%% - the list of unreferenced files needs to be updated on State
%% - the current manifest needs to be update don State
%% - the list of ongoing work needs to be cleared of this item
commit_manifest_change(ReturnedWorkItem, State) ->
NewMSN = State#state.manifest_sqn + 1,
[SentWorkItem] = State#state.ongoing_work,
RootPath = State#state.root_path,
UnreferencedFiles = State#state.unreferenced_files,
if
NewMSN == SentWorkItem#penciller_work.next_sqn ->
WISrcLevel = SentWorkItem#penciller_work.src_level,
leveled_log:log_timer("P0025",
[SentWorkItem#penciller_work.next_sqn,
WISrcLevel],
SentWorkItem#penciller_work.start_time),
ok = rename_manifest_files(RootPath, NewMSN),
FilesToDelete = ReturnedWorkItem#penciller_work.unreferenced_files,
UnreferencedFilesUpd = update_deletions(FilesToDelete,
NewMSN,
UnreferencedFiles),
leveled_log:log("P0026", [NewMSN]),
NewManifest = ReturnedWorkItem#penciller_work.new_manifest,
CurrL0 = get_item(0, State#state.manifest, []),
% If the work isn't L0 work, then we may have an uncommitted
% manifest change at L0 - so add this back into the Manifest loop
% state
RevisedManifest = case {WISrcLevel, CurrL0} of
{0, _} ->
NewManifest;
{_, []} ->
NewManifest;
{_, [L0ManEntry]} ->
lists:keystore(0,
1,
NewManifest,
{0, [L0ManEntry]})
end,
{ok, State#state{ongoing_work=[],
manifest_sqn=NewMSN,
manifest=RevisedManifest,
unreferenced_files=UnreferencedFilesUpd}}
end.
rename_manifest_files(RootPath, NewMSN) ->
OldFN = filepath(RootPath, NewMSN, pending_manifest),
NewFN = filepath(RootPath, NewMSN, current_manifest),
leveled_log:log("P0027", [OldFN, filelib:is_file(OldFN),
NewFN, filelib:is_file(NewFN)]),
ok = file:rename(OldFN,NewFN).
filepath(RootPath, manifest) ->
RootPath ++ "/" ++ ?MANIFEST_FP;
filepath(RootPath, files) ->
RootPath ++ "/" ++ ?FILES_FP.
filepath(RootPath, NewMSN, pending_manifest) ->
filepath(RootPath, manifest) ++ "/" ++ "nonzero_"
++ integer_to_list(NewMSN) ++ "." ++ ?PENDING_FILEX;
filepath(RootPath, NewMSN, current_manifest) ->
filepath(RootPath, manifest) ++ "/" ++ "nonzero_"
++ integer_to_list(NewMSN) ++ "." ++ ?CURRENT_FILEX;
filepath(RootPath, NewMSN, new_merge_files) ->
filepath(RootPath, files) ++ "/" ++ integer_to_list(NewMSN).
update_deletions([], _NewMSN, UnreferencedFiles) ->
UnreferencedFiles;
update_deletions([ClearedFile|Tail], MSN, UnreferencedFiles) ->
leveled_log:log("P0028", [ClearedFile#manifest_entry.filename]),
update_deletions(Tail,
MSN,
lists:append(UnreferencedFiles,
[{ClearedFile#manifest_entry.filename,
ClearedFile#manifest_entry.owner,
MSN}])).
confirm_delete(Filename, UnreferencedFiles, RegisteredSnapshots) ->
case lists:keyfind(Filename, 1, UnreferencedFiles) of
{Filename, Pid, MSN} ->
LowSQN = lists:foldl(fun({_, SQN}, MinSQN) -> min(SQN, MinSQN) end,
infinity,
RegisteredSnapshots),
if
MSN >= LowSQN ->
false;
true ->
{true, Pid}
end
end.
%%%============================================================================
@ -1338,47 +1085,6 @@ clean_subdir(DirPath) ->
end.
compaction_work_assessment_test() ->
L0 = [{{o, "B1", "K1", null}, {o, "B3", "K3", null}, dummy_pid}],
L1 = [{{o, "B1", "K1", null}, {o, "B2", "K2", null}, dummy_pid},
{{o, "B2", "K3", null}, {o, "B4", "K4", null}, dummy_pid}],
Manifest = [{0, L0}, {1, L1}],
{WorkQ1, 1} = assess_workqueue([], 0, Manifest, 0),
?assertMatch([{0, Manifest, 1}], WorkQ1),
L1Alt = lists:append(L1,
[{{o, "B5", "K0001", null}, {o, "B5", "K9999", null},
dummy_pid},
{{o, "B6", "K0001", null}, {o, "B6", "K9999", null},
dummy_pid},
{{o, "B7", "K0001", null}, {o, "B7", "K9999", null},
dummy_pid},
{{o, "B8", "K0001", null}, {o, "B8", "K9999", null},
dummy_pid},
{{o, "B9", "K0001", null}, {o, "B9", "K9999", null},
dummy_pid},
{{o, "BA", "K0001", null}, {o, "BA", "K9999", null},
dummy_pid},
{{o, "BB", "K0001", null}, {o, "BB", "K9999", null},
dummy_pid}]),
Manifest3 = [{0, []}, {1, L1Alt}],
{WorkQ3, 1} = assess_workqueue([], 0, Manifest3, 0),
?assertMatch([{1, Manifest3, 1}], WorkQ3).
confirm_delete_test() ->
Filename = 'test.sst',
UnreferencedFiles = [{'other.sst', dummy_owner, 15},
{Filename, dummy_owner, 10}],
RegisteredIterators1 = [{dummy_pid, 16}, {dummy_pid, 12}],
R1 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators1),
?assertMatch(R1, {true, dummy_owner}),
RegisteredIterators2 = [{dummy_pid, 10}, {dummy_pid, 12}],
R2 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators2),
?assertMatch(R2, false),
RegisteredIterators3 = [{dummy_pid, 9}, {dummy_pid, 12}],
R3 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators3),
?assertMatch(R3, false).
maybe_pause_push(PCL, KL) ->
T0 = leveled_skiplist:empty(true),
I0 = leveled_pmem:new_index(),