Split out inker manifest - load last key from manifest

The process of re-opening file included an expensive search for the
LastKey - now the LastKey can be provided out of the manifest.
This commit is contained in:
martinsumner 2017-01-18 15:23:06 +00:00
parent 4e53128a2d
commit 3ca928629c
3 changed files with 54 additions and 37 deletions

View file

@ -65,6 +65,7 @@
-export([cdb_open_writer/1, -export([cdb_open_writer/1,
cdb_open_writer/2, cdb_open_writer/2,
cdb_open_reader/1, cdb_open_reader/1,
cdb_reopen_reader/2,
cdb_get/2, cdb_get/2,
cdb_put/3, cdb_put/3,
cdb_mput/2, cdb_mput/2,
@ -125,6 +126,13 @@ cdb_open_writer(Filename, Opts) ->
ok = gen_fsm:sync_send_event(Pid, {open_writer, Filename}, infinity), ok = gen_fsm:sync_send_event(Pid, {open_writer, Filename}, infinity),
{ok, Pid}. {ok, Pid}.
cdb_reopen_reader(Filename, LastKey) ->
{ok, Pid} = gen_fsm:start(?MODULE, [#cdb_options{binary_mode=true}], []),
ok = gen_fsm:sync_send_event(Pid,
{open_reader, Filename, LastKey},
infinity),
{ok, Pid}.
cdb_open_reader(Filename) -> cdb_open_reader(Filename) ->
cdb_open_reader(Filename, #cdb_options{binary_mode=true}). cdb_open_reader(Filename, #cdb_options{binary_mode=true}).
@ -241,6 +249,13 @@ starting({open_writer, Filename}, _From, State) ->
starting({open_reader, Filename}, _From, State) -> starting({open_reader, Filename}, _From, State) ->
leveled_log:log("CDB02", [Filename]), leveled_log:log("CDB02", [Filename]),
{Handle, Index, LastKey} = open_for_readonly(Filename, false), {Handle, Index, LastKey} = open_for_readonly(Filename, false),
{reply, ok, reader, State#state{handle=Handle,
last_key=LastKey,
filename=Filename,
hash_index=Index}};
starting({open_reader, Filename, LastKey}, _From, State) ->
leveled_log:log("CDB02", [Filename]),
{Handle, Index, LastKey} = open_for_readonly(Filename, LastKey),
{reply, ok, reader, State#state{handle=Handle, {reply, ok, reader, State#state{handle=Handle,
last_key=LastKey, last_key=LastKey,
filename=Filename, filename=Filename,

View file

@ -194,7 +194,8 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout},
FilesToDelete = lists:map(fun(C) -> FilesToDelete = lists:map(fun(C) ->
{C#candidate.low_sqn, {C#candidate.low_sqn,
C#candidate.filename, C#candidate.filename,
C#candidate.journal} C#candidate.journal,
undefined}
end, end,
BestRun1), BestRun1),
leveled_log:log("IC002", [length(FilesToDelete)]), leveled_log:log("IC002", [length(FilesToDelete)]),
@ -274,7 +275,7 @@ scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN) ->
scan_all_files([], _FilterFun, _FilterServer, _MaxSQN, CandidateList) -> scan_all_files([], _FilterFun, _FilterServer, _MaxSQN, CandidateList) ->
CandidateList; CandidateList;
scan_all_files([Entry|Tail], FilterFun, FilterServer, MaxSQN, CandidateList) -> scan_all_files([Entry|Tail], FilterFun, FilterServer, MaxSQN, CandidateList) ->
{LowSQN, FN, JournalP} = Entry, {LowSQN, FN, JournalP, _LK} = Entry,
CpctPerc = check_single_file(JournalP, CpctPerc = check_single_file(JournalP,
FilterFun, FilterFun,
FilterServer, FilterServer,
@ -390,7 +391,7 @@ update_inker(Inker, ManifestSlice, FilesToDelete) ->
FilesToDelete), FilesToDelete),
ok = leveled_inker:ink_compactioncomplete(Inker), ok = leveled_inker:ink_compactioncomplete(Inker),
leveled_log:log("IC007", []), leveled_log:log("IC007", []),
lists:foreach(fun({_SQN, _FN, J2D}) -> lists:foreach(fun({_SQN, _FN, J2D, _LK}) ->
leveled_cdb:cdb_deletepending(J2D, leveled_cdb:cdb_deletepending(J2D,
ManSQN, ManSQN,
Inker) Inker)

View file

@ -435,7 +435,7 @@ put_object(LedgerKey, Object, KeyChanges, State) ->
ManEntry = start_new_activejournal(NewSQN, ManEntry = start_new_activejournal(NewSQN,
State#state.root_path, State#state.root_path,
CDBopts), CDBopts),
{_, _, NewJournalP} = ManEntry, {_, _, NewJournalP, _} = ManEntry,
NewManifest = leveled_imanifest:add_entry(State#state.manifest, ManEntry), NewManifest = leveled_imanifest:add_entry(State#state.manifest, ManEntry),
ok = simple_manifest_writer(NewManifest, ok = simple_manifest_writer(NewManifest,
State#state.manifest_sqn + 1, State#state.manifest_sqn + 1,
@ -534,29 +534,6 @@ open_all_manifest([], RootPath, CDBOpts) ->
open_all_manifest(Man0, RootPath, CDBOpts) -> open_all_manifest(Man0, RootPath, CDBOpts) ->
Man1 = lists:reverse(lists:sort(Man0)), Man1 = lists:reverse(lists:sort(Man0)),
[{HeadSQN, HeadFN, _IgnorePid, HeadLK}|ManifestTail] = Man1, [{HeadSQN, HeadFN, _IgnorePid, HeadLK}|ManifestTail] = Man1,
CompleteHeadFN = HeadFN ++ "." ++ ?JOURNAL_FILEX,
PendingHeadFN = HeadFN ++ "." ++ ?PENDING_FILEX,
Man2 = case filelib:is_file(CompleteHeadFN) of
true ->
leveled_log:log("I0012", [HeadFN]),
{ok, HeadR} = leveled_cdb:cdb_open_reader(CompleteHeadFN),
LastKey = leveled_cdb:cdb_lastkey(HeadR),
LastSQN = element(1, LastKey),
ManToHead = leveled_imanifest:add_entry(ManifestTail,
{HeadSQN,
HeadFN,
HeadR,
LastKey}),
NewManEntry = start_new_activejournal(LastSQN + 1,
RootPath,
CDBOpts),
leveled_imanifest:add_entry(ManToHead, NewManEntry);
false ->
{ok, HeadW} = leveled_cdb:cdb_open_writer(PendingHeadFN,
CDBOpts),
leveled_imanifest:add_entry(ManifestTail,
{HeadSQN, HeadFN, HeadW, HeadLK})
end,
OpenJournalFun = OpenJournalFun =
fun(ManEntry) -> fun(ManEntry) ->
case ManEntry of case ManEntry of
@ -565,7 +542,8 @@ open_all_manifest(Man0, RootPath, CDBOpts) ->
PFN = FN ++ "." ++ ?PENDING_FILEX, PFN = FN ++ "." ++ ?PENDING_FILEX,
case filelib:is_file(CFN) of case filelib:is_file(CFN) of
true -> true ->
{ok, Pid} = leveled_cdb:cdb_open_reader(CFN), {ok, Pid} = leveled_cdb:cdb_reopen_reader(CFN,
LK_RO),
{LowSQN, FN, Pid, LK_RO}; {LowSQN, FN, Pid, LK_RO};
false -> false ->
W = leveled_cdb:cdb_open_writer(PFN, CDBOpts), W = leveled_cdb:cdb_open_writer(PFN, CDBOpts),
@ -578,7 +556,30 @@ open_all_manifest(Man0, RootPath, CDBOpts) ->
ManEntry ManEntry
end end
end, end,
lists:map(OpenJournalFun, Man2). OpenedTail = lists:map(OpenJournalFun, ManifestTail),
CompleteHeadFN = HeadFN ++ "." ++ ?JOURNAL_FILEX,
PendingHeadFN = HeadFN ++ "." ++ ?PENDING_FILEX,
case filelib:is_file(CompleteHeadFN) of
true ->
leveled_log:log("I0012", [HeadFN]),
{ok, HeadR} = leveled_cdb:cdb_open_reader(CompleteHeadFN),
LastKey = leveled_cdb:cdb_lastkey(HeadR),
LastSQN = element(1, LastKey),
ManToHead = leveled_imanifest:add_entry(OpenedTail,
{HeadSQN,
HeadFN,
HeadR,
LastKey}),
NewManEntry = start_new_activejournal(LastSQN + 1,
RootPath,
CDBOpts),
leveled_imanifest:add_entry(ManToHead, NewManEntry);
false ->
{ok, HeadW} = leveled_cdb:cdb_open_writer(PendingHeadFN,
CDBOpts),
leveled_imanifest:add_entry(OpenedTail,
{HeadSQN, HeadFN, HeadW, HeadLK})
end.
start_new_activejournal(SQN, RootPath, CDBOpts) -> start_new_activejournal(SQN, RootPath, CDBOpts) ->
@ -591,25 +592,25 @@ start_new_activejournal(SQN, RootPath, CDBOpts) ->
%% FilterFun{K, V, Acc} -> Penciller Key List %% FilterFun{K, V, Acc} -> Penciller Key List
%% Load the output for the CDB file into the Penciller. %% Load the output for the CDB file into the Penciller.
load_from_sequence(_MinSQN, _FilterFun, _Penciller, []) -> load_from_sequence(_MinSQN, _FilterFun, _PCL, []) ->
ok; ok;
load_from_sequence(MinSQN, FilterFun, Penciller, [{LowSQN, FN, Pid}|Rest]) load_from_sequence(MinSQN, FilterFun, PCL, [{LowSQN, FN, Pid, _LK}|Rest])
when LowSQN >= MinSQN -> when LowSQN >= MinSQN ->
load_between_sequence(MinSQN, load_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH, MinSQN + ?LOADING_BATCH,
FilterFun, FilterFun,
Penciller, PCL,
Pid, Pid,
undefined, undefined,
FN, FN,
Rest); Rest);
load_from_sequence(MinSQN, FilterFun, Penciller, [{_LowSQN, FN, Pid}|Rest]) -> load_from_sequence(MinSQN, FilterFun, PCL, [{_LowSQN, FN, Pid, _LK}|Rest]) ->
case Rest of case Rest of
[] -> [] ->
load_between_sequence(MinSQN, load_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH, MinSQN + ?LOADING_BATCH,
FilterFun, FilterFun,
Penciller, PCL,
Pid, Pid,
undefined, undefined,
FN, FN,
@ -618,13 +619,13 @@ load_from_sequence(MinSQN, FilterFun, Penciller, [{_LowSQN, FN, Pid}|Rest]) ->
load_between_sequence(MinSQN, load_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH, MinSQN + ?LOADING_BATCH,
FilterFun, FilterFun,
Penciller, PCL,
Pid, Pid,
undefined, undefined,
FN, FN,
Rest); Rest);
_ -> _ ->
load_from_sequence(MinSQN, FilterFun, Penciller, Rest) load_from_sequence(MinSQN, FilterFun, PCL, Rest)
end. end.
@ -898,7 +899,7 @@ compact_journal_test() ->
5000), 5000),
timer:sleep(1000), timer:sleep(1000),
CompactedManifest2 = ink_getmanifest(Ink1), CompactedManifest2 = ink_getmanifest(Ink1),
R = lists:foldl(fun({_SQN, FN, _P}, Acc) -> R = lists:foldl(fun({_SQN, FN, _P, _LK}, Acc) ->
case string:str(FN, "post_compact") of case string:str(FN, "post_compact") of
N when N > 0 -> N when N > 0 ->
true; true;