diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index ba4e589..2485395 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -65,6 +65,7 @@ -export([cdb_open_writer/1, cdb_open_writer/2, cdb_open_reader/1, + cdb_open_reader/2, cdb_reopen_reader/2, cdb_get/2, cdb_put/3, @@ -262,12 +263,19 @@ starting({open_reader, Filename, LastKey}, _From, State) -> writer({get_kv, Key}, _From, State) -> {reply, - get_mem(Key, State#state.handle, State#state.hashtree), + get_mem(Key, + State#state.handle, + State#state.hashtree, + State#state.binary_mode), writer, State}; writer({key_check, Key}, _From, State) -> {reply, - get_mem(Key, State#state.handle, State#state.hashtree, loose_presence), + get_mem(Key, + State#state.handle, + State#state.hashtree, + State#state.binary_mode, + loose_presence), writer, State}; writer({put_kv, Key, Value}, _From, State) -> @@ -329,12 +337,19 @@ writer(cdb_roll, State) -> rolling({get_kv, Key}, _From, State) -> {reply, - get_mem(Key, State#state.handle, State#state.hashtree), + get_mem(Key, + State#state.handle, + State#state.hashtree, + State#state.binary_mode), rolling, State}; rolling({key_check, Key}, _From, State) -> {reply, - get_mem(Key, State#state.handle, State#state.hashtree, loose_presence), + get_mem(Key, + State#state.handle, + State#state.hashtree, + State#state.binary_mode, + loose_presence), rolling, State}; rolling({get_positions, _SampleSize}, _From, State) -> @@ -374,7 +389,10 @@ rolling({delete_pending, ManSQN, Inker}, State) -> reader({get_kv, Key}, _From, State) -> {reply, - get_withcache(State#state.handle, Key, State#state.hash_index), + get_withcache(State#state.handle, + Key, + State#state.hash_index, + State#state.binary_mode), reader, State}; reader({key_check, Key}, _From, State) -> @@ -382,7 +400,8 @@ reader({key_check, Key}, _From, State) -> get_withcache(State#state.handle, Key, State#state.hash_index, - loose_presence), + loose_presence, + State#state.binary_mode), reader, State}; reader({get_positions, SampleSize}, _From, State) -> @@ -430,9 +449,11 @@ reader({direct_fetch, PositionList, Info}, _From, State) -> FilterFalseKey(extract_key_size(H, P)) end, PositionList); key_value_check -> + BM = State#state.binary_mode, lists:filtermap( fun(P) -> - FilterFalseKey(extract_key_value_check(H, P)) end, + FilterFalseKey(extract_key_value_check(H, P, BM)) + end, PositionList) end, {reply, Reply, reader, State}; @@ -456,7 +477,10 @@ reader({delete_pending, ManSQN, Inker}, State) -> delete_pending({get_kv, Key}, _From, State) -> {reply, - get_withcache(State#state.handle, Key, State#state.hash_index), + get_withcache(State#state.handle, + Key, + State#state.hash_index, + State#state.binary_mode), delete_pending, State, ?DELETE_TIMEOUT}; @@ -465,7 +489,8 @@ delete_pending({key_check, Key}, _From, State) -> get_withcache(State#state.handle, Key, State#state.hash_index, - loose_presence), + loose_presence, + State#state.binary_mode), delete_pending, State, ?DELETE_TIMEOUT}. @@ -687,19 +712,19 @@ put(FileName, Key, Value, {LastPosition, HashTree}) -> %% -get_withcache(Handle, Key, Cache) -> - get(Handle, Key, Cache, true). +get_withcache(Handle, Key, Cache, BinaryMode) -> + get(Handle, Key, Cache, true, BinaryMode). -get_withcache(Handle, Key, Cache, QuickCheck) -> - get(Handle, Key, Cache, QuickCheck). +get_withcache(Handle, Key, Cache, QuickCheck, BinaryMode) -> + get(Handle, Key, Cache, QuickCheck, BinaryMode). -get(FileNameOrHandle, Key) -> - get(FileNameOrHandle, Key, no_cache, true). +get(FileNameOrHandle, Key, BinaryMode) -> + get(FileNameOrHandle, Key, no_cache, true, BinaryMode). -get(FileName, Key, Cache, QuickCheck) when is_list(FileName) -> +get(FileName, Key, Cache, QuickCheck, BinaryMode) when is_list(FileName) -> {ok, Handle} = file:open(FileName,[binary, raw, read]), - get(Handle, Key, Cache, QuickCheck); -get(Handle, Key, Cache, QuickCheck) when is_tuple(Handle) -> + get(Handle, Key, Cache, QuickCheck, BinaryMode); +get(Handle, Key, Cache, QuickCheck, BinaryMode) when is_tuple(Handle) -> Hash = hash(Key), Index = hash_to_index(Hash), {HashTable, Count} = get_index(Handle, Index, Cache), @@ -722,7 +747,8 @@ get(Handle, Key, Cache, QuickCheck) when is_tuple(Handle) -> lists:append(L2, L1), Hash, Key, - QuickCheck) + QuickCheck, + BinaryMode) end. get_index(Handle, Index, no_cache) -> @@ -736,13 +762,13 @@ get_index(_Handle, Index, Cache) -> %% Get a Key/Value pair from an active CDB file (with no hash table written) %% This requires a key dictionary to be passed in (mapping keys to positions) %% Will return {Key, Value} or missing -get_mem(Key, FNOrHandle, HashTree) -> - get_mem(Key, FNOrHandle, HashTree, true). +get_mem(Key, FNOrHandle, HashTree, BinaryMode) -> + get_mem(Key, FNOrHandle, HashTree, BinaryMode, true). -get_mem(Key, Filename, HashTree, QuickCheck) when is_list(Filename) -> +get_mem(Key, Filename, HashTree, BinaryMode, QuickCheck) when is_list(Filename) -> {ok, Handle} = file:open(Filename, [binary, raw, read]), - get_mem(Key, Handle, HashTree, QuickCheck); -get_mem(Key, Handle, HashTree, QuickCheck) -> + get_mem(Key, Handle, HashTree, BinaryMode, QuickCheck); +get_mem(Key, Handle, HashTree, BinaryMode, QuickCheck) -> ListToCheck = get_hashtree(Key, HashTree), case {QuickCheck, ListToCheck} of {loose_presence, []} -> @@ -750,7 +776,7 @@ get_mem(Key, Handle, HashTree, QuickCheck) -> {loose_presence, _L} -> probably; _ -> - extract_kvpair(Handle, ListToCheck, Key) + extract_kvpair(Handle, ListToCheck, Key, BinaryMode) end. %% Get the next key at a position in the file (or the first key if no position @@ -767,7 +793,7 @@ get_nextkey(Handle, {Position, FirstHashPosition}) -> {ok, Position} = file:position(Handle, Position), case read_next_2_integers(Handle) of {KeyLength, ValueLength} -> - NextKey = read_next_term(Handle, KeyLength), + NextKey = read_next_key(Handle, KeyLength), NextPosition = Position + KeyLength + ValueLength + ?DWORD_SIZE, case NextPosition of FirstHashPosition -> @@ -830,7 +856,7 @@ find_lastkey(Handle, IndexCache) -> _ -> {ok, _} = file:position(Handle, LastPosition), {KeyLength, _ValueLength} = read_next_2_integers(Handle), - read_next_term(Handle, KeyLength) + read_next_key(Handle, KeyLength) end. @@ -902,39 +928,49 @@ put_hashtree(Key, Position, HashTree) -> %% Function to extract a Key-Value pair given a file handle and a position %% Will confirm that the key matches and do a CRC check -extract_kvpair(_, [], _) -> +extract_kvpair(_H, [], _K, _BinaryMode) -> missing; -extract_kvpair(Handle, [Position|Rest], Key) -> +extract_kvpair(Handle, [Position|Rest], Key, BinaryMode) -> {ok, _} = file:position(Handle, Position), {KeyLength, ValueLength} = read_next_2_integers(Handle), - case safe_read_next_term(Handle, KeyLength) of + case safe_read_next_key(Handle, KeyLength) of Key -> % If same key as passed in, then found! - case read_next_term(Handle, ValueLength, crc) of + case read_next_value(Handle, ValueLength, crc) of {false, _} -> crc_wonky; {_, Value} -> - {Key,Value} + case BinaryMode of + true -> + {Key, Value}; + false -> + {Key, binary_to_term(Value)} + end end; _ -> - extract_kvpair(Handle, Rest, Key) + extract_kvpair(Handle, Rest, Key, BinaryMode) end. extract_key(Handle, Position) -> {ok, _} = file:position(Handle, Position), {KeyLength, _ValueLength} = read_next_2_integers(Handle), - {safe_read_next_term(Handle, KeyLength)}. + {safe_read_next_key(Handle, KeyLength)}. extract_key_size(Handle, Position) -> {ok, _} = file:position(Handle, Position), {KeyLength, ValueLength} = read_next_2_integers(Handle), - {safe_read_next_term(Handle, KeyLength), ValueLength}. + {safe_read_next_key(Handle, KeyLength), ValueLength}. -extract_key_value_check(Handle, Position) -> +extract_key_value_check(Handle, Position, BinaryMode) -> {ok, _} = file:position(Handle, Position), {KeyLength, ValueLength} = read_next_2_integers(Handle), - K = safe_read_next_term(Handle, KeyLength), - {Check, V} = read_next_term(Handle, ValueLength, crc), - {K, V, Check}. + K = safe_read_next_key(Handle, KeyLength), + {Check, V} = read_next_value(Handle, ValueLength, crc), + case BinaryMode of + true -> + {K, V, Check}; + false -> + {K, binary_to_term(V), Check} + end. %% Scan through the file until there is a failure to crc check an input, and %% at that point return the position and the key dictionary scanned so far @@ -1018,7 +1054,7 @@ saferead_keyvalue(Handle) -> eof -> false; {KeyL, ValueL} -> - case safe_read_next_term(Handle, KeyL) of + case safe_read_next_key(Handle, KeyL) of {error, _} -> false; eof -> @@ -1041,8 +1077,8 @@ saferead_keyvalue(Handle) -> end. -safe_read_next_term(Handle, Length) -> - try read_next_term(Handle, Length) of +safe_read_next_key(Handle, Length) -> + try read_next_key(Handle, Length) of Term -> Term catch @@ -1074,7 +1110,7 @@ calc_crc(Value) -> erlang:crc32(<>) end. -read_next_term(Handle, Length) -> +read_next_key(Handle, Length) -> case file:read(Handle, Length) of {ok, Bin} -> binary_to_term(Bin); @@ -1082,13 +1118,14 @@ read_next_term(Handle, Length) -> ReadError end. + %% Read next string where the string has a CRC prepended - stripping the crc %% and checking if requested -read_next_term(Handle, Length, crc) -> +read_next_value(Handle, Length, crc) -> {ok, <>} = file:read(Handle, Length), case calc_crc(Bin) of CRC -> - {true, binary_to_term(Bin)}; + {true, Bin}; _ -> {false, crc_wonky} end. @@ -1096,7 +1133,7 @@ read_next_term(Handle, Length, crc) -> %% Extract value and size from binary containing CRC extract_valueandsize(ValueAsBin) -> <<_CRC:32/integer, Bin/binary>> = ValueAsBin, - {binary_to_term(Bin), byte_size(Bin)}. + {Bin, byte_size(Bin)}. %% Used for reading lengths @@ -1129,14 +1166,15 @@ read_integerpairs(<>, Pairs) -> %% false - don't check the CRC before returning key & value %% loose_presence - confirm that the hash of the key is present -search_hash_table(Handle, Entries, Hash, Key, QuickCheck) -> - search_hash_table(Handle, Entries, Hash, Key, QuickCheck, 0). +search_hash_table(Handle, Entries, Hash, Key, QuickCheck, BinaryMode) -> + search_hash_table(Handle, Entries, Hash, Key, QuickCheck, BinaryMode, 0). -search_hash_table(_Handle, [], Hash, _Key, _QuickCheck, CycleCount) -> +search_hash_table(_Handle, [], Hash, _Key, + _QuickCheck, _BinaryMode, CycleCount) -> log_cyclecount(CycleCount, Hash, missing), missing; search_hash_table(Handle, [Entry|RestOfEntries], Hash, Key, - QuickCheck, CycleCount) -> + QuickCheck, BinaryMode, CycleCount) -> {ok, _} = file:position(Handle, Entry), {StoredHash, DataLoc} = read_next_2_integers(Handle), case StoredHash of @@ -1145,7 +1183,7 @@ search_hash_table(Handle, [Entry|RestOfEntries], Hash, Key, loose_presence -> probably; _ -> - extract_kvpair(Handle, [DataLoc], Key) + extract_kvpair(Handle, [DataLoc], Key, BinaryMode) end, case KV of missing -> @@ -1154,6 +1192,7 @@ search_hash_table(Handle, [Entry|RestOfEntries], Hash, Key, Hash, Key, QuickCheck, + BinaryMode, CycleCount + 1); _ -> log_cyclecount(CycleCount, Hash, found), @@ -1164,7 +1203,7 @@ search_hash_table(Handle, [Entry|RestOfEntries], Hash, Key, % missing; _ -> search_hash_table(Handle, RestOfEntries, Hash, Key, - QuickCheck, CycleCount + 1) + QuickCheck, BinaryMode, CycleCount + 1) end. log_cyclecount(CycleCount, Hash, Result) -> @@ -1441,15 +1480,15 @@ dump(FileName) -> NumberOfPairs = lists:foldl(Fn, 0, lists:seq(0,255)) bsr 1, io:format("Count of keys in db is ~w~n", [NumberOfPairs]), {ok, _} = file:position(Handle, {bof, 2048}), - Fn1 = fun(_I,Acc) -> - {KL,VL} = read_next_2_integers(Handle), - Key = read_next_term(Handle, KL), - case read_next_term(Handle, VL, crc) of - {_, Value} -> - {ok, CurrLoc} = file:position(Handle, cur), - {Key,Value} = get(Handle, Key) - end, - {ok, _} = file:position(Handle, CurrLoc), + Fn1 = fun(_I, Acc) -> + {KL, VL} = read_next_2_integers(Handle), + Key = read_next_key(Handle, KL), + Value = + case read_next_value(Handle, VL, crc) of + {true, V0} -> + binary_to_term(V0) + end, + {Key, Value} = get(Handle, Key, false), [{Key,Value} | Acc] end, lists:foldr(Fn1, [], lists:seq(0, NumberOfPairs-1)). @@ -1647,16 +1686,13 @@ activewrite_singlewrite_test() -> {LastPosition, KeyDict}), io:format("New key and value added to active file ~n", []), ?assertMatch({Key, Value}, - get_mem(Key, "../test/test_mem.cdb", - UpdKeyDict)), + get_mem(Key, "../test/test_mem.cdb", UpdKeyDict, false)), ?assertMatch(probably, - get_mem(Key, "../test/test_mem.cdb", - UpdKeyDict, - loose_presence)), + get_mem(Key, "../test/test_mem.cdb", UpdKeyDict, + false, loose_presence)), ?assertMatch(missing, - get_mem("not_present", "../test/test_mem.cdb", - UpdKeyDict, - loose_presence)), + get_mem("not_present", "../test/test_mem.cdb", UpdKeyDict, + false, loose_presence)), ok = file:delete("../test/test_mem.cdb"). search_hash_table_findinslot_test() -> @@ -1681,9 +1717,11 @@ search_hash_table_findinslot_test() -> io:format("Slot 1 has Hash ~w Position ~w~n", [ReadH3, ReadP3]), io:format("Slot 2 has Hash ~w Position ~w~n", [ReadH4, ReadP4]), ?assertMatch(0, ReadH4), - ?assertMatch({"key1", "value1"}, get(Handle, Key1)), - ?assertMatch(probably, get(Handle, Key1, no_cache, loose_presence)), - ?assertMatch(missing, get(Handle, "Key99", no_cache, loose_presence)), + ?assertMatch({"key1", "value1"}, get(Handle, Key1, false)), + ?assertMatch(probably, get(Handle, Key1, + no_cache, loose_presence, false)), + ?assertMatch(missing, get(Handle, "Key99", + no_cache, loose_presence, false)), {ok, _} = file:position(Handle, FirstHashPosition), FlipH3 = endian_flip(ReadH3), FlipP3 = endian_flip(ReadP3), @@ -1700,7 +1738,7 @@ search_hash_table_findinslot_test() -> RBin), ok = file:close(Handle), io:format("Find key following change to hash table~n"), - ?assertMatch(missing, get("../test/hashtable1_test.cdb", Key1)), + ?assertMatch(missing, get("../test/hashtable1_test.cdb", Key1, false)), ok = file:delete("../test/hashtable1_test.cdb"). getnextkey_inclemptyvalue_test() -> @@ -1926,7 +1964,7 @@ hashclash_test() -> ?assertMatch(missing, cdb_get(P1, KeyNF)), {ok, FN} = cdb_complete(P1), - {ok, P2} = cdb_open_reader(FN), + {ok, P2} = cdb_open_reader(FN, #cdb_options{binary_mode=false}), ?assertMatch(probably, cdb_keycheck(P2, Key1)), ?assertMatch(probably, cdb_keycheck(P2, Key99)), diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 0e5d17f..500c8e1 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -52,7 +52,8 @@ compact_inkerkvc/2, split_inkvalue/1, check_forinkertype/2, - create_value_for_journal/1, + maybe_compress/1, + create_value_for_journal/2, build_metadata_object/2, generate_ledgerkv/5, get_size/2, @@ -185,20 +186,92 @@ to_inkerkv(LedgerKey, SQN, to_fetch, null) -> {{SQN, ?INKT_STND, LedgerKey}, null, true}; to_inkerkv(LedgerKey, SQN, Object, KeyChanges) -> InkerType = check_forinkertype(LedgerKey, Object), - Value = create_value_for_journal({Object, KeyChanges}), + Value = create_value_for_journal({Object, KeyChanges}, false), {{SQN, InkerType, LedgerKey}, Value}. %% Used when fetching objects, so only handles standard, hashable entries from_inkerkv(Object) -> case Object of {{SQN, ?INKT_STND, PK}, Bin} when is_binary(Bin) -> - {{SQN, PK}, binary_to_term(Bin)}; + {{SQN, PK}, revert_value_from_journal(Bin)}; {{SQN, ?INKT_STND, PK}, Term} -> {{SQN, PK}, Term}; _ -> Object end. +create_value_for_journal({Object, KeyChanges}, Compress) -> + KeyChangeBin = term_to_binary(KeyChanges, [compressed]), + KeyChangeBinLen = byte_size(KeyChangeBin), + ObjectBin = serialise_object(Object, Compress), + TypeCode = encode_valuetype(is_binary(Object), Compress), + <>. + +maybe_compress({null, KeyChanges}) -> + create_value_for_journal({null, KeyChanges}, false); +maybe_compress(JournalBin) -> + Length0 = byte_size(JournalBin) - 1, + <> = JournalBin, + {IsBinary, IsCompressed} = decode_valuetype(Type), + case IsCompressed of + true -> + JournalBin; + false -> + V0 = revert_value_from_journal(JBin0, Length0, IsBinary, false), + create_value_for_journal(V0, true) + end. + +serialise_object(Object, false) when is_binary(Object) -> + Object; +serialise_object(Object, true) when is_binary(Object) -> + zlib:compress(Object); +serialise_object(Object, false) -> + term_to_binary(Object); +serialise_object(Object, true) -> + term_to_binary(Object, [compressed]). + +revert_value_from_journal(JournalBin) -> + Length0 = byte_size(JournalBin) - 1, + <> = JournalBin, + {IsBinary, IsCompressed} = decode_valuetype(Type), + revert_value_from_journal(JBin0, Length0, IsBinary, IsCompressed). + +revert_value_from_journal(ValueBin, ValueLen, IsBinary, IsCompressed) -> + Length1 = ValueLen - 4, + <> = ValueBin, + Length2 = Length1 - KeyChangeLength, + <> = JBin1, + {deserialise_object(OBin2, IsBinary, IsCompressed), + binary_to_term(KCBin2)}. + +deserialise_object(Binary, true, true) -> + zlib:uncompress(Binary); +deserialise_object(Binary, true, false) -> + Binary; +deserialise_object(Binary, false, _) -> + binary_to_term(Binary). + +encode_valuetype(IsBinary, IsCompressed) -> + Bit2 = + case IsBinary of + true -> 2; + false -> 0 + end, + Bit1 = + case IsCompressed of + true -> 1; + false -> 0 + end, + Bit1 + Bit2. + +decode_valuetype(TypeInt) -> + IsCompressed = TypeInt band 1 == 1, + IsBinary = TypeInt band 2 == 2, + {IsBinary, IsCompressed}. + from_journalkey({SQN, _Type, LedgerKey}) -> {SQN, LedgerKey}. @@ -220,7 +293,7 @@ compact_inkerkvc({{SQN, ?INKT_STND, LK}, V, CrcCheck}, Strategy) -> skip -> skip; retain -> - {_V, KeyDeltas} = split_inkvalue(V), + {_V, KeyDeltas} = revert_value_from_journal(V), {retain, {{SQN, ?INKT_KEYD, LK}, {null, KeyDeltas}, CrcCheck}}; TagStrat -> {TagStrat, null} @@ -245,7 +318,7 @@ get_tagstrategy(LK, Strategy) -> split_inkvalue(VBin) -> case is_binary(VBin) of true -> - binary_to_term(VBin); + revert_value_from_journal(VBin); false -> VBin end. @@ -255,14 +328,6 @@ check_forinkertype(_LedgerKey, delete) -> check_forinkertype(_LedgerKey, _Object) -> ?INKT_STND. -create_value_for_journal(Value) -> - case Value of - {Object, KeyChanges} -> - term_to_binary({Object, KeyChanges}, [compressed]); - Value when is_binary(Value) -> - Value - end. - hash(Obj) -> erlang:phash2(term_to_binary(Obj)). diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 9b47884..7b0dc08 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -490,7 +490,8 @@ write_values([], _CDBopts, Journal0, ManSlice0) -> {Journal0, ManSlice0}; write_values(KVCList, CDBopts, Journal0, ManSlice0) -> KVList = lists:map(fun({K, V, _C}) -> - {K, leveled_codec:create_value_for_journal(V)} + % Compress the value as part of compaction + {K, leveled_codec:maybe_compress(V)} end, KVCList), {ok, Journal1} = case Journal0 of @@ -639,11 +640,13 @@ test_ledgerkey(Key) -> {o, "Bucket", Key, null}. test_inkerkv(SQN, Key, V, IdxSpecs) -> - {{SQN, ?INKT_STND, test_ledgerkey(Key)}, term_to_binary({V, IdxSpecs})}. + leveled_codec:to_inkerkv(test_ledgerkey(Key), SQN, V, IdxSpecs). fetch_testcdb(RP) -> FN1 = leveled_inker:filepath(RP, 1, new_journal), - {ok, CDB1} = leveled_cdb:cdb_open_writer(FN1, #cdb_options{}), + {ok, + CDB1} = leveled_cdb:cdb_open_writer(FN1, + #cdb_options{binary_mode=true}), {K1, V1} = test_inkerkv(1, "Key1", "Value1", []), {K2, V2} = test_inkerkv(2, "Key2", "Value2", []), {K3, V3} = test_inkerkv(3, "Key3", "Value3", []), @@ -661,7 +664,7 @@ fetch_testcdb(RP) -> ok = leveled_cdb:cdb_put(CDB1, K7, V7), ok = leveled_cdb:cdb_put(CDB1, K8, V8), {ok, FN2} = leveled_cdb:cdb_complete(CDB1), - leveled_cdb:cdb_open_reader(FN2). + leveled_cdb:cdb_open_reader(FN2, #cdb_options{binary_mode=true}). check_single_file_test() -> RP = "../test/journal", @@ -718,7 +721,7 @@ compact_single_file_recovr_test() -> CDB} = compact_single_file_setup(), [{LowSQN, FN, PidR, _LastKey}] = compact_files([Candidate], - #cdb_options{file_path=CompactFP}, + #cdb_options{file_path=CompactFP, binary_mode=true}, LedgerFun1, LedgerSrv1, 9, @@ -738,11 +741,11 @@ compact_single_file_recovr_test() -> {1, stnd, test_ledgerkey("Key1")})), - {_RK1, RV1} = leveled_cdb:cdb_get(PidR, - {2, - stnd, - test_ledgerkey("Key2")}), - ?assertMatch({"Value2", []}, binary_to_term(RV1)), + RKV1 = leveled_cdb:cdb_get(PidR, + {2, + stnd, + test_ledgerkey("Key2")}), + ?assertMatch({{_, _}, {"Value2", []}}, leveled_codec:from_inkerkv(RKV1)), ok = leveled_cdb:cdb_deletepending(CDB), ok = leveled_cdb:cdb_destroy(CDB). @@ -755,7 +758,7 @@ compact_single_file_retain_test() -> CDB} = compact_single_file_setup(), [{LowSQN, FN, PidR, _LK}] = compact_files([Candidate], - #cdb_options{file_path=CompactFP}, + #cdb_options{file_path=CompactFP, binary_mode=true}, LedgerFun1, LedgerSrv1, 9, @@ -775,11 +778,11 @@ compact_single_file_retain_test() -> {1, stnd, test_ledgerkey("Key1")})), - {_RK1, RV1} = leveled_cdb:cdb_get(PidR, + RKV1 = leveled_cdb:cdb_get(PidR, {2, stnd, test_ledgerkey("Key2")}), - ?assertMatch({"Value2", []}, binary_to_term(RV1)), + ?assertMatch({{_, _}, {"Value2", []}}, leveled_codec:from_inkerkv(RKV1)), ok = leveled_cdb:cdb_deletepending(CDB), ok = leveled_cdb:cdb_destroy(CDB). @@ -815,10 +818,9 @@ compact_singlefile_totwosmallfiles_test() -> {ok, CDB1} = leveled_cdb:cdb_open_writer(FN1, CDBoptsLarge), lists:foreach(fun(X) -> LK = test_ledgerkey("Key" ++ integer_to_list(X)), - Value = term_to_binary({crypto:rand_bytes(1024), []}), - ok = leveled_cdb:cdb_put(CDB1, - {X, ?INKT_STND, LK}, - Value) + Value = crypto:rand_bytes(1024), + {IK, IV} = leveled_codec:to_inkerkv(LK, X, Value, []), + ok = leveled_cdb:cdb_put(CDB1, IK, IV) end, lists:seq(1, 1000)), {ok, NewName} = leveled_cdb:cdb_complete(CDB1), diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index f4b2f33..01b755d 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -724,6 +724,9 @@ initiate_penciller_snapshot(Bookie) -> -ifdef(TEST). +create_value_for_journal(Obj, Comp) -> + leveled_codec:create_value_for_journal(Obj, Comp). + build_dummy_journal() -> F = fun(X) -> X end, build_dummy_journal(F). @@ -740,8 +743,12 @@ build_dummy_journal(KeyConvertF) -> {ok, J1} = leveled_cdb:cdb_open_writer(F1), {K1, V1} = {KeyConvertF("Key1"), "TestValue1"}, {K2, V2} = {KeyConvertF("Key2"), "TestValue2"}, - ok = leveled_cdb:cdb_put(J1, {1, stnd, K1}, term_to_binary({V1, []})), - ok = leveled_cdb:cdb_put(J1, {2, stnd, K2}, term_to_binary({V2, []})), + ok = leveled_cdb:cdb_put(J1, + {1, stnd, K1}, + create_value_for_journal({V1, []}, false)), + ok = leveled_cdb:cdb_put(J1, + {2, stnd, K2}, + create_value_for_journal({V2, []}, false)), ok = leveled_cdb:cdb_roll(J1), LK1 = leveled_cdb:cdb_lastkey(J1), lists:foldl(fun(X, Closed) -> @@ -760,8 +767,12 @@ build_dummy_journal(KeyConvertF) -> {ok, J2} = leveled_cdb:cdb_open_writer(F2), {K1, V3} = {KeyConvertF("Key1"), "TestValue3"}, {K4, V4} = {KeyConvertF("Key4"), "TestValue4"}, - ok = leveled_cdb:cdb_put(J2, {3, stnd, K1}, term_to_binary({V3, []})), - ok = leveled_cdb:cdb_put(J2, {4, stnd, K4}, term_to_binary({V4, []})), + ok = leveled_cdb:cdb_put(J2, + {3, stnd, K1}, + create_value_for_journal({V3, []}, false)), + ok = leveled_cdb:cdb_put(J2, + {4, stnd, K4}, + create_value_for_journal({V4, []}, false)), LK2 = leveled_cdb:cdb_lastkey(J2), ok = leveled_cdb:cdb_close(J2), Manifest = [{1, "../test/journal/journal_files/nursery_1", "pid1", LK1}, @@ -794,20 +805,22 @@ clean_subdir(DirPath) -> simple_inker_test() -> RootPath = "../test/journal", build_dummy_journal(), - CDBopts = #cdb_options{max_size=300000}, + CDBopts = #cdb_options{max_size=300000, binary_mode=true}, {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, cdb_options=CDBopts}), Obj1 = ink_get(Ink1, "Key1", 1), ?assertMatch({{1, "Key1"}, {"TestValue1", []}}, Obj1), - Obj2 = ink_get(Ink1, "Key4", 4), - ?assertMatch({{4, "Key4"}, {"TestValue4", []}}, Obj2), + Obj3 = ink_get(Ink1, "Key1", 3), + ?assertMatch({{3, "Key1"}, {"TestValue3", []}}, Obj3), + Obj4 = ink_get(Ink1, "Key4", 4), + ?assertMatch({{4, "Key4"}, {"TestValue4", []}}, Obj4), ink_close(Ink1), clean_testdir(RootPath). simple_inker_completeactivejournal_test() -> RootPath = "../test/journal", build_dummy_journal(), - CDBopts = #cdb_options{max_size=300000}, + CDBopts = #cdb_options{max_size=300000, binary_mode=true}, JournalFP = filepath(RootPath, journal_dir), F2 = filename:join(JournalFP, "nursery_3.pnd"), {ok, PidW} = leveled_cdb:cdb_open_writer(F2),