Rolback hash|no_hash and batch journal compaction

The no_hash option in CDB files became too hard to manage, in particular
the need to scan the whole file to find the last_key rather than cheat
and use the index.  It has been removed for now.

The writing to the journal during journal compaction has now been
enhanced by a mput option on the CDB file write - so it can write each
batch as one pwrite operation.
This commit is contained in:
martinsumner 2016-10-26 11:39:27 +01:00
parent 97087a6b2b
commit 2a47acc758
5 changed files with 103 additions and 68 deletions

View file

@ -57,7 +57,7 @@
cdb_open_reader/1,
cdb_get/2,
cdb_put/3,
cdb_put/4,
cdb_mput/2,
cdb_getpositions/2,
cdb_directfetch/3,
cdb_lastkey/1,
@ -127,10 +127,10 @@ cdb_get(Pid, Key) ->
gen_server:call(Pid, {get_kv, Key}, infinity).
cdb_put(Pid, Key, Value) ->
cdb_put(Pid, Key, Value, hash).
gen_server:call(Pid, {put_kv, Key, Value}, infinity).
cdb_put(Pid, Key, Value, HashOpt) ->
gen_server:call(Pid, {put_kv, Key, Value, HashOpt}, infinity).
cdb_mput(Pid, KVList) ->
gen_server:call(Pid, {mput_kv, KVList}, infinity).
%% SampleSize can be an integer or the atom all
cdb_getpositions(Pid, SampleSize) ->
@ -262,7 +262,7 @@ handle_call({key_check, Key}, _From, State) ->
State#state.hash_index),
State}
end;
handle_call({put_kv, Key, Value, HashOpt}, _From, State) ->
handle_call({put_kv, Key, Value}, _From, State) ->
case {State#state.writer, State#state.pending_roll} of
{true, false} ->
Result = put(State#state.handle,
@ -270,21 +270,39 @@ handle_call({put_kv, Key, Value, HashOpt}, _From, State) ->
{State#state.last_position, State#state.hashtree},
State#state.binary_mode,
State#state.max_size),
case {Result, HashOpt} of
{roll, _} ->
case Result of
roll ->
%% Key and value could not be written
{reply, roll, State};
{{UpdHandle, NewPosition, HashTree}, hash} ->
{UpdHandle, NewPosition, HashTree} ->
{reply, ok, State#state{handle=UpdHandle,
last_position=NewPosition,
last_key=Key,
hashtree=HashTree}};
{{UpdHandle, NewPosition, _HashTree}, no_hash} ->
%% Don't update the hashtree
hashtree=HashTree}}
end;
_ ->
{reply,
{error, read_only},
State}
end;
handle_call({mput_kv, KVList}, _From, State) ->
case {State#state.writer, State#state.pending_roll} of
{true, false} ->
Result = mput(State#state.handle,
KVList,
{State#state.last_position, State#state.hashtree},
State#state.binary_mode,
State#state.max_size),
case Result of
roll ->
%% Keys and values could not be written
{reply, roll, State};
{UpdHandle, NewPosition, HashTree, LastKey} ->
{reply, ok, State#state{handle=UpdHandle,
last_position=NewPosition,
last_key=Key}}
end;
last_key=LastKey,
hashtree=HashTree}}
end;
_ ->
{reply,
{error, read_only},
@ -542,13 +560,32 @@ put(Handle, Key, Value, {LastPosition, HashTree}, BinaryMode, MaxSize) ->
put_hashtree(Key, LastPosition, HashTree)}
end.
mput(Handle, [], {LastPosition, HashTree0}, _BinaryMode, _MaxSize) ->
{Handle, LastPosition, HashTree0};
mput(Handle, KVList, {LastPosition, HashTree0}, BinaryMode, MaxSize) ->
{KPList, Bin, LastKey} = multi_key_value_to_record(KVList,
BinaryMode,
LastPosition),
PotentialNewSize = LastPosition + byte_size(Bin),
if
PotentialNewSize > MaxSize ->
roll;
true ->
ok = file:pwrite(Handle, LastPosition, Bin),
HashTree1 = lists:foldl(fun({K, P}, Acc) ->
put_hashtree(K, P, Acc)
end,
HashTree0,
KPList),
{Handle, PotentialNewSize, HashTree1, LastKey}
end.
%% Should not be used for non-test PUTs by the inker - as the Max File Size
%% should be taken from the startup options not the default
put(FileName, Key, Value, {LastPosition, HashTree}) ->
put(FileName, Key, Value, {LastPosition, HashTree},
?BINARY_MODE, ?MAX_FILE_SIZE).
%%
%% get(FileName,Key) -> {key,value}
%% Given a filename and a key, returns a key and value tuple.
@ -757,27 +794,15 @@ find_lastkey(Handle, IndexCache) ->
IndexCache,
{fun scan_index_findlast/4,
{0, 0}}),
{ok, EOFPos} = file:position(Handle, eof),
io:format("TotalKeys ~w in file~n", [TotalKeys]),
case TotalKeys of
0 ->
scan_keys_forlast(Handle, EOFPos, ?BASE_POSITION, empty);
empty;
_ ->
{ok, _} = file:position(Handle, LastPosition),
{KeyLength, _ValueLength} = read_next_2_integers(Handle),
read_next_term(Handle, KeyLength)
end.
scan_keys_forlast(_Handle, EOFPos, NextPos, LastKey) when EOFPos == NextPos ->
LastKey;
scan_keys_forlast(Handle, EOFPos, NextPos, _LastKey) ->
{ok, _} = file:position(Handle, NextPos),
{KeyLength, ValueLength} = read_next_2_integers(Handle),
scan_keys_forlast(Handle,
EOFPos,
NextPos + KeyLength + ValueLength + ?DWORD_SIZE,
read_next_term(Handle, KeyLength)).
scan_index(Handle, IndexCache, {ScanFun, InitAcc}) ->
lists:foldl(fun({_X, {Pos, Count}}, Acc) ->
@ -1329,6 +1354,16 @@ key_value_to_record({Key, Value}, BinaryMode) ->
<<LK_FL:32, LV_FL:32, BK:LK/binary, CRC:32/integer, BV:LV/binary>>.
multi_key_value_to_record(KVList, BinaryMode, LastPosition) ->
lists:foldl(fun({K, V}, {KPosL, Bin, _LK}) ->
Bin0 = key_value_to_record({K, V}, BinaryMode),
{[{K, byte_size(Bin) + LastPosition}|KPosL],
<<Bin/binary, Bin0/binary>>,
K} end,
{[], <<>>, empty},
KVList).
%%%%%%%%%%%%%%%%
% T E S T
%%%%%%%%%%%%%%%
@ -1768,25 +1803,6 @@ get_keys_byposition_manykeys_test() ->
ok = file:delete(F2).
manykeys_but_nohash_test() ->
KeyCount = 1024,
{ok, P1} = cdb_open_writer("../test/nohash_keysinfile.pnd"),
KVList = generate_sequentialkeys(KeyCount, []),
lists:foreach(fun({K, V}) -> cdb_put(P1, K, V, no_hash) end, KVList),
SW1 = os:timestamp(),
{ok, F2} = cdb_complete(P1),
SW2 = os:timestamp(),
io:format("CDB completed in ~w microseconds~n",
[timer:now_diff(SW2, SW1)]),
{ok, P2} = cdb_open_reader(F2),
io:format("FirstKey is ~s~n", [cdb_firstkey(P2)]),
io:format("LastKey is ~s~n", [cdb_lastkey(P2)]),
?assertMatch("Key1", cdb_firstkey(P2)),
?assertMatch("Key1024", cdb_lastkey(P2)),
?assertMatch([], cdb_getpositions(P2, 100)),
ok = cdb_close(P2),
ok = file:delete(F2).
nokeys_test() ->
{ok, P1} = cdb_open_writer("../test/nohash_emptyfile.pnd"),
{ok, F2} = cdb_complete(P1),
@ -1798,4 +1814,21 @@ nokeys_test() ->
ok = cdb_close(P2),
ok = file:delete(F2).
mput_test() ->
KeyCount = 1024,
{ok, P1} = cdb_open_writer("../test/nohash_keysinfile.pnd"),
KVList = generate_sequentialkeys(KeyCount, []),
ok = cdb_mput(P1, KVList),
{ok, F2} = cdb_complete(P1),
{ok, P2} = cdb_open_reader(F2),
?assertMatch("Key1", cdb_firstkey(P2)),
?assertMatch("Key1024", cdb_lastkey(P2)),
?assertMatch({"Key1", "Value1"}, cdb_get(P2, "Key1")),
?assertMatch({"Key1024", "Value1024"}, cdb_get(P2, "Key1024")),
?assertMatch(missing, cdb_get(P2, "Key1025")),
?assertMatch(missing, cdb_get(P2, "Key1026")),
ok = cdb_close(P2),
ok = file:delete(F2).
-endif.

View file

@ -143,9 +143,9 @@ to_ledgerkey(Bucket, Key, Tag) ->
to_inkerkv(LedgerKey, SQN, to_fetch, null) ->
{{SQN, ?INKT_STND, LedgerKey}, null, true};
to_inkerkv(LedgerKey, SQN, Object, KeyChanges) ->
{InkerType, HashOpt} = check_forinkertype(LedgerKey, Object),
InkerType = check_forinkertype(LedgerKey, Object),
Value = create_value_for_journal({Object, KeyChanges}),
{{SQN, InkerType, LedgerKey}, Value, HashOpt}.
{{SQN, InkerType, LedgerKey}, Value}.
%% Used when fetching objects, so only handles standard, hashable entries
from_inkerkv(Object) ->
@ -192,9 +192,9 @@ split_inkvalue(VBin) ->
end.
check_forinkertype(_LedgerKey, delete) ->
{?INKT_TOMB, no_hash};
?INKT_TOMB;
check_forinkertype(_LedgerKey, _Object) ->
{?INKT_STND, hash}.
?INKT_STND.
create_value_for_journal(Value) ->
case Value of

View file

@ -463,10 +463,15 @@ filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) ->
write_values([], _CDBopts, Journal0, ManSlice0) ->
{Journal0, ManSlice0};
write_values([KVC|Rest], CDBopts, Journal0, ManSlice0) ->
{{SQN, Type, PK}, V, _CrcCheck} = KVC,
write_values(KVCList, CDBopts, Journal0, ManSlice0) ->
KVList = lists:map(fun({K, V, _C}) ->
{K, leveled_codec:create_value_for_journal(V)}
end,
KVCList),
{ok, Journal1} = case Journal0 of
null ->
{TK, _TV} = lists:nth(1, KVList),
{SQN, _LK} = leveled_codec:from_journalkey(TK),
FP = CDBopts#cdb_options.file_path,
FN = leveled_inker:filepath(FP,
SQN,
@ -479,14 +484,13 @@ write_values([KVC|Rest], CDBopts, Journal0, ManSlice0) ->
_ ->
{ok, Journal0}
end,
ValueToStore = leveled_codec:create_value_for_journal(V),
R = leveled_cdb:cdb_put(Journal1, {SQN, Type, PK}, ValueToStore),
R = leveled_cdb:cdb_mput(Journal1, KVList),
case R of
ok ->
write_values(Rest, CDBopts, Journal1, ManSlice0);
{Journal1, ManSlice0};
roll ->
ManSlice1 = ManSlice0 ++ generate_manifest_entry(Journal1),
write_values(Rest, CDBopts, null, ManSlice1)
write_values(KVCList, CDBopts, null, ManSlice1)
end.

View file

@ -382,14 +382,13 @@ start_from_file(InkOpts) ->
put_object(LedgerKey, Object, KeyChanges, State) ->
NewSQN = State#state.journal_sqn + 1,
{JournalKey, JournalBin, HashOpt} = leveled_codec:to_inkerkv(LedgerKey,
NewSQN,
Object,
KeyChanges),
{JournalKey, JournalBin} = leveled_codec:to_inkerkv(LedgerKey,
NewSQN,
Object,
KeyChanges),
case leveled_cdb:cdb_put(State#state.active_journaldb,
JournalKey,
JournalBin,
HashOpt) of
JournalBin) of
ok ->
{ok, State#state{journal_sqn=NewSQN}, byte_size(JournalBin)};
roll ->
@ -405,8 +404,7 @@ put_object(LedgerKey, Object, KeyChanges, State) ->
State#state.root_path),
ok = leveled_cdb:cdb_put(NewJournalP,
JournalKey,
JournalBin,
HashOpt),
JournalBin),
io:format("Put to new active journal " ++
"with manifest write took ~w microseconds~n",
[timer:now_diff(os:timestamp(),SW)]),

View file

@ -13,11 +13,11 @@
all() -> [
simple_put_fetch_head_delete,
many_put_fetch_head,
% many_put_fetch_head,
journal_compaction,
fetchput_snapshot,
load_and_count,
load_and_count_withdelete,
% fetchput_snapshot,
% load_and_count,
% load_and_count_withdelete,
space_clear_ondelete_test
].