More spec/doc work in leveled_codec
Note that at some stage KeyChanges got overloaded to mean {KeyChanges, TTL}, and the spec now tries to make this a bit clearer
This commit is contained in:
parent
aa34ffda5b
commit
2063cacd8f
3 changed files with 73 additions and 35 deletions
|
@ -84,13 +84,13 @@
|
|||
-type tag() ::
|
||||
?STD_TAG|?RIAK_TAG|?IDX_TAG|?HEAD_TAG.
|
||||
-type segment_hash() ::
|
||||
{integer(), integer()}.
|
||||
{integer(), integer()}|no_lookup.
|
||||
-type ledger_status() ::
|
||||
tomb|{active, non_neg_integer()|infinity}.
|
||||
-type ledger_key() ::
|
||||
{tag(), any(), any(), any()}.
|
||||
-type ledger_value() ::
|
||||
{integer(), ledger_status(), segment_hash(), tuple()}.
|
||||
{integer(), ledger_status(), segment_hash(), tuple()|null}.
|
||||
-type ledger_kv() ::
|
||||
{ledger_key(), ledger_value()}.
|
||||
-type compaction_strategy() ::
|
||||
|
@ -101,6 +101,8 @@
|
|||
{integer(), journal_key_tag(), ledger_key()}.
|
||||
-type compression_method() ::
|
||||
lz4|native.
|
||||
-type journal_keychanges() ::
|
||||
{list(), infinity|integer()}. % {KeyChanges, TTL}
|
||||
|
||||
%%%============================================================================
|
||||
%%% Ledger Key Manipulation
|
||||
|
@ -333,7 +335,7 @@ to_inkerkey(LedgerKey, SQN) ->
|
|||
{SQN, ?INKT_STND, LedgerKey}.
|
||||
|
||||
|
||||
-spec to_inkerkv(ledger_key(), non_neg_integer(), any(), list(),
|
||||
-spec to_inkerkv(ledger_key(), non_neg_integer(), any(), journal_keychanges(),
|
||||
compression_method(), boolean()) -> {journal_key(), any()}.
|
||||
%% @doc
|
||||
%% Convert to the correct format of a Journal key and value
|
||||
|
@ -355,6 +357,11 @@ from_inkerkv(Object, ToIgnoreKeyChanges) ->
|
|||
Object
|
||||
end.
|
||||
|
||||
|
||||
-spec create_value_for_journal({any(), journal_keychanges()|binary()},
|
||||
boolean(), compression_method()) -> binary().
|
||||
%% @doc
|
||||
%% Serialise the value to be stored in the Journal
|
||||
create_value_for_journal({Object, KeyChanges}, Compress, Method)
|
||||
when not is_binary(KeyChanges) ->
|
||||
KeyChangeBin = term_to_binary(KeyChanges, [compressed]),
|
||||
|
@ -402,6 +409,10 @@ serialise_object(Object, false, _Method) ->
|
|||
serialise_object(Object, true, _Method) ->
|
||||
term_to_binary(Object, [compressed]).
|
||||
|
||||
-spec revert_value_from_journal(binary()) -> {any(), journal_keychanges()}.
|
||||
%% @doc
|
||||
%% Revert the object back to its deserialised state, along with the list of
|
||||
%% key changes associated with the change
|
||||
revert_value_from_journal(JournalBin) ->
|
||||
revert_value_from_journal(JournalBin, false).
|
||||
|
||||
|
@ -415,7 +426,8 @@ revert_value_from_journal(JournalBin, ToIgnoreKeyChanges) ->
|
|||
case ToIgnoreKeyChanges of
|
||||
true ->
|
||||
<<OBin2:Length1/binary, _KCBin2:KeyChangeLength/binary>> = JBin0,
|
||||
{deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4), []};
|
||||
{deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4),
|
||||
{[], infinity}};
|
||||
false ->
|
||||
<<OBin2:Length1/binary, KCBin2:KeyChangeLength/binary>> = JBin0,
|
||||
{deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4),
|
||||
|
@ -450,12 +462,19 @@ encode_valuetype(IsBinary, IsCompressed, Method) ->
|
|||
end,
|
||||
Bit1 + Bit2 + Bit3.
|
||||
|
||||
|
||||
-spec decode_valuetype(integer()) -> {boolean(), boolean(), boolean()}.
|
||||
%% @doc
|
||||
%% Check bit flags to confirm how the object has been serialised
|
||||
decode_valuetype(TypeInt) ->
|
||||
IsCompressed = TypeInt band 1 == 1,
|
||||
IsBinary = TypeInt band 2 == 2,
|
||||
IsLz4 = TypeInt band 4 == 4,
|
||||
{IsBinary, IsCompressed, IsLz4}.
|
||||
|
||||
-spec from_journalkey(journal_key()) -> {integer(), ledger_key()}.
|
||||
%% @doc
|
||||
%% Return just SQN and Ledger Key
|
||||
from_journalkey({SQN, _Type, LedgerKey}) ->
|
||||
{SQN, LedgerKey}.
|
||||
|
||||
|
@ -480,13 +499,21 @@ hash(Obj) ->
|
|||
%%%============================================================================
|
||||
|
||||
|
||||
-spec obj_objectspecs(list(tuple()), integer(), integer()|infinity)
|
||||
-> list(ledger_kv()).
|
||||
%% @doc
|
||||
%% Convert object specs to KV entries ready for the ledger
|
||||
obj_objectspecs(ObjectSpecs, SQN, TTL) ->
|
||||
lists:map(fun({IdxOp, Bucket, Key, SubKey, Value}) ->
|
||||
gen_headspec(Bucket, Key, IdxOp, SubKey, Value, SQN, TTL)
|
||||
end,
|
||||
ObjectSpecs).
|
||||
|
||||
|
||||
-spec idx_indexspecs(list(tuple()),
|
||||
any(), any(), integer(), integer()|infinity)
|
||||
-> list(ledger_kv()).
|
||||
%% @doc
|
||||
%% Convert index specs to KV entries ready for the ledger
|
||||
idx_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) ->
|
||||
lists:map(
|
||||
fun({IdxOp, IdxFld, IdxTrm}) ->
|
||||
|
|
|
@ -821,14 +821,14 @@ fetch_testcdb(RP) ->
|
|||
{ok,
|
||||
CDB1} = leveled_cdb:cdb_open_writer(FN1,
|
||||
#cdb_options{binary_mode=true}),
|
||||
{K1, V1} = test_inkerkv(1, "Key1", "Value1", []),
|
||||
{K2, V2} = test_inkerkv(2, "Key2", "Value2", []),
|
||||
{K3, V3} = test_inkerkv(3, "Key3", "Value3", []),
|
||||
{K4, V4} = test_inkerkv(4, "Key1", "Value4", []),
|
||||
{K5, V5} = test_inkerkv(5, "Key1", "Value5", []),
|
||||
{K6, V6} = test_inkerkv(6, "Key1", "Value6", []),
|
||||
{K7, V7} = test_inkerkv(7, "Key1", "Value7", []),
|
||||
{K8, V8} = test_inkerkv(8, "Key1", "Value8", []),
|
||||
{K1, V1} = test_inkerkv(1, "Key1", "Value1", {[], infinity}),
|
||||
{K2, V2} = test_inkerkv(2, "Key2", "Value2", {[], infinity}),
|
||||
{K3, V3} = test_inkerkv(3, "Key3", "Value3", {[], infinity}),
|
||||
{K4, V4} = test_inkerkv(4, "Key1", "Value4", {[], infinity}),
|
||||
{K5, V5} = test_inkerkv(5, "Key1", "Value5", {[], infinity}),
|
||||
{K6, V6} = test_inkerkv(6, "Key1", "Value6", {[], infinity}),
|
||||
{K7, V7} = test_inkerkv(7, "Key1", "Value7", {[], infinity}),
|
||||
{K8, V8} = test_inkerkv(8, "Key1", "Value8", {[], infinity}),
|
||||
ok = leveled_cdb:cdb_put(CDB1, K1, V1),
|
||||
ok = leveled_cdb:cdb_put(CDB1, K2, V2),
|
||||
ok = leveled_cdb:cdb_put(CDB1, K3, V3),
|
||||
|
@ -920,7 +920,8 @@ compact_single_file_recovr_test() ->
|
|||
{2,
|
||||
stnd,
|
||||
test_ledgerkey("Key2")}),
|
||||
?assertMatch({{_, _}, {"Value2", []}}, leveled_codec:from_inkerkv(RKV1)),
|
||||
?assertMatch({{_, _}, {"Value2", {[], infinity}}},
|
||||
leveled_codec:from_inkerkv(RKV1)),
|
||||
ok = leveled_cdb:cdb_deletepending(CDB),
|
||||
ok = leveled_cdb:cdb_destroy(CDB).
|
||||
|
||||
|
@ -958,7 +959,8 @@ compact_single_file_retain_test() ->
|
|||
{2,
|
||||
stnd,
|
||||
test_ledgerkey("Key2")}),
|
||||
?assertMatch({{_, _}, {"Value2", []}}, leveled_codec:from_inkerkv(RKV1)),
|
||||
?assertMatch({{_, _}, {"Value2", {[], infinity}}},
|
||||
leveled_codec:from_inkerkv(RKV1)),
|
||||
ok = leveled_cdb:cdb_deletepending(CDB),
|
||||
ok = leveled_cdb:cdb_destroy(CDB).
|
||||
|
||||
|
@ -1001,7 +1003,8 @@ compact_singlefile_totwosmallfiles_testto() ->
|
|||
LK = test_ledgerkey("Key" ++ integer_to_list(X)),
|
||||
Value = leveled_rand:rand_bytes(1024),
|
||||
{IK, IV} =
|
||||
leveled_codec:to_inkerkv(LK, X, Value, [],
|
||||
leveled_codec:to_inkerkv(LK, X, Value,
|
||||
{[], infinity},
|
||||
native, true),
|
||||
ok = leveled_cdb:cdb_put(CDB1, IK, IV)
|
||||
end,
|
||||
|
|
|
@ -127,6 +127,7 @@
|
|||
-define(PENDING_FILEX, "pnd").
|
||||
-define(LOADING_PAUSE, 1000).
|
||||
-define(LOADING_BATCH, 1000).
|
||||
-define(TEST_KC, {[], infinity}).
|
||||
|
||||
-record(state, {manifest = [] :: list(),
|
||||
manifest_sqn = 0 :: integer(),
|
||||
|
@ -671,7 +672,10 @@ get_cdbopts(InkOpts)->
|
|||
CDBopts#cdb_options{waste_path = WasteFP}.
|
||||
|
||||
|
||||
-spec put_object(tuple(), any(), list(), ink_state())
|
||||
-spec put_object(leveled_codec:ledger_key(),
|
||||
any(),
|
||||
leveled_codec:journal_keychanges(),
|
||||
ink_state())
|
||||
-> {ok|rolling, ink_state(), integer()}.
|
||||
%% @doc
|
||||
%% Add the object to the current journal if it fits. If it doesn't fit, a new
|
||||
|
@ -1051,12 +1055,14 @@ build_dummy_journal(KeyConvertF) ->
|
|||
{ok, J1} = leveled_cdb:cdb_open_writer(F1),
|
||||
{K1, V1} = {KeyConvertF("Key1"), "TestValue1"},
|
||||
{K2, V2} = {KeyConvertF("Key2"), "TestValue2"},
|
||||
ok = leveled_cdb:cdb_put(J1,
|
||||
ok =
|
||||
leveled_cdb:cdb_put(J1,
|
||||
{1, stnd, K1},
|
||||
create_value_for_journal({V1, []}, false)),
|
||||
ok = leveled_cdb:cdb_put(J1,
|
||||
create_value_for_journal({V1, ?TEST_KC}, false)),
|
||||
ok =
|
||||
leveled_cdb:cdb_put(J1,
|
||||
{2, stnd, K2},
|
||||
create_value_for_journal({V2, []}, false)),
|
||||
create_value_for_journal({V2, ?TEST_KC}, false)),
|
||||
ok = leveled_cdb:cdb_roll(J1),
|
||||
LK1 = leveled_cdb:cdb_lastkey(J1),
|
||||
lists:foldl(fun(X, Closed) ->
|
||||
|
@ -1075,12 +1081,14 @@ build_dummy_journal(KeyConvertF) ->
|
|||
{ok, J2} = leveled_cdb:cdb_open_writer(F2),
|
||||
{K1, V3} = {KeyConvertF("Key1"), "TestValue3"},
|
||||
{K4, V4} = {KeyConvertF("Key4"), "TestValue4"},
|
||||
ok = leveled_cdb:cdb_put(J2,
|
||||
ok =
|
||||
leveled_cdb:cdb_put(J2,
|
||||
{3, stnd, K1},
|
||||
create_value_for_journal({V3, []}, false)),
|
||||
ok = leveled_cdb:cdb_put(J2,
|
||||
create_value_for_journal({V3, ?TEST_KC}, false)),
|
||||
ok =
|
||||
leveled_cdb:cdb_put(J2,
|
||||
{4, stnd, K4},
|
||||
create_value_for_journal({V4, []}, false)),
|
||||
create_value_for_journal({V4, ?TEST_KC}, false)),
|
||||
LK2 = leveled_cdb:cdb_lastkey(J2),
|
||||
ok = leveled_cdb:cdb_close(J2),
|
||||
Manifest = [{1, "../test/journal/journal_files/nursery_1", "pid1", LK1},
|
||||
|
@ -1119,11 +1127,11 @@ simple_inker_test() ->
|
|||
compression_method=native,
|
||||
compress_on_receipt=true}),
|
||||
Obj1 = ink_get(Ink1, "Key1", 1),
|
||||
?assertMatch({{1, "Key1"}, {"TestValue1", []}}, Obj1),
|
||||
?assertMatch({{1, "Key1"}, {"TestValue1", ?TEST_KC}}, Obj1),
|
||||
Obj3 = ink_get(Ink1, "Key1", 3),
|
||||
?assertMatch({{3, "Key1"}, {"TestValue3", []}}, Obj3),
|
||||
?assertMatch({{3, "Key1"}, {"TestValue3", ?TEST_KC}}, Obj3),
|
||||
Obj4 = ink_get(Ink1, "Key4", 4),
|
||||
?assertMatch({{4, "Key4"}, {"TestValue4", []}}, Obj4),
|
||||
?assertMatch({{4, "Key4"}, {"TestValue4", ?TEST_KC}}, Obj4),
|
||||
ink_close(Ink1),
|
||||
clean_testdir(RootPath).
|
||||
|
||||
|
@ -1143,9 +1151,9 @@ simple_inker_completeactivejournal_test() ->
|
|||
compression_method=native,
|
||||
compress_on_receipt=true}),
|
||||
Obj1 = ink_get(Ink1, "Key1", 1),
|
||||
?assertMatch({{1, "Key1"}, {"TestValue1", []}}, Obj1),
|
||||
?assertMatch({{1, "Key1"}, {"TestValue1", ?TEST_KC}}, Obj1),
|
||||
Obj2 = ink_get(Ink1, "Key4", 4),
|
||||
?assertMatch({{4, "Key4"}, {"TestValue4", []}}, Obj2),
|
||||
?assertMatch({{4, "Key4"}, {"TestValue4", ?TEST_KC}}, Obj2),
|
||||
ink_close(Ink1),
|
||||
clean_testdir(RootPath).
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue