diff --git a/include/leveled.hrl b/include/leveled.hrl index 8c5d6f8..0e62cf3 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -47,7 +47,8 @@ {max_size :: integer(), file_path :: string(), waste_path :: string(), - binary_mode = false :: boolean()}). + binary_mode = false :: boolean(), + sync_strategy = sync}). -record(inker_options, {cdb_max_size :: integer(), diff --git a/priv/leveled.schema b/priv/leveled.schema new file mode 100644 index 0000000..14dc2e7 --- /dev/null +++ b/priv/leveled.schema @@ -0,0 +1,9 @@ +%% -*- erlang -*- + +%%%% leveled + +%% @doc A path under which bitcask data files will be stored. +{mapping, "leveled.data_root", "leveled.data_root", [ + {default, "$(platform_data_dir)/leveled"}, + {datatype, directory} +]}. diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 46b4a29..f802c3d 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -118,7 +118,7 @@ terminate/2, code_change/3, book_start/1, - book_start/3, + book_start/4, book_put/5, book_put/6, book_tempput/7, @@ -149,7 +149,7 @@ -record(state, {inker :: pid(), penciller :: pid(), cache_size :: integer(), - ledger_cache :: gb_trees:tree(), + ledger_cache :: list(), % a skiplist is_snapshot :: boolean(), slow_offer = false :: boolean()}). @@ -159,10 +159,11 @@ %%% API %%%============================================================================ -book_start(RootPath, LedgerCacheSize, JournalSize) -> +book_start(RootPath, LedgerCacheSize, JournalSize, SyncStrategy) -> book_start([{root_path, RootPath}, {cache_size, LedgerCacheSize}, - {max_journalsize, JournalSize}]). + {max_journalsize, JournalSize}, + {sync_strategy, SyncStrategy}]). book_start(Opts) -> gen_server:start(?MODULE, [Opts], []). @@ -233,14 +234,14 @@ init([Opts]) -> {ok, #state{inker=Inker, penciller=Penciller, cache_size=CacheSize, - ledger_cache=gb_trees:empty(), + ledger_cache=leveled_skiplist:empty(), is_snapshot=false}}; Bookie -> {ok, {Penciller, LedgerCache}, Inker} = book_snapshotstore(Bookie, self(), ?SNAPSHOT_TIMEOUT), ok = leveled_penciller:pcl_loadsnapshot(Penciller, - gb_trees:empty()), + leveled_skiplist:empty()), leveled_log:log("B0002", [Inker, Penciller]), {ok, #state{penciller=Penciller, inker=Inker, @@ -431,7 +432,7 @@ bucket_stats(State, Bucket, Tag) -> {LedgerSnapshot, LedgerCache}, _JournalSnapshot} = snapshot_store(State, ledger), Folder = fun() -> - leveled_log:log("B0004", [gb_trees:size(LedgerCache)]), + leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]), ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, LedgerCache), StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), @@ -454,7 +455,7 @@ binary_bucketlist(State, Tag, {FoldBucketsFun, InitAcc}) -> {LedgerSnapshot, LedgerCache}, _JournalSnapshot} = snapshot_store(State, ledger), Folder = fun() -> - leveled_log:log("B0004", [gb_trees:size(LedgerCache)]), + leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]), ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, LedgerCache), BucketAcc = get_nextbucket(null, @@ -509,7 +510,7 @@ index_query(State, {B, null} end, Folder = fun() -> - leveled_log:log("B0004", [gb_trees:size(LedgerCache)]), + leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]), ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, LedgerCache), StartKey = leveled_codec:to_ledgerkey(Bucket, @@ -551,7 +552,7 @@ hashtree_query(State, Tag, JournalCheck) -> {LedgerSnapshot, LedgerCache}, JournalSnapshot} = snapshot_store(State, SnapType), Folder = fun() -> - leveled_log:log("B0004", [gb_trees:size(LedgerCache)]), + leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]), ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, LedgerCache), StartKey = leveled_codec:to_ledgerkey(null, null, Tag), @@ -602,7 +603,7 @@ foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun) -> {FoldObjectsFun, []} end, Folder = fun() -> - leveled_log:log("B0004", [gb_trees:size(LedgerCache)]), + leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]), ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, LedgerCache), AccFun = accumulate_objects(FoldFun, JournalSnapshot, Tag), @@ -623,7 +624,7 @@ bucketkey_query(State, Tag, Bucket, {FoldKeysFun, InitAcc}) -> {LedgerSnapshot, LedgerCache}, _JournalSnapshot} = snapshot_store(State, ledger), Folder = fun() -> - leveled_log:log("B0004", [gb_trees:size(LedgerCache)]), + leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]), ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, LedgerCache), SK = leveled_codec:to_ledgerkey(Bucket, null, Tag), @@ -635,7 +636,7 @@ bucketkey_query(State, Tag, Bucket, {FoldKeysFun, InitAcc}) -> AccFun, InitAcc), ok = leveled_penciller:pcl_close(LedgerSnapshot), - lists:reverse(Acc) + Acc end, {async, Folder}. @@ -661,7 +662,7 @@ snapshot_store(State, SnapType) -> set_options(Opts) -> MaxJournalSize = get_opt(max_journalsize, Opts, 10000000000), - + SyncStrat = get_opt(sync_strategy, Opts, sync), WRP = get_opt(waste_retention_period, Opts), AltStrategy = get_opt(reload_strategy, Opts, []), @@ -680,7 +681,8 @@ set_options(Opts) -> max_run_length = get_opt(max_run_length, Opts), waste_retention_period = WRP, cdb_options = #cdb_options{max_size=MaxJournalSize, - binary_mode=true}}, + binary_mode=true, + sync_strategy=SyncStrat}}, #penciller_options{root_path = LedgerFP, max_inmemory_tablesize = PCLL0CacheSize}}. @@ -697,7 +699,7 @@ startup(InkerOpts, PencillerOpts) -> fetch_head(Key, Penciller, LedgerCache) -> - case gb_trees:lookup(Key, LedgerCache) of + case leveled_skiplist:lookup(Key, LedgerCache) of {value, Head} -> Head; none -> @@ -863,18 +865,18 @@ preparefor_ledgercache(_Type, LedgerKey, SQN, Obj, Size, {IndexSpecs, TTL}) -> addto_ledgercache(Changes, Cache) -> - lists:foldl(fun({K, V}, Acc) -> gb_trees:enter(K, V, Acc) end, + lists:foldl(fun({K, V}, Acc) -> leveled_skiplist:enter(K, V, Acc) end, Cache, Changes). maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> - CacheSize = gb_trees:size(Cache), + CacheSize = leveled_skiplist:size(Cache), TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize), if TimeToPush -> case leveled_penciller:pcl_pushmem(Penciller, Cache) of ok -> - {ok, gb_trees:empty()}; + {ok, leveled_skiplist:empty()}; returned -> {returned, Cache} end; @@ -966,112 +968,6 @@ generate_multiple_objects(Count, KeyNumber, ObjL) -> KeyNumber + 1, ObjL ++ [{Key, Value, IndexSpec}]). - -generate_multiple_robjects(Count, KeyNumber) -> - generate_multiple_robjects(Count, KeyNumber, []). - -generate_multiple_robjects(0, _KeyNumber, ObjL) -> - ObjL; -generate_multiple_robjects(Count, KeyNumber, ObjL) -> - Obj = {"Bucket", - "Key" ++ integer_to_list(KeyNumber), - crypto:rand_bytes(1024), - [], - [{"MDK", "MDV" ++ integer_to_list(KeyNumber)}, - {"MDK2", "MDV" ++ integer_to_list(KeyNumber)}]}, - {B1, K1, V1, Spec1, MD} = Obj, - Content = #r_content{metadata=MD, value=V1}, - Obj1 = #r_object{bucket=B1, key=K1, contents=[Content], vclock=[{'a',1}]}, - generate_multiple_robjects(Count - 1, KeyNumber + 1, ObjL ++ [{Obj1, Spec1}]). - - -single_key_test() -> - RootPath = reset_filestructure(), - {ok, Bookie1} = book_start([{root_path, RootPath}]), - {B1, K1, V1, Spec1, MD} = {"Bucket1", - "Key1", - "Value1", - [], - {"MDK1", "MDV1"}}, - Content = #r_content{metadata=MD, value=V1}, - Object = #r_object{bucket=B1, key=K1, contents=[Content], vclock=[{'a',1}]}, - ok = book_put(Bookie1, B1, K1, Object, Spec1, ?RIAK_TAG), - {ok, F1} = book_get(Bookie1, B1, K1, ?RIAK_TAG), - ?assertMatch(F1, Object), - ok = book_close(Bookie1), - {ok, Bookie2} = book_start([{root_path, RootPath}]), - {ok, F2} = book_get(Bookie2, B1, K1, ?RIAK_TAG), - ?assertMatch(F2, Object), - ok = book_close(Bookie2), - reset_filestructure(). - -multi_key_test() -> - RootPath = reset_filestructure(), - {ok, Bookie1} = book_start([{root_path, RootPath}]), - {B1, K1, V1, Spec1, MD1} = {"Bucket", - "Key1", - "Value1", - [], - {"MDK1", "MDV1"}}, - C1 = #r_content{metadata=MD1, value=V1}, - Obj1 = #r_object{bucket=B1, key=K1, contents=[C1], vclock=[{'a',1}]}, - {B2, K2, V2, Spec2, MD2} = {"Bucket", - "Key2", - "Value2", - [], - {"MDK2", "MDV2"}}, - C2 = #r_content{metadata=MD2, value=V2}, - Obj2 = #r_object{bucket=B2, key=K2, contents=[C2], vclock=[{'a',1}]}, - ok = book_put(Bookie1, B1, K1, Obj1, Spec1, ?RIAK_TAG), - ObjL1 = generate_multiple_robjects(20, 3), - SW1 = os:timestamp(), - lists:foreach(fun({O, S}) -> - {B, K} = leveled_codec:riakto_keydetails(O), - ok = book_put(Bookie1, B, K, O, S, ?RIAK_TAG) - end, - ObjL1), - io:format("PUT of 20 objects completed in ~w microseconds~n", - [timer:now_diff(os:timestamp(),SW1)]), - ok = book_put(Bookie1, B2, K2, Obj2, Spec2, ?RIAK_TAG), - {ok, F1A} = book_get(Bookie1, B1, K1, ?RIAK_TAG), - ?assertMatch(F1A, Obj1), - {ok, F2A} = book_get(Bookie1, B2, K2, ?RIAK_TAG), - ?assertMatch(F2A, Obj2), - ObjL2 = generate_multiple_robjects(20, 23), - SW2 = os:timestamp(), - lists:foreach(fun({O, S}) -> - {B, K} = leveled_codec:riakto_keydetails(O), - ok = book_put(Bookie1, B, K, O, S, ?RIAK_TAG) - end, - ObjL2), - io:format("PUT of 20 objects completed in ~w microseconds~n", - [timer:now_diff(os:timestamp(),SW2)]), - {ok, F1B} = book_get(Bookie1, B1, K1, ?RIAK_TAG), - ?assertMatch(F1B, Obj1), - {ok, F2B} = book_get(Bookie1, B2, K2, ?RIAK_TAG), - ?assertMatch(F2B, Obj2), - ok = book_close(Bookie1), - % Now reopen the file, and confirm that a fetch is still possible - {ok, Bookie2} = book_start([{root_path, RootPath}]), - {ok, F1C} = book_get(Bookie2, B1, K1, ?RIAK_TAG), - ?assertMatch(F1C, Obj1), - {ok, F2C} = book_get(Bookie2, B2, K2, ?RIAK_TAG), - ?assertMatch(F2C, Obj2), - ObjL3 = generate_multiple_robjects(20, 43), - SW3 = os:timestamp(), - lists:foreach(fun({O, S}) -> - {B, K} = leveled_codec:riakto_keydetails(O), - ok = book_put(Bookie2, B, K, O, S, ?RIAK_TAG) - end, - ObjL3), - io:format("PUT of 20 objects completed in ~w microseconds~n", - [timer:now_diff(os:timestamp(),SW3)]), - {ok, F1D} = book_get(Bookie2, B1, K1, ?RIAK_TAG), - ?assertMatch(F1D, Obj1), - {ok, F2D} = book_get(Bookie2, B2, K2, ?RIAK_TAG), - ?assertMatch(F2D, Obj2), - ok = book_close(Bookie2), - reset_filestructure(). ttl_test() -> RootPath = reset_filestructure(), diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index d6f1e96..1354571 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -107,7 +107,8 @@ delete_point = 0 :: integer(), inker :: pid(), deferred_delete = false :: boolean(), - waste_path :: string()}). + waste_path :: string(), + sync_strategy = none}). %%%============================================================================ @@ -222,12 +223,15 @@ init([Opts]) -> starting, #state{max_size=MaxSize, binary_mode=Opts#cdb_options.binary_mode, - waste_path=Opts#cdb_options.waste_path}}. + waste_path=Opts#cdb_options.waste_path, + sync_strategy=Opts#cdb_options.sync_strategy}}. starting({open_writer, Filename}, _From, State) -> leveled_log:log("CDB01", [Filename]), {LastPosition, HashTree, LastKey} = open_active_file(Filename), - {ok, Handle} = file:open(Filename, [sync | ?WRITE_OPS]), + WriteOps = set_writeops(State#state.sync_strategy), + leveled_log:log("CDB13", [WriteOps]), + {ok, Handle} = file:open(Filename, WriteOps), {reply, ok, writer, State#state{handle=Handle, last_position=LastPosition, last_key=LastKey, @@ -263,6 +267,13 @@ writer({put_kv, Key, Value}, _From, State) -> %% Key and value could not be written {reply, roll, writer, State}; {UpdHandle, NewPosition, HashTree} -> + ok = + case State#state.sync_strategy of + riak_sync -> + file:datasync(UpdHandle); + _ -> + ok + end, {reply, ok, writer, State#state{handle=UpdHandle, last_position=NewPosition, last_key=Key, @@ -381,23 +392,32 @@ reader({get_positions, SampleSize}, _From, State) -> end; reader({direct_fetch, PositionList, Info}, _From, State) -> H = State#state.handle, - case Info of - key_only -> - KeyList = lists:map(fun(P) -> - extract_key(H, P) end, - PositionList), - {reply, KeyList, reader, State}; - key_size -> - KeySizeList = lists:map(fun(P) -> - extract_key_size(H, P) end, - PositionList), - {reply, KeySizeList, reader, State}; - key_value_check -> - KVCList = lists:map(fun(P) -> - extract_key_value_check(H, P) end, - PositionList), - {reply, KVCList, reader, State} - end; + FilterFalseKey = fun(Tpl) -> case element(1, Tpl) of + false -> + false; + _Key -> + {true, Tpl} + end end, + Reply = + case Info of + key_only -> + FM = lists:filtermap( + fun(P) -> + FilterFalseKey(extract_key(H, P)) end, + PositionList), + lists:map(fun(T) -> element(1, T) end, FM); + key_size -> + lists:filtermap( + fun(P) -> + FilterFalseKey(extract_key_size(H, P)) end, + PositionList); + key_value_check -> + lists:filtermap( + fun(P) -> + FilterFalseKey(extract_key_value_check(H, P)) end, + PositionList) + end, + {reply, Reply, reader, State}; reader(cdb_complete, _From, State) -> ok = file:close(State#state.handle), {stop, normal, {ok, State#state.filename}, State#state{handle=undefined}}; @@ -482,7 +502,8 @@ handle_sync_event(cdb_firstkey, _From, StateName, State) -> ?BASE_POSITION -> empty; _ -> - extract_key(State#state.handle, ?BASE_POSITION) + element(1, extract_key(State#state.handle, + ?BASE_POSITION)) end, {reply, FirstKey, StateName, State}; handle_sync_event(cdb_filename, _From, StateName, State) -> @@ -520,6 +541,23 @@ code_change(_OldVsn, StateName, State, _Extra) -> %%% Internal functions %%%============================================================================ +%% Assumption is that sync should be used - it is a transaction log. +%% +%% However this flag is not supported in OTP 16. Bitcask appears to pass an +%% o_sync flag, but this isn't supported either (maybe it works with the +%% bitcask nif fileops). +%% +%% To get round this will try and datasync on each PUT with riak_sync +set_writeops(SyncStrategy) -> + case SyncStrategy of + sync -> + [sync | ?WRITE_OPS]; + riak_sync -> + ?WRITE_OPS; + none -> + ?WRITE_OPS + end. + %% from_dict(FileName,ListOfKeyValueTuples) %% Given a filename and a dictionary, create a cdb @@ -865,17 +903,17 @@ extract_kvpair(Handle, [Position|Rest], Key) -> extract_key(Handle, Position) -> {ok, _} = file:position(Handle, Position), {KeyLength, _ValueLength} = read_next_2_integers(Handle), - read_next_term(Handle, KeyLength). + {safe_read_next_term(Handle, KeyLength)}. extract_key_size(Handle, Position) -> {ok, _} = file:position(Handle, Position), {KeyLength, ValueLength} = read_next_2_integers(Handle), - {read_next_term(Handle, KeyLength), ValueLength}. + {safe_read_next_term(Handle, KeyLength), ValueLength}. extract_key_value_check(Handle, Position) -> {ok, _} = file:position(Handle, Position), {KeyLength, ValueLength} = read_next_2_integers(Handle), - K = read_next_term(Handle, KeyLength), + K = safe_read_next_term(Handle, KeyLength), {Check, V} = read_next_term(Handle, ValueLength, crc), {K, V, Check}. @@ -1648,21 +1686,20 @@ get_keys_byposition_simple_test() -> io:format("Position list of ~w~n", [PositionList]), ?assertMatch(3, length(PositionList)), R1 = cdb_directfetch(P2, PositionList, key_only), + io:format("R1 ~w~n", [R1]), ?assertMatch(3, length(R1)), lists:foreach(fun(Key) -> - Check = lists:member(Key, KeyList), - ?assertMatch(Check, true) end, + ?assertMatch(true, lists:member(Key, KeyList)) end, R1), R2 = cdb_directfetch(P2, PositionList, key_size), ?assertMatch(3, length(R2)), lists:foreach(fun({Key, _Size}) -> - Check = lists:member(Key, KeyList), - ?assertMatch(Check, true) end, + ?assertMatch(true, lists:member(Key, KeyList)) end, R2), R3 = cdb_directfetch(P2, PositionList, key_value_check), ?assertMatch(3, length(R3)), lists:foreach(fun({Key, Value, Check}) -> - ?assertMatch(Check, true), + ?assertMatch(true, Check), {K, V} = cdb_get(P2, Key), ?assertMatch(K, Key), ?assertMatch(V, Value) end, diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 817ef04..8903198 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -60,9 +60,12 @@ get_size/2, get_keyandhash/2, convert_indexspecs/5, - riakto_keydetails/1, generate_uuid/0, - integer_now/0]). + integer_now/0, + riak_extract_metadata/2]). + +-define(V1_VERS, 1). +-define(MAGIC, 53). % riak_kv -> riak_object %% Credit to @@ -325,44 +328,67 @@ get_keyandhash(LK, Value) -> build_metadata_object(PrimaryKey, MD) -> - {Tag, Bucket, Key, null} = PrimaryKey, + {Tag, _Bucket, _Key, null} = PrimaryKey, case Tag of ?RIAK_TAG -> - riak_metadata_object(Bucket, Key, MD); + {SibMetaBinList, Vclock, _Hash, _Size} = MD, + riak_metadata_to_binary(Vclock, SibMetaBinList); ?STD_TAG -> MD end. - - -riak_metadata_object(Bucket, Key, MD) -> - {RMD, VC, _Hash, _Size} = MD, - Contents = lists:foldl(fun(X, Acc) -> Acc ++ [#r_content{metadata=X}] end, - [], - RMD), - #r_object{contents=Contents, bucket=Bucket, key=Key, vclock=VC}. - riak_extract_metadata(delete, Size) -> {delete, null, null, Size}; -riak_extract_metadata(Obj, Size) -> - {get_metadatas(Obj), vclock(Obj), riak_hash(Obj), Size}. +riak_extract_metadata(ObjBin, Size) -> + {Vclock, SibMetaBinList} = riak_metadata_from_binary(ObjBin), + {SibMetaBinList, Vclock, erlang:phash2(ObjBin), Size}. -riak_hash(Obj=#r_object{}) -> - Vclock = vclock(Obj), - UpdObj = set_vclock(Obj, lists:sort(Vclock)), - erlang:phash2(term_to_binary(UpdObj)). +%% <>. -riakto_keydetails(Object) -> - {Object#r_object.bucket, Object#r_object.key}. +riak_metadata_to_binary(Vclock, SibMetaBinList) -> + VclockBin = term_to_binary(Vclock), + VclockLen = byte_size(VclockBin), + SibCount = length(SibMetaBinList), + SibsBin = slimbin_contents(SibMetaBinList), + <>. + +% Fixes the value length for each sibling to be zero, and so includes no value +slimbin_content(MetaBin) -> + MetaLen = byte_size(MetaBin), + <<0:32/integer, MetaLen:32/integer, MetaBin:MetaLen/binary>>. -get_metadatas(#r_object{contents=Contents}) -> - [Content#r_content.metadata || Content <- Contents]. - -set_vclock(Object=#r_object{}, VClock) -> Object#r_object{vclock=VClock}. - -vclock(#r_object{vclock=VClock}) -> VClock. +slimbin_contents(SibMetaBinList) -> + F = fun(MetaBin, Acc) -> + <> + end, + lists:foldl(F, <<>>, SibMetaBinList). +riak_metadata_from_binary(V1Binary) -> + <> = V1Binary, + <> = Rest, + SibMetaBinList = + case SibCount of + 0 -> + []; + SC when is_integer(SC) -> + get_metadata_from_siblings(SibsBin, SibCount, []) + end, + {binary_to_term(VclockBin), SibMetaBinList}. + +get_metadata_from_siblings(<<>>, 0, SibMetaBinList) -> + SibMetaBinList; +get_metadata_from_siblings(<>, + SibCount, + SibMetaBinList) -> + <<_ValBin:ValLen/binary, MetaLen:32/integer, Rest1/binary>> = Rest0, + <> = Rest1, + get_metadata_from_siblings(Rest2, + SibCount - 1, + [MetaBin|SibMetaBinList]). diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index acaad5f..cb00883 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -633,7 +633,7 @@ load_from_sequence(MinSQN, FilterFun, Penciller, [{_LowSQN, FN, Pid}|Rest]) -> load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller, CDBpid, StartPos, FN, Rest) -> leveled_log:log("I0014", [FN, MinSQN]), - InitAcc = {MinSQN, MaxSQN, gb_trees:empty()}, + InitAcc = {MinSQN, MaxSQN, leveled_skiplist:empty()}, Res = case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitAcc, StartPos) of {eof, {AccMinSQN, _AccMaxSQN, AccKL}} -> ok = push_to_penciller(Penciller, AccKL), diff --git a/src/leveled_log.erl b/src/leveled_log.erl index 9a9c668..b4c63f2 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -256,7 +256,9 @@ {"CDB11", {info, "CRC check failed due to size"}}, {"CDB12", - {inof, "HashTree written"}} + {info, "HashTree written"}}, + {"CDB13", + {info, "Write options of ~w"}} ])). diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 5705a61..94bac54 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -212,15 +212,16 @@ levelzero_pending = false :: boolean(), levelzero_constructor :: pid(), - levelzero_cache = [] :: list(), % a list of gb_trees - levelzero_index :: array:array(), + levelzero_cache = [] :: list(), % a list of skiplists + levelzero_index, + % is an array - but cannot specif due to OTP compatability levelzero_size = 0 :: integer(), levelzero_maxcachesize :: integer(), is_snapshot = false :: boolean(), snapshot_fully_loaded = false :: boolean(), source_penciller :: pid(), - levelzero_astree :: gb_trees:tree(), + levelzero_astree :: list(), % skiplist ongoing_work = [] :: list(), work_backlog = false :: boolean()}). @@ -366,25 +367,24 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys}, _From, State=#state{snapshot_fully_loaded=Ready}) when Ready == true -> - L0AsTree = + L0AsList = case State#state.levelzero_astree of undefined -> leveled_pmem:merge_trees(StartKey, EndKey, State#state.levelzero_cache, - gb_trees:empty()); - Tree -> - Tree + leveled_skiplist:empty()); + List -> + List end, - L0iter = gb_trees:iterator(L0AsTree), SFTiter = initiate_rangequery_frommanifest(StartKey, EndKey, State#state.manifest), - Acc = keyfolder({L0iter, SFTiter}, + Acc = keyfolder({L0AsList, SFTiter}, {StartKey, EndKey}, {AccFun, InitAcc}, MaxKeys), - {reply, Acc, State#state{levelzero_astree = L0AsTree}}; + {reply, Acc, State#state{levelzero_astree = L0AsList}}; handle_call(work_for_clerk, From, State) -> {UpdState, Work} = return_work(State, From), {reply, Work, UpdState}; @@ -985,77 +985,73 @@ keyfolder(IMMiter, SFTiter, StartKey, EndKey, {AccFun, Acc}) -> keyfolder(_Iterators, _KeyRange, {_AccFun, Acc}, MaxKeys) when MaxKeys == 0 -> Acc; -keyfolder({null, SFTiter}, KeyRange, {AccFun, Acc}, MaxKeys) -> +keyfolder({[], SFTiter}, KeyRange, {AccFun, Acc}, MaxKeys) -> {StartKey, EndKey} = KeyRange, case find_nextkey(SFTiter, StartKey, EndKey) of no_more_keys -> Acc; {NxSFTiter, {SFTKey, SFTVal}} -> Acc1 = AccFun(SFTKey, SFTVal, Acc), - keyfolder({null, NxSFTiter}, KeyRange, {AccFun, Acc1}, MaxKeys - 1) + keyfolder({[], NxSFTiter}, KeyRange, {AccFun, Acc1}, MaxKeys - 1) end; -keyfolder({IMMiterator, SFTiterator}, KeyRange, {AccFun, Acc}, MaxKeys) -> +keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SFTiterator}, KeyRange, + {AccFun, Acc}, MaxKeys) -> {StartKey, EndKey} = KeyRange, - case gb_trees:next(IMMiterator) of - none -> - % There are no more keys in the in-memory iterator, so now - % iterate only over the remaining keys in the SFT iterator - keyfolder({null, SFTiterator}, KeyRange, {AccFun, Acc}, MaxKeys); - {IMMKey, _IMMVal, NxIMMiterator} when IMMKey < StartKey -> + case {IMMKey < StartKey, leveled_codec:endkey_passed(EndKey, IMMKey)} of + {true, _} -> + % Normally everything is pre-filterd, but the IMM iterator can - % be re-used and do may be behind the StartKey if the StartKey has + % be re-used and so may be behind the StartKey if the StartKey has % advanced from the previous use keyfolder({NxIMMiterator, SFTiterator}, KeyRange, {AccFun, Acc}, MaxKeys); - {IMMKey, IMMVal, NxIMMiterator} -> - case leveled_codec:endkey_passed(EndKey, IMMKey) of - true -> - % There are no more keys in-range in the in-memory - % iterator, so take action as if this iterator is empty - % (see above) - keyfolder({null, SFTiterator}, + {false, true} -> + % There are no more keys in-range in the in-memory + % iterator, so take action as if this iterator is empty + % (see above) + keyfolder({[], SFTiterator}, + KeyRange, + {AccFun, Acc}, + MaxKeys); + {false, false} -> + case find_nextkey(SFTiterator, StartKey, EndKey) of + no_more_keys -> + % No more keys in range in the persisted store, so use the + % in-memory KV as the next + Acc1 = AccFun(IMMKey, IMMVal, Acc), + keyfolder({NxIMMiterator, SFTiterator}, KeyRange, - {AccFun, Acc}, - MaxKeys); - false -> - case find_nextkey(SFTiterator, StartKey, EndKey) of - no_more_keys -> - % No more keys in range in the persisted store, so use the - % in-memory KV as the next + {AccFun, Acc1}, + MaxKeys - 1); + {NxSFTiterator, {SFTKey, SFTVal}} -> + % There is a next key, so need to know which is the + % next key between the two (and handle two keys + % with different sequence numbers). + case leveled_codec:key_dominates({IMMKey, + IMMVal}, + {SFTKey, + SFTVal}) of + left_hand_first -> Acc1 = AccFun(IMMKey, IMMVal, Acc), keyfolder({NxIMMiterator, SFTiterator}, KeyRange, {AccFun, Acc1}, MaxKeys - 1); - {NxSFTiterator, {SFTKey, SFTVal}} -> - % There is a next key, so need to know which is the - % next key between the two (and handle two keys - % with different sequence numbers). - case leveled_codec:key_dominates({IMMKey, - IMMVal}, - {SFTKey, - SFTVal}) of - left_hand_first -> - Acc1 = AccFun(IMMKey, IMMVal, Acc), - keyfolder({NxIMMiterator, SFTiterator}, - KeyRange, - {AccFun, Acc1}, - MaxKeys - 1); - right_hand_first -> - Acc1 = AccFun(SFTKey, SFTVal, Acc), - keyfolder({IMMiterator, NxSFTiterator}, - KeyRange, - {AccFun, Acc1}, - MaxKeys - 1); - left_hand_dominant -> - Acc1 = AccFun(IMMKey, IMMVal, Acc), - keyfolder({NxIMMiterator, NxSFTiterator}, - KeyRange, - {AccFun, Acc1}, - MaxKeys - 1) - end + right_hand_first -> + Acc1 = AccFun(SFTKey, SFTVal, Acc), + keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], + NxSFTiterator}, + KeyRange, + {AccFun, Acc1}, + MaxKeys - 1); + left_hand_dominant -> + Acc1 = AccFun(IMMKey, IMMVal, Acc), + keyfolder({NxIMMiterator, NxSFTiterator}, + KeyRange, + {AccFun, Acc1}, + MaxKeys - 1) end end end. @@ -1267,8 +1263,8 @@ confirm_delete_test() -> maybe_pause_push(PCL, KL) -> - T0 = gb_trees:empty(), - T1 = lists:foldl(fun({K, V}, Acc) -> gb_trees:enter(K, V, Acc) end, + T0 = leveled_skiplist:empty(), + T1 = lists:foldl(fun({K, V}, Acc) -> leveled_skiplist:enter(K, V, Acc) end, T0, KL), case pcl_pushmem(PCL, T1) of @@ -1335,7 +1331,7 @@ simple_server_test() -> SnapOpts = #penciller_options{start_snapshot = true, source_penciller = PCLr}, {ok, PclSnap} = pcl_start(SnapOpts), - ok = pcl_loadsnapshot(PclSnap, gb_trees:empty()), + ok = pcl_loadsnapshot(PclSnap, leveled_skiplist:empty()), ?assertMatch(Key1, pcl_fetch(PclSnap, {o,"Bucket0001", "Key0001", null})), ?assertMatch(Key2, pcl_fetch(PclSnap, {o,"Bucket0002", "Key0002", null})), ?assertMatch(Key3, pcl_fetch(PclSnap, {o,"Bucket0003", "Key0003", null})), @@ -1384,7 +1380,7 @@ simple_server_test() -> term_to_binary("Hello")), {ok, PclSnap2} = pcl_start(SnapOpts), - ok = pcl_loadsnapshot(PclSnap2, gb_trees:empty()), + ok = pcl_loadsnapshot(PclSnap2, leveled_skiplist:empty()), ?assertMatch(false, pcl_checksequencenumber(PclSnap2, {o, "Bucket0001", @@ -1543,16 +1539,16 @@ foldwithimm_simple_test() -> {3, [{{o, "Bucket1", "Key3"}, {3, {active, infinity}, null}}]}, {5, [{{o, "Bucket1", "Key5"}, {2, {active, infinity}, null}}]} ], - IMM0 = gb_trees:enter({o, "Bucket1", "Key6"}, - {7, {active, infinity}, null}, - gb_trees:empty()), - IMM1 = gb_trees:enter({o, "Bucket1", "Key1"}, - {8, {active, infinity}, null}, - IMM0), - IMM2 = gb_trees:enter({o, "Bucket1", "Key8"}, - {9, {active, infinity}, null}, - IMM1), - IMMiter = gb_trees:iterator_from({o, "Bucket1", "Key1"}, IMM2), + IMM0 = leveled_skiplist:enter({o, "Bucket1", "Key6"}, + {7, {active, infinity}, null}, + leveled_skiplist:empty()), + IMM1 = leveled_skiplist:enter({o, "Bucket1", "Key1"}, + {8, {active, infinity}, null}, + IMM0), + IMM2 = leveled_skiplist:enter({o, "Bucket1", "Key8"}, + {9, {active, infinity}, null}, + IMM1), + IMMiter = leveled_skiplist:to_range(IMM2, {o, "Bucket1", "Key1"}), AccFun = fun(K, V, Acc) -> SQN = leveled_codec:strip_to_seqonly({K, V}), Acc ++ [{K, SQN}] end, Acc = keyfolder(IMMiter, @@ -1564,10 +1560,10 @@ foldwithimm_simple_test() -> {{o, "Bucket1", "Key5"}, 2}, {{o, "Bucket1", "Key6"}, 7}], Acc), - IMM1A = gb_trees:enter({o, "Bucket1", "Key1"}, - {8, {active, infinity}, null}, - gb_trees:empty()), - IMMiterA = gb_trees:iterator_from({o, "Bucket1", "Key1"}, IMM1A), + IMM1A = leveled_skiplist:enter({o, "Bucket1", "Key1"}, + {8, {active, infinity}, null}, + leveled_skiplist:empty()), + IMMiterA = leveled_skiplist:to_range(IMM1A, {o, "Bucket1", "Key1"}), AccA = keyfolder(IMMiterA, QueryArray, {o, "Bucket1", "Key1"}, {o, "Bucket1", "Key6"}, @@ -1576,10 +1572,10 @@ foldwithimm_simple_test() -> {{o, "Bucket1", "Key3"}, 3}, {{o, "Bucket1", "Key5"}, 2}], AccA), - IMM3 = gb_trees:enter({o, "Bucket1", "Key4"}, - {10, {active, infinity}, null}, - IMM2), - IMMiterB = gb_trees:iterator_from({o, "Bucket1", "Key1"}, IMM3), + IMM3 = leveled_skiplist:enter({o, "Bucket1", "Key4"}, + {10, {active, infinity}, null}, + IMM2), + IMMiterB = leveled_skiplist:to_range(IMM3, {o, "Bucket1", "Key1"}), AccB = keyfolder(IMMiterB, QueryArray, {o, "Bucket1", "Key1"}, {o, "Bucket1", "Key6"}, @@ -1594,7 +1590,7 @@ create_file_test() -> Filename = "../test/new_file.sft", ok = file:write_file(Filename, term_to_binary("hello")), KVL = lists:usort(leveled_sft:generate_randomkeys(10000)), - Tree = gb_trees:from_orddict(KVL), + Tree = leveled_skiplist:from_list(KVL), FetchFun = fun(Slot) -> lists:nth(Slot, [Tree]) end, {ok, SP, diff --git a/src/leveled_pmem.erl b/src/leveled_pmem.erl index d12425b..39dd0c6 100644 --- a/src/leveled_pmem.erl +++ b/src/leveled_pmem.erl @@ -76,7 +76,7 @@ add_to_index(L0Index, L0Size, LevelMinus1, LedgerSQN, TreeList) -> Count0, array:set(Slot, [{Hash, SlotInTreeList}|L], HashIndex)} end, - LM1List = gb_trees:to_list(LevelMinus1), + LM1List = leveled_skiplist:to_list(LevelMinus1), StartingT = {infinity, 0, L0Size, L0Index}, {MinSQN, MaxSQN, NewL0Size, UpdL0Index} = lists:foldl(FoldFun, StartingT, @@ -96,7 +96,7 @@ to_list(Slots, FetchFun) -> SlotList = lists:reverse(lists:seq(1, Slots)), FullList = lists:foldl(fun(Slot, Acc) -> Tree = FetchFun(Slot), - L = gb_trees:to_list(Tree), + L = leveled_skiplist:to_list(Tree), lists:ukeymerge(1, Acc, L) end, [], @@ -128,7 +128,7 @@ check_levelzero(Key, L0Index, TreeList) -> {Found, KV}; false -> CheckTree = lists:nth(SlotToCheck, TreeList), - case gb_trees:lookup(Key, CheckTree) of + case leveled_skiplist:lookup(Key, CheckTree) of none -> {Found, KV}; {value, Value} -> @@ -139,12 +139,15 @@ check_levelzero(Key, L0Index, TreeList) -> {false, not_found}, lists:reverse(lists:usort(SlotList))). - -merge_trees(StartKey, EndKey, TreeList, LevelMinus1) -> - lists:foldl(fun(Tree, TreeAcc) -> - merge_nexttree(Tree, TreeAcc, StartKey, EndKey) end, - gb_trees:empty(), - lists:append(TreeList, [LevelMinus1])). + +merge_trees(StartKey, EndKey, SkipListList, LevelMinus1) -> + lists:foldl(fun(SkipList, Acc) -> + R = leveled_skiplist:to_range(SkipList, + StartKey, + EndKey), + lists:ukeymerge(1, Acc, R) end, + [], + [LevelMinus1|lists:reverse(SkipListList)]). %%%============================================================================ %%% Internal Functions @@ -155,24 +158,6 @@ hash_to_slot(Key) -> H = erlang:phash2(Key), {H bsr element(2, ?SLOT_WIDTH), H band (element(1, ?SLOT_WIDTH) - 1)}. -merge_nexttree(Tree, TreeAcc, StartKey, EndKey) -> - Iter = gb_trees:iterator_from(StartKey, Tree), - merge_nexttree(Iter, TreeAcc, EndKey). - -merge_nexttree(Iter, TreeAcc, EndKey) -> - case gb_trees:next(Iter) of - none -> - TreeAcc; - {Key, Value, NewIter} -> - case leveled_codec:endkey_passed(EndKey, Key) of - true -> - TreeAcc; - false -> - merge_nexttree(NewIter, - gb_trees:enter(Key, Value, TreeAcc), - EndKey) - end - end. %%%============================================================================ %%% Test @@ -183,7 +168,7 @@ merge_nexttree(Iter, TreeAcc, EndKey) -> generate_randomkeys(Seqn, Count, BucketRangeLow, BucketRangeHigh) -> generate_randomkeys(Seqn, Count, - gb_trees:empty(), + leveled_skiplist:empty(), BucketRangeLow, BucketRangeHigh). @@ -197,7 +182,7 @@ generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) -> {Seqn, {active, infinity}, null}}, generate_randomkeys(Seqn + 1, Count - 1, - gb_trees:enter(K, V, Acc), + leveled_skiplist:enter(K, V, Acc), BucketLow, BRange). @@ -216,29 +201,29 @@ compare_method_test() -> ?assertMatch(32000, SQN), ?assertMatch(true, Size =< 32000), - TestList = gb_trees:to_list(generate_randomkeys(1, 2000, 1, 800)), + TestList = leveled_skiplist:to_list(generate_randomkeys(1, 2000, 1, 800)), S0 = lists:foldl(fun({Key, _V}, Acc) -> - R0 = lists:foldr(fun(Tree, {Found, KV}) -> - case Found of - true -> - {true, KV}; - false -> - L0 = gb_trees:lookup(Key, Tree), - case L0 of - none -> - {false, not_found}; - {value, Value} -> - {true, {Key, Value}} - end + R0 = lists:foldr(fun(Tree, {Found, KV}) -> + case Found of + true -> + {true, KV}; + false -> + L0 = leveled_skiplist:lookup(Key, Tree), + case L0 of + none -> + {false, not_found}; + {value, Value} -> + {true, {Key, Value}} end - end, - {false, not_found}, - TreeList), - [R0|Acc] - end, - [], - TestList), + end + end, + {false, not_found}, + TreeList), + [R0|Acc] + end, + [], + TestList), S1 = lists:foldl(fun({Key, _V}, Acc) -> R0 = check_levelzero(Key, Index, TreeList), @@ -258,20 +243,20 @@ compare_method_test() -> P = leveled_codec:endkey_passed(EndKey, K), case {K, P} of {K, false} when K >= StartKey -> - gb_trees:enter(K, V, Acc); + leveled_skiplist:enter(K, V, Acc); _ -> Acc end end, - gb_trees:empty(), + leveled_skiplist:empty(), DumpList), - Sz0 = gb_trees:size(Q0), + Sz0 = leveled_skiplist:size(Q0), io:format("Crude method took ~w microseconds resulting in tree of " ++ "size ~w~n", [timer:now_diff(os:timestamp(), SWa), Sz0]), SWb = os:timestamp(), - Q1 = merge_trees(StartKey, EndKey, TreeList, gb_trees:empty()), - Sz1 = gb_trees:size(Q1), + Q1 = merge_trees(StartKey, EndKey, TreeList, leveled_skiplist:empty()), + Sz1 = length(Q1), io:format("Merge method took ~w microseconds resulting in tree of " ++ "size ~w~n", [timer:now_diff(os:timestamp(), SWb), Sz1]), diff --git a/src/leveled_skiplist.erl b/src/leveled_skiplist.erl new file mode 100644 index 0000000..b9d9af4 --- /dev/null +++ b/src/leveled_skiplist.erl @@ -0,0 +1,539 @@ +%% -------- SKIPLIST --------- +%% +%% For storing small numbers of {K, V} pairs where reasonable insertion and +%% fetch times, but with fast support for flattening to a list or a sublist +%% within a certain key range +%% +%% Used instead of gb_trees to retain compatability of OTP16 (and Riak's +%% ongoing dependency on OTP16) +%% +%% Not a proper skip list. Only supports a fixed depth. Good enough for the +%% purposes of leveled. Also uses peculiar enkey_passed function within +%% leveled. Not tested beyond a depth of 2. + +-module(leveled_skiplist). + +-include("include/leveled.hrl"). + +-export([ + from_list/1, + from_sortedlist/1, + to_list/1, + enter/3, + to_range/2, + to_range/3, + lookup/2, + empty/0, + size/1 + ]). + +-include_lib("eunit/include/eunit.hrl"). + +-define(SKIP_WIDTH, 16). +-define(LIST_HEIGHT, 2). +-define(INFINITY_KEY, {null, null, null, null, null}). + + +%%%============================================================================ +%%% SkipList API +%%%============================================================================ + +enter(Key, Value, SkipList) -> + enter(Key, Value, SkipList, ?SKIP_WIDTH, ?LIST_HEIGHT). + +from_list(UnsortedKVL) -> + KVL = lists:ukeysort(1, UnsortedKVL), + from_list(KVL, ?SKIP_WIDTH, ?LIST_HEIGHT). + +from_sortedlist(SortedKVL) -> + from_list(SortedKVL, ?SKIP_WIDTH, ?LIST_HEIGHT). + +lookup(Key, SkipList) -> + lookup(Key, SkipList, ?LIST_HEIGHT). + + +%% Rather than support iterator_from like gb_trees, will just an output a key +%% sorted list for the desired range, which can the be iterated over as normal +to_range(SkipList, Start) -> + to_range(SkipList, Start, ?INFINITY_KEY, ?LIST_HEIGHT). + +to_range(SkipList, Start, End) -> + to_range(SkipList, Start, End, ?LIST_HEIGHT). + +to_list(SkipList) -> + to_list(SkipList, ?LIST_HEIGHT). + +empty() -> + empty([], ?LIST_HEIGHT). + +size(SkipList) -> + size(SkipList, ?LIST_HEIGHT). + + +%%%============================================================================ +%%% SkipList Base Functions +%%%============================================================================ + +enter(Key, Value, SkipList, Width, 1) -> + Hash = erlang:phash2(Key), + {MarkerKey, SubList} = find_mark(Key, SkipList), + case Hash rem Width of + 0 -> + {LHS, RHS} = lists:splitwith(fun({K, _V}) -> + K =< Key end, + SubList), + SkpL1 = lists:keyreplace(MarkerKey, 1, SkipList, {MarkerKey, RHS}), + SkpL2 = [{Key, lists:ukeysort(1, [{Key, Value}|LHS])}|SkpL1], + lists:ukeysort(1, SkpL2); + _ -> + {LHS, RHS} = lists:splitwith(fun({K, _V}) -> K < Key end, SubList), + UpdSubList = + case RHS of + [] -> + LHS ++ [{Key, Value}]; + [{FirstKey, _V}|RHSTail] -> + case FirstKey of + Key -> + LHS ++ [{Key, Value}] ++ RHSTail; + _ -> + LHS ++ [{Key, Value}] ++ RHS + end + end, + lists:keyreplace(MarkerKey, 1, SkipList, {MarkerKey, UpdSubList}) + end; +enter(Key, Value, SkipList, Width, Level) -> + Hash = erlang:phash2(Key), + HashMatch = width(Level, Width), + {MarkerKey, SubSkipList} = find_mark(Key, SkipList), + UpdSubSkipList = enter(Key, Value, SubSkipList, Width, Level - 1), + case Hash rem HashMatch of + 0 -> + % + {LHS, RHS} = lists:splitwith(fun({K, _V}) -> + K =< Key end, + UpdSubSkipList), + SkpL1 = lists:keyreplace(MarkerKey, 1, SkipList, {MarkerKey, RHS}), + lists:ukeysort(1, [{Key, LHS}|SkpL1]); + _ -> + % Need to replace Marker Key with sublist + lists:keyreplace(MarkerKey, + 1, + SkipList, + {MarkerKey, UpdSubSkipList}) + end. + + +from_list(KVL, Width, 1) -> + Slots = length(KVL) div Width, + SkipList0 = lists:map(fun(X) -> + N = X * Width, + {K, _V} = lists:nth(N, KVL), + {K, lists:sublist(KVL, + N - Width + 1, + Width)} + end, + lists:seq(1, length(KVL) div Width)), + case Slots * Width < length(KVL) of + true -> + {LastK, _V} = lists:last(KVL), + SkipList0 ++ [{LastK, lists:nthtail(Slots * Width, KVL)}]; + false -> + SkipList0 + end; +from_list(KVL, Width, Level) -> + SkipWidth = width(Level, Width), + LoftSlots = length(KVL) div SkipWidth, + case LoftSlots of + 0 -> + {K, _V} = lists:last(KVL), + [{K, from_list(KVL, Width, Level - 1)}]; + _ -> + SkipList0 = + lists:map(fun(X) -> + N = X * SkipWidth, + {K, _V} = lists:nth(N, KVL), + SL = lists:sublist(KVL, + N - SkipWidth + 1, + SkipWidth), + {K, from_list(SL, Width, Level - 1)} + end, + lists:seq(1, LoftSlots)), + case LoftSlots * SkipWidth < length(KVL) of + true -> + {LastK, _V} = lists:last(KVL), + TailList = lists:nthtail(LoftSlots * SkipWidth, KVL), + SkipList0 ++ [{LastK, from_list(TailList, + Width, + Level - 1)}]; + false -> + SkipList0 + end + end. + + +lookup(Key, SkipList, 1) -> + SubList = get_sublist(Key, SkipList), + case lists:keyfind(Key, 1, SubList) of + false -> + none; + {Key, V} -> + {value, V} + end; +lookup(Key, SkipList, Level) -> + SubList = get_sublist(Key, SkipList), + case SubList of + null -> + none; + _ -> + lookup(Key, SubList, Level - 1) + end. + + +to_list(SkipList, 1) -> + lists:foldl(fun({_Mark, SL}, Acc) -> Acc ++ SL end, [], SkipList); +to_list(SkipList, Level) -> + lists:foldl(fun({_Mark, SL}, Acc) -> Acc ++ to_list(SL, Level - 1) end, + [], + SkipList). + +to_range(SkipList, Start, End, 1) -> + R = lists:foldl(fun({Mark, SL}, {PassedStart, PassedEnd, Acc, PrevList}) -> + + case {PassedStart, PassedEnd} of + {true, true} -> + {true, true, Acc, null}; + {false, false} -> + case Start > Mark of + true -> + {false, false, Acc, SL}; + false -> + RHS = splitlist_start(Start, PrevList ++ SL), + case leveled_codec:endkey_passed(End, Mark) of + true -> + EL = splitlist_end(End, RHS), + {true, true, EL, null}; + false -> + {true, false, RHS, null} + end + end; + {true, false} -> + case leveled_codec:endkey_passed(End, Mark) of + true -> + EL = splitlist_end(End, SL), + {true, true, Acc ++ EL, null}; + false -> + {true, false, Acc ++ SL, null} + end + end end, + + {false, false, [], []}, + SkipList), + {_Bool1, _Bool2, SubList, _PrevList} = R, + SubList; +to_range(SkipList, Start, End, Level) -> + R = lists:foldl(fun({Mark, SL}, {PassedStart, PassedEnd, Acc, PrevList}) -> + + case {PassedStart, PassedEnd} of + {true, true} -> + {true, true, Acc, null}; + {false, false} -> + case Start > Mark of + true -> + {false, false, Acc, SL}; + false -> + SkipLRange = to_range(PrevList, + Start, End, + Level - 1) ++ + to_range(SL, + Start, End, + Level - 1), + case leveled_codec:endkey_passed(End, Mark) of + true -> + {true, true, SkipLRange, null}; + false -> + {true, false, SkipLRange, null} + end + end; + {true, false} -> + SkipLRange = to_range(SL, Start, End, Level - 1), + case leveled_codec:endkey_passed(End, Mark) of + true -> + {true, true, Acc ++ SkipLRange, null}; + false -> + {true, false, Acc ++ SkipLRange, null} + end + end end, + + {false, false, [], []}, + SkipList), + {_Bool1, _Bool2, SubList, _PrevList} = R, + SubList. + + +empty(SkipList, 1) -> + [{?INFINITY_KEY, SkipList}]; +empty(SkipList, Level) -> + empty([{?INFINITY_KEY, SkipList}], Level - 1). + +size(SkipList, 1) -> + lists:foldl(fun({_Mark, SL}, Acc) -> length(SL) + Acc end, 0, SkipList); +size(SkipList, Level) -> + lists:foldl(fun({_Mark, SL}, Acc) -> size(SL, Level - 1) + Acc end, + 0, + SkipList). + + +%%%============================================================================ +%%% Internal Functions +%%%============================================================================ + +width(1, Width) -> + Width; +width(N, Width) -> + width(N - 1, Width * Width). + +find_mark(Key, SkipList) -> + lists:foldl(fun({Marker, SL}, Acc) -> + case Acc of + false -> + case Marker >= Key of + true -> + {Marker, SL}; + false -> + Acc + end; + _ -> + Acc + end end, + false, + SkipList). + +get_sublist(Key, SkipList) -> + lists:foldl(fun({SkipKey, SL}, Acc) -> + case {Acc, SkipKey} of + {null, SkipKey} when SkipKey >= Key -> + SL; + _ -> + Acc + end end, + null, + SkipList). + +splitlist_start(StartKey, SL) -> + {_LHS, RHS} = lists:splitwith(fun({K, _V}) -> K < StartKey end, SL), + RHS. + +splitlist_end(EndKey, SL) -> + {LHS, _RHS} = lists:splitwith(fun({K, _V}) -> + not leveled_codec:endkey_passed(EndKey, K) + end, + SL), + LHS. + +%%%============================================================================ +%%% Test +%%%============================================================================ + +-ifdef(TEST). + +generate_randomkeys(Seqn, Count, BucketRangeLow, BucketRangeHigh) -> + generate_randomkeys(Seqn, + Count, + [], + BucketRangeLow, + BucketRangeHigh). + +generate_randomkeys(_Seqn, 0, Acc, _BucketLow, _BucketHigh) -> + Acc; +generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) -> + BNumber = + case BRange of + 0 -> + string:right(integer_to_list(BucketLow), 4, $0); + _ -> + BRand = random:uniform(BRange), + string:right(integer_to_list(BucketLow + BRand), 4, $0) + end, + KNumber = string:right(integer_to_list(random:uniform(1000)), 4, $0), + {K, V} = {{o, "Bucket" ++ BNumber, "Key" ++ KNumber, null}, + {Seqn, {active, infinity}, null}}, + generate_randomkeys(Seqn + 1, + Count - 1, + [{K, V}|Acc], + BucketLow, + BRange). + +skiplist_small_test() -> + % Check nothing bad happens with very small lists + lists:foreach(fun(N) -> dotest_skiplist_small(N) end, lists:seq(1, 32)). + + +dotest_skiplist_small(N) -> + KL = generate_randomkeys(1, N, 1, 2), + SkipList1 = + lists:foldl(fun({K, V}, SL) -> + enter(K, V, SL) + end, + empty(), + KL), + SkipList2 = from_list(lists:reverse(KL)), + lists:foreach(fun({K, V}) -> ?assertMatch({value, V}, lookup(K, SkipList1)) + end, + lists:ukeysort(1, lists:reverse(KL))), + lists:foreach(fun({K, V}) -> ?assertMatch({value, V}, lookup(K, SkipList2)) + end, + lists:ukeysort(1, lists:reverse(KL))). + +skiplist_test() -> + N = 8000, + KL = generate_randomkeys(1, N, 1, N div 5), + + SWaGSL = os:timestamp(), + SkipList = from_list(lists:reverse(KL)), + io:format(user, "Generating skip list with ~w keys in ~w microseconds~n" ++ + "Top level key count of ~w~n", + [N, timer:now_diff(os:timestamp(), SWaGSL), length(SkipList)]), + io:format(user, "Second tier key counts of ~w~n", + [lists:map(fun({_L, SL}) -> length(SL) end, SkipList)]), + KLSorted = lists:ukeysort(1, lists:reverse(KL)), + + SWaGSL2 = os:timestamp(), + SkipList = from_sortedlist(KLSorted), + io:format(user, "Generating skip list with ~w sorted keys in ~w " ++ + "microseconds~n", + [N, timer:now_diff(os:timestamp(), SWaGSL2)]), + + SWaDSL = os:timestamp(), + SkipList1 = + lists:foldl(fun({K, V}, SL) -> + enter(K, V, SL) + end, + empty(), + KL), + io:format(user, "Dynamic load of skiplist with ~w keys took ~w " ++ + "microseconds~n" ++ + "Top level key count of ~w~n", + [N, timer:now_diff(os:timestamp(), SWaDSL), length(SkipList1)]), + io:format(user, "Second tier key counts of ~w~n", + [lists:map(fun({_L, SL}) -> length(SL) end, SkipList1)]), + + io:format(user, "~nRunning timing tests for generated skiplist:~n", []), + skiplist_timingtest(KLSorted, SkipList, N), + + io:format(user, "~nRunning timing tests for dynamic skiplist:~n", []), + skiplist_timingtest(KLSorted, SkipList1, N). + + +skiplist_timingtest(KL, SkipList, N) -> + io:format(user, "Timing tests on skiplist of size ~w~n", + [leveled_skiplist:size(SkipList)]), + CheckList1 = lists:sublist(KL, N div 4, 200), + CheckList2 = lists:sublist(KL, N div 3, 200), + CheckList3 = lists:sublist(KL, N div 2, 200), + CheckList4 = lists:sublist(KL, N - 1000, 200), + CheckList5 = lists:sublist(KL, N - 500, 200), + CheckList6 = lists:sublist(KL, 1, 10), + CheckList7 = lists:nthtail(N - 200, KL), + CheckList8 = lists:sublist(KL, N div 2, 1), + CheckAll = CheckList1 ++ CheckList2 ++ CheckList3 ++ + CheckList4 ++ CheckList5 ++ CheckList6 ++ CheckList7, + + SWb = os:timestamp(), + lists:foreach(fun({K, V}) -> + ?assertMatch({value, V}, lookup(K, SkipList)) + end, + CheckAll), + io:format(user, "Finding 1020 keys took ~w microseconds~n", + [timer:now_diff(os:timestamp(), SWb)]), + + RangeFun = + fun(SkipListToQuery, CheckListForQ, Assert) -> + KR = + to_range(SkipListToQuery, + element(1, lists:nth(1, CheckListForQ)), + element(1, lists:last(CheckListForQ))), + case Assert of + true -> + CompareL = length(lists:usort(CheckListForQ)), + ?assertMatch(CompareL, length(KR)); + false -> + KR + end + end, + + SWc = os:timestamp(), + RangeFun(SkipList, CheckList1, true), + RangeFun(SkipList, CheckList2, true), + RangeFun(SkipList, CheckList3, true), + RangeFun(SkipList, CheckList4, true), + RangeFun(SkipList, CheckList5, true), + RangeFun(SkipList, CheckList6, true), + RangeFun(SkipList, CheckList7, true), + RangeFun(SkipList, CheckList8, true), + + KL_OOR1 = generate_randomkeys(1, 4, N div 5 + 1, N div 5 + 10), + KR9 = RangeFun(SkipList, KL_OOR1, false), + ?assertMatch([], KR9), + + KL_OOR2 = generate_randomkeys(1, 4, 0, 0), + KR10 = RangeFun(SkipList, KL_OOR2, false), + ?assertMatch([], KR10), + + io:format(user, "Finding 10 ranges took ~w microseconds~n", + [timer:now_diff(os:timestamp(), SWc)]), + + AltKL1 = generate_randomkeys(1, 1000, 1, 200), + SWd = os:timestamp(), + lists:foreach(fun({K, _V}) -> + lookup(K, SkipList) + end, + AltKL1), + io:format(user, "Getting 1000 mainly missing keys took ~w microseconds~n", + [timer:now_diff(os:timestamp(), SWd)]), + AltKL2 = generate_randomkeys(1, 1000, N div 5 + 1, N div 5 + 300), + SWe = os:timestamp(), + lists:foreach(fun({K, _V}) -> + none = lookup(K, SkipList) + end, + AltKL2), + io:format(user, "Getting 1000 missing keys above range took ~w " ++ + "microseconds~n", + [timer:now_diff(os:timestamp(), SWe)]), + AltKL3 = generate_randomkeys(1, 1000, 0, 0), + SWf = os:timestamp(), + lists:foreach(fun({K, _V}) -> + none = lookup(K, SkipList) + end, + AltKL3), + io:format(user, "Getting 1000 missing keys below range took ~w " ++ + "microseconds~n", + [timer:now_diff(os:timestamp(), SWf)]), + + SWg = os:timestamp(), + FlatList = to_list(SkipList), + io:format(user, "Flattening skiplist took ~w microseconds~n", + [timer:now_diff(os:timestamp(), SWg)]), + ?assertMatch(KL, FlatList). + +define_kv(X) -> + {{o, "Bucket", "Key" ++ string:right(integer_to_list(X), 6), null}, + {X, {active, infinity}, null}}. + +skiplist_roundsize_test() -> + KVL = lists:map(fun(X) -> define_kv(X) end, lists:seq(1, 4096)), + SkipList = from_list(KVL), + lists:foreach(fun({K, V}) -> + ?assertMatch({value, V}, lookup(K, SkipList)) end, + KVL), + lists:foreach(fun(X) -> + {KS, _VS} = define_kv(X * 32 + 1), + {KE, _VE} = define_kv((X + 1) * 32), + R = to_range(SkipList, KS, KE), + L = lists:sublist(KVL, + X * 32 + 1, + 32), + ?assertMatch(L, R) end, + lists:seq(0, 24)). + + +-endif. \ No newline at end of file diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 5dab283..4a7486a 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -24,7 +24,8 @@ all() -> [ simple_put_fetch_head_delete(_Config) -> RootPath = testutil:reset_filestructure(), - StartOpts1 = [{root_path, RootPath}], + StartOpts1 = [{root_path, RootPath}, + {sync_strategy, testutil:sync_strategy()}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), {TestObject, TestSpec} = testutil:generate_testobject(), ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), @@ -32,7 +33,8 @@ simple_put_fetch_head_delete(_Config) -> testutil:check_formissingobject(Bookie1, "Bucket1", "Key2"), ok = leveled_bookie:book_close(Bookie1), StartOpts2 = [{root_path, RootPath}, - {max_journalsize, 3000000}], + {max_journalsize, 3000000}, + {sync_strategy, testutil:sync_strategy()}], {ok, Bookie2} = leveled_bookie:book_start(StartOpts2), testutil:check_forobject(Bookie2, TestObject), ObjList1 = testutil:generate_objects(5000, 2), @@ -66,7 +68,9 @@ simple_put_fetch_head_delete(_Config) -> many_put_fetch_head(_Config) -> RootPath = testutil:reset_filestructure(), - StartOpts1 = [{root_path, RootPath}, {max_pencillercachesize, 16000}], + StartOpts1 = [{root_path, RootPath}, + {max_pencillercachesize, 16000}, + {sync_strategy, riak_sync}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), {TestObject, TestSpec} = testutil:generate_testobject(), ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), @@ -74,7 +78,8 @@ many_put_fetch_head(_Config) -> ok = leveled_bookie:book_close(Bookie1), StartOpts2 = [{root_path, RootPath}, {max_journalsize, 1000000000}, - {max_pencillercachesize, 32000}], + {max_pencillercachesize, 32000}, + {sync_strategy, testutil:sync_strategy()}], {ok, Bookie2} = leveled_bookie:book_start(StartOpts2), testutil:check_forobject(Bookie2, TestObject), GenList = [2, 20002, 40002, 60002, 80002, @@ -103,7 +108,8 @@ journal_compaction(_Config) -> RootPath = testutil:reset_filestructure(), StartOpts1 = [{root_path, RootPath}, {max_journalsize, 10000000}, - {max_run_length, 1}], + {max_run_length, 1}, + {sync_strategy, testutil:sync_strategy()}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), ok = leveled_bookie:book_compactjournal(Bookie1, 30000), {TestObject, TestSpec} = testutil:generate_testobject(), @@ -118,7 +124,7 @@ journal_compaction(_Config) -> "Key1", "Value1", [], - {"MDK1", "MDV1"}}, + [{"MDK1", "MDV1"}]}, {TestObject2, TestSpec2} = testutil:generate_testobject(B2, K2, V2, Spec2, MD), ok = testutil:book_riakput(Bookie1, TestObject2, TestSpec2), @@ -193,7 +199,8 @@ journal_compaction(_Config) -> StartOpts2 = [{root_path, RootPath}, {max_journalsize, 10000000}, {max_run_length, 1}, - {waste_retention_period, 1}], + {waste_retention_period, 1}, + {sync_strategy, testutil:sync_strategy()}], {ok, Bookie3} = leveled_bookie:book_start(StartOpts2), ok = leveled_bookie:book_compactjournal(Bookie3, 30000), testutil:wait_for_compaction(Bookie3), @@ -208,7 +215,9 @@ journal_compaction(_Config) -> fetchput_snapshot(_Config) -> RootPath = testutil:reset_filestructure(), - StartOpts1 = [{root_path, RootPath}, {max_journalsize, 30000000}], + StartOpts1 = [{root_path, RootPath}, + {max_journalsize, 30000000}, + {sync_strategy, none}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), {TestObject, TestSpec} = testutil:generate_testobject(), ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), @@ -309,7 +318,9 @@ load_and_count(_Config) -> % Use artificially small files, and the load keys, counting they're all % present RootPath = testutil:reset_filestructure(), - StartOpts1 = [{root_path, RootPath}, {max_journalsize, 50000000}], + StartOpts1 = [{root_path, RootPath}, + {max_journalsize, 50000000}, + {sync_strategy, testutil:sync_strategy()}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), {TestObject, TestSpec} = testutil:generate_testobject(), ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), @@ -392,7 +403,9 @@ load_and_count(_Config) -> load_and_count_withdelete(_Config) -> RootPath = testutil:reset_filestructure(), - StartOpts1 = [{root_path, RootPath}, {max_journalsize, 50000000}], + StartOpts1 = [{root_path, RootPath}, + {max_journalsize, 50000000}, + {sync_strategy, testutil:sync_strategy()}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), {TestObject, TestSpec} = testutil:generate_testobject(), ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), @@ -415,7 +428,8 @@ load_and_count_withdelete(_Config) -> 0, lists:seq(1, 20)), testutil:check_forobject(Bookie1, TestObject), - {BucketD, KeyD} = leveled_codec:riakto_keydetails(TestObject), + {BucketD, KeyD} = {TestObject#r_object.bucket, + TestObject#r_object.key}, {_, 1} = testutil:check_bucket_stats(Bookie1, BucketD), ok = testutil:book_riakdelete(Bookie1, BucketD, KeyD, []), not_found = testutil:book_riakget(Bookie1, BucketD, KeyD), @@ -448,7 +462,9 @@ load_and_count_withdelete(_Config) -> space_clear_ondelete(_Config) -> RootPath = testutil:reset_filestructure(), - StartOpts1 = [{root_path, RootPath}, {max_journalsize, 20000000}], + StartOpts1 = [{root_path, RootPath}, + {max_journalsize, 20000000}, + {sync_strategy, testutil:sync_strategy()}], {ok, Book1} = leveled_bookie:book_start(StartOpts1), G2 = fun testutil:generate_compressibleobjects/2, testutil:load_objects(20000, @@ -475,8 +491,8 @@ space_clear_ondelete(_Config) -> [length(FNsA_J), length(FNsA_L)]), % Get an iterator to lock the inker during compaction - FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, testutil:riak_hash(V)}|Acc] - end, + FoldObjectsFun = fun(B, K, ObjBin, Acc) -> + [{B, K, erlang:phash2(ObjBin)}|Acc] end, {async, HTreeF1} = leveled_bookie:book_returnfolder(Book1, {foldobjects_allkeys, ?RIAK_TAG, diff --git a/test/end_to_end/iterator_SUITE.erl b/test/end_to_end/iterator_SUITE.erl index 4e3c41e..8c0246e 100644 --- a/test/end_to_end/iterator_SUITE.erl +++ b/test/end_to_end/iterator_SUITE.erl @@ -6,21 +6,74 @@ -define(KEY_ONLY, {false, undefined}). -export([all/0]). --export([small_load_with2i/1, +-export([single_object_with2i/1, + small_load_with2i/1, query_count/1, rotating_objects/1]). all() -> [ + single_object_with2i, small_load_with2i, query_count, rotating_objects ]. +single_object_with2i(_Config) -> + % Load a single object with an integer and a binary + % index and query for it + RootPath = testutil:reset_filestructure(), + StartOpts1 = [{root_path, RootPath}, + {max_journalsize, 5000000}, + {sync_strategy, testutil:sync_strategy()}], + % low journal size to make sure > 1 created + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + {TestObject, _TestSpec} = testutil:generate_testobject(), + TestSpec = [{add, list_to_binary("integer_int"), 100}, + {add, list_to_binary("binary_bin"), <<100:32/integer>>}], + ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), + + IdxQ1 = {index_query, + "Bucket1", + {fun testutil:foldkeysfun/3, []}, + {list_to_binary("binary_bin"), + <<99:32/integer>>, <<101:32/integer>>}, + {true, undefined}}, + {async, IdxFolder1} = leveled_bookie:book_returnfolder(Bookie1, IdxQ1), + R1 = IdxFolder1(), + io:format("R1 of ~w~n", [R1]), + true = [{<<100:32/integer>>,"Key1"}] == R1, + + IdxQ2 = {index_query, + "Bucket1", + {fun testutil:foldkeysfun/3, []}, + {list_to_binary("integer_int"), + 99, 101}, + {true, undefined}}, + {async, IdxFolder2} = leveled_bookie:book_returnfolder(Bookie1, IdxQ2), + R2 = IdxFolder2(), + io:format("R2 of ~w~n", [R2]), + true = [{100,"Key1"}] == R2, + + IdxQ3 = {index_query, + {"Bucket1", "Key1"}, + {fun testutil:foldkeysfun/3, []}, + {list_to_binary("integer_int"), + 99, 101}, + {true, undefined}}, + {async, IdxFolder3} = leveled_bookie:book_returnfolder(Bookie1, IdxQ3), + R3 = IdxFolder3(), + io:format("R2 of ~w~n", [R3]), + true = [{100,"Key1"}] == R3, + + ok = leveled_bookie:book_close(Bookie1), + testutil:reset_filestructure(). + small_load_with2i(_Config) -> RootPath = testutil:reset_filestructure(), StartOpts1 = [{root_path, RootPath}, - {max_journalsize, 5000000}], + {max_journalsize, 5000000}, + {sync_strategy, testutil:sync_strategy()}], % low journal size to make sure > 1 created {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), {TestObject, TestSpec} = testutil:generate_testobject(), @@ -65,12 +118,12 @@ small_load_with2i(_Config) -> DSpc = lists:map(fun({add, F, T}) -> {remove, F, T} end, Spc), - {B, K} = leveled_codec:riakto_keydetails(Obj), + {B, K} = {Obj#r_object.bucket, Obj#r_object.key}, testutil:book_riakdelete(Bookie1, B, K, DSpc) end, ChkList1), %% Get the Buckets Keys and Hashes for the whole bucket - FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, testutil:riak_hash(V)}|Acc] + FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, erlang:phash2(V)}|Acc] end, {async, HTreeF1} = leveled_bookie:book_returnfolder(Bookie1, {foldobjects_allkeys, @@ -95,9 +148,8 @@ small_load_with2i(_Config) -> true = 9900 == length(KeyHashList2), true = 9900 == length(KeyHashList3), - SumIntFun = fun(_B, _K, V, Acc) -> - [C] = V#r_object.contents, - {I, _Bin} = C#r_content.value, + SumIntFun = fun(_B, _K, Obj, Acc) -> + {I, _Bin} = testutil:get_value(Obj), Acc + I end, BucketObjQ = {foldobjects_bybucket, ?RIAK_TAG, "Bucket", {SumIntFun, 0}}, @@ -128,13 +180,16 @@ small_load_with2i(_Config) -> query_count(_Config) -> RootPath = testutil:reset_filestructure(), - {ok, Book1} = leveled_bookie:book_start(RootPath, 2000, 50000000), + {ok, Book1} = leveled_bookie:book_start(RootPath, + 2000, + 50000000, + testutil:sync_strategy()), BucketBin = list_to_binary("Bucket"), {TestObject, TestSpec} = testutil:generate_testobject(BucketBin, "Key1", "Value1", [], - {"MDK1", "MDV1"}), + [{"MDK1", "MDV1"}]), ok = testutil:book_riakput(Book1, TestObject, TestSpec), testutil:check_forobject(Book1, TestObject), testutil:check_formissingobject(Book1, "Bucket1", "Key2"), @@ -177,7 +232,10 @@ query_count(_Config) -> Book1, ?KEY_ONLY), ok = leveled_bookie:book_close(Book1), - {ok, Book2} = leveled_bookie:book_start(RootPath, 1000, 50000000), + {ok, Book2} = leveled_bookie:book_start(RootPath, + 1000, + 50000000, + testutil:sync_strategy()), Index1Count = count_termsonindex(BucketBin, "idx1_bin", Book2, @@ -288,7 +346,10 @@ query_count(_Config) -> end, R9), ok = leveled_bookie:book_close(Book2), - {ok, Book3} = leveled_bookie:book_start(RootPath, 2000, 50000000), + {ok, Book3} = leveled_bookie:book_start(RootPath, + 2000, + 50000000, + testutil:sync_strategy()), lists:foreach(fun({IdxF, IdxT, X}) -> Q = {index_query, BucketBin, @@ -305,7 +366,10 @@ query_count(_Config) -> R9), ok = testutil:book_riakput(Book3, Obj9, Spc9), ok = leveled_bookie:book_close(Book3), - {ok, Book4} = leveled_bookie:book_start(RootPath, 2000, 50000000), + {ok, Book4} = leveled_bookie:book_start(RootPath, + 2000, + 50000000, + testutil:sync_strategy()), lists:foreach(fun({IdxF, IdxT, X}) -> Q = {index_query, BucketBin, @@ -365,7 +429,10 @@ query_count(_Config) -> ok = leveled_bookie:book_close(Book4), - {ok, Book5} = leveled_bookie:book_start(RootPath, 2000, 50000000), + {ok, Book5} = leveled_bookie:book_start(RootPath, + 2000, + 50000000, + testutil:sync_strategy()), {async, BLF3} = leveled_bookie:book_returnfolder(Book5, BucketListQuery), SW_QC = os:timestamp(), BucketSet3 = BLF3(), diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index 79d9da7..a7af93e 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -20,10 +20,12 @@ retain_strategy(_Config) -> BookOpts = [{root_path, RootPath}, {cache_size, 1000}, {max_journalsize, 5000000}, + {sync_strategy, testutil:sync_strategy()}, {reload_strategy, [{?RIAK_TAG, retain}]}], BookOptsAlt = [{root_path, RootPath}, {cache_size, 1000}, {max_journalsize, 100000}, + {sync_strategy, testutil:sync_strategy()}, {reload_strategy, [{?RIAK_TAG, retain}]}, {max_run_length, 8}], {ok, Spcl3, LastV3} = rotating_object_check(BookOpts, "Bucket3", 800), @@ -47,6 +49,7 @@ recovr_strategy(_Config) -> BookOpts = [{root_path, RootPath}, {cache_size, 1000}, {max_journalsize, 5000000}, + {sync_strategy, testutil:sync_strategy()}, {reload_strategy, [{?RIAK_TAG, recovr}]}], R6 = rotating_object_check(BookOpts, "Bucket6", 6400), @@ -64,10 +67,12 @@ recovr_strategy(_Config) -> lists:foreach(fun({K, _SpcL}) -> {ok, OH} = testutil:book_riakhead(Book1, "Bucket6", K), - K = OH#r_object.key, + VCH = testutil:get_vclock(OH), {ok, OG} = testutil:book_riakget(Book1, "Bucket6", K), V = testutil:get_value(OG), - true = V == V4 + VCG = testutil:get_vclock(OG), + true = V == V4, + true = VCH == VCG end, lists:nthtail(6400, AllSpcL)), Q = fun(RT) -> {index_query, @@ -84,13 +89,16 @@ recovr_strategy(_Config) -> [length(KeyList), length(KeyTermList)]), true = length(KeyList) == 6400, true = length(KeyList) < length(KeyTermList), - true = length(KeyTermList) < 25600. + true = length(KeyTermList) < 25600, + ok = leveled_bookie:book_close(Book1), + testutil:reset_filestructure(). aae_bustedjournal(_Config) -> RootPath = testutil:reset_filestructure(), StartOpts = [{root_path, RootPath}, - {max_journalsize, 20000000}], + {max_journalsize, 20000000}, + {sync_strategy, testutil:sync_strategy()}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts), {TestObject, TestSpec} = testutil:generate_testobject(), ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), @@ -150,7 +158,7 @@ aae_bustedjournal(_Config) -> % Will need to remove the file or corrupt the hashtree to get presence to % fail - FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, testutil:riak_hash(V)}|Acc] + FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, erlang:phash2(V)}|Acc] end, SW = os:timestamp(), {async, HashTreeF3} = leveled_bookie:book_returnfolder(Bookie2, @@ -243,7 +251,8 @@ journal_compaction_bustedjournal(_Config) -> RootPath = testutil:reset_filestructure(), StartOpts1 = [{root_path, RootPath}, {max_journalsize, 10000000}, - {max_run_length, 10}], + {max_run_length, 10}, + {sync_strategy, testutil:sync_strategy()}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), {TestObject, TestSpec} = testutil:generate_testobject(), ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index 768758f..e53eb4f 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -25,6 +25,7 @@ set_object/5, get_key/1, get_value/1, + get_vclock/1, get_compressiblevalue/0, get_compressiblevalue_andinteger/0, get_randomindexes_generator/1, @@ -39,18 +40,112 @@ restore_file/2, restore_topending/2, find_journals/1, - riak_hash/1, wait_for_compaction/1, - foldkeysfun/3]). + foldkeysfun/3, + sync_strategy/0]). -define(RETURN_TERMS, {true, undefined}). -define(SLOWOFFER_DELAY, 5). +-define(V1_VERS, 1). +-define(MAGIC, 53). % riak_kv -> riak_object +-define(MD_VTAG, <<"X-Riak-VTag">>). +-define(MD_LASTMOD, <<"X-Riak-Last-Modified">>). +-define(MD_DELETED, <<"X-Riak-Deleted">>). +-define(EMPTY_VTAG_BIN, <<"e">>). +%% ================================================= +%% From riak_object + +to_binary(v1, #r_object{contents=Contents, vclock=VClock}) -> + new_v1(VClock, Contents). + +new_v1(Vclock, Siblings) -> + VclockBin = term_to_binary(Vclock), + VclockLen = byte_size(VclockBin), + SibCount = length(Siblings), + SibsBin = bin_contents(Siblings), + <>. + +bin_content(#r_content{metadata=Meta, value=Val}) -> + ValBin = encode_maybe_binary(Val), + ValLen = byte_size(ValBin), + MetaBin = meta_bin(Meta), + MetaLen = byte_size(MetaBin), + <>. + +bin_contents(Contents) -> + F = fun(Content, Acc) -> + <> + end, + lists:foldl(F, <<>>, Contents). + +meta_bin(MD) -> + {{VTagVal, Deleted, LastModVal}, RestBin} = + dict:fold(fun fold_meta_to_bin/3, + {{undefined, <<0>>, undefined}, <<>>}, + MD), + VTagBin = case VTagVal of + undefined -> ?EMPTY_VTAG_BIN; + _ -> list_to_binary(VTagVal) + end, + VTagLen = byte_size(VTagBin), + LastModBin = case LastModVal of + undefined -> + <<0:32/integer, 0:32/integer, 0:32/integer>>; + {Mega,Secs,Micro} -> + <> + end, + <>. + +fold_meta_to_bin(?MD_VTAG, Value, {{_Vt,Del,Lm},RestBin}) -> + {{Value, Del, Lm}, RestBin}; +fold_meta_to_bin(?MD_LASTMOD, Value, {{Vt,Del,_Lm},RestBin}) -> + {{Vt, Del, Value}, RestBin}; +fold_meta_to_bin(?MD_DELETED, true, {{Vt,_Del,Lm},RestBin})-> + {{Vt, <<1>>, Lm}, RestBin}; +fold_meta_to_bin(?MD_DELETED, "true", Acc) -> + fold_meta_to_bin(?MD_DELETED, true, Acc); +fold_meta_to_bin(?MD_DELETED, _, {{Vt,_Del,Lm},RestBin}) -> + {{Vt, <<0>>, Lm}, RestBin}; +fold_meta_to_bin(Key, Value, {{_Vt,_Del,_Lm}=Elems,RestBin}) -> + ValueBin = encode_maybe_binary(Value), + ValueLen = byte_size(ValueBin), + KeyBin = encode_maybe_binary(Key), + KeyLen = byte_size(KeyBin), + MetaBin = <>, + {Elems, <>}. + +encode_maybe_binary(Bin) when is_binary(Bin) -> + <<1, Bin/binary>>; +encode_maybe_binary(Bin) -> + <<0, (term_to_binary(Bin))/binary>>. + +%% ================================================= + +sync_strategy() -> + case erlang:system_info(otp_release) of + "17" -> + sync; + "18" -> + sync; + "19" -> + sync; + "16" -> + none + end. book_riakput(Pid, RiakObject, IndexSpecs) -> - {Bucket, Key} = leveled_codec:riakto_keydetails(RiakObject), - leveled_bookie:book_put(Pid, Bucket, Key, RiakObject, IndexSpecs, ?RIAK_TAG). + leveled_bookie:book_put(Pid, + RiakObject#r_object.bucket, + RiakObject#r_object.key, + to_binary(v1, RiakObject), + IndexSpecs, + ?RIAK_TAG). book_riakdelete(Pid, Bucket, Key, IndexSpecs) -> leveled_bookie:book_put(Pid, Bucket, Key, delete, IndexSpecs, ?RIAK_TAG). @@ -131,9 +226,9 @@ check_forlist(Bookie, ChkList, Log) -> R = book_riakget(Bookie, Obj#r_object.bucket, Obj#r_object.key), - ok = case R of - {ok, Obj} -> - ok; + true = case R of + {ok, Val} -> + to_binary(v1, Obj) == Val; not_found -> io:format("Object not found for key ~s~n", [Obj#r_object.key]), @@ -156,20 +251,18 @@ check_formissinglist(Bookie, ChkList) -> [timer:now_diff(os:timestamp(), SW), length(ChkList)]). check_forobject(Bookie, TestObject) -> - {ok, TestObject} = book_riakget(Bookie, + TestBinary = to_binary(v1, TestObject), + {ok, TestBinary} = book_riakget(Bookie, TestObject#r_object.bucket, TestObject#r_object.key), - {ok, HeadObject} = book_riakhead(Bookie, + {ok, HeadBinary} = book_riakhead(Bookie, TestObject#r_object.bucket, TestObject#r_object.key), - ok = case {HeadObject#r_object.bucket, - HeadObject#r_object.key, - HeadObject#r_object.vclock} of - {B1, K1, VC1} when B1 == TestObject#r_object.bucket, - K1 == TestObject#r_object.key, - VC1 == TestObject#r_object.vclock -> - ok - end. + {_SibMetaBinList, + Vclock, + _Hash, + size} = leveled_codec:riak_extract_metadata(HeadBinary, size), + true = Vclock == TestObject#r_object.vclock. check_formissingobject(Bookie, Bucket, Key) -> not_found = book_riakget(Bookie, Bucket, Key), @@ -181,11 +274,11 @@ generate_testobject() -> "Key1", "Value1", [], - {"MDK1", "MDV1"}}, + [{"MDK1", "MDV1"}]}, generate_testobject(B1, K1, V1, Spec1, MD). generate_testobject(B, K, V, Spec, MD) -> - Content = #r_content{metadata=MD, value=V}, + Content = #r_content{metadata=dict:from_list(MD), value=V}, {#r_object{bucket=B, key=K, contents=[Content], vclock=[{'a',1}]}, Spec}. @@ -269,6 +362,7 @@ set_object(Bucket, Key, Value, IndexGen) -> set_object(Bucket, Key, Value, IndexGen, []). set_object(Bucket, Key, Value, IndexGen, Indexes2Remove) -> + Obj = {Bucket, Key, Value, @@ -278,16 +372,52 @@ set_object(Bucket, Key, Value, IndexGen, Indexes2Remove) -> [{"MDK", "MDV" ++ Key}, {"MDK2", "MDV" ++ Key}]}, {B1, K1, V1, Spec1, MD} = Obj, - Content = #r_content{metadata=MD, value=V1}, - {#r_object{bucket=B1, key=K1, contents=[Content], vclock=[{'a',1}]}, + Content = #r_content{metadata=dict:from_list(MD), value=V1}, + {#r_object{bucket=B1, + key=K1, + contents=[Content], + vclock=generate_vclock()}, Spec1}. + +generate_vclock() -> + lists:map(fun(X) -> + {_, Actor} = lists:keyfind(random:uniform(10), + 1, + actor_list()), + {Actor, X} end, + lists:seq(1, random:uniform(8))). + + +actor_list() -> + [{1, albert}, {2, bertie}, {3, clara}, {4, dave}, {5, elton}, + {6, fred}, {7, george}, {8, harry}, {9, isaac}, {10, leila}]. + get_key(Object) -> Object#r_object.key. -get_value(Object) -> - [Content] = Object#r_object.contents, - Content#r_content.value. +get_value(ObjectBin) -> + <<_Magic:8/integer, _Vers:8/integer, VclockLen:32/integer, + Rest1/binary>> = ObjectBin, + <<_VclockBin:VclockLen/binary, SibCount:32/integer, SibsBin/binary>> = Rest1, + case SibCount of + 1 -> + <> = SibsBin, + <> = Rest2, + case ContentBin of + <<0, ContentBin0/binary>> -> + binary_to_term(ContentBin0) + end; + N -> + io:format("SibCount of ~w with ObjectBin ~w~n", [N, ObjectBin]), + error + end. + +get_vclock(ObjectBin) -> + <<_Magic:8/integer, _Vers:8/integer, VclockLen:32/integer, + Rest1/binary>> = ObjectBin, + <> = Rest1, + binary_to_term(VclockBin). load_objects(ChunkSize, GenList, Bookie, TestObject, Generator) -> lists:map(fun(KN) -> @@ -431,7 +561,8 @@ put_altered_indexed_objects(Book, Bucket, KSpecL, RemoveOld2i) -> rotating_object_check(RootPath, B, NumberOfObjects) -> BookOpts = [{root_path, RootPath}, {cache_size, 1000}, - {max_journalsize, 5000000}], + {max_journalsize, 5000000}, + {sync_strategy, sync_strategy()}], {ok, Book1} = leveled_bookie:book_start(BookOpts), {KSpcL1, V1} = testutil:put_indexed_objects(Book1, B, NumberOfObjects), ok = testutil:check_indexed_objects(Book1, B, KSpcL1, V1), @@ -453,6 +584,10 @@ corrupt_journal(RootPath, FileName, Corruptions, BasePosition, GapSize) -> OriginalPath = RootPath ++ "/journal/journal_files/" ++ FileName, BackupPath = RootPath ++ "/journal/journal_files/" ++ filename:basename(FileName, ".cdb") ++ ".bak", + io:format("Corruption attempt to be made to filename ~s ~w ~w~n", + [FileName, + filelib:is_file(OriginalPath), + filelib:is_file(BackupPath)]), {ok, _BytesCopied} = file:copy(OriginalPath, BackupPath), {ok, Handle} = file:open(OriginalPath, [binary, raw, read, write]), lists:foreach(fun(X) -> @@ -490,11 +625,3 @@ find_journals(RootPath) -> FNsA_J), CDBFiles. - -riak_hash(Obj=#r_object{}) -> - Vclock = vclock(Obj), - UpdObj = set_vclock(Obj, lists:sort(Vclock)), - erlang:phash2(term_to_binary(UpdObj)). - -set_vclock(Object=#r_object{}, VClock) -> Object#r_object{vclock=VClock}. -vclock(#r_object{vclock=VClock}) -> VClock.