Log improvements
Continuation of log review and conversion to using central log function. Fixup of convoluted shutdown process between Bookie, Inker and Inker's Clerk
This commit is contained in:
parent
c6fc8d1768
commit
4e46c9735d
6 changed files with 128 additions and 91 deletions
|
@ -159,8 +159,6 @@
|
||||||
-define(CACHE_SIZE, 2000).
|
-define(CACHE_SIZE, 2000).
|
||||||
-define(JOURNAL_FP, "journal").
|
-define(JOURNAL_FP, "journal").
|
||||||
-define(LEDGER_FP, "ledger").
|
-define(LEDGER_FP, "ledger").
|
||||||
-define(SHUTDOWN_WAITS, 60).
|
|
||||||
-define(SHUTDOWN_PAUSE, 10000).
|
|
||||||
-define(SNAPSHOT_TIMEOUT, 300000).
|
-define(SNAPSHOT_TIMEOUT, 300000).
|
||||||
-define(CHECKJOURNAL_PROB, 0.2).
|
-define(CHECKJOURNAL_PROB, 0.2).
|
||||||
|
|
||||||
|
@ -394,8 +392,7 @@ handle_info(_Info, State) ->
|
||||||
|
|
||||||
terminate(Reason, State) ->
|
terminate(Reason, State) ->
|
||||||
leveled_log:log("B0003", [Reason]),
|
leveled_log:log("B0003", [Reason]),
|
||||||
WaitList = lists:duplicate(?SHUTDOWN_WAITS, ?SHUTDOWN_PAUSE),
|
ok = leveled_inker:ink_close(State#state.inker),
|
||||||
ok = shutdown_wait(WaitList, State#state.inker),
|
|
||||||
ok = leveled_penciller:pcl_close(State#state.penciller).
|
ok = leveled_penciller:pcl_close(State#state.penciller).
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
|
@ -552,21 +549,7 @@ snapshot_store(State, SnapType) ->
|
||||||
ledger ->
|
ledger ->
|
||||||
{ok, {LedgerSnapshot, State#state.ledger_cache},
|
{ok, {LedgerSnapshot, State#state.ledger_cache},
|
||||||
null}
|
null}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
shutdown_wait([], _Inker) ->
|
|
||||||
false;
|
|
||||||
shutdown_wait([TopPause|Rest], Inker) ->
|
|
||||||
case leveled_inker:ink_close(Inker) of
|
|
||||||
ok ->
|
|
||||||
ok;
|
|
||||||
pause ->
|
|
||||||
io:format("Inker shutdown stil waiting for process to complete" ++
|
|
||||||
" with further wait of ~w~n", [lists:sum(Rest)]),
|
|
||||||
ok = timer:sleep(TopPause),
|
|
||||||
shutdown_wait(Rest, Inker)
|
|
||||||
end.
|
|
||||||
|
|
||||||
|
|
||||||
set_options(Opts) ->
|
set_options(Opts) ->
|
||||||
MaxJournalSize = get_opt(max_journalsize, Opts, 10000000000),
|
MaxJournalSize = get_opt(max_journalsize, Opts, 10000000000),
|
||||||
|
|
|
@ -175,27 +175,20 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout},
|
||||||
C#candidate.journal}
|
C#candidate.journal}
|
||||||
end,
|
end,
|
||||||
BestRun1),
|
BestRun1),
|
||||||
io:format("Clerk updating Inker as compaction complete of " ++
|
leveled_log:log("IC002", [length(FilesToDelete)]),
|
||||||
"~w files~n", [length(FilesToDelete)]),
|
case is_process_alive(Inker) of
|
||||||
{ok, ManSQN} = leveled_inker:ink_updatemanifest(Inker,
|
|
||||||
ManifestSlice,
|
|
||||||
FilesToDelete),
|
|
||||||
ok = leveled_inker:ink_compactioncomplete(Inker),
|
|
||||||
io:format("Clerk has completed compaction process~n"),
|
|
||||||
case PromptDelete of
|
|
||||||
true ->
|
true ->
|
||||||
lists:foreach(fun({_SQN, _FN, J2D}) ->
|
update_inker(Inker,
|
||||||
leveled_cdb:cdb_deletepending(J2D,
|
ManifestSlice,
|
||||||
ManSQN,
|
FilesToDelete,
|
||||||
Inker)
|
PromptDelete),
|
||||||
end,
|
|
||||||
FilesToDelete),
|
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
false ->
|
false ->
|
||||||
{noreply, State}
|
leveled_log:log("IC001", []),
|
||||||
|
{stop, normal, State}
|
||||||
end;
|
end;
|
||||||
Score ->
|
Score ->
|
||||||
io:format("No compaction run as highest score=~w~n", [Score]),
|
leveled_log:log("IC003", [Score]),
|
||||||
ok = leveled_inker:ink_compactioncomplete(Inker),
|
ok = leveled_inker:ink_compactioncomplete(Inker),
|
||||||
{noreply, State}
|
{noreply, State}
|
||||||
end;
|
end;
|
||||||
|
@ -245,7 +238,7 @@ check_single_file(CDB, FilterFun, FilterServer, MaxSQN, SampleSize, BatchSize) -
|
||||||
_ ->
|
_ ->
|
||||||
100 * ActiveSize / (ActiveSize + ReplacedSize)
|
100 * ActiveSize / (ActiveSize + ReplacedSize)
|
||||||
end,
|
end,
|
||||||
io:format("Score for filename ~s is ~w~n", [FN, Score]),
|
leveled_log:log("IC004", [FN, Score]),
|
||||||
Score.
|
Score.
|
||||||
|
|
||||||
scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN) ->
|
scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN) ->
|
||||||
|
@ -352,12 +345,10 @@ score_run(Run, MaxRunLength) ->
|
||||||
|
|
||||||
|
|
||||||
print_compaction_run(BestRun, MaxRunLength) ->
|
print_compaction_run(BestRun, MaxRunLength) ->
|
||||||
io:format("Compaction to be performed on ~w files with score of ~w~n",
|
leveled_log:log("IC005", [length(BestRun),
|
||||||
[length(BestRun), score_run(BestRun, MaxRunLength)]),
|
score_run(BestRun, MaxRunLength)]),
|
||||||
lists:foreach(fun(File) ->
|
lists:foreach(fun(File) ->
|
||||||
io:format("Filename ~s is part of compaction run~n",
|
leveled_log:log("IC006", [File#candidate.filename])
|
||||||
[File#candidate.filename])
|
|
||||||
|
|
||||||
end,
|
end,
|
||||||
BestRun).
|
BestRun).
|
||||||
|
|
||||||
|
@ -366,6 +357,24 @@ sort_run(RunOfFiles) ->
|
||||||
Cand1#candidate.low_sqn =< Cand2#candidate.low_sqn end,
|
Cand1#candidate.low_sqn =< Cand2#candidate.low_sqn end,
|
||||||
lists:sort(CompareFun, RunOfFiles).
|
lists:sort(CompareFun, RunOfFiles).
|
||||||
|
|
||||||
|
update_inker(Inker, ManifestSlice, FilesToDelete, PromptDelete) ->
|
||||||
|
{ok, ManSQN} = leveled_inker:ink_updatemanifest(Inker,
|
||||||
|
ManifestSlice,
|
||||||
|
FilesToDelete),
|
||||||
|
ok = leveled_inker:ink_compactioncomplete(Inker),
|
||||||
|
leveled_log:log("IC007", []),
|
||||||
|
case PromptDelete of
|
||||||
|
true ->
|
||||||
|
lists:foreach(fun({_SQN, _FN, J2D}) ->
|
||||||
|
leveled_cdb:cdb_deletepending(J2D,
|
||||||
|
ManSQN,
|
||||||
|
Inker)
|
||||||
|
end,
|
||||||
|
FilesToDelete),
|
||||||
|
ok;
|
||||||
|
false ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
compact_files(BestRun, CDBopts, FilterFun, FilterServer, MaxSQN, RStrategy) ->
|
compact_files(BestRun, CDBopts, FilterFun, FilterServer, MaxSQN, RStrategy) ->
|
||||||
BatchesOfPositions = get_all_positions(BestRun, []),
|
BatchesOfPositions = get_all_positions(BestRun, []),
|
||||||
|
@ -418,8 +427,7 @@ get_all_positions([], PositionBatches) ->
|
||||||
get_all_positions([HeadRef|RestOfBest], PositionBatches) ->
|
get_all_positions([HeadRef|RestOfBest], PositionBatches) ->
|
||||||
SrcJournal = HeadRef#candidate.journal,
|
SrcJournal = HeadRef#candidate.journal,
|
||||||
Positions = leveled_cdb:cdb_getpositions(SrcJournal, all),
|
Positions = leveled_cdb:cdb_getpositions(SrcJournal, all),
|
||||||
io:format("Compaction source ~s has yielded ~w positions~n",
|
leveled_log:log("IC008", [HeadRef#candidate.filename, length(Positions)]),
|
||||||
[HeadRef#candidate.filename, length(Positions)]),
|
|
||||||
Batches = split_positions_into_batches(lists:sort(Positions),
|
Batches = split_positions_into_batches(lists:sort(Positions),
|
||||||
SrcJournal,
|
SrcJournal,
|
||||||
[]),
|
[]),
|
||||||
|
@ -480,9 +488,7 @@ write_values(KVCList, CDBopts, Journal0, ManSlice0) ->
|
||||||
FN = leveled_inker:filepath(FP,
|
FN = leveled_inker:filepath(FP,
|
||||||
SQN,
|
SQN,
|
||||||
compact_journal),
|
compact_journal),
|
||||||
io:format("Generate journal for compaction"
|
leveled_log:log("IC009", [FN]),
|
||||||
++ " with filename ~s~n",
|
|
||||||
[FN]),
|
|
||||||
leveled_cdb:cdb_open_writer(FN,
|
leveled_cdb:cdb_open_writer(FN,
|
||||||
CDBopts);
|
CDBopts);
|
||||||
_ ->
|
_ ->
|
||||||
|
|
|
@ -248,8 +248,7 @@ handle_call({fetch, Key, SQN}, _From, State) ->
|
||||||
{{SQN, Key}, {Value, _IndexSpecs}} ->
|
{{SQN, Key}, {Value, _IndexSpecs}} ->
|
||||||
{reply, {ok, Value}, State};
|
{reply, {ok, Value}, State};
|
||||||
Other ->
|
Other ->
|
||||||
io:format("Unexpected failure to fetch value for" ++
|
leveled_log:log("I0001", [Key, SQN, Other]),
|
||||||
"Key=~w SQN=~w with reason ~w~n", [Key, SQN, Other]),
|
|
||||||
{reply, not_present, State}
|
{reply, not_present, State}
|
||||||
end;
|
end;
|
||||||
handle_call({get, Key, SQN}, _From, State) ->
|
handle_call({get, Key, SQN}, _From, State) ->
|
||||||
|
@ -263,15 +262,14 @@ handle_call({load_pcl, StartSQN, FilterFun, Penciller}, _From, State) ->
|
||||||
handle_call({register_snapshot, Requestor}, _From , State) ->
|
handle_call({register_snapshot, Requestor}, _From , State) ->
|
||||||
Rs = [{Requestor,
|
Rs = [{Requestor,
|
||||||
State#state.manifest_sqn}|State#state.registered_snapshots],
|
State#state.manifest_sqn}|State#state.registered_snapshots],
|
||||||
io:format("Journal snapshot ~w registered at SQN ~w~n",
|
leveled_log:log("I0002", [Requestor, State#state.manifest_sqn]),
|
||||||
[Requestor, State#state.manifest_sqn]),
|
|
||||||
{reply, {State#state.manifest,
|
{reply, {State#state.manifest,
|
||||||
State#state.active_journaldb},
|
State#state.active_journaldb},
|
||||||
State#state{registered_snapshots=Rs}};
|
State#state{registered_snapshots=Rs}};
|
||||||
handle_call({release_snapshot, Snapshot}, _From , State) ->
|
handle_call({release_snapshot, Snapshot}, _From , State) ->
|
||||||
Rs = lists:keydelete(Snapshot, 1, State#state.registered_snapshots),
|
Rs = lists:keydelete(Snapshot, 1, State#state.registered_snapshots),
|
||||||
io:format("Journal snapshot ~w released~n", [Snapshot]),
|
leveled_log:log("I0003", [Snapshot]),
|
||||||
io:format("Remaining journal snapshots are ~w~n", [Rs]),
|
leveled_log:log("I0004", [length(Rs)]),
|
||||||
{reply, ok, State#state{registered_snapshots=Rs}};
|
{reply, ok, State#state{registered_snapshots=Rs}};
|
||||||
handle_call({confirm_delete, ManSQN}, _From, State) ->
|
handle_call({confirm_delete, ManSQN}, _From, State) ->
|
||||||
Reply = lists:foldl(fun({_R, SnapSQN}, Bool) ->
|
Reply = lists:foldl(fun({_R, SnapSQN}, Bool) ->
|
||||||
|
@ -300,7 +298,7 @@ handle_call({update_manifest,
|
||||||
ManifestSnippet),
|
ManifestSnippet),
|
||||||
NewManifestSQN = State#state.manifest_sqn + 1,
|
NewManifestSQN = State#state.manifest_sqn + 1,
|
||||||
manifest_printer(Man1),
|
manifest_printer(Man1),
|
||||||
ok = simple_manifest_writer(Man1, NewManifestSQN, State#state.root_path),
|
simple_manifest_writer(Man1, NewManifestSQN, State#state.root_path),
|
||||||
{reply,
|
{reply,
|
||||||
{ok, NewManifestSQN},
|
{ok, NewManifestSQN},
|
||||||
State#state{manifest=Man1,
|
State#state{manifest=Man1,
|
||||||
|
@ -327,12 +325,7 @@ handle_call(compaction_complete, _From, State) ->
|
||||||
handle_call(compaction_pending, _From, State) ->
|
handle_call(compaction_pending, _From, State) ->
|
||||||
{reply, State#state.compaction_pending, State};
|
{reply, State#state.compaction_pending, State};
|
||||||
handle_call(close, _From, State) ->
|
handle_call(close, _From, State) ->
|
||||||
case State#state.compaction_pending of
|
{stop, normal, ok, State}.
|
||||||
true ->
|
|
||||||
{reply, pause, State};
|
|
||||||
false ->
|
|
||||||
{stop, normal, ok, State}
|
|
||||||
end.
|
|
||||||
|
|
||||||
handle_cast(_Msg, State) ->
|
handle_cast(_Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
@ -345,13 +338,13 @@ terminate(Reason, State) ->
|
||||||
true ->
|
true ->
|
||||||
ok = ink_releasesnapshot(State#state.source_inker, self());
|
ok = ink_releasesnapshot(State#state.source_inker, self());
|
||||||
false ->
|
false ->
|
||||||
io:format("Inker closing journal for reason ~w~n", [Reason]),
|
leveled_log:log("I0005", [Reason]),
|
||||||
io:format("Close triggered with journal_sqn=~w and manifest_sqn=~w~n",
|
leveled_log:log("I0006", [State#state.journal_sqn,
|
||||||
[State#state.journal_sqn, State#state.manifest_sqn]),
|
State#state.manifest_sqn]),
|
||||||
io:format("Manifest when closing is: ~n"),
|
|
||||||
leveled_iclerk:clerk_stop(State#state.clerk),
|
leveled_iclerk:clerk_stop(State#state.clerk),
|
||||||
lists:foreach(fun({Snap, _SQN}) -> ok = ink_close(Snap) end,
|
lists:foreach(fun({Snap, _SQN}) -> ok = ink_close(Snap) end,
|
||||||
State#state.registered_snapshots),
|
State#state.registered_snapshots),
|
||||||
|
leveled_log:log("I0007", []),
|
||||||
manifest_printer(State#state.manifest),
|
manifest_printer(State#state.manifest),
|
||||||
ok = close_allmanifest(State#state.manifest)
|
ok = close_allmanifest(State#state.manifest)
|
||||||
end.
|
end.
|
||||||
|
@ -425,9 +418,7 @@ put_object(LedgerKey, Object, KeyChanges, State) ->
|
||||||
ok = leveled_cdb:cdb_put(NewJournalP,
|
ok = leveled_cdb:cdb_put(NewJournalP,
|
||||||
JournalKey,
|
JournalKey,
|
||||||
JournalBin),
|
JournalBin),
|
||||||
io:format("Put to new active journal " ++
|
leveled_log:log_timer("I0008", [], SW),
|
||||||
"with manifest write took ~w microseconds~n",
|
|
||||||
[timer:now_diff(os:timestamp(),SW)]),
|
|
||||||
{rolling,
|
{rolling,
|
||||||
State#state{journal_sqn=NewSQN,
|
State#state{journal_sqn=NewSQN,
|
||||||
manifest=NewManifest,
|
manifest=NewManifest,
|
||||||
|
@ -489,14 +480,14 @@ build_manifest(ManifestFilenames,
|
||||||
% the manifest (must also increment the manifest SQN).
|
% the manifest (must also increment the manifest SQN).
|
||||||
UpdManifestSQN = if
|
UpdManifestSQN = if
|
||||||
length(OpenManifest) > length(Manifest) ->
|
length(OpenManifest) > length(Manifest) ->
|
||||||
io:format("Updated manifest on startup: ~n"),
|
leveled_log:log("I0009", []),
|
||||||
manifest_printer(OpenManifest),
|
manifest_printer(OpenManifest),
|
||||||
simple_manifest_writer(OpenManifest,
|
simple_manifest_writer(OpenManifest,
|
||||||
ManifestSQN + 1,
|
ManifestSQN + 1,
|
||||||
RootPath),
|
RootPath),
|
||||||
ManifestSQN + 1;
|
ManifestSQN + 1;
|
||||||
true ->
|
true ->
|
||||||
io:format("Unchanged manifest on startup: ~n"),
|
leveled_log:log("I0010", []),
|
||||||
manifest_printer(OpenManifest),
|
manifest_printer(OpenManifest),
|
||||||
ManifestSQN
|
ManifestSQN
|
||||||
end,
|
end,
|
||||||
|
@ -512,7 +503,7 @@ close_allmanifest([H|ManifestT]) ->
|
||||||
|
|
||||||
|
|
||||||
open_all_manifest([], RootPath, CDBOpts) ->
|
open_all_manifest([], RootPath, CDBOpts) ->
|
||||||
io:format("Manifest is empty, starting from manifest SQN 1~n"),
|
leveled_log:log("I0011", []),
|
||||||
add_to_manifest([], start_new_activejournal(1, RootPath, CDBOpts));
|
add_to_manifest([], start_new_activejournal(1, RootPath, CDBOpts));
|
||||||
open_all_manifest(Man0, RootPath, CDBOpts) ->
|
open_all_manifest(Man0, RootPath, CDBOpts) ->
|
||||||
Man1 = lists:reverse(lists:sort(Man0)),
|
Man1 = lists:reverse(lists:sort(Man0)),
|
||||||
|
@ -521,8 +512,7 @@ open_all_manifest(Man0, RootPath, CDBOpts) ->
|
||||||
PendingHeadFN = HeadFN ++ "." ++ ?PENDING_FILEX,
|
PendingHeadFN = HeadFN ++ "." ++ ?PENDING_FILEX,
|
||||||
Man2 = case filelib:is_file(CompleteHeadFN) of
|
Man2 = case filelib:is_file(CompleteHeadFN) of
|
||||||
true ->
|
true ->
|
||||||
io:format("Head manifest entry ~s is complete~n",
|
leveled_log:log("I0012", [HeadFN]),
|
||||||
[HeadFN]),
|
|
||||||
{ok, HeadR} = leveled_cdb:cdb_open_reader(CompleteHeadFN),
|
{ok, HeadR} = leveled_cdb:cdb_open_reader(CompleteHeadFN),
|
||||||
{LastSQN, _Type, _PK} = leveled_cdb:cdb_lastkey(HeadR),
|
{LastSQN, _Type, _PK} = leveled_cdb:cdb_lastkey(HeadR),
|
||||||
add_to_manifest(add_to_manifest(ManifestTail,
|
add_to_manifest(add_to_manifest(ManifestTail,
|
||||||
|
@ -569,7 +559,7 @@ add_to_manifest(Manifest, Entry) ->
|
||||||
|
|
||||||
remove_from_manifest(Manifest, Entry) ->
|
remove_from_manifest(Manifest, Entry) ->
|
||||||
{SQN, FN, _PidR} = Entry,
|
{SQN, FN, _PidR} = Entry,
|
||||||
io:format("File ~s to be removed from manifest~n", [FN]),
|
leveled_log:log("I0013", [FN]),
|
||||||
lists:keydelete(SQN, 1, Manifest).
|
lists:keydelete(SQN, 1, Manifest).
|
||||||
|
|
||||||
find_in_manifest(SQN, [{LowSQN, _FN, Pid}|_Tail]) when SQN >= LowSQN ->
|
find_in_manifest(SQN, [{LowSQN, _FN, Pid}|_Tail]) when SQN >= LowSQN ->
|
||||||
|
@ -623,7 +613,7 @@ load_from_sequence(MinSQN, FilterFun, Penciller, [{_LowSQN, FN, Pid}|Rest]) ->
|
||||||
|
|
||||||
load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller,
|
load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller,
|
||||||
CDBpid, StartPos, FN, Rest) ->
|
CDBpid, StartPos, FN, Rest) ->
|
||||||
io:format("Loading from filename ~s from SQN ~w~n", [FN, MinSQN]),
|
leveled_log:log("I0014", [FN, MinSQN]),
|
||||||
InitAcc = {MinSQN, MaxSQN, gb_trees:empty()},
|
InitAcc = {MinSQN, MaxSQN, gb_trees:empty()},
|
||||||
Res = case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitAcc, StartPos) of
|
Res = case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitAcc, StartPos) of
|
||||||
{eof, {AccMinSQN, _AccMaxSQN, AccKL}} ->
|
{eof, {AccMinSQN, _AccMaxSQN, AccKL}} ->
|
||||||
|
@ -698,8 +688,7 @@ filepath(CompactFilePath, NewSQN, compact_journal) ->
|
||||||
|
|
||||||
simple_manifest_reader(SQN, RootPath) ->
|
simple_manifest_reader(SQN, RootPath) ->
|
||||||
ManifestPath = filepath(RootPath, manifest_dir),
|
ManifestPath = filepath(RootPath, manifest_dir),
|
||||||
io:format("Opening manifest file at ~s with SQN ~w~n",
|
leveled_log:log("I0015", [ManifestPath, SQN]),
|
||||||
[ManifestPath, SQN]),
|
|
||||||
{ok, MBin} = file:read_file(filename:join(ManifestPath,
|
{ok, MBin} = file:read_file(filename:join(ManifestPath,
|
||||||
integer_to_list(SQN)
|
integer_to_list(SQN)
|
||||||
++ ".man")),
|
++ ".man")),
|
||||||
|
@ -715,13 +704,8 @@ simple_manifest_writer(Manifest, ManSQN, RootPath) ->
|
||||||
MBin = term_to_binary(lists:map(fun({SQN, FN, _PID}) -> {SQN, FN} end,
|
MBin = term_to_binary(lists:map(fun({SQN, FN, _PID}) -> {SQN, FN} end,
|
||||||
Manifest), [compressed]),
|
Manifest), [compressed]),
|
||||||
case filelib:is_file(NewFN) of
|
case filelib:is_file(NewFN) of
|
||||||
true ->
|
|
||||||
io:format("Error - trying to write manifest for"
|
|
||||||
++ " ManifestSQN=~w which already exists~n", [ManSQN]),
|
|
||||||
error;
|
|
||||||
false ->
|
false ->
|
||||||
io:format("Writing new version of manifest for "
|
leveled_log:log("I0016", [ManSQN]),
|
||||||
++ " manifestSQN=~w~n", [ManSQN]),
|
|
||||||
ok = file:write_file(TmpFN, MBin),
|
ok = file:write_file(TmpFN, MBin),
|
||||||
ok = file:rename(TmpFN, NewFN),
|
ok = file:rename(TmpFN, NewFN),
|
||||||
ok
|
ok
|
||||||
|
@ -729,8 +713,7 @@ simple_manifest_writer(Manifest, ManSQN, RootPath) ->
|
||||||
|
|
||||||
manifest_printer(Manifest) ->
|
manifest_printer(Manifest) ->
|
||||||
lists:foreach(fun({SQN, FN, _PID}) ->
|
lists:foreach(fun({SQN, FN, _PID}) ->
|
||||||
io:format("At SQN=~w journal has filename ~s~n",
|
leveled_log:log("I0017", [SQN, FN]) end,
|
||||||
[SQN, FN]) end,
|
|
||||||
Manifest).
|
Manifest).
|
||||||
|
|
||||||
initiate_penciller_snapshot(Bookie) ->
|
initiate_penciller_snapshot(Bookie) ->
|
||||||
|
|
|
@ -123,7 +123,69 @@
|
||||||
{"PC014",
|
{"PC014",
|
||||||
{info, "Empty file ~s to be cleared"}},
|
{info, "Empty file ~s to be cleared"}},
|
||||||
{"PC015",
|
{"PC015",
|
||||||
{info, "File created"}}
|
{info, "File created"}},
|
||||||
|
|
||||||
|
{"I0001",
|
||||||
|
{info, "Unexpected failure to fetch value for Key=~w SQN=~w "
|
||||||
|
++ "with reason ~w"}},
|
||||||
|
{"I0002",
|
||||||
|
{info, "Journal snapshot ~w registered at SQN ~w"}},
|
||||||
|
{"I0003",
|
||||||
|
{info, "Journal snapshot ~w released"}},
|
||||||
|
{"I0004",
|
||||||
|
{info, "Remaining number of journal snapshots is ~w"}},
|
||||||
|
{"I0005",
|
||||||
|
{info, "Inker closing journal for reason ~w"}},
|
||||||
|
{"I0006",
|
||||||
|
{info, "Close triggered with journal_sqn=~w and manifest_sqn=~w"}},
|
||||||
|
{"I0007",
|
||||||
|
{info, "Inker manifest when closing is:"}},
|
||||||
|
{"I0008",
|
||||||
|
{info, "Put to new active journal required roll and manifest write"}},
|
||||||
|
{"I0009",
|
||||||
|
{info, "Updated manifest on startup:"}},
|
||||||
|
{"I0010",
|
||||||
|
{info, "Unchanged manifest on startup:"}},
|
||||||
|
{"I0011",
|
||||||
|
{info, "Manifest is empty, starting from manifest SQN 1"}},
|
||||||
|
{"I0012",
|
||||||
|
{info, "Head manifest entry ~s is complete so new active journal "
|
||||||
|
++ "required"}},
|
||||||
|
{"I0013",
|
||||||
|
{info, "File ~s to be removed from manifest"}},
|
||||||
|
{"I0014",
|
||||||
|
{info, "On startup oading from filename ~s from SQN ~w"}},
|
||||||
|
{"I0015",
|
||||||
|
{info, "Opening manifest file at ~s with SQN ~w"}},
|
||||||
|
{"I0016",
|
||||||
|
{info, "Writing new version of manifest for manifestSQN=~w"}},
|
||||||
|
{"I0017",
|
||||||
|
{info, "At SQN=~w journal has filename ~s"}},
|
||||||
|
|
||||||
|
{"IC001",
|
||||||
|
{info, "Inker no longer alive so Clerk to abandon work "
|
||||||
|
++ "leaving garbage"}},
|
||||||
|
{"IC002",
|
||||||
|
{info, "Clerk updating Inker as compaction complete of ~w files"}},
|
||||||
|
{"IC003",
|
||||||
|
{info, "No compaction run as highest score=~w"}},
|
||||||
|
{"IC004",
|
||||||
|
{info, "Score for filename ~s is ~w"}},
|
||||||
|
{"IC005",
|
||||||
|
{info, "Compaction to be performed on ~w files with score of ~w"}},
|
||||||
|
{"IC006",
|
||||||
|
{info, "Filename ~s is part of compaction run"}},
|
||||||
|
{"IC007",
|
||||||
|
{info, "Clerk has completed compaction process"}},
|
||||||
|
{"IC008",
|
||||||
|
{info, "Compaction source ~s has yielded ~w positions"}},
|
||||||
|
{"IC009",
|
||||||
|
{info, "Generate journal for compaction with filename ~s"}},
|
||||||
|
|
||||||
|
{"PM001",
|
||||||
|
{info, "Indexed new cache entry with total L0 cache size now ~w"}},
|
||||||
|
{"PM002",
|
||||||
|
{info, "Completed dump of L0 cache to list of size ~w"}}
|
||||||
|
|
||||||
])).
|
])).
|
||||||
|
|
||||||
|
@ -132,7 +194,7 @@ log(LogReference, Subs) ->
|
||||||
{ok, {LogLevel, LogText}} = dict:find(LogReference, ?LOGBASE),
|
{ok, {LogLevel, LogText}} = dict:find(LogReference, ?LOGBASE),
|
||||||
case lists:member(LogLevel, ?LOG_LEVEL) of
|
case lists:member(LogLevel, ?LOG_LEVEL) of
|
||||||
true ->
|
true ->
|
||||||
io:format(LogText ++ "~n", Subs);
|
io:format(LogReference ++ " " ++ LogText ++ "~n", Subs);
|
||||||
false ->
|
false ->
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
@ -148,7 +210,8 @@ log_timer(LogReference, Subs, StartTime) ->
|
||||||
MicroS ->
|
MicroS ->
|
||||||
{"ms", MicroS div 1000}
|
{"ms", MicroS div 1000}
|
||||||
end,
|
end,
|
||||||
io:format(LogText ++ " with time taken ~w " ++ Unit ++ "~n",
|
io:format(LogReference ++ " " ++ LogText ++ " with time taken ~w "
|
||||||
|
++ Unit ++ "~n",
|
||||||
Subs ++ [Time]);
|
Subs ++ [Time]);
|
||||||
false ->
|
false ->
|
||||||
ok
|
ok
|
||||||
|
|
|
@ -715,7 +715,7 @@ checkready(Pid) ->
|
||||||
%% to an immediate return as expected. With 32K keys in the TreeList it could
|
%% to an immediate return as expected. With 32K keys in the TreeList it could
|
||||||
%% take around 35-40ms.
|
%% take around 35-40ms.
|
||||||
%%
|
%%
|
||||||
%% To avoid blocking this gen_server, the SFT file cna request each item of the
|
%% To avoid blocking this gen_server, the SFT file can request each item of the
|
||||||
%% cache one at a time.
|
%% cache one at a time.
|
||||||
%%
|
%%
|
||||||
%% The Wait is set to false to use a cast when calling this in normal operation
|
%% The Wait is set to false to use a cast when calling this in normal operation
|
||||||
|
@ -1417,6 +1417,10 @@ simple_server_test() ->
|
||||||
1)),
|
1)),
|
||||||
ok = pcl_close(PclSnap),
|
ok = pcl_close(PclSnap),
|
||||||
|
|
||||||
|
% Ignore a fake pending mnaifest on startup
|
||||||
|
ok = file:write_file(RootPath ++ "/" ++ ?MANIFEST_FP ++ "nonzero_99.pnd",
|
||||||
|
term_to_binary("Hello")),
|
||||||
|
|
||||||
{ok, PclSnap2} = pcl_start(SnapOpts),
|
{ok, PclSnap2} = pcl_start(SnapOpts),
|
||||||
ok = pcl_loadsnapshot(PclSnap2, gb_trees:empty()),
|
ok = pcl_loadsnapshot(PclSnap2, gb_trees:empty()),
|
||||||
?assertMatch(false, pcl_checksequencenumber(PclSnap2,
|
?assertMatch(false, pcl_checksequencenumber(PclSnap2,
|
||||||
|
|
|
@ -69,8 +69,7 @@ add_to_index(L0Index, L0Size, LevelMinus1, LedgerSQN, TreeList) ->
|
||||||
{infinity, 0, L0Index},
|
{infinity, 0, L0Index},
|
||||||
LM1List),
|
LM1List),
|
||||||
NewL0Size = length(LM1List) + L0Size,
|
NewL0Size = length(LM1List) + L0Size,
|
||||||
io:format("Rolled L0 cache to size ~w in ~w microseconds~n",
|
leveled_log:log_timer("PM001", [NewL0Size], SW),
|
||||||
[NewL0Size, timer:now_diff(os:timestamp(), SW)]),
|
|
||||||
if
|
if
|
||||||
MinSQN > LedgerSQN ->
|
MinSQN > LedgerSQN ->
|
||||||
{MaxSQN,
|
{MaxSQN,
|
||||||
|
@ -90,8 +89,7 @@ to_list(Slots, FetchFun) ->
|
||||||
end,
|
end,
|
||||||
[],
|
[],
|
||||||
SlotList),
|
SlotList),
|
||||||
io:format("L0 cache converted to list of size ~w in ~w microseconds~n",
|
leveled_log:log_timer("PM002", [length(FullList)], SW),
|
||||||
[length(FullList), timer:now_diff(os:timestamp(), SW)]),
|
|
||||||
FullList.
|
FullList.
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue