diff --git a/src/leveled_imanifest.erl b/src/leveled_imanifest.erl index c115785..045be1a 100644 --- a/src/leveled_imanifest.erl +++ b/src/leveled_imanifest.erl @@ -11,6 +11,7 @@ -export([ generate_entry/1, add_entry/2, + append_lastkey/3, remove_entry/2, find_entry/2 @@ -38,6 +39,15 @@ add_entry(Manifest, Entry) -> StrippedName = filename:rootname(FN), lists:reverse(lists:sort([{SQN, StrippedName, PidR, LastKey}|Manifest])). +append_lastkey(Manifest, Pid, LastKey) -> + [{SQN, Filename, PidR, LK}|ManifestTail] = Manifest, + case {PidR, LK} of + {Pid, empty} -> + [{SQN, Filename, PidR, LastKey}|ManifestTail]; + _ -> + Manifest + end. + remove_entry(Manifest, Entry) -> {SQN, FN, _PidR, _LastKey} = Entry, leveled_log:log("I0013", [FN]), diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 51a4617..c404745 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -243,10 +243,7 @@ init([InkerOpts]) -> handle_call({put, Key, Object, KeyChanges}, _From, State) -> case put_object(Key, Object, KeyChanges, State) of - {ok, UpdState, ObjSize} -> - {reply, {ok, UpdState#state.journal_sqn, ObjSize}, UpdState}; - {rolling, UpdState, ObjSize} -> - ok = leveled_cdb:cdb_roll(State#state.active_journaldb), + {_, UpdState, ObjSize} -> {reply, {ok, UpdState#state.journal_sqn, ObjSize}, UpdState} end; handle_call({fetch, Key, SQN}, _From, State) -> @@ -412,13 +409,14 @@ start_from_file(InkOpts) -> put_object(LedgerKey, Object, KeyChanges, State) -> NewSQN = State#state.journal_sqn + 1, + ActiveJournal = State#state.active_journaldb, SW= os:timestamp(), {JournalKey, JournalBin} = leveled_codec:to_inkerkv(LedgerKey, NewSQN, Object, KeyChanges), T0 = timer:now_diff(os:timestamp(), SW), - case leveled_cdb:cdb_put(State#state.active_journaldb, + case leveled_cdb:cdb_put(ActiveJournal, JournalKey, JournalBin) of ok -> @@ -431,13 +429,18 @@ put_object(LedgerKey, Object, KeyChanges, State) -> byte_size(JournalBin)}; roll -> SWroll = os:timestamp(), + LastKey = leveled_cdb:cdb_lastkey(ActiveJournal), + ok = leveled_cdb:cdb_roll(ActiveJournal), + Manifest0 = leveled_imanifest:append_lastkey(State#state.manifest, + ActiveJournal, + LastKey), CDBopts = State#state.cdb_options, ManEntry = start_new_activejournal(NewSQN, State#state.root_path, CDBopts), {_, _, NewJournalP, _} = ManEntry, - NewManifest = leveled_imanifest:add_entry(State#state.manifest, ManEntry), - ok = simple_manifest_writer(NewManifest, + Manifest1 = leveled_imanifest:add_entry(Manifest0, ManEntry), + ok = simple_manifest_writer(Manifest1, State#state.manifest_sqn + 1, State#state.root_path), ok = leveled_cdb:cdb_put(NewJournalP, @@ -446,7 +449,7 @@ put_object(LedgerKey, Object, KeyChanges, State) -> leveled_log:log_timer("I0008", [], SWroll), {rolling, State#state{journal_sqn=NewSQN, - manifest=NewManifest, + manifest=Manifest1, manifest_sqn = State#state.manifest_sqn + 1, active_journaldb=NewJournalP}, byte_size(JournalBin)} @@ -615,7 +618,7 @@ load_from_sequence(MinSQN, FilterFun, PCL, [{_LowSQN, FN, Pid, _LK}|Rest]) -> undefined, FN, Rest); - [{NextSQN, _NxtFN, _NxtPid}|_Rest] when NextSQN > MinSQN -> + [{NextSQN, _NxtFN, _NxtPid, _NxtLK}|_Rest] when NextSQN > MinSQN -> load_between_sequence(MinSQN, MinSQN + ?LOADING_BATCH, FilterFun,