From 4e46c9735da34f0a7a268a9630567f541506a06c Mon Sep 17 00:00:00 2001 From: martinsumner Date: Thu, 3 Nov 2016 16:05:43 +0000 Subject: [PATCH] Log improvements Continuation of log review and conversion to using central log function. Fixup of convoluted shutdown process between Bookie, Inker and Inker's Clerk --- src/leveled_bookie.erl | 21 ++---------- src/leveled_iclerk.erl | 60 +++++++++++++++++++--------------- src/leveled_inker.erl | 57 ++++++++++++-------------------- src/leveled_log.erl | 69 +++++++++++++++++++++++++++++++++++++-- src/leveled_penciller.erl | 6 +++- src/leveled_pmem.erl | 6 ++-- 6 files changed, 128 insertions(+), 91 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index b388cae..3c64b69 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -159,8 +159,6 @@ -define(CACHE_SIZE, 2000). -define(JOURNAL_FP, "journal"). -define(LEDGER_FP, "ledger"). --define(SHUTDOWN_WAITS, 60). --define(SHUTDOWN_PAUSE, 10000). -define(SNAPSHOT_TIMEOUT, 300000). -define(CHECKJOURNAL_PROB, 0.2). @@ -394,8 +392,7 @@ handle_info(_Info, State) -> terminate(Reason, State) -> leveled_log:log("B0003", [Reason]), - WaitList = lists:duplicate(?SHUTDOWN_WAITS, ?SHUTDOWN_PAUSE), - ok = shutdown_wait(WaitList, State#state.inker), + ok = leveled_inker:ink_close(State#state.inker), ok = leveled_penciller:pcl_close(State#state.penciller). code_change(_OldVsn, State, _Extra) -> @@ -552,21 +549,7 @@ snapshot_store(State, SnapType) -> ledger -> {ok, {LedgerSnapshot, State#state.ledger_cache}, null} - end. - -shutdown_wait([], _Inker) -> - false; -shutdown_wait([TopPause|Rest], Inker) -> - case leveled_inker:ink_close(Inker) of - ok -> - ok; - pause -> - io:format("Inker shutdown stil waiting for process to complete" ++ - " with further wait of ~w~n", [lists:sum(Rest)]), - ok = timer:sleep(TopPause), - shutdown_wait(Rest, Inker) - end. - + end. set_options(Opts) -> MaxJournalSize = get_opt(max_journalsize, Opts, 10000000000), diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 8fc284c..ecd05ab 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -175,27 +175,20 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout}, C#candidate.journal} end, BestRun1), - io:format("Clerk updating Inker as compaction complete of " ++ - "~w files~n", [length(FilesToDelete)]), - {ok, ManSQN} = leveled_inker:ink_updatemanifest(Inker, - ManifestSlice, - FilesToDelete), - ok = leveled_inker:ink_compactioncomplete(Inker), - io:format("Clerk has completed compaction process~n"), - case PromptDelete of + leveled_log:log("IC002", [length(FilesToDelete)]), + case is_process_alive(Inker) of true -> - lists:foreach(fun({_SQN, _FN, J2D}) -> - leveled_cdb:cdb_deletepending(J2D, - ManSQN, - Inker) - end, - FilesToDelete), + update_inker(Inker, + ManifestSlice, + FilesToDelete, + PromptDelete), {noreply, State}; false -> - {noreply, State} + leveled_log:log("IC001", []), + {stop, normal, State} end; Score -> - io:format("No compaction run as highest score=~w~n", [Score]), + leveled_log:log("IC003", [Score]), ok = leveled_inker:ink_compactioncomplete(Inker), {noreply, State} end; @@ -245,7 +238,7 @@ check_single_file(CDB, FilterFun, FilterServer, MaxSQN, SampleSize, BatchSize) - _ -> 100 * ActiveSize / (ActiveSize + ReplacedSize) end, - io:format("Score for filename ~s is ~w~n", [FN, Score]), + leveled_log:log("IC004", [FN, Score]), Score. scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN) -> @@ -352,12 +345,10 @@ score_run(Run, MaxRunLength) -> print_compaction_run(BestRun, MaxRunLength) -> - io:format("Compaction to be performed on ~w files with score of ~w~n", - [length(BestRun), score_run(BestRun, MaxRunLength)]), + leveled_log:log("IC005", [length(BestRun), + score_run(BestRun, MaxRunLength)]), lists:foreach(fun(File) -> - io:format("Filename ~s is part of compaction run~n", - [File#candidate.filename]) - + leveled_log:log("IC006", [File#candidate.filename]) end, BestRun). @@ -366,6 +357,24 @@ sort_run(RunOfFiles) -> Cand1#candidate.low_sqn =< Cand2#candidate.low_sqn end, lists:sort(CompareFun, RunOfFiles). +update_inker(Inker, ManifestSlice, FilesToDelete, PromptDelete) -> + {ok, ManSQN} = leveled_inker:ink_updatemanifest(Inker, + ManifestSlice, + FilesToDelete), + ok = leveled_inker:ink_compactioncomplete(Inker), + leveled_log:log("IC007", []), + case PromptDelete of + true -> + lists:foreach(fun({_SQN, _FN, J2D}) -> + leveled_cdb:cdb_deletepending(J2D, + ManSQN, + Inker) + end, + FilesToDelete), + ok; + false -> + ok + end. compact_files(BestRun, CDBopts, FilterFun, FilterServer, MaxSQN, RStrategy) -> BatchesOfPositions = get_all_positions(BestRun, []), @@ -418,8 +427,7 @@ get_all_positions([], PositionBatches) -> get_all_positions([HeadRef|RestOfBest], PositionBatches) -> SrcJournal = HeadRef#candidate.journal, Positions = leveled_cdb:cdb_getpositions(SrcJournal, all), - io:format("Compaction source ~s has yielded ~w positions~n", - [HeadRef#candidate.filename, length(Positions)]), + leveled_log:log("IC008", [HeadRef#candidate.filename, length(Positions)]), Batches = split_positions_into_batches(lists:sort(Positions), SrcJournal, []), @@ -480,9 +488,7 @@ write_values(KVCList, CDBopts, Journal0, ManSlice0) -> FN = leveled_inker:filepath(FP, SQN, compact_journal), - io:format("Generate journal for compaction" - ++ " with filename ~s~n", - [FN]), + leveled_log:log("IC009", [FN]), leveled_cdb:cdb_open_writer(FN, CDBopts); _ -> diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 6cbb67d..8e2fa11 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -248,8 +248,7 @@ handle_call({fetch, Key, SQN}, _From, State) -> {{SQN, Key}, {Value, _IndexSpecs}} -> {reply, {ok, Value}, State}; Other -> - io:format("Unexpected failure to fetch value for" ++ - "Key=~w SQN=~w with reason ~w~n", [Key, SQN, Other]), + leveled_log:log("I0001", [Key, SQN, Other]), {reply, not_present, State} end; handle_call({get, Key, SQN}, _From, State) -> @@ -263,15 +262,14 @@ handle_call({load_pcl, StartSQN, FilterFun, Penciller}, _From, State) -> handle_call({register_snapshot, Requestor}, _From , State) -> Rs = [{Requestor, State#state.manifest_sqn}|State#state.registered_snapshots], - io:format("Journal snapshot ~w registered at SQN ~w~n", - [Requestor, State#state.manifest_sqn]), + leveled_log:log("I0002", [Requestor, State#state.manifest_sqn]), {reply, {State#state.manifest, State#state.active_journaldb}, State#state{registered_snapshots=Rs}}; handle_call({release_snapshot, Snapshot}, _From , State) -> Rs = lists:keydelete(Snapshot, 1, State#state.registered_snapshots), - io:format("Journal snapshot ~w released~n", [Snapshot]), - io:format("Remaining journal snapshots are ~w~n", [Rs]), + leveled_log:log("I0003", [Snapshot]), + leveled_log:log("I0004", [length(Rs)]), {reply, ok, State#state{registered_snapshots=Rs}}; handle_call({confirm_delete, ManSQN}, _From, State) -> Reply = lists:foldl(fun({_R, SnapSQN}, Bool) -> @@ -300,7 +298,7 @@ handle_call({update_manifest, ManifestSnippet), NewManifestSQN = State#state.manifest_sqn + 1, manifest_printer(Man1), - ok = simple_manifest_writer(Man1, NewManifestSQN, State#state.root_path), + simple_manifest_writer(Man1, NewManifestSQN, State#state.root_path), {reply, {ok, NewManifestSQN}, State#state{manifest=Man1, @@ -327,12 +325,7 @@ handle_call(compaction_complete, _From, State) -> handle_call(compaction_pending, _From, State) -> {reply, State#state.compaction_pending, State}; handle_call(close, _From, State) -> - case State#state.compaction_pending of - true -> - {reply, pause, State}; - false -> - {stop, normal, ok, State} - end. + {stop, normal, ok, State}. handle_cast(_Msg, State) -> {noreply, State}. @@ -345,13 +338,13 @@ terminate(Reason, State) -> true -> ok = ink_releasesnapshot(State#state.source_inker, self()); false -> - io:format("Inker closing journal for reason ~w~n", [Reason]), - io:format("Close triggered with journal_sqn=~w and manifest_sqn=~w~n", - [State#state.journal_sqn, State#state.manifest_sqn]), - io:format("Manifest when closing is: ~n"), + leveled_log:log("I0005", [Reason]), + leveled_log:log("I0006", [State#state.journal_sqn, + State#state.manifest_sqn]), leveled_iclerk:clerk_stop(State#state.clerk), lists:foreach(fun({Snap, _SQN}) -> ok = ink_close(Snap) end, State#state.registered_snapshots), + leveled_log:log("I0007", []), manifest_printer(State#state.manifest), ok = close_allmanifest(State#state.manifest) end. @@ -425,9 +418,7 @@ put_object(LedgerKey, Object, KeyChanges, State) -> ok = leveled_cdb:cdb_put(NewJournalP, JournalKey, JournalBin), - io:format("Put to new active journal " ++ - "with manifest write took ~w microseconds~n", - [timer:now_diff(os:timestamp(),SW)]), + leveled_log:log_timer("I0008", [], SW), {rolling, State#state{journal_sqn=NewSQN, manifest=NewManifest, @@ -489,14 +480,14 @@ build_manifest(ManifestFilenames, % the manifest (must also increment the manifest SQN). UpdManifestSQN = if length(OpenManifest) > length(Manifest) -> - io:format("Updated manifest on startup: ~n"), + leveled_log:log("I0009", []), manifest_printer(OpenManifest), simple_manifest_writer(OpenManifest, ManifestSQN + 1, RootPath), ManifestSQN + 1; true -> - io:format("Unchanged manifest on startup: ~n"), + leveled_log:log("I0010", []), manifest_printer(OpenManifest), ManifestSQN end, @@ -512,7 +503,7 @@ close_allmanifest([H|ManifestT]) -> open_all_manifest([], RootPath, CDBOpts) -> - io:format("Manifest is empty, starting from manifest SQN 1~n"), + leveled_log:log("I0011", []), add_to_manifest([], start_new_activejournal(1, RootPath, CDBOpts)); open_all_manifest(Man0, RootPath, CDBOpts) -> Man1 = lists:reverse(lists:sort(Man0)), @@ -521,8 +512,7 @@ open_all_manifest(Man0, RootPath, CDBOpts) -> PendingHeadFN = HeadFN ++ "." ++ ?PENDING_FILEX, Man2 = case filelib:is_file(CompleteHeadFN) of true -> - io:format("Head manifest entry ~s is complete~n", - [HeadFN]), + leveled_log:log("I0012", [HeadFN]), {ok, HeadR} = leveled_cdb:cdb_open_reader(CompleteHeadFN), {LastSQN, _Type, _PK} = leveled_cdb:cdb_lastkey(HeadR), add_to_manifest(add_to_manifest(ManifestTail, @@ -569,7 +559,7 @@ add_to_manifest(Manifest, Entry) -> remove_from_manifest(Manifest, Entry) -> {SQN, FN, _PidR} = Entry, - io:format("File ~s to be removed from manifest~n", [FN]), + leveled_log:log("I0013", [FN]), lists:keydelete(SQN, 1, Manifest). find_in_manifest(SQN, [{LowSQN, _FN, Pid}|_Tail]) when SQN >= LowSQN -> @@ -623,7 +613,7 @@ load_from_sequence(MinSQN, FilterFun, Penciller, [{_LowSQN, FN, Pid}|Rest]) -> load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller, CDBpid, StartPos, FN, Rest) -> - io:format("Loading from filename ~s from SQN ~w~n", [FN, MinSQN]), + leveled_log:log("I0014", [FN, MinSQN]), InitAcc = {MinSQN, MaxSQN, gb_trees:empty()}, Res = case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitAcc, StartPos) of {eof, {AccMinSQN, _AccMaxSQN, AccKL}} -> @@ -698,8 +688,7 @@ filepath(CompactFilePath, NewSQN, compact_journal) -> simple_manifest_reader(SQN, RootPath) -> ManifestPath = filepath(RootPath, manifest_dir), - io:format("Opening manifest file at ~s with SQN ~w~n", - [ManifestPath, SQN]), + leveled_log:log("I0015", [ManifestPath, SQN]), {ok, MBin} = file:read_file(filename:join(ManifestPath, integer_to_list(SQN) ++ ".man")), @@ -715,13 +704,8 @@ simple_manifest_writer(Manifest, ManSQN, RootPath) -> MBin = term_to_binary(lists:map(fun({SQN, FN, _PID}) -> {SQN, FN} end, Manifest), [compressed]), case filelib:is_file(NewFN) of - true -> - io:format("Error - trying to write manifest for" - ++ " ManifestSQN=~w which already exists~n", [ManSQN]), - error; false -> - io:format("Writing new version of manifest for " - ++ " manifestSQN=~w~n", [ManSQN]), + leveled_log:log("I0016", [ManSQN]), ok = file:write_file(TmpFN, MBin), ok = file:rename(TmpFN, NewFN), ok @@ -729,8 +713,7 @@ simple_manifest_writer(Manifest, ManSQN, RootPath) -> manifest_printer(Manifest) -> lists:foreach(fun({SQN, FN, _PID}) -> - io:format("At SQN=~w journal has filename ~s~n", - [SQN, FN]) end, + leveled_log:log("I0017", [SQN, FN]) end, Manifest). initiate_penciller_snapshot(Bookie) -> diff --git a/src/leveled_log.erl b/src/leveled_log.erl index d4d91ad..d1e63b5 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -123,7 +123,69 @@ {"PC014", {info, "Empty file ~s to be cleared"}}, {"PC015", - {info, "File created"}} + {info, "File created"}}, + + {"I0001", + {info, "Unexpected failure to fetch value for Key=~w SQN=~w " + ++ "with reason ~w"}}, + {"I0002", + {info, "Journal snapshot ~w registered at SQN ~w"}}, + {"I0003", + {info, "Journal snapshot ~w released"}}, + {"I0004", + {info, "Remaining number of journal snapshots is ~w"}}, + {"I0005", + {info, "Inker closing journal for reason ~w"}}, + {"I0006", + {info, "Close triggered with journal_sqn=~w and manifest_sqn=~w"}}, + {"I0007", + {info, "Inker manifest when closing is:"}}, + {"I0008", + {info, "Put to new active journal required roll and manifest write"}}, + {"I0009", + {info, "Updated manifest on startup:"}}, + {"I0010", + {info, "Unchanged manifest on startup:"}}, + {"I0011", + {info, "Manifest is empty, starting from manifest SQN 1"}}, + {"I0012", + {info, "Head manifest entry ~s is complete so new active journal " + ++ "required"}}, + {"I0013", + {info, "File ~s to be removed from manifest"}}, + {"I0014", + {info, "On startup oading from filename ~s from SQN ~w"}}, + {"I0015", + {info, "Opening manifest file at ~s with SQN ~w"}}, + {"I0016", + {info, "Writing new version of manifest for manifestSQN=~w"}}, + {"I0017", + {info, "At SQN=~w journal has filename ~s"}}, + + {"IC001", + {info, "Inker no longer alive so Clerk to abandon work " + ++ "leaving garbage"}}, + {"IC002", + {info, "Clerk updating Inker as compaction complete of ~w files"}}, + {"IC003", + {info, "No compaction run as highest score=~w"}}, + {"IC004", + {info, "Score for filename ~s is ~w"}}, + {"IC005", + {info, "Compaction to be performed on ~w files with score of ~w"}}, + {"IC006", + {info, "Filename ~s is part of compaction run"}}, + {"IC007", + {info, "Clerk has completed compaction process"}}, + {"IC008", + {info, "Compaction source ~s has yielded ~w positions"}}, + {"IC009", + {info, "Generate journal for compaction with filename ~s"}}, + + {"PM001", + {info, "Indexed new cache entry with total L0 cache size now ~w"}}, + {"PM002", + {info, "Completed dump of L0 cache to list of size ~w"}} ])). @@ -132,7 +194,7 @@ log(LogReference, Subs) -> {ok, {LogLevel, LogText}} = dict:find(LogReference, ?LOGBASE), case lists:member(LogLevel, ?LOG_LEVEL) of true -> - io:format(LogText ++ "~n", Subs); + io:format(LogReference ++ " " ++ LogText ++ "~n", Subs); false -> ok end. @@ -148,7 +210,8 @@ log_timer(LogReference, Subs, StartTime) -> MicroS -> {"ms", MicroS div 1000} end, - io:format(LogText ++ " with time taken ~w " ++ Unit ++ "~n", + io:format(LogReference ++ " " ++ LogText ++ " with time taken ~w " + ++ Unit ++ "~n", Subs ++ [Time]); false -> ok diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 7336cd6..baeec07 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -715,7 +715,7 @@ checkready(Pid) -> %% to an immediate return as expected. With 32K keys in the TreeList it could %% take around 35-40ms. %% -%% To avoid blocking this gen_server, the SFT file cna request each item of the +%% To avoid blocking this gen_server, the SFT file can request each item of the %% cache one at a time. %% %% The Wait is set to false to use a cast when calling this in normal operation @@ -1417,6 +1417,10 @@ simple_server_test() -> 1)), ok = pcl_close(PclSnap), + % Ignore a fake pending mnaifest on startup + ok = file:write_file(RootPath ++ "/" ++ ?MANIFEST_FP ++ "nonzero_99.pnd", + term_to_binary("Hello")), + {ok, PclSnap2} = pcl_start(SnapOpts), ok = pcl_loadsnapshot(PclSnap2, gb_trees:empty()), ?assertMatch(false, pcl_checksequencenumber(PclSnap2, diff --git a/src/leveled_pmem.erl b/src/leveled_pmem.erl index e7b218a..fa30ab9 100644 --- a/src/leveled_pmem.erl +++ b/src/leveled_pmem.erl @@ -69,8 +69,7 @@ add_to_index(L0Index, L0Size, LevelMinus1, LedgerSQN, TreeList) -> {infinity, 0, L0Index}, LM1List), NewL0Size = length(LM1List) + L0Size, - io:format("Rolled L0 cache to size ~w in ~w microseconds~n", - [NewL0Size, timer:now_diff(os:timestamp(), SW)]), + leveled_log:log_timer("PM001", [NewL0Size], SW), if MinSQN > LedgerSQN -> {MaxSQN, @@ -90,8 +89,7 @@ to_list(Slots, FetchFun) -> end, [], SlotList), - io:format("L0 cache converted to list of size ~w in ~w microseconds~n", - [length(FullList), timer:now_diff(os:timestamp(), SW)]), + leveled_log:log_timer("PM002", [length(FullList)], SW), FullList.