diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index fd8527e..c092d82 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -65,6 +65,7 @@ -export([cdb_open_writer/1, cdb_open_writer/2, cdb_open_reader/1, + cdb_reopen_reader/2, cdb_get/2, cdb_put/3, cdb_mput/2, @@ -125,6 +126,13 @@ cdb_open_writer(Filename, Opts) -> ok = gen_fsm:sync_send_event(Pid, {open_writer, Filename}, infinity), {ok, Pid}. +cdb_reopen_reader(Filename, LastKey) -> + {ok, Pid} = gen_fsm:start(?MODULE, [#cdb_options{binary_mode=true}], []), + ok = gen_fsm:sync_send_event(Pid, + {open_reader, Filename, LastKey}, + infinity), + {ok, Pid}. + cdb_open_reader(Filename) -> cdb_open_reader(Filename, #cdb_options{binary_mode=true}). @@ -241,6 +249,13 @@ starting({open_writer, Filename}, _From, State) -> starting({open_reader, Filename}, _From, State) -> leveled_log:log("CDB02", [Filename]), {Handle, Index, LastKey} = open_for_readonly(Filename, false), + {reply, ok, reader, State#state{handle=Handle, + last_key=LastKey, + filename=Filename, + hash_index=Index}}; +starting({open_reader, Filename, LastKey}, _From, State) -> + leveled_log:log("CDB02", [Filename]), + {Handle, Index, LastKey} = open_for_readonly(Filename, LastKey), {reply, ok, reader, State#state{handle=Handle, last_key=LastKey, filename=Filename, diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index f66fcd0..9b47884 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -194,7 +194,8 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout}, FilesToDelete = lists:map(fun(C) -> {C#candidate.low_sqn, C#candidate.filename, - C#candidate.journal} + C#candidate.journal, + undefined} end, BestRun1), leveled_log:log("IC002", [length(FilesToDelete)]), @@ -274,7 +275,7 @@ scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN) -> scan_all_files([], _FilterFun, _FilterServer, _MaxSQN, CandidateList) -> CandidateList; scan_all_files([Entry|Tail], FilterFun, FilterServer, MaxSQN, CandidateList) -> - {LowSQN, FN, JournalP} = Entry, + {LowSQN, FN, JournalP, _LK} = Entry, CpctPerc = check_single_file(JournalP, FilterFun, FilterServer, @@ -390,7 +391,7 @@ update_inker(Inker, ManifestSlice, FilesToDelete) -> FilesToDelete), ok = leveled_inker:ink_compactioncomplete(Inker), leveled_log:log("IC007", []), - lists:foreach(fun({_SQN, _FN, J2D}) -> + lists:foreach(fun({_SQN, _FN, J2D, _LK}) -> leveled_cdb:cdb_deletepending(J2D, ManSQN, Inker) diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index c9d1a74..51a4617 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -435,7 +435,7 @@ put_object(LedgerKey, Object, KeyChanges, State) -> ManEntry = start_new_activejournal(NewSQN, State#state.root_path, CDBopts), - {_, _, NewJournalP} = ManEntry, + {_, _, NewJournalP, _} = ManEntry, NewManifest = leveled_imanifest:add_entry(State#state.manifest, ManEntry), ok = simple_manifest_writer(NewManifest, State#state.manifest_sqn + 1, @@ -534,29 +534,6 @@ open_all_manifest([], RootPath, CDBOpts) -> open_all_manifest(Man0, RootPath, CDBOpts) -> Man1 = lists:reverse(lists:sort(Man0)), [{HeadSQN, HeadFN, _IgnorePid, HeadLK}|ManifestTail] = Man1, - CompleteHeadFN = HeadFN ++ "." ++ ?JOURNAL_FILEX, - PendingHeadFN = HeadFN ++ "." ++ ?PENDING_FILEX, - Man2 = case filelib:is_file(CompleteHeadFN) of - true -> - leveled_log:log("I0012", [HeadFN]), - {ok, HeadR} = leveled_cdb:cdb_open_reader(CompleteHeadFN), - LastKey = leveled_cdb:cdb_lastkey(HeadR), - LastSQN = element(1, LastKey), - ManToHead = leveled_imanifest:add_entry(ManifestTail, - {HeadSQN, - HeadFN, - HeadR, - LastKey}), - NewManEntry = start_new_activejournal(LastSQN + 1, - RootPath, - CDBOpts), - leveled_imanifest:add_entry(ManToHead, NewManEntry); - false -> - {ok, HeadW} = leveled_cdb:cdb_open_writer(PendingHeadFN, - CDBOpts), - leveled_imanifest:add_entry(ManifestTail, - {HeadSQN, HeadFN, HeadW, HeadLK}) - end, OpenJournalFun = fun(ManEntry) -> case ManEntry of @@ -565,7 +542,8 @@ open_all_manifest(Man0, RootPath, CDBOpts) -> PFN = FN ++ "." ++ ?PENDING_FILEX, case filelib:is_file(CFN) of true -> - {ok, Pid} = leveled_cdb:cdb_open_reader(CFN), + {ok, Pid} = leveled_cdb:cdb_reopen_reader(CFN, + LK_RO), {LowSQN, FN, Pid, LK_RO}; false -> W = leveled_cdb:cdb_open_writer(PFN, CDBOpts), @@ -578,7 +556,30 @@ open_all_manifest(Man0, RootPath, CDBOpts) -> ManEntry end end, - lists:map(OpenJournalFun, Man2). + OpenedTail = lists:map(OpenJournalFun, ManifestTail), + CompleteHeadFN = HeadFN ++ "." ++ ?JOURNAL_FILEX, + PendingHeadFN = HeadFN ++ "." ++ ?PENDING_FILEX, + case filelib:is_file(CompleteHeadFN) of + true -> + leveled_log:log("I0012", [HeadFN]), + {ok, HeadR} = leveled_cdb:cdb_open_reader(CompleteHeadFN), + LastKey = leveled_cdb:cdb_lastkey(HeadR), + LastSQN = element(1, LastKey), + ManToHead = leveled_imanifest:add_entry(OpenedTail, + {HeadSQN, + HeadFN, + HeadR, + LastKey}), + NewManEntry = start_new_activejournal(LastSQN + 1, + RootPath, + CDBOpts), + leveled_imanifest:add_entry(ManToHead, NewManEntry); + false -> + {ok, HeadW} = leveled_cdb:cdb_open_writer(PendingHeadFN, + CDBOpts), + leveled_imanifest:add_entry(OpenedTail, + {HeadSQN, HeadFN, HeadW, HeadLK}) + end. start_new_activejournal(SQN, RootPath, CDBOpts) -> @@ -591,25 +592,25 @@ start_new_activejournal(SQN, RootPath, CDBOpts) -> %% FilterFun{K, V, Acc} -> Penciller Key List %% Load the output for the CDB file into the Penciller. -load_from_sequence(_MinSQN, _FilterFun, _Penciller, []) -> +load_from_sequence(_MinSQN, _FilterFun, _PCL, []) -> ok; -load_from_sequence(MinSQN, FilterFun, Penciller, [{LowSQN, FN, Pid}|Rest]) +load_from_sequence(MinSQN, FilterFun, PCL, [{LowSQN, FN, Pid, _LK}|Rest]) when LowSQN >= MinSQN -> load_between_sequence(MinSQN, MinSQN + ?LOADING_BATCH, FilterFun, - Penciller, + PCL, Pid, undefined, FN, Rest); -load_from_sequence(MinSQN, FilterFun, Penciller, [{_LowSQN, FN, Pid}|Rest]) -> +load_from_sequence(MinSQN, FilterFun, PCL, [{_LowSQN, FN, Pid, _LK}|Rest]) -> case Rest of [] -> load_between_sequence(MinSQN, MinSQN + ?LOADING_BATCH, FilterFun, - Penciller, + PCL, Pid, undefined, FN, @@ -618,13 +619,13 @@ load_from_sequence(MinSQN, FilterFun, Penciller, [{_LowSQN, FN, Pid}|Rest]) -> load_between_sequence(MinSQN, MinSQN + ?LOADING_BATCH, FilterFun, - Penciller, + PCL, Pid, undefined, FN, Rest); _ -> - load_from_sequence(MinSQN, FilterFun, Penciller, Rest) + load_from_sequence(MinSQN, FilterFun, PCL, Rest) end. @@ -898,7 +899,7 @@ compact_journal_test() -> 5000), timer:sleep(1000), CompactedManifest2 = ink_getmanifest(Ink1), - R = lists:foldl(fun({_SQN, FN, _P}, Acc) -> + R = lists:foldl(fun({_SQN, FN, _P, _LK}, Acc) -> case string:str(FN, "post_compact") of N when N > 0 -> true;