Trying to standardise binary manipulation of value

Looking into theory that use of term_to_binary is imperfect.  Also may
be better to compress values only when they are compacted?
This commit is contained in:
martinsumner 2017-03-20 15:43:54 +00:00
parent 0cdc0eb558
commit f3ffa920af
4 changed files with 230 additions and 112 deletions

View file

@ -65,6 +65,7 @@
-export([cdb_open_writer/1, -export([cdb_open_writer/1,
cdb_open_writer/2, cdb_open_writer/2,
cdb_open_reader/1, cdb_open_reader/1,
cdb_open_reader/2,
cdb_reopen_reader/2, cdb_reopen_reader/2,
cdb_get/2, cdb_get/2,
cdb_put/3, cdb_put/3,
@ -262,12 +263,19 @@ starting({open_reader, Filename, LastKey}, _From, State) ->
writer({get_kv, Key}, _From, State) -> writer({get_kv, Key}, _From, State) ->
{reply, {reply,
get_mem(Key, State#state.handle, State#state.hashtree), get_mem(Key,
State#state.handle,
State#state.hashtree,
State#state.binary_mode),
writer, writer,
State}; State};
writer({key_check, Key}, _From, State) -> writer({key_check, Key}, _From, State) ->
{reply, {reply,
get_mem(Key, State#state.handle, State#state.hashtree, loose_presence), get_mem(Key,
State#state.handle,
State#state.hashtree,
State#state.binary_mode,
loose_presence),
writer, writer,
State}; State};
writer({put_kv, Key, Value}, _From, State) -> writer({put_kv, Key, Value}, _From, State) ->
@ -329,12 +337,19 @@ writer(cdb_roll, State) ->
rolling({get_kv, Key}, _From, State) -> rolling({get_kv, Key}, _From, State) ->
{reply, {reply,
get_mem(Key, State#state.handle, State#state.hashtree), get_mem(Key,
State#state.handle,
State#state.hashtree,
State#state.binary_mode),
rolling, rolling,
State}; State};
rolling({key_check, Key}, _From, State) -> rolling({key_check, Key}, _From, State) ->
{reply, {reply,
get_mem(Key, State#state.handle, State#state.hashtree, loose_presence), get_mem(Key,
State#state.handle,
State#state.hashtree,
State#state.binary_mode,
loose_presence),
rolling, rolling,
State}; State};
rolling({get_positions, _SampleSize}, _From, State) -> rolling({get_positions, _SampleSize}, _From, State) ->
@ -374,7 +389,10 @@ rolling({delete_pending, ManSQN, Inker}, State) ->
reader({get_kv, Key}, _From, State) -> reader({get_kv, Key}, _From, State) ->
{reply, {reply,
get_withcache(State#state.handle, Key, State#state.hash_index), get_withcache(State#state.handle,
Key,
State#state.hash_index,
State#state.binary_mode),
reader, reader,
State}; State};
reader({key_check, Key}, _From, State) -> reader({key_check, Key}, _From, State) ->
@ -382,7 +400,8 @@ reader({key_check, Key}, _From, State) ->
get_withcache(State#state.handle, get_withcache(State#state.handle,
Key, Key,
State#state.hash_index, State#state.hash_index,
loose_presence), loose_presence,
State#state.binary_mode),
reader, reader,
State}; State};
reader({get_positions, SampleSize}, _From, State) -> reader({get_positions, SampleSize}, _From, State) ->
@ -430,9 +449,11 @@ reader({direct_fetch, PositionList, Info}, _From, State) ->
FilterFalseKey(extract_key_size(H, P)) end, FilterFalseKey(extract_key_size(H, P)) end,
PositionList); PositionList);
key_value_check -> key_value_check ->
BM = State#state.binary_mode,
lists:filtermap( lists:filtermap(
fun(P) -> fun(P) ->
FilterFalseKey(extract_key_value_check(H, P)) end, FilterFalseKey(extract_key_value_check(H, P, BM))
end,
PositionList) PositionList)
end, end,
{reply, Reply, reader, State}; {reply, Reply, reader, State};
@ -456,7 +477,10 @@ reader({delete_pending, ManSQN, Inker}, State) ->
delete_pending({get_kv, Key}, _From, State) -> delete_pending({get_kv, Key}, _From, State) ->
{reply, {reply,
get_withcache(State#state.handle, Key, State#state.hash_index), get_withcache(State#state.handle,
Key,
State#state.hash_index,
State#state.binary_mode),
delete_pending, delete_pending,
State, State,
?DELETE_TIMEOUT}; ?DELETE_TIMEOUT};
@ -465,7 +489,8 @@ delete_pending({key_check, Key}, _From, State) ->
get_withcache(State#state.handle, get_withcache(State#state.handle,
Key, Key,
State#state.hash_index, State#state.hash_index,
loose_presence), loose_presence,
State#state.binary_mode),
delete_pending, delete_pending,
State, State,
?DELETE_TIMEOUT}. ?DELETE_TIMEOUT}.
@ -687,19 +712,19 @@ put(FileName, Key, Value, {LastPosition, HashTree}) ->
%% %%
get_withcache(Handle, Key, Cache) -> get_withcache(Handle, Key, Cache, BinaryMode) ->
get(Handle, Key, Cache, true). get(Handle, Key, Cache, true, BinaryMode).
get_withcache(Handle, Key, Cache, QuickCheck) -> get_withcache(Handle, Key, Cache, QuickCheck, BinaryMode) ->
get(Handle, Key, Cache, QuickCheck). get(Handle, Key, Cache, QuickCheck, BinaryMode).
get(FileNameOrHandle, Key) -> get(FileNameOrHandle, Key, BinaryMode) ->
get(FileNameOrHandle, Key, no_cache, true). get(FileNameOrHandle, Key, no_cache, true, BinaryMode).
get(FileName, Key, Cache, QuickCheck) when is_list(FileName) -> get(FileName, Key, Cache, QuickCheck, BinaryMode) when is_list(FileName) ->
{ok, Handle} = file:open(FileName,[binary, raw, read]), {ok, Handle} = file:open(FileName,[binary, raw, read]),
get(Handle, Key, Cache, QuickCheck); get(Handle, Key, Cache, QuickCheck, BinaryMode);
get(Handle, Key, Cache, QuickCheck) when is_tuple(Handle) -> get(Handle, Key, Cache, QuickCheck, BinaryMode) when is_tuple(Handle) ->
Hash = hash(Key), Hash = hash(Key),
Index = hash_to_index(Hash), Index = hash_to_index(Hash),
{HashTable, Count} = get_index(Handle, Index, Cache), {HashTable, Count} = get_index(Handle, Index, Cache),
@ -722,7 +747,8 @@ get(Handle, Key, Cache, QuickCheck) when is_tuple(Handle) ->
lists:append(L2, L1), lists:append(L2, L1),
Hash, Hash,
Key, Key,
QuickCheck) QuickCheck,
BinaryMode)
end. end.
get_index(Handle, Index, no_cache) -> get_index(Handle, Index, no_cache) ->
@ -736,13 +762,13 @@ get_index(_Handle, Index, Cache) ->
%% Get a Key/Value pair from an active CDB file (with no hash table written) %% Get a Key/Value pair from an active CDB file (with no hash table written)
%% This requires a key dictionary to be passed in (mapping keys to positions) %% This requires a key dictionary to be passed in (mapping keys to positions)
%% Will return {Key, Value} or missing %% Will return {Key, Value} or missing
get_mem(Key, FNOrHandle, HashTree) -> get_mem(Key, FNOrHandle, HashTree, BinaryMode) ->
get_mem(Key, FNOrHandle, HashTree, true). get_mem(Key, FNOrHandle, HashTree, BinaryMode, true).
get_mem(Key, Filename, HashTree, QuickCheck) when is_list(Filename) -> get_mem(Key, Filename, HashTree, BinaryMode, QuickCheck) when is_list(Filename) ->
{ok, Handle} = file:open(Filename, [binary, raw, read]), {ok, Handle} = file:open(Filename, [binary, raw, read]),
get_mem(Key, Handle, HashTree, QuickCheck); get_mem(Key, Handle, HashTree, BinaryMode, QuickCheck);
get_mem(Key, Handle, HashTree, QuickCheck) -> get_mem(Key, Handle, HashTree, BinaryMode, QuickCheck) ->
ListToCheck = get_hashtree(Key, HashTree), ListToCheck = get_hashtree(Key, HashTree),
case {QuickCheck, ListToCheck} of case {QuickCheck, ListToCheck} of
{loose_presence, []} -> {loose_presence, []} ->
@ -750,7 +776,7 @@ get_mem(Key, Handle, HashTree, QuickCheck) ->
{loose_presence, _L} -> {loose_presence, _L} ->
probably; probably;
_ -> _ ->
extract_kvpair(Handle, ListToCheck, Key) extract_kvpair(Handle, ListToCheck, Key, BinaryMode)
end. end.
%% Get the next key at a position in the file (or the first key if no position %% Get the next key at a position in the file (or the first key if no position
@ -767,7 +793,7 @@ get_nextkey(Handle, {Position, FirstHashPosition}) ->
{ok, Position} = file:position(Handle, Position), {ok, Position} = file:position(Handle, Position),
case read_next_2_integers(Handle) of case read_next_2_integers(Handle) of
{KeyLength, ValueLength} -> {KeyLength, ValueLength} ->
NextKey = read_next_term(Handle, KeyLength), NextKey = read_next_key(Handle, KeyLength),
NextPosition = Position + KeyLength + ValueLength + ?DWORD_SIZE, NextPosition = Position + KeyLength + ValueLength + ?DWORD_SIZE,
case NextPosition of case NextPosition of
FirstHashPosition -> FirstHashPosition ->
@ -830,7 +856,7 @@ find_lastkey(Handle, IndexCache) ->
_ -> _ ->
{ok, _} = file:position(Handle, LastPosition), {ok, _} = file:position(Handle, LastPosition),
{KeyLength, _ValueLength} = read_next_2_integers(Handle), {KeyLength, _ValueLength} = read_next_2_integers(Handle),
read_next_term(Handle, KeyLength) read_next_key(Handle, KeyLength)
end. end.
@ -902,39 +928,49 @@ put_hashtree(Key, Position, HashTree) ->
%% Function to extract a Key-Value pair given a file handle and a position %% Function to extract a Key-Value pair given a file handle and a position
%% Will confirm that the key matches and do a CRC check %% Will confirm that the key matches and do a CRC check
extract_kvpair(_, [], _) -> extract_kvpair(_H, [], _K, _BinaryMode) ->
missing; missing;
extract_kvpair(Handle, [Position|Rest], Key) -> extract_kvpair(Handle, [Position|Rest], Key, BinaryMode) ->
{ok, _} = file:position(Handle, Position), {ok, _} = file:position(Handle, Position),
{KeyLength, ValueLength} = read_next_2_integers(Handle), {KeyLength, ValueLength} = read_next_2_integers(Handle),
case safe_read_next_term(Handle, KeyLength) of case safe_read_next_key(Handle, KeyLength) of
Key -> % If same key as passed in, then found! Key -> % If same key as passed in, then found!
case read_next_term(Handle, ValueLength, crc) of case read_next_value(Handle, ValueLength, crc) of
{false, _} -> {false, _} ->
crc_wonky; crc_wonky;
{_, Value} -> {_, Value} ->
{Key,Value} case BinaryMode of
true ->
{Key, Value};
false ->
{Key, binary_to_term(Value)}
end
end; end;
_ -> _ ->
extract_kvpair(Handle, Rest, Key) extract_kvpair(Handle, Rest, Key, BinaryMode)
end. end.
extract_key(Handle, Position) -> extract_key(Handle, Position) ->
{ok, _} = file:position(Handle, Position), {ok, _} = file:position(Handle, Position),
{KeyLength, _ValueLength} = read_next_2_integers(Handle), {KeyLength, _ValueLength} = read_next_2_integers(Handle),
{safe_read_next_term(Handle, KeyLength)}. {safe_read_next_key(Handle, KeyLength)}.
extract_key_size(Handle, Position) -> extract_key_size(Handle, Position) ->
{ok, _} = file:position(Handle, Position), {ok, _} = file:position(Handle, Position),
{KeyLength, ValueLength} = read_next_2_integers(Handle), {KeyLength, ValueLength} = read_next_2_integers(Handle),
{safe_read_next_term(Handle, KeyLength), ValueLength}. {safe_read_next_key(Handle, KeyLength), ValueLength}.
extract_key_value_check(Handle, Position) -> extract_key_value_check(Handle, Position, BinaryMode) ->
{ok, _} = file:position(Handle, Position), {ok, _} = file:position(Handle, Position),
{KeyLength, ValueLength} = read_next_2_integers(Handle), {KeyLength, ValueLength} = read_next_2_integers(Handle),
K = safe_read_next_term(Handle, KeyLength), K = safe_read_next_key(Handle, KeyLength),
{Check, V} = read_next_term(Handle, ValueLength, crc), {Check, V} = read_next_value(Handle, ValueLength, crc),
{K, V, Check}. case BinaryMode of
true ->
{K, V, Check};
false ->
{K, binary_to_term(V), Check}
end.
%% Scan through the file until there is a failure to crc check an input, and %% Scan through the file until there is a failure to crc check an input, and
%% at that point return the position and the key dictionary scanned so far %% at that point return the position and the key dictionary scanned so far
@ -1018,7 +1054,7 @@ saferead_keyvalue(Handle) ->
eof -> eof ->
false; false;
{KeyL, ValueL} -> {KeyL, ValueL} ->
case safe_read_next_term(Handle, KeyL) of case safe_read_next_key(Handle, KeyL) of
{error, _} -> {error, _} ->
false; false;
eof -> eof ->
@ -1041,8 +1077,8 @@ saferead_keyvalue(Handle) ->
end. end.
safe_read_next_term(Handle, Length) -> safe_read_next_key(Handle, Length) ->
try read_next_term(Handle, Length) of try read_next_key(Handle, Length) of
Term -> Term ->
Term Term
catch catch
@ -1074,7 +1110,7 @@ calc_crc(Value) ->
erlang:crc32(<<Value/bitstring,0:M>>) erlang:crc32(<<Value/bitstring,0:M>>)
end. end.
read_next_term(Handle, Length) -> read_next_key(Handle, Length) ->
case file:read(Handle, Length) of case file:read(Handle, Length) of
{ok, Bin} -> {ok, Bin} ->
binary_to_term(Bin); binary_to_term(Bin);
@ -1082,13 +1118,14 @@ read_next_term(Handle, Length) ->
ReadError ReadError
end. end.
%% Read next string where the string has a CRC prepended - stripping the crc %% Read next string where the string has a CRC prepended - stripping the crc
%% and checking if requested %% and checking if requested
read_next_term(Handle, Length, crc) -> read_next_value(Handle, Length, crc) ->
{ok, <<CRC:32/integer, Bin/binary>>} = file:read(Handle, Length), {ok, <<CRC:32/integer, Bin/binary>>} = file:read(Handle, Length),
case calc_crc(Bin) of case calc_crc(Bin) of
CRC -> CRC ->
{true, binary_to_term(Bin)}; {true, Bin};
_ -> _ ->
{false, crc_wonky} {false, crc_wonky}
end. end.
@ -1096,7 +1133,7 @@ read_next_term(Handle, Length, crc) ->
%% Extract value and size from binary containing CRC %% Extract value and size from binary containing CRC
extract_valueandsize(ValueAsBin) -> extract_valueandsize(ValueAsBin) ->
<<_CRC:32/integer, Bin/binary>> = ValueAsBin, <<_CRC:32/integer, Bin/binary>> = ValueAsBin,
{binary_to_term(Bin), byte_size(Bin)}. {Bin, byte_size(Bin)}.
%% Used for reading lengths %% Used for reading lengths
@ -1129,14 +1166,15 @@ read_integerpairs(<<Int1:32, Int2:32, Rest/binary>>, Pairs) ->
%% false - don't check the CRC before returning key & value %% false - don't check the CRC before returning key & value
%% loose_presence - confirm that the hash of the key is present %% loose_presence - confirm that the hash of the key is present
search_hash_table(Handle, Entries, Hash, Key, QuickCheck) -> search_hash_table(Handle, Entries, Hash, Key, QuickCheck, BinaryMode) ->
search_hash_table(Handle, Entries, Hash, Key, QuickCheck, 0). search_hash_table(Handle, Entries, Hash, Key, QuickCheck, BinaryMode, 0).
search_hash_table(_Handle, [], Hash, _Key, _QuickCheck, CycleCount) -> search_hash_table(_Handle, [], Hash, _Key,
_QuickCheck, _BinaryMode, CycleCount) ->
log_cyclecount(CycleCount, Hash, missing), log_cyclecount(CycleCount, Hash, missing),
missing; missing;
search_hash_table(Handle, [Entry|RestOfEntries], Hash, Key, search_hash_table(Handle, [Entry|RestOfEntries], Hash, Key,
QuickCheck, CycleCount) -> QuickCheck, BinaryMode, CycleCount) ->
{ok, _} = file:position(Handle, Entry), {ok, _} = file:position(Handle, Entry),
{StoredHash, DataLoc} = read_next_2_integers(Handle), {StoredHash, DataLoc} = read_next_2_integers(Handle),
case StoredHash of case StoredHash of
@ -1145,7 +1183,7 @@ search_hash_table(Handle, [Entry|RestOfEntries], Hash, Key,
loose_presence -> loose_presence ->
probably; probably;
_ -> _ ->
extract_kvpair(Handle, [DataLoc], Key) extract_kvpair(Handle, [DataLoc], Key, BinaryMode)
end, end,
case KV of case KV of
missing -> missing ->
@ -1154,6 +1192,7 @@ search_hash_table(Handle, [Entry|RestOfEntries], Hash, Key,
Hash, Hash,
Key, Key,
QuickCheck, QuickCheck,
BinaryMode,
CycleCount + 1); CycleCount + 1);
_ -> _ ->
log_cyclecount(CycleCount, Hash, found), log_cyclecount(CycleCount, Hash, found),
@ -1164,7 +1203,7 @@ search_hash_table(Handle, [Entry|RestOfEntries], Hash, Key,
% missing; % missing;
_ -> _ ->
search_hash_table(Handle, RestOfEntries, Hash, Key, search_hash_table(Handle, RestOfEntries, Hash, Key,
QuickCheck, CycleCount + 1) QuickCheck, BinaryMode, CycleCount + 1)
end. end.
log_cyclecount(CycleCount, Hash, Result) -> log_cyclecount(CycleCount, Hash, Result) ->
@ -1441,15 +1480,15 @@ dump(FileName) ->
NumberOfPairs = lists:foldl(Fn, 0, lists:seq(0,255)) bsr 1, NumberOfPairs = lists:foldl(Fn, 0, lists:seq(0,255)) bsr 1,
io:format("Count of keys in db is ~w~n", [NumberOfPairs]), io:format("Count of keys in db is ~w~n", [NumberOfPairs]),
{ok, _} = file:position(Handle, {bof, 2048}), {ok, _} = file:position(Handle, {bof, 2048}),
Fn1 = fun(_I,Acc) -> Fn1 = fun(_I, Acc) ->
{KL,VL} = read_next_2_integers(Handle), {KL, VL} = read_next_2_integers(Handle),
Key = read_next_term(Handle, KL), Key = read_next_key(Handle, KL),
case read_next_term(Handle, VL, crc) of Value =
{_, Value} -> case read_next_value(Handle, VL, crc) of
{ok, CurrLoc} = file:position(Handle, cur), {true, V0} ->
{Key,Value} = get(Handle, Key) binary_to_term(V0)
end, end,
{ok, _} = file:position(Handle, CurrLoc), {Key, Value} = get(Handle, Key, false),
[{Key,Value} | Acc] [{Key,Value} | Acc]
end, end,
lists:foldr(Fn1, [], lists:seq(0, NumberOfPairs-1)). lists:foldr(Fn1, [], lists:seq(0, NumberOfPairs-1)).
@ -1647,16 +1686,13 @@ activewrite_singlewrite_test() ->
{LastPosition, KeyDict}), {LastPosition, KeyDict}),
io:format("New key and value added to active file ~n", []), io:format("New key and value added to active file ~n", []),
?assertMatch({Key, Value}, ?assertMatch({Key, Value},
get_mem(Key, "../test/test_mem.cdb", get_mem(Key, "../test/test_mem.cdb", UpdKeyDict, false)),
UpdKeyDict)),
?assertMatch(probably, ?assertMatch(probably,
get_mem(Key, "../test/test_mem.cdb", get_mem(Key, "../test/test_mem.cdb", UpdKeyDict,
UpdKeyDict, false, loose_presence)),
loose_presence)),
?assertMatch(missing, ?assertMatch(missing,
get_mem("not_present", "../test/test_mem.cdb", get_mem("not_present", "../test/test_mem.cdb", UpdKeyDict,
UpdKeyDict, false, loose_presence)),
loose_presence)),
ok = file:delete("../test/test_mem.cdb"). ok = file:delete("../test/test_mem.cdb").
search_hash_table_findinslot_test() -> search_hash_table_findinslot_test() ->
@ -1681,9 +1717,11 @@ search_hash_table_findinslot_test() ->
io:format("Slot 1 has Hash ~w Position ~w~n", [ReadH3, ReadP3]), io:format("Slot 1 has Hash ~w Position ~w~n", [ReadH3, ReadP3]),
io:format("Slot 2 has Hash ~w Position ~w~n", [ReadH4, ReadP4]), io:format("Slot 2 has Hash ~w Position ~w~n", [ReadH4, ReadP4]),
?assertMatch(0, ReadH4), ?assertMatch(0, ReadH4),
?assertMatch({"key1", "value1"}, get(Handle, Key1)), ?assertMatch({"key1", "value1"}, get(Handle, Key1, false)),
?assertMatch(probably, get(Handle, Key1, no_cache, loose_presence)), ?assertMatch(probably, get(Handle, Key1,
?assertMatch(missing, get(Handle, "Key99", no_cache, loose_presence)), no_cache, loose_presence, false)),
?assertMatch(missing, get(Handle, "Key99",
no_cache, loose_presence, false)),
{ok, _} = file:position(Handle, FirstHashPosition), {ok, _} = file:position(Handle, FirstHashPosition),
FlipH3 = endian_flip(ReadH3), FlipH3 = endian_flip(ReadH3),
FlipP3 = endian_flip(ReadP3), FlipP3 = endian_flip(ReadP3),
@ -1700,7 +1738,7 @@ search_hash_table_findinslot_test() ->
RBin), RBin),
ok = file:close(Handle), ok = file:close(Handle),
io:format("Find key following change to hash table~n"), io:format("Find key following change to hash table~n"),
?assertMatch(missing, get("../test/hashtable1_test.cdb", Key1)), ?assertMatch(missing, get("../test/hashtable1_test.cdb", Key1, false)),
ok = file:delete("../test/hashtable1_test.cdb"). ok = file:delete("../test/hashtable1_test.cdb").
getnextkey_inclemptyvalue_test() -> getnextkey_inclemptyvalue_test() ->
@ -1926,7 +1964,7 @@ hashclash_test() ->
?assertMatch(missing, cdb_get(P1, KeyNF)), ?assertMatch(missing, cdb_get(P1, KeyNF)),
{ok, FN} = cdb_complete(P1), {ok, FN} = cdb_complete(P1),
{ok, P2} = cdb_open_reader(FN), {ok, P2} = cdb_open_reader(FN, #cdb_options{binary_mode=false}),
?assertMatch(probably, cdb_keycheck(P2, Key1)), ?assertMatch(probably, cdb_keycheck(P2, Key1)),
?assertMatch(probably, cdb_keycheck(P2, Key99)), ?assertMatch(probably, cdb_keycheck(P2, Key99)),

View file

@ -52,7 +52,8 @@
compact_inkerkvc/2, compact_inkerkvc/2,
split_inkvalue/1, split_inkvalue/1,
check_forinkertype/2, check_forinkertype/2,
create_value_for_journal/1, maybe_compress/1,
create_value_for_journal/2,
build_metadata_object/2, build_metadata_object/2,
generate_ledgerkv/5, generate_ledgerkv/5,
get_size/2, get_size/2,
@ -185,20 +186,92 @@ to_inkerkv(LedgerKey, SQN, to_fetch, null) ->
{{SQN, ?INKT_STND, LedgerKey}, null, true}; {{SQN, ?INKT_STND, LedgerKey}, null, true};
to_inkerkv(LedgerKey, SQN, Object, KeyChanges) -> to_inkerkv(LedgerKey, SQN, Object, KeyChanges) ->
InkerType = check_forinkertype(LedgerKey, Object), InkerType = check_forinkertype(LedgerKey, Object),
Value = create_value_for_journal({Object, KeyChanges}), Value = create_value_for_journal({Object, KeyChanges}, false),
{{SQN, InkerType, LedgerKey}, Value}. {{SQN, InkerType, LedgerKey}, Value}.
%% Used when fetching objects, so only handles standard, hashable entries %% Used when fetching objects, so only handles standard, hashable entries
from_inkerkv(Object) -> from_inkerkv(Object) ->
case Object of case Object of
{{SQN, ?INKT_STND, PK}, Bin} when is_binary(Bin) -> {{SQN, ?INKT_STND, PK}, Bin} when is_binary(Bin) ->
{{SQN, PK}, binary_to_term(Bin)}; {{SQN, PK}, revert_value_from_journal(Bin)};
{{SQN, ?INKT_STND, PK}, Term} -> {{SQN, ?INKT_STND, PK}, Term} ->
{{SQN, PK}, Term}; {{SQN, PK}, Term};
_ -> _ ->
Object Object
end. end.
create_value_for_journal({Object, KeyChanges}, Compress) ->
KeyChangeBin = term_to_binary(KeyChanges, [compressed]),
KeyChangeBinLen = byte_size(KeyChangeBin),
ObjectBin = serialise_object(Object, Compress),
TypeCode = encode_valuetype(is_binary(Object), Compress),
<<ObjectBin/binary,
KeyChangeBin/binary,
KeyChangeBinLen:32/integer,
TypeCode:8/integer>>.
maybe_compress({null, KeyChanges}) ->
create_value_for_journal({null, KeyChanges}, false);
maybe_compress(JournalBin) ->
Length0 = byte_size(JournalBin) - 1,
<<JBin0:Length0/binary, Type:8/integer>> = JournalBin,
{IsBinary, IsCompressed} = decode_valuetype(Type),
case IsCompressed of
true ->
JournalBin;
false ->
V0 = revert_value_from_journal(JBin0, Length0, IsBinary, false),
create_value_for_journal(V0, true)
end.
serialise_object(Object, false) when is_binary(Object) ->
Object;
serialise_object(Object, true) when is_binary(Object) ->
zlib:compress(Object);
serialise_object(Object, false) ->
term_to_binary(Object);
serialise_object(Object, true) ->
term_to_binary(Object, [compressed]).
revert_value_from_journal(JournalBin) ->
Length0 = byte_size(JournalBin) - 1,
<<JBin0:Length0/binary, Type:8/integer>> = JournalBin,
{IsBinary, IsCompressed} = decode_valuetype(Type),
revert_value_from_journal(JBin0, Length0, IsBinary, IsCompressed).
revert_value_from_journal(ValueBin, ValueLen, IsBinary, IsCompressed) ->
Length1 = ValueLen - 4,
<<JBin1:Length1/binary, KeyChangeLength:32/integer>> = ValueBin,
Length2 = Length1 - KeyChangeLength,
<<OBin2:Length2/binary, KCBin2:KeyChangeLength/binary>> = JBin1,
{deserialise_object(OBin2, IsBinary, IsCompressed),
binary_to_term(KCBin2)}.
deserialise_object(Binary, true, true) ->
zlib:uncompress(Binary);
deserialise_object(Binary, true, false) ->
Binary;
deserialise_object(Binary, false, _) ->
binary_to_term(Binary).
encode_valuetype(IsBinary, IsCompressed) ->
Bit2 =
case IsBinary of
true -> 2;
false -> 0
end,
Bit1 =
case IsCompressed of
true -> 1;
false -> 0
end,
Bit1 + Bit2.
decode_valuetype(TypeInt) ->
IsCompressed = TypeInt band 1 == 1,
IsBinary = TypeInt band 2 == 2,
{IsBinary, IsCompressed}.
from_journalkey({SQN, _Type, LedgerKey}) -> from_journalkey({SQN, _Type, LedgerKey}) ->
{SQN, LedgerKey}. {SQN, LedgerKey}.
@ -220,7 +293,7 @@ compact_inkerkvc({{SQN, ?INKT_STND, LK}, V, CrcCheck}, Strategy) ->
skip -> skip ->
skip; skip;
retain -> retain ->
{_V, KeyDeltas} = split_inkvalue(V), {_V, KeyDeltas} = revert_value_from_journal(V),
{retain, {{SQN, ?INKT_KEYD, LK}, {null, KeyDeltas}, CrcCheck}}; {retain, {{SQN, ?INKT_KEYD, LK}, {null, KeyDeltas}, CrcCheck}};
TagStrat -> TagStrat ->
{TagStrat, null} {TagStrat, null}
@ -245,7 +318,7 @@ get_tagstrategy(LK, Strategy) ->
split_inkvalue(VBin) -> split_inkvalue(VBin) ->
case is_binary(VBin) of case is_binary(VBin) of
true -> true ->
binary_to_term(VBin); revert_value_from_journal(VBin);
false -> false ->
VBin VBin
end. end.
@ -255,14 +328,6 @@ check_forinkertype(_LedgerKey, delete) ->
check_forinkertype(_LedgerKey, _Object) -> check_forinkertype(_LedgerKey, _Object) ->
?INKT_STND. ?INKT_STND.
create_value_for_journal(Value) ->
case Value of
{Object, KeyChanges} ->
term_to_binary({Object, KeyChanges}, [compressed]);
Value when is_binary(Value) ->
Value
end.
hash(Obj) -> hash(Obj) ->
erlang:phash2(term_to_binary(Obj)). erlang:phash2(term_to_binary(Obj)).

View file

@ -490,7 +490,8 @@ write_values([], _CDBopts, Journal0, ManSlice0) ->
{Journal0, ManSlice0}; {Journal0, ManSlice0};
write_values(KVCList, CDBopts, Journal0, ManSlice0) -> write_values(KVCList, CDBopts, Journal0, ManSlice0) ->
KVList = lists:map(fun({K, V, _C}) -> KVList = lists:map(fun({K, V, _C}) ->
{K, leveled_codec:create_value_for_journal(V)} % Compress the value as part of compaction
{K, leveled_codec:maybe_compress(V)}
end, end,
KVCList), KVCList),
{ok, Journal1} = case Journal0 of {ok, Journal1} = case Journal0 of
@ -639,11 +640,13 @@ test_ledgerkey(Key) ->
{o, "Bucket", Key, null}. {o, "Bucket", Key, null}.
test_inkerkv(SQN, Key, V, IdxSpecs) -> test_inkerkv(SQN, Key, V, IdxSpecs) ->
{{SQN, ?INKT_STND, test_ledgerkey(Key)}, term_to_binary({V, IdxSpecs})}. leveled_codec:to_inkerkv(test_ledgerkey(Key), SQN, V, IdxSpecs).
fetch_testcdb(RP) -> fetch_testcdb(RP) ->
FN1 = leveled_inker:filepath(RP, 1, new_journal), FN1 = leveled_inker:filepath(RP, 1, new_journal),
{ok, CDB1} = leveled_cdb:cdb_open_writer(FN1, #cdb_options{}), {ok,
CDB1} = leveled_cdb:cdb_open_writer(FN1,
#cdb_options{binary_mode=true}),
{K1, V1} = test_inkerkv(1, "Key1", "Value1", []), {K1, V1} = test_inkerkv(1, "Key1", "Value1", []),
{K2, V2} = test_inkerkv(2, "Key2", "Value2", []), {K2, V2} = test_inkerkv(2, "Key2", "Value2", []),
{K3, V3} = test_inkerkv(3, "Key3", "Value3", []), {K3, V3} = test_inkerkv(3, "Key3", "Value3", []),
@ -661,7 +664,7 @@ fetch_testcdb(RP) ->
ok = leveled_cdb:cdb_put(CDB1, K7, V7), ok = leveled_cdb:cdb_put(CDB1, K7, V7),
ok = leveled_cdb:cdb_put(CDB1, K8, V8), ok = leveled_cdb:cdb_put(CDB1, K8, V8),
{ok, FN2} = leveled_cdb:cdb_complete(CDB1), {ok, FN2} = leveled_cdb:cdb_complete(CDB1),
leveled_cdb:cdb_open_reader(FN2). leveled_cdb:cdb_open_reader(FN2, #cdb_options{binary_mode=true}).
check_single_file_test() -> check_single_file_test() ->
RP = "../test/journal", RP = "../test/journal",
@ -718,7 +721,7 @@ compact_single_file_recovr_test() ->
CDB} = compact_single_file_setup(), CDB} = compact_single_file_setup(),
[{LowSQN, FN, PidR, _LastKey}] = [{LowSQN, FN, PidR, _LastKey}] =
compact_files([Candidate], compact_files([Candidate],
#cdb_options{file_path=CompactFP}, #cdb_options{file_path=CompactFP, binary_mode=true},
LedgerFun1, LedgerFun1,
LedgerSrv1, LedgerSrv1,
9, 9,
@ -738,11 +741,11 @@ compact_single_file_recovr_test() ->
{1, {1,
stnd, stnd,
test_ledgerkey("Key1")})), test_ledgerkey("Key1")})),
{_RK1, RV1} = leveled_cdb:cdb_get(PidR, RKV1 = leveled_cdb:cdb_get(PidR,
{2, {2,
stnd, stnd,
test_ledgerkey("Key2")}), test_ledgerkey("Key2")}),
?assertMatch({"Value2", []}, binary_to_term(RV1)), ?assertMatch({{_, _}, {"Value2", []}}, leveled_codec:from_inkerkv(RKV1)),
ok = leveled_cdb:cdb_deletepending(CDB), ok = leveled_cdb:cdb_deletepending(CDB),
ok = leveled_cdb:cdb_destroy(CDB). ok = leveled_cdb:cdb_destroy(CDB).
@ -755,7 +758,7 @@ compact_single_file_retain_test() ->
CDB} = compact_single_file_setup(), CDB} = compact_single_file_setup(),
[{LowSQN, FN, PidR, _LK}] = [{LowSQN, FN, PidR, _LK}] =
compact_files([Candidate], compact_files([Candidate],
#cdb_options{file_path=CompactFP}, #cdb_options{file_path=CompactFP, binary_mode=true},
LedgerFun1, LedgerFun1,
LedgerSrv1, LedgerSrv1,
9, 9,
@ -775,11 +778,11 @@ compact_single_file_retain_test() ->
{1, {1,
stnd, stnd,
test_ledgerkey("Key1")})), test_ledgerkey("Key1")})),
{_RK1, RV1} = leveled_cdb:cdb_get(PidR, RKV1 = leveled_cdb:cdb_get(PidR,
{2, {2,
stnd, stnd,
test_ledgerkey("Key2")}), test_ledgerkey("Key2")}),
?assertMatch({"Value2", []}, binary_to_term(RV1)), ?assertMatch({{_, _}, {"Value2", []}}, leveled_codec:from_inkerkv(RKV1)),
ok = leveled_cdb:cdb_deletepending(CDB), ok = leveled_cdb:cdb_deletepending(CDB),
ok = leveled_cdb:cdb_destroy(CDB). ok = leveled_cdb:cdb_destroy(CDB).
@ -815,10 +818,9 @@ compact_singlefile_totwosmallfiles_test() ->
{ok, CDB1} = leveled_cdb:cdb_open_writer(FN1, CDBoptsLarge), {ok, CDB1} = leveled_cdb:cdb_open_writer(FN1, CDBoptsLarge),
lists:foreach(fun(X) -> lists:foreach(fun(X) ->
LK = test_ledgerkey("Key" ++ integer_to_list(X)), LK = test_ledgerkey("Key" ++ integer_to_list(X)),
Value = term_to_binary({crypto:rand_bytes(1024), []}), Value = crypto:rand_bytes(1024),
ok = leveled_cdb:cdb_put(CDB1, {IK, IV} = leveled_codec:to_inkerkv(LK, X, Value, []),
{X, ?INKT_STND, LK}, ok = leveled_cdb:cdb_put(CDB1, IK, IV)
Value)
end, end,
lists:seq(1, 1000)), lists:seq(1, 1000)),
{ok, NewName} = leveled_cdb:cdb_complete(CDB1), {ok, NewName} = leveled_cdb:cdb_complete(CDB1),

View file

@ -724,6 +724,9 @@ initiate_penciller_snapshot(Bookie) ->
-ifdef(TEST). -ifdef(TEST).
create_value_for_journal(Obj, Comp) ->
leveled_codec:create_value_for_journal(Obj, Comp).
build_dummy_journal() -> build_dummy_journal() ->
F = fun(X) -> X end, F = fun(X) -> X end,
build_dummy_journal(F). build_dummy_journal(F).
@ -740,8 +743,12 @@ build_dummy_journal(KeyConvertF) ->
{ok, J1} = leveled_cdb:cdb_open_writer(F1), {ok, J1} = leveled_cdb:cdb_open_writer(F1),
{K1, V1} = {KeyConvertF("Key1"), "TestValue1"}, {K1, V1} = {KeyConvertF("Key1"), "TestValue1"},
{K2, V2} = {KeyConvertF("Key2"), "TestValue2"}, {K2, V2} = {KeyConvertF("Key2"), "TestValue2"},
ok = leveled_cdb:cdb_put(J1, {1, stnd, K1}, term_to_binary({V1, []})), ok = leveled_cdb:cdb_put(J1,
ok = leveled_cdb:cdb_put(J1, {2, stnd, K2}, term_to_binary({V2, []})), {1, stnd, K1},
create_value_for_journal({V1, []}, false)),
ok = leveled_cdb:cdb_put(J1,
{2, stnd, K2},
create_value_for_journal({V2, []}, false)),
ok = leveled_cdb:cdb_roll(J1), ok = leveled_cdb:cdb_roll(J1),
LK1 = leveled_cdb:cdb_lastkey(J1), LK1 = leveled_cdb:cdb_lastkey(J1),
lists:foldl(fun(X, Closed) -> lists:foldl(fun(X, Closed) ->
@ -760,8 +767,12 @@ build_dummy_journal(KeyConvertF) ->
{ok, J2} = leveled_cdb:cdb_open_writer(F2), {ok, J2} = leveled_cdb:cdb_open_writer(F2),
{K1, V3} = {KeyConvertF("Key1"), "TestValue3"}, {K1, V3} = {KeyConvertF("Key1"), "TestValue3"},
{K4, V4} = {KeyConvertF("Key4"), "TestValue4"}, {K4, V4} = {KeyConvertF("Key4"), "TestValue4"},
ok = leveled_cdb:cdb_put(J2, {3, stnd, K1}, term_to_binary({V3, []})), ok = leveled_cdb:cdb_put(J2,
ok = leveled_cdb:cdb_put(J2, {4, stnd, K4}, term_to_binary({V4, []})), {3, stnd, K1},
create_value_for_journal({V3, []}, false)),
ok = leveled_cdb:cdb_put(J2,
{4, stnd, K4},
create_value_for_journal({V4, []}, false)),
LK2 = leveled_cdb:cdb_lastkey(J2), LK2 = leveled_cdb:cdb_lastkey(J2),
ok = leveled_cdb:cdb_close(J2), ok = leveled_cdb:cdb_close(J2),
Manifest = [{1, "../test/journal/journal_files/nursery_1", "pid1", LK1}, Manifest = [{1, "../test/journal/journal_files/nursery_1", "pid1", LK1},
@ -794,20 +805,22 @@ clean_subdir(DirPath) ->
simple_inker_test() -> simple_inker_test() ->
RootPath = "../test/journal", RootPath = "../test/journal",
build_dummy_journal(), build_dummy_journal(),
CDBopts = #cdb_options{max_size=300000}, CDBopts = #cdb_options{max_size=300000, binary_mode=true},
{ok, Ink1} = ink_start(#inker_options{root_path=RootPath, {ok, Ink1} = ink_start(#inker_options{root_path=RootPath,
cdb_options=CDBopts}), cdb_options=CDBopts}),
Obj1 = ink_get(Ink1, "Key1", 1), Obj1 = ink_get(Ink1, "Key1", 1),
?assertMatch({{1, "Key1"}, {"TestValue1", []}}, Obj1), ?assertMatch({{1, "Key1"}, {"TestValue1", []}}, Obj1),
Obj2 = ink_get(Ink1, "Key4", 4), Obj3 = ink_get(Ink1, "Key1", 3),
?assertMatch({{4, "Key4"}, {"TestValue4", []}}, Obj2), ?assertMatch({{3, "Key1"}, {"TestValue3", []}}, Obj3),
Obj4 = ink_get(Ink1, "Key4", 4),
?assertMatch({{4, "Key4"}, {"TestValue4", []}}, Obj4),
ink_close(Ink1), ink_close(Ink1),
clean_testdir(RootPath). clean_testdir(RootPath).
simple_inker_completeactivejournal_test() -> simple_inker_completeactivejournal_test() ->
RootPath = "../test/journal", RootPath = "../test/journal",
build_dummy_journal(), build_dummy_journal(),
CDBopts = #cdb_options{max_size=300000}, CDBopts = #cdb_options{max_size=300000, binary_mode=true},
JournalFP = filepath(RootPath, journal_dir), JournalFP = filepath(RootPath, journal_dir),
F2 = filename:join(JournalFP, "nursery_3.pnd"), F2 = filename:join(JournalFP, "nursery_3.pnd"),
{ok, PidW} = leveled_cdb:cdb_open_writer(F2), {ok, PidW} = leveled_cdb:cdb_open_writer(F2),