commit
43c5f2fb07
5 changed files with 254 additions and 122 deletions
|
@ -65,6 +65,7 @@
|
|||
-export([cdb_open_writer/1,
|
||||
cdb_open_writer/2,
|
||||
cdb_open_reader/1,
|
||||
cdb_open_reader/2,
|
||||
cdb_reopen_reader/2,
|
||||
cdb_get/2,
|
||||
cdb_put/3,
|
||||
|
@ -262,12 +263,19 @@ starting({open_reader, Filename, LastKey}, _From, State) ->
|
|||
|
||||
writer({get_kv, Key}, _From, State) ->
|
||||
{reply,
|
||||
get_mem(Key, State#state.handle, State#state.hashtree),
|
||||
get_mem(Key,
|
||||
State#state.handle,
|
||||
State#state.hashtree,
|
||||
State#state.binary_mode),
|
||||
writer,
|
||||
State};
|
||||
writer({key_check, Key}, _From, State) ->
|
||||
{reply,
|
||||
get_mem(Key, State#state.handle, State#state.hashtree, loose_presence),
|
||||
get_mem(Key,
|
||||
State#state.handle,
|
||||
State#state.hashtree,
|
||||
State#state.binary_mode,
|
||||
loose_presence),
|
||||
writer,
|
||||
State};
|
||||
writer({put_kv, Key, Value}, _From, State) ->
|
||||
|
@ -329,12 +337,19 @@ writer(cdb_roll, State) ->
|
|||
|
||||
rolling({get_kv, Key}, _From, State) ->
|
||||
{reply,
|
||||
get_mem(Key, State#state.handle, State#state.hashtree),
|
||||
get_mem(Key,
|
||||
State#state.handle,
|
||||
State#state.hashtree,
|
||||
State#state.binary_mode),
|
||||
rolling,
|
||||
State};
|
||||
rolling({key_check, Key}, _From, State) ->
|
||||
{reply,
|
||||
get_mem(Key, State#state.handle, State#state.hashtree, loose_presence),
|
||||
get_mem(Key,
|
||||
State#state.handle,
|
||||
State#state.hashtree,
|
||||
State#state.binary_mode,
|
||||
loose_presence),
|
||||
rolling,
|
||||
State};
|
||||
rolling({get_positions, _SampleSize}, _From, State) ->
|
||||
|
@ -374,7 +389,10 @@ rolling({delete_pending, ManSQN, Inker}, State) ->
|
|||
|
||||
reader({get_kv, Key}, _From, State) ->
|
||||
{reply,
|
||||
get_withcache(State#state.handle, Key, State#state.hash_index),
|
||||
get_withcache(State#state.handle,
|
||||
Key,
|
||||
State#state.hash_index,
|
||||
State#state.binary_mode),
|
||||
reader,
|
||||
State};
|
||||
reader({key_check, Key}, _From, State) ->
|
||||
|
@ -382,7 +400,8 @@ reader({key_check, Key}, _From, State) ->
|
|||
get_withcache(State#state.handle,
|
||||
Key,
|
||||
State#state.hash_index,
|
||||
loose_presence),
|
||||
loose_presence,
|
||||
State#state.binary_mode),
|
||||
reader,
|
||||
State};
|
||||
reader({get_positions, SampleSize}, _From, State) ->
|
||||
|
@ -430,9 +449,11 @@ reader({direct_fetch, PositionList, Info}, _From, State) ->
|
|||
FilterFalseKey(extract_key_size(H, P)) end,
|
||||
PositionList);
|
||||
key_value_check ->
|
||||
BM = State#state.binary_mode,
|
||||
lists:filtermap(
|
||||
fun(P) ->
|
||||
FilterFalseKey(extract_key_value_check(H, P)) end,
|
||||
FilterFalseKey(extract_key_value_check(H, P, BM))
|
||||
end,
|
||||
PositionList)
|
||||
end,
|
||||
{reply, Reply, reader, State};
|
||||
|
@ -456,7 +477,10 @@ reader({delete_pending, ManSQN, Inker}, State) ->
|
|||
|
||||
delete_pending({get_kv, Key}, _From, State) ->
|
||||
{reply,
|
||||
get_withcache(State#state.handle, Key, State#state.hash_index),
|
||||
get_withcache(State#state.handle,
|
||||
Key,
|
||||
State#state.hash_index,
|
||||
State#state.binary_mode),
|
||||
delete_pending,
|
||||
State,
|
||||
?DELETE_TIMEOUT};
|
||||
|
@ -465,7 +489,8 @@ delete_pending({key_check, Key}, _From, State) ->
|
|||
get_withcache(State#state.handle,
|
||||
Key,
|
||||
State#state.hash_index,
|
||||
loose_presence),
|
||||
loose_presence,
|
||||
State#state.binary_mode),
|
||||
delete_pending,
|
||||
State,
|
||||
?DELETE_TIMEOUT}.
|
||||
|
@ -687,19 +712,19 @@ put(FileName, Key, Value, {LastPosition, HashTree}) ->
|
|||
%%
|
||||
|
||||
|
||||
get_withcache(Handle, Key, Cache) ->
|
||||
get(Handle, Key, Cache, true).
|
||||
get_withcache(Handle, Key, Cache, BinaryMode) ->
|
||||
get(Handle, Key, Cache, true, BinaryMode).
|
||||
|
||||
get_withcache(Handle, Key, Cache, QuickCheck) ->
|
||||
get(Handle, Key, Cache, QuickCheck).
|
||||
get_withcache(Handle, Key, Cache, QuickCheck, BinaryMode) ->
|
||||
get(Handle, Key, Cache, QuickCheck, BinaryMode).
|
||||
|
||||
get(FileNameOrHandle, Key) ->
|
||||
get(FileNameOrHandle, Key, no_cache, true).
|
||||
get(FileNameOrHandle, Key, BinaryMode) ->
|
||||
get(FileNameOrHandle, Key, no_cache, true, BinaryMode).
|
||||
|
||||
get(FileName, Key, Cache, QuickCheck) when is_list(FileName) ->
|
||||
get(FileName, Key, Cache, QuickCheck, BinaryMode) when is_list(FileName) ->
|
||||
{ok, Handle} = file:open(FileName,[binary, raw, read]),
|
||||
get(Handle, Key, Cache, QuickCheck);
|
||||
get(Handle, Key, Cache, QuickCheck) when is_tuple(Handle) ->
|
||||
get(Handle, Key, Cache, QuickCheck, BinaryMode);
|
||||
get(Handle, Key, Cache, QuickCheck, BinaryMode) when is_tuple(Handle) ->
|
||||
Hash = hash(Key),
|
||||
Index = hash_to_index(Hash),
|
||||
{HashTable, Count} = get_index(Handle, Index, Cache),
|
||||
|
@ -722,7 +747,8 @@ get(Handle, Key, Cache, QuickCheck) when is_tuple(Handle) ->
|
|||
lists:append(L2, L1),
|
||||
Hash,
|
||||
Key,
|
||||
QuickCheck)
|
||||
QuickCheck,
|
||||
BinaryMode)
|
||||
end.
|
||||
|
||||
get_index(Handle, Index, no_cache) ->
|
||||
|
@ -736,13 +762,13 @@ get_index(_Handle, Index, Cache) ->
|
|||
%% Get a Key/Value pair from an active CDB file (with no hash table written)
|
||||
%% This requires a key dictionary to be passed in (mapping keys to positions)
|
||||
%% Will return {Key, Value} or missing
|
||||
get_mem(Key, FNOrHandle, HashTree) ->
|
||||
get_mem(Key, FNOrHandle, HashTree, true).
|
||||
get_mem(Key, FNOrHandle, HashTree, BinaryMode) ->
|
||||
get_mem(Key, FNOrHandle, HashTree, BinaryMode, true).
|
||||
|
||||
get_mem(Key, Filename, HashTree, QuickCheck) when is_list(Filename) ->
|
||||
get_mem(Key, Filename, HashTree, BinaryMode, QuickCheck) when is_list(Filename) ->
|
||||
{ok, Handle} = file:open(Filename, [binary, raw, read]),
|
||||
get_mem(Key, Handle, HashTree, QuickCheck);
|
||||
get_mem(Key, Handle, HashTree, QuickCheck) ->
|
||||
get_mem(Key, Handle, HashTree, BinaryMode, QuickCheck);
|
||||
get_mem(Key, Handle, HashTree, BinaryMode, QuickCheck) ->
|
||||
ListToCheck = get_hashtree(Key, HashTree),
|
||||
case {QuickCheck, ListToCheck} of
|
||||
{loose_presence, []} ->
|
||||
|
@ -750,7 +776,7 @@ get_mem(Key, Handle, HashTree, QuickCheck) ->
|
|||
{loose_presence, _L} ->
|
||||
probably;
|
||||
_ ->
|
||||
extract_kvpair(Handle, ListToCheck, Key)
|
||||
extract_kvpair(Handle, ListToCheck, Key, BinaryMode)
|
||||
end.
|
||||
|
||||
%% Get the next key at a position in the file (or the first key if no position
|
||||
|
@ -767,7 +793,7 @@ get_nextkey(Handle, {Position, FirstHashPosition}) ->
|
|||
{ok, Position} = file:position(Handle, Position),
|
||||
case read_next_2_integers(Handle) of
|
||||
{KeyLength, ValueLength} ->
|
||||
NextKey = read_next_term(Handle, KeyLength),
|
||||
NextKey = read_next_key(Handle, KeyLength),
|
||||
NextPosition = Position + KeyLength + ValueLength + ?DWORD_SIZE,
|
||||
case NextPosition of
|
||||
FirstHashPosition ->
|
||||
|
@ -830,7 +856,7 @@ find_lastkey(Handle, IndexCache) ->
|
|||
_ ->
|
||||
{ok, _} = file:position(Handle, LastPosition),
|
||||
{KeyLength, _ValueLength} = read_next_2_integers(Handle),
|
||||
read_next_term(Handle, KeyLength)
|
||||
read_next_key(Handle, KeyLength)
|
||||
end.
|
||||
|
||||
|
||||
|
@ -902,39 +928,49 @@ put_hashtree(Key, Position, HashTree) ->
|
|||
|
||||
%% Function to extract a Key-Value pair given a file handle and a position
|
||||
%% Will confirm that the key matches and do a CRC check
|
||||
extract_kvpair(_, [], _) ->
|
||||
extract_kvpair(_H, [], _K, _BinaryMode) ->
|
||||
missing;
|
||||
extract_kvpair(Handle, [Position|Rest], Key) ->
|
||||
extract_kvpair(Handle, [Position|Rest], Key, BinaryMode) ->
|
||||
{ok, _} = file:position(Handle, Position),
|
||||
{KeyLength, ValueLength} = read_next_2_integers(Handle),
|
||||
case safe_read_next_term(Handle, KeyLength) of
|
||||
case safe_read_next_key(Handle, KeyLength) of
|
||||
Key -> % If same key as passed in, then found!
|
||||
case read_next_term(Handle, ValueLength, crc) of
|
||||
case read_next_value(Handle, ValueLength, crc) of
|
||||
{false, _} ->
|
||||
crc_wonky;
|
||||
{_, Value} ->
|
||||
{Key,Value}
|
||||
case BinaryMode of
|
||||
true ->
|
||||
{Key, Value};
|
||||
false ->
|
||||
{Key, binary_to_term(Value)}
|
||||
end
|
||||
end;
|
||||
_ ->
|
||||
extract_kvpair(Handle, Rest, Key)
|
||||
extract_kvpair(Handle, Rest, Key, BinaryMode)
|
||||
end.
|
||||
|
||||
extract_key(Handle, Position) ->
|
||||
{ok, _} = file:position(Handle, Position),
|
||||
{KeyLength, _ValueLength} = read_next_2_integers(Handle),
|
||||
{safe_read_next_term(Handle, KeyLength)}.
|
||||
{safe_read_next_key(Handle, KeyLength)}.
|
||||
|
||||
extract_key_size(Handle, Position) ->
|
||||
{ok, _} = file:position(Handle, Position),
|
||||
{KeyLength, ValueLength} = read_next_2_integers(Handle),
|
||||
{safe_read_next_term(Handle, KeyLength), ValueLength}.
|
||||
{safe_read_next_key(Handle, KeyLength), ValueLength}.
|
||||
|
||||
extract_key_value_check(Handle, Position) ->
|
||||
extract_key_value_check(Handle, Position, BinaryMode) ->
|
||||
{ok, _} = file:position(Handle, Position),
|
||||
{KeyLength, ValueLength} = read_next_2_integers(Handle),
|
||||
K = safe_read_next_term(Handle, KeyLength),
|
||||
{Check, V} = read_next_term(Handle, ValueLength, crc),
|
||||
{K, V, Check}.
|
||||
K = safe_read_next_key(Handle, KeyLength),
|
||||
{Check, V} = read_next_value(Handle, ValueLength, crc),
|
||||
case BinaryMode of
|
||||
true ->
|
||||
{K, V, Check};
|
||||
false ->
|
||||
{K, binary_to_term(V), Check}
|
||||
end.
|
||||
|
||||
%% Scan through the file until there is a failure to crc check an input, and
|
||||
%% at that point return the position and the key dictionary scanned so far
|
||||
|
@ -1018,7 +1054,7 @@ saferead_keyvalue(Handle) ->
|
|||
eof ->
|
||||
false;
|
||||
{KeyL, ValueL} ->
|
||||
case safe_read_next_term(Handle, KeyL) of
|
||||
case safe_read_next_key(Handle, KeyL) of
|
||||
{error, _} ->
|
||||
false;
|
||||
eof ->
|
||||
|
@ -1041,8 +1077,8 @@ saferead_keyvalue(Handle) ->
|
|||
end.
|
||||
|
||||
|
||||
safe_read_next_term(Handle, Length) ->
|
||||
try read_next_term(Handle, Length) of
|
||||
safe_read_next_key(Handle, Length) ->
|
||||
try read_next_key(Handle, Length) of
|
||||
Term ->
|
||||
Term
|
||||
catch
|
||||
|
@ -1074,7 +1110,7 @@ calc_crc(Value) ->
|
|||
erlang:crc32(<<Value/bitstring,0:M>>)
|
||||
end.
|
||||
|
||||
read_next_term(Handle, Length) ->
|
||||
read_next_key(Handle, Length) ->
|
||||
case file:read(Handle, Length) of
|
||||
{ok, Bin} ->
|
||||
binary_to_term(Bin);
|
||||
|
@ -1082,13 +1118,14 @@ read_next_term(Handle, Length) ->
|
|||
ReadError
|
||||
end.
|
||||
|
||||
|
||||
%% Read next string where the string has a CRC prepended - stripping the crc
|
||||
%% and checking if requested
|
||||
read_next_term(Handle, Length, crc) ->
|
||||
read_next_value(Handle, Length, crc) ->
|
||||
{ok, <<CRC:32/integer, Bin/binary>>} = file:read(Handle, Length),
|
||||
case calc_crc(Bin) of
|
||||
CRC ->
|
||||
{true, binary_to_term(Bin)};
|
||||
{true, Bin};
|
||||
_ ->
|
||||
{false, crc_wonky}
|
||||
end.
|
||||
|
@ -1096,7 +1133,7 @@ read_next_term(Handle, Length, crc) ->
|
|||
%% Extract value and size from binary containing CRC
|
||||
extract_valueandsize(ValueAsBin) ->
|
||||
<<_CRC:32/integer, Bin/binary>> = ValueAsBin,
|
||||
{binary_to_term(Bin), byte_size(Bin)}.
|
||||
{Bin, byte_size(Bin)}.
|
||||
|
||||
|
||||
%% Used for reading lengths
|
||||
|
@ -1129,14 +1166,15 @@ read_integerpairs(<<Int1:32, Int2:32, Rest/binary>>, Pairs) ->
|
|||
%% false - don't check the CRC before returning key & value
|
||||
%% loose_presence - confirm that the hash of the key is present
|
||||
|
||||
search_hash_table(Handle, Entries, Hash, Key, QuickCheck) ->
|
||||
search_hash_table(Handle, Entries, Hash, Key, QuickCheck, 0).
|
||||
search_hash_table(Handle, Entries, Hash, Key, QuickCheck, BinaryMode) ->
|
||||
search_hash_table(Handle, Entries, Hash, Key, QuickCheck, BinaryMode, 0).
|
||||
|
||||
search_hash_table(_Handle, [], Hash, _Key, _QuickCheck, CycleCount) ->
|
||||
search_hash_table(_Handle, [], Hash, _Key,
|
||||
_QuickCheck, _BinaryMode, CycleCount) ->
|
||||
log_cyclecount(CycleCount, Hash, missing),
|
||||
missing;
|
||||
search_hash_table(Handle, [Entry|RestOfEntries], Hash, Key,
|
||||
QuickCheck, CycleCount) ->
|
||||
QuickCheck, BinaryMode, CycleCount) ->
|
||||
{ok, _} = file:position(Handle, Entry),
|
||||
{StoredHash, DataLoc} = read_next_2_integers(Handle),
|
||||
case StoredHash of
|
||||
|
@ -1145,7 +1183,7 @@ search_hash_table(Handle, [Entry|RestOfEntries], Hash, Key,
|
|||
loose_presence ->
|
||||
probably;
|
||||
_ ->
|
||||
extract_kvpair(Handle, [DataLoc], Key)
|
||||
extract_kvpair(Handle, [DataLoc], Key, BinaryMode)
|
||||
end,
|
||||
case KV of
|
||||
missing ->
|
||||
|
@ -1154,6 +1192,7 @@ search_hash_table(Handle, [Entry|RestOfEntries], Hash, Key,
|
|||
Hash,
|
||||
Key,
|
||||
QuickCheck,
|
||||
BinaryMode,
|
||||
CycleCount + 1);
|
||||
_ ->
|
||||
log_cyclecount(CycleCount, Hash, found),
|
||||
|
@ -1164,7 +1203,7 @@ search_hash_table(Handle, [Entry|RestOfEntries], Hash, Key,
|
|||
% missing;
|
||||
_ ->
|
||||
search_hash_table(Handle, RestOfEntries, Hash, Key,
|
||||
QuickCheck, CycleCount + 1)
|
||||
QuickCheck, BinaryMode, CycleCount + 1)
|
||||
end.
|
||||
|
||||
log_cyclecount(CycleCount, Hash, Result) ->
|
||||
|
@ -1441,15 +1480,15 @@ dump(FileName) ->
|
|||
NumberOfPairs = lists:foldl(Fn, 0, lists:seq(0,255)) bsr 1,
|
||||
io:format("Count of keys in db is ~w~n", [NumberOfPairs]),
|
||||
{ok, _} = file:position(Handle, {bof, 2048}),
|
||||
Fn1 = fun(_I,Acc) ->
|
||||
{KL,VL} = read_next_2_integers(Handle),
|
||||
Key = read_next_term(Handle, KL),
|
||||
case read_next_term(Handle, VL, crc) of
|
||||
{_, Value} ->
|
||||
{ok, CurrLoc} = file:position(Handle, cur),
|
||||
{Key,Value} = get(Handle, Key)
|
||||
end,
|
||||
{ok, _} = file:position(Handle, CurrLoc),
|
||||
Fn1 = fun(_I, Acc) ->
|
||||
{KL, VL} = read_next_2_integers(Handle),
|
||||
Key = read_next_key(Handle, KL),
|
||||
Value =
|
||||
case read_next_value(Handle, VL, crc) of
|
||||
{true, V0} ->
|
||||
binary_to_term(V0)
|
||||
end,
|
||||
{Key, Value} = get(Handle, Key, false),
|
||||
[{Key,Value} | Acc]
|
||||
end,
|
||||
lists:foldr(Fn1, [], lists:seq(0, NumberOfPairs-1)).
|
||||
|
@ -1647,16 +1686,13 @@ activewrite_singlewrite_test() ->
|
|||
{LastPosition, KeyDict}),
|
||||
io:format("New key and value added to active file ~n", []),
|
||||
?assertMatch({Key, Value},
|
||||
get_mem(Key, "../test/test_mem.cdb",
|
||||
UpdKeyDict)),
|
||||
get_mem(Key, "../test/test_mem.cdb", UpdKeyDict, false)),
|
||||
?assertMatch(probably,
|
||||
get_mem(Key, "../test/test_mem.cdb",
|
||||
UpdKeyDict,
|
||||
loose_presence)),
|
||||
get_mem(Key, "../test/test_mem.cdb", UpdKeyDict,
|
||||
false, loose_presence)),
|
||||
?assertMatch(missing,
|
||||
get_mem("not_present", "../test/test_mem.cdb",
|
||||
UpdKeyDict,
|
||||
loose_presence)),
|
||||
get_mem("not_present", "../test/test_mem.cdb", UpdKeyDict,
|
||||
false, loose_presence)),
|
||||
ok = file:delete("../test/test_mem.cdb").
|
||||
|
||||
search_hash_table_findinslot_test() ->
|
||||
|
@ -1681,9 +1717,11 @@ search_hash_table_findinslot_test() ->
|
|||
io:format("Slot 1 has Hash ~w Position ~w~n", [ReadH3, ReadP3]),
|
||||
io:format("Slot 2 has Hash ~w Position ~w~n", [ReadH4, ReadP4]),
|
||||
?assertMatch(0, ReadH4),
|
||||
?assertMatch({"key1", "value1"}, get(Handle, Key1)),
|
||||
?assertMatch(probably, get(Handle, Key1, no_cache, loose_presence)),
|
||||
?assertMatch(missing, get(Handle, "Key99", no_cache, loose_presence)),
|
||||
?assertMatch({"key1", "value1"}, get(Handle, Key1, false)),
|
||||
?assertMatch(probably, get(Handle, Key1,
|
||||
no_cache, loose_presence, false)),
|
||||
?assertMatch(missing, get(Handle, "Key99",
|
||||
no_cache, loose_presence, false)),
|
||||
{ok, _} = file:position(Handle, FirstHashPosition),
|
||||
FlipH3 = endian_flip(ReadH3),
|
||||
FlipP3 = endian_flip(ReadP3),
|
||||
|
@ -1700,7 +1738,7 @@ search_hash_table_findinslot_test() ->
|
|||
RBin),
|
||||
ok = file:close(Handle),
|
||||
io:format("Find key following change to hash table~n"),
|
||||
?assertMatch(missing, get("../test/hashtable1_test.cdb", Key1)),
|
||||
?assertMatch(missing, get("../test/hashtable1_test.cdb", Key1, false)),
|
||||
ok = file:delete("../test/hashtable1_test.cdb").
|
||||
|
||||
getnextkey_inclemptyvalue_test() ->
|
||||
|
@ -1926,7 +1964,7 @@ hashclash_test() ->
|
|||
?assertMatch(missing, cdb_get(P1, KeyNF)),
|
||||
|
||||
{ok, FN} = cdb_complete(P1),
|
||||
{ok, P2} = cdb_open_reader(FN),
|
||||
{ok, P2} = cdb_open_reader(FN, #cdb_options{binary_mode=false}),
|
||||
|
||||
?assertMatch(probably, cdb_keycheck(P2, Key1)),
|
||||
?assertMatch(probably, cdb_keycheck(P2, Key99)),
|
||||
|
|
|
@ -52,7 +52,8 @@
|
|||
compact_inkerkvc/2,
|
||||
split_inkvalue/1,
|
||||
check_forinkertype/2,
|
||||
create_value_for_journal/1,
|
||||
maybe_compress/1,
|
||||
create_value_for_journal/2,
|
||||
build_metadata_object/2,
|
||||
generate_ledgerkv/5,
|
||||
get_size/2,
|
||||
|
@ -185,20 +186,92 @@ to_inkerkv(LedgerKey, SQN, to_fetch, null) ->
|
|||
{{SQN, ?INKT_STND, LedgerKey}, null, true};
|
||||
to_inkerkv(LedgerKey, SQN, Object, KeyChanges) ->
|
||||
InkerType = check_forinkertype(LedgerKey, Object),
|
||||
Value = create_value_for_journal({Object, KeyChanges}),
|
||||
Value = create_value_for_journal({Object, KeyChanges}, false),
|
||||
{{SQN, InkerType, LedgerKey}, Value}.
|
||||
|
||||
%% Used when fetching objects, so only handles standard, hashable entries
|
||||
from_inkerkv(Object) ->
|
||||
case Object of
|
||||
{{SQN, ?INKT_STND, PK}, Bin} when is_binary(Bin) ->
|
||||
{{SQN, PK}, binary_to_term(Bin)};
|
||||
{{SQN, PK}, revert_value_from_journal(Bin)};
|
||||
{{SQN, ?INKT_STND, PK}, Term} ->
|
||||
{{SQN, PK}, Term};
|
||||
_ ->
|
||||
Object
|
||||
end.
|
||||
|
||||
create_value_for_journal({Object, KeyChanges}, Compress) ->
|
||||
KeyChangeBin = term_to_binary(KeyChanges, [compressed]),
|
||||
KeyChangeBinLen = byte_size(KeyChangeBin),
|
||||
ObjectBin = serialise_object(Object, Compress),
|
||||
TypeCode = encode_valuetype(is_binary(Object), Compress),
|
||||
<<ObjectBin/binary,
|
||||
KeyChangeBin/binary,
|
||||
KeyChangeBinLen:32/integer,
|
||||
TypeCode:8/integer>>.
|
||||
|
||||
maybe_compress({null, KeyChanges}) ->
|
||||
create_value_for_journal({null, KeyChanges}, false);
|
||||
maybe_compress(JournalBin) ->
|
||||
Length0 = byte_size(JournalBin) - 1,
|
||||
<<JBin0:Length0/binary, Type:8/integer>> = JournalBin,
|
||||
{IsBinary, IsCompressed} = decode_valuetype(Type),
|
||||
case IsCompressed of
|
||||
true ->
|
||||
JournalBin;
|
||||
false ->
|
||||
V0 = revert_value_from_journal(JBin0, Length0, IsBinary, false),
|
||||
create_value_for_journal(V0, true)
|
||||
end.
|
||||
|
||||
serialise_object(Object, false) when is_binary(Object) ->
|
||||
Object;
|
||||
serialise_object(Object, true) when is_binary(Object) ->
|
||||
zlib:compress(Object);
|
||||
serialise_object(Object, false) ->
|
||||
term_to_binary(Object);
|
||||
serialise_object(Object, true) ->
|
||||
term_to_binary(Object, [compressed]).
|
||||
|
||||
revert_value_from_journal(JournalBin) ->
|
||||
Length0 = byte_size(JournalBin) - 1,
|
||||
<<JBin0:Length0/binary, Type:8/integer>> = JournalBin,
|
||||
{IsBinary, IsCompressed} = decode_valuetype(Type),
|
||||
revert_value_from_journal(JBin0, Length0, IsBinary, IsCompressed).
|
||||
|
||||
revert_value_from_journal(ValueBin, ValueLen, IsBinary, IsCompressed) ->
|
||||
Length1 = ValueLen - 4,
|
||||
<<JBin1:Length1/binary, KeyChangeLength:32/integer>> = ValueBin,
|
||||
Length2 = Length1 - KeyChangeLength,
|
||||
<<OBin2:Length2/binary, KCBin2:KeyChangeLength/binary>> = JBin1,
|
||||
{deserialise_object(OBin2, IsBinary, IsCompressed),
|
||||
binary_to_term(KCBin2)}.
|
||||
|
||||
deserialise_object(Binary, true, true) ->
|
||||
zlib:uncompress(Binary);
|
||||
deserialise_object(Binary, true, false) ->
|
||||
Binary;
|
||||
deserialise_object(Binary, false, _) ->
|
||||
binary_to_term(Binary).
|
||||
|
||||
encode_valuetype(IsBinary, IsCompressed) ->
|
||||
Bit2 =
|
||||
case IsBinary of
|
||||
true -> 2;
|
||||
false -> 0
|
||||
end,
|
||||
Bit1 =
|
||||
case IsCompressed of
|
||||
true -> 1;
|
||||
false -> 0
|
||||
end,
|
||||
Bit1 + Bit2.
|
||||
|
||||
decode_valuetype(TypeInt) ->
|
||||
IsCompressed = TypeInt band 1 == 1,
|
||||
IsBinary = TypeInt band 2 == 2,
|
||||
{IsBinary, IsCompressed}.
|
||||
|
||||
from_journalkey({SQN, _Type, LedgerKey}) ->
|
||||
{SQN, LedgerKey}.
|
||||
|
||||
|
@ -220,7 +293,7 @@ compact_inkerkvc({{SQN, ?INKT_STND, LK}, V, CrcCheck}, Strategy) ->
|
|||
skip ->
|
||||
skip;
|
||||
retain ->
|
||||
{_V, KeyDeltas} = split_inkvalue(V),
|
||||
{_V, KeyDeltas} = revert_value_from_journal(V),
|
||||
{retain, {{SQN, ?INKT_KEYD, LK}, {null, KeyDeltas}, CrcCheck}};
|
||||
TagStrat ->
|
||||
{TagStrat, null}
|
||||
|
@ -245,7 +318,7 @@ get_tagstrategy(LK, Strategy) ->
|
|||
split_inkvalue(VBin) ->
|
||||
case is_binary(VBin) of
|
||||
true ->
|
||||
binary_to_term(VBin);
|
||||
revert_value_from_journal(VBin);
|
||||
false ->
|
||||
VBin
|
||||
end.
|
||||
|
@ -255,14 +328,6 @@ check_forinkertype(_LedgerKey, delete) ->
|
|||
check_forinkertype(_LedgerKey, _Object) ->
|
||||
?INKT_STND.
|
||||
|
||||
create_value_for_journal(Value) ->
|
||||
case Value of
|
||||
{Object, KeyChanges} ->
|
||||
term_to_binary({Object, KeyChanges}, [compressed]);
|
||||
Value when is_binary(Value) ->
|
||||
Value
|
||||
end.
|
||||
|
||||
hash(Obj) ->
|
||||
erlang:phash2(term_to_binary(Obj)).
|
||||
|
||||
|
@ -368,8 +433,7 @@ riak_extract_metadata(ObjBin, Size) ->
|
|||
%% <<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
|
||||
%%% VclockBin/binary, SibCount:32/integer, SibsBin/binary>>.
|
||||
|
||||
riak_metadata_to_binary(Vclock, SibData) ->
|
||||
VclockBin = term_to_binary(Vclock),
|
||||
riak_metadata_to_binary(VclockBin, SibData) ->
|
||||
VclockLen = byte_size(VclockBin),
|
||||
% <<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
|
||||
% VclockBin:VclockLen/binary, SibData:32/integer>>.
|
||||
|
@ -390,7 +454,7 @@ riak_metadata_from_binary(V1Binary) ->
|
|||
SC when is_integer(SC) ->
|
||||
get_metadata_from_siblings(SibsBin, SibCount, [])
|
||||
end,
|
||||
{binary_to_term(VclockBin), SibMetaBinList}.
|
||||
{VclockBin, SibMetaBinList}.
|
||||
|
||||
% Fixes the value length for each sibling to be zero, and so includes no value
|
||||
slimbin_content(MetaBin) ->
|
||||
|
@ -517,11 +581,26 @@ corrupted_inker_tag_test() ->
|
|||
%% Test below proved that the overhead of performing hashes was trivial
|
||||
%% Maybe 5 microseconds per hash
|
||||
|
||||
%hashperf_test() ->
|
||||
% OL = lists:map(fun(_X) -> crypto:rand_bytes(8192) end, lists:seq(1, 10000)),
|
||||
% SW = os:timestamp(),
|
||||
% _HL = lists:map(fun(Obj) -> erlang:phash2(Obj) end, OL),
|
||||
% io:format(user, "10000 object hashes in ~w microseconds~n",
|
||||
% [timer:now_diff(os:timestamp(), SW)]).
|
||||
hashperf_test() ->
|
||||
OL = lists:map(fun(_X) -> crypto:rand_bytes(8192) end, lists:seq(1, 1000)),
|
||||
SW = os:timestamp(),
|
||||
_HL = lists:map(fun(Obj) -> erlang:phash2(Obj) end, OL),
|
||||
io:format(user, "1000 object hashes in ~w microseconds~n",
|
||||
[timer:now_diff(os:timestamp(), SW)]).
|
||||
|
||||
magichashperf_test() ->
|
||||
KeyFun =
|
||||
fun(X) ->
|
||||
K = {o, "Bucket", "Key" ++ integer_to_list(X), null},
|
||||
{K, X}
|
||||
end,
|
||||
KL = lists:map(KeyFun, lists:seq(1, 1000)),
|
||||
{TimeMH, _HL1} = timer:tc(lists, map, [fun(K) -> magic_hash(K) end, KL]),
|
||||
io:format(user, "1000 keys magic hashed in ~w microseconds~n", [TimeMH]),
|
||||
{TimePH, _Hl2} = timer:tc(lists, map, [fun(K) -> erlang:phash2(K) end, KL]),
|
||||
io:format(user, "1000 keys phash2 hashed in ~w microseconds~n", [TimePH]),
|
||||
{TimeMH2, _HL1} = timer:tc(lists, map, [fun(K) -> magic_hash(K) end, KL]),
|
||||
io:format(user, "1000 keys magic hashed in ~w microseconds~n", [TimeMH2]).
|
||||
|
||||
|
||||
-endif.
|
|
@ -490,7 +490,8 @@ write_values([], _CDBopts, Journal0, ManSlice0) ->
|
|||
{Journal0, ManSlice0};
|
||||
write_values(KVCList, CDBopts, Journal0, ManSlice0) ->
|
||||
KVList = lists:map(fun({K, V, _C}) ->
|
||||
{K, leveled_codec:create_value_for_journal(V)}
|
||||
% Compress the value as part of compaction
|
||||
{K, leveled_codec:maybe_compress(V)}
|
||||
end,
|
||||
KVCList),
|
||||
{ok, Journal1} = case Journal0 of
|
||||
|
@ -639,11 +640,13 @@ test_ledgerkey(Key) ->
|
|||
{o, "Bucket", Key, null}.
|
||||
|
||||
test_inkerkv(SQN, Key, V, IdxSpecs) ->
|
||||
{{SQN, ?INKT_STND, test_ledgerkey(Key)}, term_to_binary({V, IdxSpecs})}.
|
||||
leveled_codec:to_inkerkv(test_ledgerkey(Key), SQN, V, IdxSpecs).
|
||||
|
||||
fetch_testcdb(RP) ->
|
||||
FN1 = leveled_inker:filepath(RP, 1, new_journal),
|
||||
{ok, CDB1} = leveled_cdb:cdb_open_writer(FN1, #cdb_options{}),
|
||||
{ok,
|
||||
CDB1} = leveled_cdb:cdb_open_writer(FN1,
|
||||
#cdb_options{binary_mode=true}),
|
||||
{K1, V1} = test_inkerkv(1, "Key1", "Value1", []),
|
||||
{K2, V2} = test_inkerkv(2, "Key2", "Value2", []),
|
||||
{K3, V3} = test_inkerkv(3, "Key3", "Value3", []),
|
||||
|
@ -661,7 +664,7 @@ fetch_testcdb(RP) ->
|
|||
ok = leveled_cdb:cdb_put(CDB1, K7, V7),
|
||||
ok = leveled_cdb:cdb_put(CDB1, K8, V8),
|
||||
{ok, FN2} = leveled_cdb:cdb_complete(CDB1),
|
||||
leveled_cdb:cdb_open_reader(FN2).
|
||||
leveled_cdb:cdb_open_reader(FN2, #cdb_options{binary_mode=true}).
|
||||
|
||||
check_single_file_test() ->
|
||||
RP = "../test/journal",
|
||||
|
@ -718,7 +721,7 @@ compact_single_file_recovr_test() ->
|
|||
CDB} = compact_single_file_setup(),
|
||||
[{LowSQN, FN, PidR, _LastKey}] =
|
||||
compact_files([Candidate],
|
||||
#cdb_options{file_path=CompactFP},
|
||||
#cdb_options{file_path=CompactFP, binary_mode=true},
|
||||
LedgerFun1,
|
||||
LedgerSrv1,
|
||||
9,
|
||||
|
@ -738,11 +741,11 @@ compact_single_file_recovr_test() ->
|
|||
{1,
|
||||
stnd,
|
||||
test_ledgerkey("Key1")})),
|
||||
{_RK1, RV1} = leveled_cdb:cdb_get(PidR,
|
||||
{2,
|
||||
stnd,
|
||||
test_ledgerkey("Key2")}),
|
||||
?assertMatch({"Value2", []}, binary_to_term(RV1)),
|
||||
RKV1 = leveled_cdb:cdb_get(PidR,
|
||||
{2,
|
||||
stnd,
|
||||
test_ledgerkey("Key2")}),
|
||||
?assertMatch({{_, _}, {"Value2", []}}, leveled_codec:from_inkerkv(RKV1)),
|
||||
ok = leveled_cdb:cdb_deletepending(CDB),
|
||||
ok = leveled_cdb:cdb_destroy(CDB).
|
||||
|
||||
|
@ -755,7 +758,7 @@ compact_single_file_retain_test() ->
|
|||
CDB} = compact_single_file_setup(),
|
||||
[{LowSQN, FN, PidR, _LK}] =
|
||||
compact_files([Candidate],
|
||||
#cdb_options{file_path=CompactFP},
|
||||
#cdb_options{file_path=CompactFP, binary_mode=true},
|
||||
LedgerFun1,
|
||||
LedgerSrv1,
|
||||
9,
|
||||
|
@ -775,11 +778,11 @@ compact_single_file_retain_test() ->
|
|||
{1,
|
||||
stnd,
|
||||
test_ledgerkey("Key1")})),
|
||||
{_RK1, RV1} = leveled_cdb:cdb_get(PidR,
|
||||
RKV1 = leveled_cdb:cdb_get(PidR,
|
||||
{2,
|
||||
stnd,
|
||||
test_ledgerkey("Key2")}),
|
||||
?assertMatch({"Value2", []}, binary_to_term(RV1)),
|
||||
?assertMatch({{_, _}, {"Value2", []}}, leveled_codec:from_inkerkv(RKV1)),
|
||||
ok = leveled_cdb:cdb_deletepending(CDB),
|
||||
ok = leveled_cdb:cdb_destroy(CDB).
|
||||
|
||||
|
@ -815,10 +818,9 @@ compact_singlefile_totwosmallfiles_test() ->
|
|||
{ok, CDB1} = leveled_cdb:cdb_open_writer(FN1, CDBoptsLarge),
|
||||
lists:foreach(fun(X) ->
|
||||
LK = test_ledgerkey("Key" ++ integer_to_list(X)),
|
||||
Value = term_to_binary({crypto:rand_bytes(1024), []}),
|
||||
ok = leveled_cdb:cdb_put(CDB1,
|
||||
{X, ?INKT_STND, LK},
|
||||
Value)
|
||||
Value = crypto:rand_bytes(1024),
|
||||
{IK, IV} = leveled_codec:to_inkerkv(LK, X, Value, []),
|
||||
ok = leveled_cdb:cdb_put(CDB1, IK, IV)
|
||||
end,
|
||||
lists:seq(1, 1000)),
|
||||
{ok, NewName} = leveled_cdb:cdb_complete(CDB1),
|
||||
|
|
|
@ -724,6 +724,9 @@ initiate_penciller_snapshot(Bookie) ->
|
|||
|
||||
-ifdef(TEST).
|
||||
|
||||
create_value_for_journal(Obj, Comp) ->
|
||||
leveled_codec:create_value_for_journal(Obj, Comp).
|
||||
|
||||
build_dummy_journal() ->
|
||||
F = fun(X) -> X end,
|
||||
build_dummy_journal(F).
|
||||
|
@ -740,8 +743,12 @@ build_dummy_journal(KeyConvertF) ->
|
|||
{ok, J1} = leveled_cdb:cdb_open_writer(F1),
|
||||
{K1, V1} = {KeyConvertF("Key1"), "TestValue1"},
|
||||
{K2, V2} = {KeyConvertF("Key2"), "TestValue2"},
|
||||
ok = leveled_cdb:cdb_put(J1, {1, stnd, K1}, term_to_binary({V1, []})),
|
||||
ok = leveled_cdb:cdb_put(J1, {2, stnd, K2}, term_to_binary({V2, []})),
|
||||
ok = leveled_cdb:cdb_put(J1,
|
||||
{1, stnd, K1},
|
||||
create_value_for_journal({V1, []}, false)),
|
||||
ok = leveled_cdb:cdb_put(J1,
|
||||
{2, stnd, K2},
|
||||
create_value_for_journal({V2, []}, false)),
|
||||
ok = leveled_cdb:cdb_roll(J1),
|
||||
LK1 = leveled_cdb:cdb_lastkey(J1),
|
||||
lists:foldl(fun(X, Closed) ->
|
||||
|
@ -760,8 +767,12 @@ build_dummy_journal(KeyConvertF) ->
|
|||
{ok, J2} = leveled_cdb:cdb_open_writer(F2),
|
||||
{K1, V3} = {KeyConvertF("Key1"), "TestValue3"},
|
||||
{K4, V4} = {KeyConvertF("Key4"), "TestValue4"},
|
||||
ok = leveled_cdb:cdb_put(J2, {3, stnd, K1}, term_to_binary({V3, []})),
|
||||
ok = leveled_cdb:cdb_put(J2, {4, stnd, K4}, term_to_binary({V4, []})),
|
||||
ok = leveled_cdb:cdb_put(J2,
|
||||
{3, stnd, K1},
|
||||
create_value_for_journal({V3, []}, false)),
|
||||
ok = leveled_cdb:cdb_put(J2,
|
||||
{4, stnd, K4},
|
||||
create_value_for_journal({V4, []}, false)),
|
||||
LK2 = leveled_cdb:cdb_lastkey(J2),
|
||||
ok = leveled_cdb:cdb_close(J2),
|
||||
Manifest = [{1, "../test/journal/journal_files/nursery_1", "pid1", LK1},
|
||||
|
@ -794,20 +805,22 @@ clean_subdir(DirPath) ->
|
|||
simple_inker_test() ->
|
||||
RootPath = "../test/journal",
|
||||
build_dummy_journal(),
|
||||
CDBopts = #cdb_options{max_size=300000},
|
||||
CDBopts = #cdb_options{max_size=300000, binary_mode=true},
|
||||
{ok, Ink1} = ink_start(#inker_options{root_path=RootPath,
|
||||
cdb_options=CDBopts}),
|
||||
Obj1 = ink_get(Ink1, "Key1", 1),
|
||||
?assertMatch({{1, "Key1"}, {"TestValue1", []}}, Obj1),
|
||||
Obj2 = ink_get(Ink1, "Key4", 4),
|
||||
?assertMatch({{4, "Key4"}, {"TestValue4", []}}, Obj2),
|
||||
Obj3 = ink_get(Ink1, "Key1", 3),
|
||||
?assertMatch({{3, "Key1"}, {"TestValue3", []}}, Obj3),
|
||||
Obj4 = ink_get(Ink1, "Key4", 4),
|
||||
?assertMatch({{4, "Key4"}, {"TestValue4", []}}, Obj4),
|
||||
ink_close(Ink1),
|
||||
clean_testdir(RootPath).
|
||||
|
||||
simple_inker_completeactivejournal_test() ->
|
||||
RootPath = "../test/journal",
|
||||
build_dummy_journal(),
|
||||
CDBopts = #cdb_options{max_size=300000},
|
||||
CDBopts = #cdb_options{max_size=300000, binary_mode=true},
|
||||
JournalFP = filepath(RootPath, journal_dir),
|
||||
F2 = filename:join(JournalFP, "nursery_3.pnd"),
|
||||
{ok, PidW} = leveled_cdb:cdb_open_writer(F2),
|
||||
|
|
|
@ -46,7 +46,7 @@ simple_put_fetch_head_delete(_Config) ->
|
|||
ok = leveled_bookie:book_put(Bookie2, "Bucket1", "Key2", "Value2",
|
||||
[{add, "Index1", "Term1"}]),
|
||||
{ok, "Value2"} = leveled_bookie:book_get(Bookie2, "Bucket1", "Key2"),
|
||||
{ok, {62888926, 56}} = leveled_bookie:book_head(Bookie2,
|
||||
{ok, {62888926, 60}} = leveled_bookie:book_head(Bookie2,
|
||||
"Bucket1",
|
||||
"Key2"),
|
||||
testutil:check_formissingobject(Bookie2, "Bucket1", "Key2"),
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue