Resolve test issues
This commit is contained in:
parent
3ca928629c
commit
1f406c76dd
2 changed files with 22 additions and 9 deletions
|
@ -11,6 +11,7 @@
|
||||||
-export([
|
-export([
|
||||||
generate_entry/1,
|
generate_entry/1,
|
||||||
add_entry/2,
|
add_entry/2,
|
||||||
|
append_lastkey/3,
|
||||||
remove_entry/2,
|
remove_entry/2,
|
||||||
find_entry/2
|
find_entry/2
|
||||||
|
|
||||||
|
@ -38,6 +39,15 @@ add_entry(Manifest, Entry) ->
|
||||||
StrippedName = filename:rootname(FN),
|
StrippedName = filename:rootname(FN),
|
||||||
lists:reverse(lists:sort([{SQN, StrippedName, PidR, LastKey}|Manifest])).
|
lists:reverse(lists:sort([{SQN, StrippedName, PidR, LastKey}|Manifest])).
|
||||||
|
|
||||||
|
append_lastkey(Manifest, Pid, LastKey) ->
|
||||||
|
[{SQN, Filename, PidR, LK}|ManifestTail] = Manifest,
|
||||||
|
case {PidR, LK} of
|
||||||
|
{Pid, empty} ->
|
||||||
|
[{SQN, Filename, PidR, LastKey}|ManifestTail];
|
||||||
|
_ ->
|
||||||
|
Manifest
|
||||||
|
end.
|
||||||
|
|
||||||
remove_entry(Manifest, Entry) ->
|
remove_entry(Manifest, Entry) ->
|
||||||
{SQN, FN, _PidR, _LastKey} = Entry,
|
{SQN, FN, _PidR, _LastKey} = Entry,
|
||||||
leveled_log:log("I0013", [FN]),
|
leveled_log:log("I0013", [FN]),
|
||||||
|
|
|
@ -243,10 +243,7 @@ init([InkerOpts]) ->
|
||||||
|
|
||||||
handle_call({put, Key, Object, KeyChanges}, _From, State) ->
|
handle_call({put, Key, Object, KeyChanges}, _From, State) ->
|
||||||
case put_object(Key, Object, KeyChanges, State) of
|
case put_object(Key, Object, KeyChanges, State) of
|
||||||
{ok, UpdState, ObjSize} ->
|
{_, UpdState, ObjSize} ->
|
||||||
{reply, {ok, UpdState#state.journal_sqn, ObjSize}, UpdState};
|
|
||||||
{rolling, UpdState, ObjSize} ->
|
|
||||||
ok = leveled_cdb:cdb_roll(State#state.active_journaldb),
|
|
||||||
{reply, {ok, UpdState#state.journal_sqn, ObjSize}, UpdState}
|
{reply, {ok, UpdState#state.journal_sqn, ObjSize}, UpdState}
|
||||||
end;
|
end;
|
||||||
handle_call({fetch, Key, SQN}, _From, State) ->
|
handle_call({fetch, Key, SQN}, _From, State) ->
|
||||||
|
@ -412,13 +409,14 @@ start_from_file(InkOpts) ->
|
||||||
|
|
||||||
put_object(LedgerKey, Object, KeyChanges, State) ->
|
put_object(LedgerKey, Object, KeyChanges, State) ->
|
||||||
NewSQN = State#state.journal_sqn + 1,
|
NewSQN = State#state.journal_sqn + 1,
|
||||||
|
ActiveJournal = State#state.active_journaldb,
|
||||||
SW= os:timestamp(),
|
SW= os:timestamp(),
|
||||||
{JournalKey, JournalBin} = leveled_codec:to_inkerkv(LedgerKey,
|
{JournalKey, JournalBin} = leveled_codec:to_inkerkv(LedgerKey,
|
||||||
NewSQN,
|
NewSQN,
|
||||||
Object,
|
Object,
|
||||||
KeyChanges),
|
KeyChanges),
|
||||||
T0 = timer:now_diff(os:timestamp(), SW),
|
T0 = timer:now_diff(os:timestamp(), SW),
|
||||||
case leveled_cdb:cdb_put(State#state.active_journaldb,
|
case leveled_cdb:cdb_put(ActiveJournal,
|
||||||
JournalKey,
|
JournalKey,
|
||||||
JournalBin) of
|
JournalBin) of
|
||||||
ok ->
|
ok ->
|
||||||
|
@ -431,13 +429,18 @@ put_object(LedgerKey, Object, KeyChanges, State) ->
|
||||||
byte_size(JournalBin)};
|
byte_size(JournalBin)};
|
||||||
roll ->
|
roll ->
|
||||||
SWroll = os:timestamp(),
|
SWroll = os:timestamp(),
|
||||||
|
LastKey = leveled_cdb:cdb_lastkey(ActiveJournal),
|
||||||
|
ok = leveled_cdb:cdb_roll(ActiveJournal),
|
||||||
|
Manifest0 = leveled_imanifest:append_lastkey(State#state.manifest,
|
||||||
|
ActiveJournal,
|
||||||
|
LastKey),
|
||||||
CDBopts = State#state.cdb_options,
|
CDBopts = State#state.cdb_options,
|
||||||
ManEntry = start_new_activejournal(NewSQN,
|
ManEntry = start_new_activejournal(NewSQN,
|
||||||
State#state.root_path,
|
State#state.root_path,
|
||||||
CDBopts),
|
CDBopts),
|
||||||
{_, _, NewJournalP, _} = ManEntry,
|
{_, _, NewJournalP, _} = ManEntry,
|
||||||
NewManifest = leveled_imanifest:add_entry(State#state.manifest, ManEntry),
|
Manifest1 = leveled_imanifest:add_entry(Manifest0, ManEntry),
|
||||||
ok = simple_manifest_writer(NewManifest,
|
ok = simple_manifest_writer(Manifest1,
|
||||||
State#state.manifest_sqn + 1,
|
State#state.manifest_sqn + 1,
|
||||||
State#state.root_path),
|
State#state.root_path),
|
||||||
ok = leveled_cdb:cdb_put(NewJournalP,
|
ok = leveled_cdb:cdb_put(NewJournalP,
|
||||||
|
@ -446,7 +449,7 @@ put_object(LedgerKey, Object, KeyChanges, State) ->
|
||||||
leveled_log:log_timer("I0008", [], SWroll),
|
leveled_log:log_timer("I0008", [], SWroll),
|
||||||
{rolling,
|
{rolling,
|
||||||
State#state{journal_sqn=NewSQN,
|
State#state{journal_sqn=NewSQN,
|
||||||
manifest=NewManifest,
|
manifest=Manifest1,
|
||||||
manifest_sqn = State#state.manifest_sqn + 1,
|
manifest_sqn = State#state.manifest_sqn + 1,
|
||||||
active_journaldb=NewJournalP},
|
active_journaldb=NewJournalP},
|
||||||
byte_size(JournalBin)}
|
byte_size(JournalBin)}
|
||||||
|
@ -615,7 +618,7 @@ load_from_sequence(MinSQN, FilterFun, PCL, [{_LowSQN, FN, Pid, _LK}|Rest]) ->
|
||||||
undefined,
|
undefined,
|
||||||
FN,
|
FN,
|
||||||
Rest);
|
Rest);
|
||||||
[{NextSQN, _NxtFN, _NxtPid}|_Rest] when NextSQN > MinSQN ->
|
[{NextSQN, _NxtFN, _NxtPid, _NxtLK}|_Rest] when NextSQN > MinSQN ->
|
||||||
load_between_sequence(MinSQN,
|
load_between_sequence(MinSQN,
|
||||||
MinSQN + ?LOADING_BATCH,
|
MinSQN + ?LOADING_BATCH,
|
||||||
FilterFun,
|
FilterFun,
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue