diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 4999f88..1bc170c 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -255,7 +255,7 @@ book_start(Opts) -> %% %% Put an item in the store but with a Time To Live - the time when the object %% should expire, in gregorian_seconds (add the required number of seconds to -%% leveled_codec:integer_time/1). +%% leveled_util:integer_time/1). %% %% There exists the possibility of per object expiry times, not just whole %% store expiry times as has traditionally been the feature in Riak. Care @@ -654,7 +654,7 @@ handle_call({get, Bucket, Key, Tag}, _From, State) tomb -> not_found; {active, TS} -> - case TS >= leveled_codec:integer_now() of + case TS >= leveled_util:integer_now() of false -> not_found; true -> @@ -695,7 +695,7 @@ handle_call({head, Bucket, Key, Tag}, _From, State) {_SeqN, tomb, _MH, _MD} -> {reply, not_found, State}; {_SeqN, {active, TS}, _MH, MD} -> - case TS >= leveled_codec:integer_now() of + case TS >= leveled_util:integer_now() of true -> {SWr, UpdTimingsP} = update_timings(SWp, @@ -1649,7 +1649,7 @@ ttl_test() -> {ok, Bookie1} = book_start([{root_path, RootPath}]), ObjL1 = generate_multiple_objects(100, 1), % Put in all the objects with a TTL in the future - Future = leveled_codec:integer_now() + 300, + Future = leveled_util:integer_now() + 300, lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1, "Bucket", K, V, S, ?STD_TAG, @@ -1665,7 +1665,7 @@ ttl_test() -> ObjL1), ObjL2 = generate_multiple_objects(100, 101), - Past = leveled_codec:integer_now() - 300, + Past = leveled_util:integer_now() - 300, lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1, "Bucket", K, V, S, ?STD_TAG, @@ -1741,7 +1741,7 @@ hashlist_query_testto() -> {cache_size, 500}]), ObjL1 = generate_multiple_objects(1200, 1), % Put in all the objects with a TTL in the future - Future = leveled_codec:integer_now() + 300, + Future = leveled_util:integer_now() + 300, lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1, "Bucket", K, V, S, ?STD_TAG, @@ -1749,7 +1749,7 @@ hashlist_query_testto() -> ObjL1), ObjL2 = generate_multiple_objects(20, 1201), % Put in a few objects with a TTL in the past - Past = leveled_codec:integer_now() - 300, + Past = leveled_util:integer_now() - 300, lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1, "Bucket", K, V, S, ?STD_TAG, @@ -1793,7 +1793,7 @@ hashlist_query_withjournalcheck_testto() -> {cache_size, 500}]), ObjL1 = generate_multiple_objects(800, 1), % Put in all the objects with a TTL in the future - Future = leveled_codec:integer_now() + 300, + Future = leveled_util:integer_now() + 300, lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1, "Bucket", K, V, S, ?STD_TAG, @@ -1822,7 +1822,7 @@ foldobjects_vs_hashtree_testto() -> {cache_size, 500}]), ObjL1 = generate_multiple_objects(800, 1), % Put in all the objects with a TTL in the future - Future = leveled_codec:integer_now() + 300, + Future = leveled_util:integer_now() + 300, lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1, "Bucket", K, V, S, ?STD_TAG, @@ -1896,7 +1896,7 @@ foldobjects_vs_foldheads_bybucket_testto() -> ObjL1 = generate_multiple_objects(400, 1), ObjL2 = generate_multiple_objects(400, 1), % Put in all the objects with a TTL in the future - Future = leveled_codec:integer_now() + 300, + Future = leveled_util:integer_now() + 300, lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1, "BucketA", K, V, S, ?STD_TAG, @@ -2034,7 +2034,7 @@ is_empty_test() -> {max_journalsize, 1000000}, {cache_size, 500}]), % Put in an object with a TTL in the future - Future = leveled_codec:integer_now() + 300, + Future = leveled_util:integer_now() + 300, ?assertMatch(true, leveled_bookie:book_isempty(Bookie1, ?STD_TAG)), ok = book_tempput(Bookie1, <<"B">>, <<"K">>, {value, <<"V">>}, [], diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 9adff0e..431710d 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -1543,7 +1543,7 @@ endian_flip(Int) -> X. hash(Key) -> - leveled_codec:magic_hash(Key). + leveled_util:magic_hash(Key). % Get the least significant 8 bits from the hash. hash_to_index(Hash) -> diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index f335099..e8bee97 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -47,7 +47,7 @@ to_ledgerkey/5, from_ledgerkey/1, from_ledgerkey/2, - to_inkerkv/3, + to_inkerkey/2, to_inkerkv/6, from_inkerkv/1, from_inkerkv/2, @@ -64,10 +64,7 @@ idx_indexspecs/5, obj_objectspecs/3, aae_indexspecs/6, - generate_uuid/0, - integer_now/0, riak_extract_metadata/2, - magic_hash/1, segment_hash/1, to_lookup/1, riak_metadata_to_binary/2]). @@ -84,8 +81,31 @@ integer()|null, % Hash of vclock - non-exportable integer()}. % Size in bytes of real object +-type tag() :: ?STD_TAG|?RIAK_TAG|?IDX_TAG|?HEAD_TAG. +-type segment_hash() :: + {integer(), integer()}. +-type ledger_status() :: + tomb|{active, non_neg_integer()|infinity}. +-type ledger_key() :: + {tag(), any(), any(), any()}. +-type ledger_value() :: + {integer(), ledger_status(), segment_hash(), tuple()}. +-type ledger_kv() :: + {ledger_key(), ledger_value()}. +-type compaction_strategy() :: + list({tag(), retain|skip|recalc}). +-type journal_key_tag() :: + ?INKT_STND|?INKT_TOMB|?INKT_MPUT|?INKT_KEYD. +-type journal_key() :: + {integer(), journal_key_tag(), ledger_key()}. +-type compression_method() :: + lz4|native. --spec segment_hash(any()) -> {integer(), integer()}. +%%%============================================================================ +%%% Ledger Key Manipulation +%%%============================================================================ + +-spec segment_hash(ledger_key()|binary()) -> {integer(), integer()}. %% @doc %% Return two 16 bit integers - the segment ID and a second integer for spare %% entropy. The hashed should be used in blooms or indexes such that some @@ -107,28 +127,7 @@ segment_hash(Key) -> segment_hash(term_to_binary(Key)). --spec magic_hash(any()) -> integer(). -%% @doc -%% Use DJ Bernstein magic hash function. Note, this is more expensive than -%% phash2 but provides a much more balanced result. -%% -%% Hash function contains mysterious constants, some explanation here as to -%% what they are - -%% http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function -magic_hash({binary, BinaryKey}) -> - H = 5381, - hash1(H, BinaryKey) band 16#FFFFFFFF; -magic_hash(AnyKey) -> - BK = term_to_binary(AnyKey), - magic_hash({binary, BK}). - -hash1(H, <<>>) -> - H; -hash1(H, <>) -> - H1 = H * 33, - H2 = H1 bxor B, - hash1(H2, Rest). - +-spec to_lookup(ledger_key()) -> lookup|no_lookup. %% @doc %% Should it be possible to lookup a key in the merge tree. This is not true %% For keys that should only be read through range queries. Direct lookup @@ -141,36 +140,30 @@ to_lookup(Key) -> lookup end. --spec generate_uuid() -> list(). + %% @doc -%% Generate a new globally unique ID as a string. -%% Credit to -%% https://github.com/afiskon/erlang-uuid-v4/blob/master/src/uuid.erl -generate_uuid() -> - <> = leveled_rand:rand_bytes(16), - L = io_lib:format("~8.16.0b-~4.16.0b-4~3.16.0b-~4.16.0b-~12.16.0b", - [A, B, C band 16#0fff, D band 16#3fff bor 16#8000, E]), - binary_to_list(list_to_binary(L)). - -inker_reload_strategy(AltList) -> - ReloadStrategy0 = [{?RIAK_TAG, retain}, {?STD_TAG, retain}], - lists:foldl(fun({X, Y}, SList) -> - lists:keyreplace(X, 1, SList, {X, Y}) - end, - ReloadStrategy0, - AltList). +%% Some helper functions to get a sub_components of the key/value +-spec strip_to_statusonly(ledger_kv()) -> ledger_status(). strip_to_statusonly({_, {_, St, _, _}}) -> St. +-spec strip_to_seqonly(ledger_kv()) -> non_neg_integer(). strip_to_seqonly({_, {SeqN, _, _, _}}) -> SeqN. +-spec strip_to_keyseqonly(ledger_kv()) -> {ledger_key(), integer()}. strip_to_keyseqonly({LK, {SeqN, _, _, _}}) -> {LK, SeqN}. +-spec strip_to_seqnhashonly(ledger_kv()) -> {integer(), segment_hash()}. strip_to_seqnhashonly({_, {SeqN, _, MH, _}}) -> {SeqN, MH}. +-spec striphead_to_details(ledger_value()) -> ledger_value(). striphead_to_details({SeqN, St, MH, MD}) -> {SeqN, St, MH, MD}. - +-spec key_dominates(ledger_kv(), ledger_kv()) -> + left_hand_first|right_hand_first|left_hand_dominant|right_hand_dominant. +%% @doc +%% When comparing two keys in the ledger need to find if one key comes before +%% the other, or if the match, which key is "better" and should be the winner key_dominates(LeftKey, RightKey) -> case {LeftKey, RightKey} of {{LK, _LVAL}, {RK, _RVAL}} when LK < RK -> @@ -185,7 +178,11 @@ key_dominates(LeftKey, RightKey) -> right_hand_dominant end. - +-spec maybe_reap_expiredkey(ledger_kv(), {boolean(), integer()}) -> boolean(). +%% @doc +%% Make a reap decision based on the level in the ledger (needs to be expired +%% and in the basement). the level is a tuple of the is_basement boolean, and +%% a timestamp passed into the calling function maybe_reap_expiredkey(KV, LevelD) -> Status = strip_to_statusonly(KV), maybe_reap(Status, LevelD). @@ -199,6 +196,9 @@ maybe_reap(tomb, {true, _CurrTS}) -> maybe_reap(_, _) -> false. +-spec is_active(ledger_key(), ledger_value(), non_neg_integer()) -> boolean(). +%% @doc +%% Is this an active KV pair or has the timestamp expired is_active(Key, Value, Now) -> case strip_to_statusonly({Key, Value}) of {active, infinity} -> @@ -231,21 +231,116 @@ from_ledgerkey({?IDX_TAG, Bucket, {_IdxFld, IdxVal}, Key}) -> from_ledgerkey({_Tag, Bucket, Key, _SubKey}) -> {Bucket, Key}. +-spec to_ledgerkey(any(), any(), tag(), any(), any()) -> ledger_key(). +%% @doc +%% Convert something into a ledger key to_ledgerkey(Bucket, Key, Tag, Field, Value) when Tag == ?IDX_TAG -> {?IDX_TAG, Bucket, {Field, Value}, Key}. +-spec to_ledgerkey(any(), any(), tag()) -> ledger_key(). +%% @doc +%% Convert something into a ledger key to_ledgerkey(Bucket, {Key, SubKey}, ?HEAD_TAG) -> {?HEAD_TAG, Bucket, Key, SubKey}; to_ledgerkey(Bucket, Key, Tag) -> {Tag, Bucket, Key, null}. +-spec endkey_passed(ledger_key(), ledger_key()) -> boolean(). +%% @oc +%% Compare a key against a query key, only comparing elements that are non-null +%% in the Query key. This is used for comparing against end keys in queries. +endkey_passed(all, _) -> + false; +endkey_passed({EK1, null, null, null}, {CK1, _, _, _}) -> + EK1 < CK1; +endkey_passed({EK1, EK2, null, null}, {CK1, CK2, _, _}) -> + {EK1, EK2} < {CK1, CK2}; +endkey_passed({EK1, EK2, EK3, null}, {CK1, CK2, CK3, _}) -> + {EK1, EK2, EK3} < {CK1, CK2, CK3}; +endkey_passed(EndKey, CheckingKey) -> + EndKey < CheckingKey. -%% Return the Key, Value and Hash Option for this object. The hash option -%% indicates whether the key would ever be looked up directly, and so if it -%% requires an entry in the hash table -to_inkerkv(LedgerKey, SQN, to_fetch) -> - {{SQN, ?INKT_STND, LedgerKey}, null, true}. +%%%============================================================================ +%%% Journal Compaction functions +%%%============================================================================ + +-spec inker_reload_strategy(compaction_strategy()) -> compaction_strategy(). +%% @doc +%% Take the default startegy for compaction, and override the approach for any +%% tags passed in +inker_reload_strategy(AltList) -> + ReloadStrategy0 = [{?RIAK_TAG, retain}, {?STD_TAG, retain}], + lists:foldl(fun({X, Y}, SList) -> + lists:keyreplace(X, 1, SList, {X, Y}) + end, + ReloadStrategy0, + AltList). + + +-spec compact_inkerkvc({journal_key(), any(), boolean()}, + compaction_strategy()) -> + skip|{retain, any()}|{recalc, null}. +%% @doc +%% Decide whether a superceded object should be replicated in the compacted +%% file and in what format. +compact_inkerkvc({_InkerKey, crc_wonky, false}, _Strategy) -> + skip; +compact_inkerkvc({{_SQN, ?INKT_TOMB, _LK}, _V, _CrcCheck}, _Strategy) -> + skip; +compact_inkerkvc({{SQN, ?INKT_KEYD, LK}, V, CrcCheck}, Strategy) -> + case get_tagstrategy(LK, Strategy) of + skip -> + skip; + retain -> + {retain, {{SQN, ?INKT_KEYD, LK}, V, CrcCheck}}; + TagStrat -> + {TagStrat, null} + end; +compact_inkerkvc({{SQN, ?INKT_STND, LK}, V, CrcCheck}, Strategy) -> + case get_tagstrategy(LK, Strategy) of + skip -> + skip; + retain -> + {_V, KeyDeltas} = revert_value_from_journal(V), + {retain, {{SQN, ?INKT_KEYD, LK}, {null, KeyDeltas}, CrcCheck}}; + TagStrat -> + {TagStrat, null} + end. + +-spec get_tagstrategy(ledger_key(), compaction_strategy()) + -> skip|retain|recalc. +%% @doc +%% Work out the compaction startegy for the key +get_tagstrategy(LK, Strategy) -> + case LK of + {Tag, _, _, _} -> + case lists:keyfind(Tag, 1, Strategy) of + {Tag, TagStrat} -> + TagStrat; + false -> + leveled_log:log("IC012", [Tag, Strategy]), + skip + end; + _ -> + skip + end. + +%%%============================================================================ +%%% Manipulate Journal Key and Value +%%%============================================================================ + +-spec to_inkerkey(ledger_key(), non_neg_integer()) -> journal_key(). +%% @doc +%% convertion from ledger_key to journal_key to allow for the key to be fetched +to_inkerkey(LedgerKey, SQN) -> + {SQN, ?INKT_STND, LedgerKey}. + + +-spec to_inkerkv(ledger_key(), non_neg_integer(), any(), list(), + compression_method(), boolean()) -> {journal_key(), any()}. +%% @doc +%% Convert to the correct format of a Journal key and value to_inkerkv(LedgerKey, SQN, Object, KeyChanges, PressMethod, Compress) -> InkerType = check_forinkertype(LedgerKey, Object), Value = @@ -368,45 +463,6 @@ decode_valuetype(TypeInt) -> from_journalkey({SQN, _Type, LedgerKey}) -> {SQN, LedgerKey}. -compact_inkerkvc({_InkerKey, crc_wonky, false}, _Strategy) -> - skip; -compact_inkerkvc({{_SQN, ?INKT_TOMB, _LK}, _V, _CrcCheck}, _Strategy) -> - skip; -compact_inkerkvc({{SQN, ?INKT_KEYD, LK}, V, CrcCheck}, Strategy) -> - case get_tagstrategy(LK, Strategy) of - skip -> - skip; - retain -> - {retain, {{SQN, ?INKT_KEYD, LK}, V, CrcCheck}}; - TagStrat -> - {TagStrat, null} - end; -compact_inkerkvc({{SQN, ?INKT_STND, LK}, V, CrcCheck}, Strategy) -> - case get_tagstrategy(LK, Strategy) of - skip -> - skip; - retain -> - {_V, KeyDeltas} = revert_value_from_journal(V), - {retain, {{SQN, ?INKT_KEYD, LK}, {null, KeyDeltas}, CrcCheck}}; - TagStrat -> - {TagStrat, null} - end; -compact_inkerkvc(_KVC, _Strategy) -> - skip. - -get_tagstrategy(LK, Strategy) -> - case LK of - {Tag, _, _, _} -> - case lists:keyfind(Tag, 1, Strategy) of - {Tag, TagStrat} -> - TagStrat; - false -> - leveled_log:log("IC012", [Tag, Strategy]), - skip - end; - _ -> - skip - end. split_inkvalue(VBin) when is_binary(VBin) -> revert_value_from_journal(VBin). @@ -422,18 +478,10 @@ hash(Obj) -> erlang:phash2(term_to_binary(Obj)). -% Compare a key against a query key, only comparing elements that are non-null -% in the Query key. This is used for comparing against end keys in queries. -endkey_passed(all, _) -> - false; -endkey_passed({EK1, null, null, null}, {CK1, _, _, _}) -> - EK1 < CK1; -endkey_passed({EK1, EK2, null, null}, {CK1, CK2, _, _}) -> - {EK1, EK2} < {CK1, CK2}; -endkey_passed({EK1, EK2, EK3, null}, {CK1, CK2, CK3, _}) -> - {EK1, EK2, EK3} < {CK1, CK2, CK3}; -endkey_passed(EndKey, CheckingKey) -> - EndKey < CheckingKey. + +%%%============================================================================ +%%% Other functions +%%%============================================================================ obj_objectspecs(ObjectSpecs, SQN, TTL) -> @@ -528,7 +576,7 @@ aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods) -> Dates = parse_date(LMD0, AAE#recent_aae.unit_minutes, AAE#recent_aae.limit_minutes, - integer_now()), + leveled_util:integer_now()), case Dates of no_index -> Acc; @@ -557,12 +605,12 @@ aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods) -> -spec parse_date(tuple(), integer(), integer(), integer()) -> no_index|{binary(), integer()}. %% @doc -%% Parse the lat modified date and the AAE date configuration to return a +%% Parse the last modified date and the AAE date configuration to return a %% binary to be used as the last modified date part of the index, and an %% integer to be used as the TTL of the index entry. %% Return no_index if the change is not recent. parse_date(LMD, UnitMins, LimitMins, Now) -> - LMDsecs = integer_time(LMD), + LMDsecs = leveled_util:integer_time(LMD), Recent = (LMDsecs + LimitMins * 60) > Now, case Recent of false -> @@ -614,13 +662,6 @@ generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) -> {Bucket, Key, Value, {Hash, ObjHash}, LastMods}. -integer_now() -> - integer_time(os:timestamp()). - -integer_time(TS) -> - DT = calendar:now_to_universal_time(TS), - calendar:datetime_to_gregorian_seconds(DT). - extract_metadata(Obj, Size, ?RIAK_TAG) -> riak_extract_metadata(Obj, Size); extract_metadata(Obj, Size, ?STD_TAG) -> @@ -782,37 +823,18 @@ endkey_passed_test() -> ?assertMatch(true, endkey_passed(TestKey, K2)). -corrupted_ledgerkey_test() -> - % When testing for compacted journal which has been corrupted, there may - % be a corruptes ledger key. Always skip these keys - % Key has become a 3-tuple not a 4-tuple - TagStrat1 = compact_inkerkvc({{1, - ?INKT_STND, - {?STD_TAG, "B1", "K1andSK"}}, - {}, - true}, - [{?STD_TAG, retain}]), - ?assertMatch(skip, TagStrat1), - TagStrat2 = compact_inkerkvc({{1, - ?INKT_KEYD, - {?STD_TAG, "B1", "K1andSK"}}, - {}, - true}, - [{?STD_TAG, retain}]), - ?assertMatch(skip, TagStrat2). - general_skip_strategy_test() -> % Confirm that we will skip if the strategy says so TagStrat1 = compact_inkerkvc({{1, ?INKT_STND, - {?STD_TAG, "B1", "K1andSK"}}, + {?STD_TAG, "B1", "K1andSK", null}}, {}, true}, [{?STD_TAG, skip}]), ?assertMatch(skip, TagStrat1), TagStrat2 = compact_inkerkvc({{1, ?INKT_KEYD, - {?STD_TAG, "B1", "K1andSK"}}, + {?STD_TAG, "B1", "K1andSK", null}}, {}, true}, [{?STD_TAG, skip}]), @@ -839,15 +861,6 @@ general_skip_strategy_test() -> [{?STD_TAG, skip}, {?IDX_TAG, recalc}]), ?assertMatch(skip, TagStrat5). -corrupted_inker_tag_test() -> - % Confirm that we will skip on unknown inker tag - TagStrat1 = compact_inkerkvc({{1, - foo, - {?STD_TAG, "B1", "K1andSK"}}, - {}, - true}, - [{?STD_TAG, retain}]), - ?assertMatch(skip, TagStrat1). %% Test below proved that the overhead of performing hashes was trivial %% Maybe 5 microseconds per hash @@ -859,24 +872,10 @@ hashperf_test() -> io:format(user, "1000 object hashes in ~w microseconds~n", [timer:now_diff(os:timestamp(), SW)]). -magichashperf_test() -> - KeyFun = - fun(X) -> - K = {o, "Bucket", "Key" ++ integer_to_list(X), null}, - {K, X} - end, - KL = lists:map(KeyFun, lists:seq(1, 1000)), - {TimeMH, _HL1} = timer:tc(lists, map, [fun(K) -> magic_hash(K) end, KL]), - io:format(user, "1000 keys magic hashed in ~w microseconds~n", [TimeMH]), - {TimePH, _Hl2} = timer:tc(lists, map, [fun(K) -> erlang:phash2(K) end, KL]), - io:format(user, "1000 keys phash2 hashed in ~w microseconds~n", [TimePH]), - {TimeMH2, _HL1} = timer:tc(lists, map, [fun(K) -> magic_hash(K) end, KL]), - io:format(user, "1000 keys magic hashed in ~w microseconds~n", [TimeMH2]). - parsedate_test() -> {MeS, S, MiS} = os:timestamp(), timer:sleep(100), - Now = integer_now(), + Now = leveled_util:integer_now(), UnitMins = 5, LimitMins = 60, PD = parse_date({MeS, S, MiS}, UnitMins, LimitMins, Now), @@ -898,7 +897,7 @@ check_pd(PD, UnitMins) -> parseolddate_test() -> LMD = os:timestamp(), timer:sleep(100), - Now = integer_now() + 60 * 60, + Now = leveled_util:integer_now() + 60 * 60, UnitMins = 5, LimitMins = 60, PD = parse_date(LMD, UnitMins, LimitMins, Now), diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 5000b5d..717688e 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -736,8 +736,7 @@ get_object(LedgerKey, SQN, Manifest) -> get_object(LedgerKey, SQN, Manifest, ToIgnoreKeyChanges) -> JournalP = leveled_imanifest:find_entry(SQN, Manifest), - {InkerKey, _V, true} = - leveled_codec:to_inkerkv(LedgerKey, SQN, to_fetch), + InkerKey = leveled_codec:to_inkerkey(LedgerKey, SQN), Obj = leveled_cdb:cdb_get(JournalP, InkerKey), leveled_codec:from_inkerkv(Obj, ToIgnoreKeyChanges). @@ -752,8 +751,7 @@ get_object(LedgerKey, SQN, Manifest, ToIgnoreKeyChanges) -> %% the positive answer is 'probably' not 'true' key_check(LedgerKey, SQN, Manifest) -> JournalP = leveled_imanifest:find_entry(SQN, Manifest), - {InkerKey, _V, true} = - leveled_codec:to_inkerkv(LedgerKey, SQN, to_fetch), + InkerKey = leveled_codec:to_inkerkey(LedgerKey, SQN), leveled_cdb:cdb_keycheck(JournalP, InkerKey). @@ -1013,12 +1011,12 @@ filepath(RootPath, journal_waste_dir) -> filepath(RootPath, NewSQN, new_journal) -> filename:join(filepath(RootPath, journal_dir), integer_to_list(NewSQN) ++ "_" - ++ leveled_codec:generate_uuid() + ++ leveled_util:generate_uuid() ++ "." ++ ?PENDING_FILEX); filepath(CompactFilePath, NewSQN, compact_journal) -> filename:join(CompactFilePath, integer_to_list(NewSQN) ++ "_" - ++ leveled_codec:generate_uuid() + ++ leveled_util:generate_uuid() ++ "." ++ ?PENDING_FILEX). diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index 81cc7f2..1b65f7d 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -266,7 +266,7 @@ generate_randomkeys(Count, Acc, BucketLow, BRange) -> BNumber = string:right(integer_to_list(BucketLow + leveled_rand:uniform(BRange)), 4, $0), KNumber = string:right(integer_to_list(leveled_rand:uniform(1000)), 4, $0), - K = {o, "Bucket" ++ BNumber, "Key" ++ KNumber}, + K = {o, "Bucket" ++ BNumber, "Key" ++ KNumber, null}, RandKey = {K, {Count + 1, {active, infinity}, leveled_codec:segment_hash(K), diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index 74ef45a..f896d40 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -403,7 +403,7 @@ foldobjects_byindex(SnapFun, {Tag, Bucket, Field, FromTerm, ToTerm}, FoldFun) -> get_nextbucket(_NextB, _NextK, _Tag, _LS, BKList, {Limit, Limit}) -> BKList; get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList, {C, L}) -> - Now = leveled_codec:integer_now(), + Now = leveled_util:integer_now(), StartKey = leveled_codec:to_ledgerkey(NextBucket, NextKey, Tag), EndKey = leveled_codec:to_ledgerkey(null, null, Tag), ExtractFun = @@ -420,7 +420,7 @@ get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList, {C, L}) -> leveled_log:log("B0008",[]), BKList; {{B, K}, V} when is_binary(B), is_binary(K) -> - case leveled_codec:is_active({B, K}, V, Now) of + case leveled_codec:is_active({Tag, B, K, null}, V, Now) of true -> leveled_log:log("B0009",[B]), get_nextbucket(<>, @@ -497,7 +497,7 @@ foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, SegmentList) -> accumulate_size() -> - Now = leveled_codec:integer_now(), + Now = leveled_util:integer_now(), AccFun = fun(Key, Value, {Size, Count}) -> case leveled_codec:is_active(Key, Value, Now) of true -> @@ -533,7 +533,7 @@ accumulate_tree(FilterFun, JournalCheck, InkerClone, HashFun) -> AddKeyFun). get_hashaccumulator(JournalCheck, InkerClone, AddKeyFun) -> - Now = leveled_codec:integer_now(), + Now = leveled_util:integer_now(), AccFun = fun(LK, V, Acc) -> case leveled_codec:is_active(LK, V, Now) of @@ -559,7 +559,7 @@ get_hashaccumulator(JournalCheck, InkerClone, AddKeyFun) -> accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) -> - Now = leveled_codec:integer_now(), + Now = leveled_util:integer_now(), AccFun = fun(LK, V, Acc) -> % The function takes the Ledger Key and the value from the @@ -642,7 +642,7 @@ check_presence(Key, Value, InkerClone) -> end. accumulate_keys(FoldKeysFun) -> - Now = leveled_codec:integer_now(), + Now = leveled_util:integer_now(), AccFun = fun(Key, Value, Acc) -> case leveled_codec:is_active(Key, Value, Now) of true -> @@ -661,7 +661,7 @@ add_terms(ObjKey, IdxValue) -> {IdxValue, ObjKey}. accumulate_index(TermRe, AddFun, FoldKeysFun) -> - Now = leveled_codec:integer_now(), + Now = leveled_util:integer_now(), case TermRe of undefined -> fun(Key, Value, Acc) -> diff --git a/src/leveled_util.erl b/src/leveled_util.erl new file mode 100644 index 0000000..0539a22 --- /dev/null +++ b/src/leveled_util.erl @@ -0,0 +1,91 @@ +%% -------- Utility Functions --------- +%% +%% Generally helpful funtions within leveled +%% + +-module(leveled_util). + + +-include("include/leveled.hrl"). + +-include_lib("eunit/include/eunit.hrl"). + + +-export([generate_uuid/0, + integer_now/0, + integer_time/1, + magic_hash/1]). + + +-spec generate_uuid() -> list(). +%% @doc +%% Generate a new globally unique ID as a string. +%% Credit to +%% https://github.com/afiskon/erlang-uuid-v4/blob/master/src/uuid.erl +generate_uuid() -> + <> = leveled_rand:rand_bytes(16), + L = io_lib:format("~8.16.0b-~4.16.0b-4~3.16.0b-~4.16.0b-~12.16.0b", + [A, B, C band 16#0fff, D band 16#3fff bor 16#8000, E]), + binary_to_list(list_to_binary(L)). + +-spec integer_now() -> non_neg_integer(). +%% @doc +%% Return now in gregorian seconds +integer_now() -> + integer_time(os:timestamp()). + +-spec integer_time (erlang:timestamp()) -> non_neg_integer(). +%% @doc +%% Return a given time in gergorian seconds +integer_time(TS) -> + DT = calendar:now_to_universal_time(TS), + calendar:datetime_to_gregorian_seconds(DT). + + +-spec magic_hash(any()) -> integer(). +%% @doc +%% Use DJ Bernstein magic hash function. Note, this is more expensive than +%% phash2 but provides a much more balanced result. +%% +%% Hash function contains mysterious constants, some explanation here as to +%% what they are - +%% http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function +magic_hash({binary, BinaryKey}) -> + H = 5381, + hash1(H, BinaryKey) band 16#FFFFFFFF; +magic_hash(AnyKey) -> + BK = term_to_binary(AnyKey), + magic_hash({binary, BK}). + +hash1(H, <<>>) -> + H; +hash1(H, <>) -> + H1 = H * 33, + H2 = H1 bxor B, + hash1(H2, Rest). + + + + +%%%============================================================================ +%%% Test +%%%============================================================================ + +-ifdef(TEST). + + +magichashperf_test() -> + KeyFun = + fun(X) -> + K = {o, "Bucket", "Key" ++ integer_to_list(X), null}, + {K, X} + end, + KL = lists:map(KeyFun, lists:seq(1, 1000)), + {TimeMH, _HL1} = timer:tc(lists, map, [fun(K) -> magic_hash(K) end, KL]), + io:format(user, "1000 keys magic hashed in ~w microseconds~n", [TimeMH]), + {TimePH, _Hl2} = timer:tc(lists, map, [fun(K) -> erlang:phash2(K) end, KL]), + io:format(user, "1000 keys phash2 hashed in ~w microseconds~n", [TimePH]), + {TimeMH2, _HL1} = timer:tc(lists, map, [fun(K) -> magic_hash(K) end, KL]), + io:format(user, "1000 keys magic hashed in ~w microseconds~n", [TimeMH2]). + +-endif. \ No newline at end of file diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index 45b4a83..d6b4db1 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -184,7 +184,7 @@ stdload(_Bookie, 0, Acc) -> Acc; stdload(Bookie, Count, Acc) -> B = "Bucket", - K = leveled_codec:generate_uuid(), + K = leveled_util:generate_uuid(), V = get_compressiblevalue(), R = leveled_bookie:book_put(Bookie, B, K, V, [], ?STD_TAG), case R of @@ -377,7 +377,7 @@ generate_objects(0, _KeyNumber, ObjL, _Value, _IndexGen, _Bucket) -> ObjL; generate_objects(Count, binary_uuid, ObjL, Value, IndexGen, Bucket) -> {Obj1, Spec1} = set_object(list_to_binary(Bucket), - list_to_binary(leveled_codec:generate_uuid()), + list_to_binary(leveled_util:generate_uuid()), Value, IndexGen), generate_objects(Count - 1,