Have inker reopen compacted files
The inker cler will now close compacted files before prompting the inker to update the manifest. The inker should reopen those files, so that the file processes are linked to it and not the clerk. This also stops a stopped clerk leading to orphaned cdb files.
This commit is contained in:
parent
e349774167
commit
5b54affbf0
3 changed files with 28 additions and 13 deletions
|
@ -227,6 +227,7 @@ clerk_scorefilelist(Pid, []) ->
|
||||||
clerk_scorefilelist(Pid, CandidateList) ->
|
clerk_scorefilelist(Pid, CandidateList) ->
|
||||||
gen_server:cast(Pid, {score_filelist, CandidateList}).
|
gen_server:cast(Pid, {score_filelist, CandidateList}).
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% gen_server callbacks
|
%%% gen_server callbacks
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
@ -357,14 +358,14 @@ handle_cast(scoring_complete, State) ->
|
||||||
end,
|
end,
|
||||||
BestRun1),
|
BestRun1),
|
||||||
leveled_log:log("IC002", [length(FilesToDelete)]),
|
leveled_log:log("IC002", [length(FilesToDelete)]),
|
||||||
|
ok = CloseFun(FilterServer),
|
||||||
ok = leveled_inker:ink_clerkcomplete(State#state.inker,
|
ok = leveled_inker:ink_clerkcomplete(State#state.inker,
|
||||||
ManifestSlice,
|
ManifestSlice,
|
||||||
FilesToDelete),
|
FilesToDelete),
|
||||||
ok = CloseFun(FilterServer),
|
|
||||||
{noreply, State#state{scoring_state = undefined}};
|
{noreply, State#state{scoring_state = undefined}};
|
||||||
false ->
|
false ->
|
||||||
ok = leveled_inker:ink_clerkcomplete(State#state.inker, [], []),
|
|
||||||
ok = CloseFun(FilterServer),
|
ok = CloseFun(FilterServer),
|
||||||
|
ok = leveled_inker:ink_clerkcomplete(State#state.inker, [], []),
|
||||||
{noreply, State#state{scoring_state = undefined}}
|
{noreply, State#state{scoring_state = undefined}}
|
||||||
end;
|
end;
|
||||||
handle_cast({trim, PersistedSQN, ManifestAsList}, State) ->
|
handle_cast({trim, PersistedSQN, ManifestAsList}, State) ->
|
||||||
|
@ -794,8 +795,7 @@ write_values(KVCList, CDBopts, Journal0, ManSlice0, PressMethod) ->
|
||||||
SQN,
|
SQN,
|
||||||
compact_journal),
|
compact_journal),
|
||||||
leveled_log:log("IC009", [FN]),
|
leveled_log:log("IC009", [FN]),
|
||||||
leveled_cdb:cdb_open_writer(FN,
|
leveled_cdb:cdb_open_writer(FN, CDBopts);
|
||||||
CDBopts);
|
|
||||||
_ ->
|
_ ->
|
||||||
{ok, Journal0}
|
{ok, Journal0}
|
||||||
end,
|
end,
|
||||||
|
@ -1018,9 +1018,10 @@ compact_single_file_recovr_test() ->
|
||||||
LedgerFun1,
|
LedgerFun1,
|
||||||
CompactFP,
|
CompactFP,
|
||||||
CDB} = compact_single_file_setup(),
|
CDB} = compact_single_file_setup(),
|
||||||
[{LowSQN, FN, PidR, _LastKey}] =
|
CDBOpts = #cdb_options{binary_mode=true},
|
||||||
|
[{LowSQN, FN, _PidOldR, LastKey}] =
|
||||||
compact_files([Candidate],
|
compact_files([Candidate],
|
||||||
#cdb_options{file_path=CompactFP, binary_mode=true},
|
CDBOpts#cdb_options{file_path=CompactFP},
|
||||||
LedgerFun1,
|
LedgerFun1,
|
||||||
LedgerSrv1,
|
LedgerSrv1,
|
||||||
9,
|
9,
|
||||||
|
@ -1028,6 +1029,7 @@ compact_single_file_recovr_test() ->
|
||||||
native),
|
native),
|
||||||
io:format("FN of ~s~n", [FN]),
|
io:format("FN of ~s~n", [FN]),
|
||||||
?assertMatch(2, LowSQN),
|
?assertMatch(2, LowSQN),
|
||||||
|
{ok, PidR} = leveled_cdb:cdb_reopen_reader(FN, LastKey, CDBOpts),
|
||||||
?assertMatch(probably,
|
?assertMatch(probably,
|
||||||
leveled_cdb:cdb_keycheck(PidR,
|
leveled_cdb:cdb_keycheck(PidR,
|
||||||
{8,
|
{8,
|
||||||
|
@ -1047,6 +1049,7 @@ compact_single_file_recovr_test() ->
|
||||||
test_ledgerkey("Key2")}),
|
test_ledgerkey("Key2")}),
|
||||||
?assertMatch({{_, _}, {"Value2", {[], infinity}}},
|
?assertMatch({{_, _}, {"Value2", {[], infinity}}},
|
||||||
leveled_codec:from_inkerkv(RKV1)),
|
leveled_codec:from_inkerkv(RKV1)),
|
||||||
|
ok = leveled_cdb:cdb_close(PidR),
|
||||||
ok = leveled_cdb:cdb_deletepending(CDB),
|
ok = leveled_cdb:cdb_deletepending(CDB),
|
||||||
ok = leveled_cdb:cdb_destroy(CDB).
|
ok = leveled_cdb:cdb_destroy(CDB).
|
||||||
|
|
||||||
|
@ -1057,9 +1060,10 @@ compact_single_file_retain_test() ->
|
||||||
LedgerFun1,
|
LedgerFun1,
|
||||||
CompactFP,
|
CompactFP,
|
||||||
CDB} = compact_single_file_setup(),
|
CDB} = compact_single_file_setup(),
|
||||||
[{LowSQN, FN, PidR, _LK}] =
|
CDBOpts = #cdb_options{binary_mode=true},
|
||||||
|
[{LowSQN, FN, _PidOldR, LastKey}] =
|
||||||
compact_files([Candidate],
|
compact_files([Candidate],
|
||||||
#cdb_options{file_path=CompactFP, binary_mode=true},
|
CDBOpts#cdb_options{file_path=CompactFP},
|
||||||
LedgerFun1,
|
LedgerFun1,
|
||||||
LedgerSrv1,
|
LedgerSrv1,
|
||||||
9,
|
9,
|
||||||
|
@ -1067,6 +1071,7 @@ compact_single_file_retain_test() ->
|
||||||
native),
|
native),
|
||||||
io:format("FN of ~s~n", [FN]),
|
io:format("FN of ~s~n", [FN]),
|
||||||
?assertMatch(1, LowSQN),
|
?assertMatch(1, LowSQN),
|
||||||
|
{ok, PidR} = leveled_cdb:cdb_reopen_reader(FN, LastKey, CDBOpts),
|
||||||
?assertMatch(probably,
|
?assertMatch(probably,
|
||||||
leveled_cdb:cdb_keycheck(PidR,
|
leveled_cdb:cdb_keycheck(PidR,
|
||||||
{8,
|
{8,
|
||||||
|
@ -1081,11 +1086,12 @@ compact_single_file_retain_test() ->
|
||||||
stnd,
|
stnd,
|
||||||
test_ledgerkey("Key1")})),
|
test_ledgerkey("Key1")})),
|
||||||
RKV1 = leveled_cdb:cdb_get(PidR,
|
RKV1 = leveled_cdb:cdb_get(PidR,
|
||||||
{2,
|
{2,
|
||||||
stnd,
|
stnd,
|
||||||
test_ledgerkey("Key2")}),
|
test_ledgerkey("Key2")}),
|
||||||
?assertMatch({{_, _}, {"Value2", {[], infinity}}},
|
?assertMatch({{_, _}, {"Value2", {[], infinity}}},
|
||||||
leveled_codec:from_inkerkv(RKV1)),
|
leveled_codec:from_inkerkv(RKV1)),
|
||||||
|
ok = leveled_cdb:cdb_close(PidR),
|
||||||
ok = leveled_cdb:cdb_deletepending(CDB),
|
ok = leveled_cdb:cdb_deletepending(CDB),
|
||||||
ok = leveled_cdb:cdb_destroy(CDB).
|
ok = leveled_cdb:cdb_destroy(CDB).
|
||||||
|
|
||||||
|
|
|
@ -52,6 +52,9 @@ generate_entry(Journal) ->
|
||||||
case leveled_cdb:cdb_firstkey(PidR) of
|
case leveled_cdb:cdb_firstkey(PidR) of
|
||||||
{StartSQN, _Type, _PK} ->
|
{StartSQN, _Type, _PK} ->
|
||||||
LastKey = leveled_cdb:cdb_lastkey(PidR),
|
LastKey = leveled_cdb:cdb_lastkey(PidR),
|
||||||
|
% close the file here. This will then be re-opened by the inker
|
||||||
|
% and so will be correctly linked to the inker not to the iclerk
|
||||||
|
ok = leveled_cdb:cdb_close(PidR),
|
||||||
[{StartSQN, NewFN, PidR, LastKey}];
|
[{StartSQN, NewFN, PidR, LastKey}];
|
||||||
empty ->
|
empty ->
|
||||||
leveled_log:log("IC013", [NewFN]),
|
leveled_log:log("IC013", [NewFN]),
|
||||||
|
|
|
@ -698,14 +698,20 @@ handle_call(doom, _From, State) ->
|
||||||
|
|
||||||
|
|
||||||
handle_cast({clerk_complete, ManifestSnippet, FilesToDelete}, State) ->
|
handle_cast({clerk_complete, ManifestSnippet, FilesToDelete}, State) ->
|
||||||
|
CDBOpts = State#state.cdb_options,
|
||||||
DropFun =
|
DropFun =
|
||||||
fun(E, Acc) ->
|
fun(E, Acc) ->
|
||||||
leveled_imanifest:remove_entry(Acc, E)
|
leveled_imanifest:remove_entry(Acc, E)
|
||||||
end,
|
end,
|
||||||
Man0 = lists:foldl(DropFun, State#state.manifest, FilesToDelete),
|
Man0 = lists:foldl(DropFun, State#state.manifest, FilesToDelete),
|
||||||
AddFun =
|
AddFun =
|
||||||
fun(E, Acc) ->
|
fun(ManEntry, Acc) ->
|
||||||
leveled_imanifest:add_entry(Acc, E, false)
|
{LowSQN, FN, _, LK_RO} = ManEntry,
|
||||||
|
% At this stage the FN has a .cdb extension, which will be
|
||||||
|
% stripped during add_entry - so need to add the .cdb here
|
||||||
|
{ok, Pid} = leveled_cdb:cdb_reopen_reader(FN, LK_RO, CDBOpts),
|
||||||
|
UpdEntry = {LowSQN, FN, Pid, LK_RO},
|
||||||
|
leveled_imanifest:add_entry(Acc, UpdEntry, false)
|
||||||
end,
|
end,
|
||||||
Man1 = lists:foldl(AddFun, Man0, ManifestSnippet),
|
Man1 = lists:foldl(AddFun, Man0, ManifestSnippet),
|
||||||
NewManifestSQN = State#state.manifest_sqn + 1,
|
NewManifestSQN = State#state.manifest_sqn + 1,
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue