Merge pull request #6 from martinsumner/OTP16

OTP16
This commit is contained in:
martinsumner 2016-12-03 15:01:28 +00:00 committed by GitHub
commit 441fff13e5
14 changed files with 1098 additions and 388 deletions

View file

@ -47,7 +47,8 @@
{max_size :: integer(),
file_path :: string(),
waste_path :: string(),
binary_mode = false :: boolean()}).
binary_mode = false :: boolean(),
sync_strategy = sync}).
-record(inker_options,
{cdb_max_size :: integer(),

9
priv/leveled.schema Normal file
View file

@ -0,0 +1,9 @@
%% -*- erlang -*-
%%%% leveled
%% @doc A path under which bitcask data files will be stored.
{mapping, "leveled.data_root", "leveled.data_root", [
{default, "$(platform_data_dir)/leveled"},
{datatype, directory}
]}.

View file

@ -118,7 +118,7 @@
terminate/2,
code_change/3,
book_start/1,
book_start/3,
book_start/4,
book_put/5,
book_put/6,
book_tempput/7,
@ -149,7 +149,7 @@
-record(state, {inker :: pid(),
penciller :: pid(),
cache_size :: integer(),
ledger_cache :: gb_trees:tree(),
ledger_cache :: list(), % a skiplist
is_snapshot :: boolean(),
slow_offer = false :: boolean()}).
@ -159,10 +159,11 @@
%%% API
%%%============================================================================
book_start(RootPath, LedgerCacheSize, JournalSize) ->
book_start(RootPath, LedgerCacheSize, JournalSize, SyncStrategy) ->
book_start([{root_path, RootPath},
{cache_size, LedgerCacheSize},
{max_journalsize, JournalSize}]).
{max_journalsize, JournalSize},
{sync_strategy, SyncStrategy}]).
book_start(Opts) ->
gen_server:start(?MODULE, [Opts], []).
@ -233,14 +234,14 @@ init([Opts]) ->
{ok, #state{inker=Inker,
penciller=Penciller,
cache_size=CacheSize,
ledger_cache=gb_trees:empty(),
ledger_cache=leveled_skiplist:empty(),
is_snapshot=false}};
Bookie ->
{ok,
{Penciller, LedgerCache},
Inker} = book_snapshotstore(Bookie, self(), ?SNAPSHOT_TIMEOUT),
ok = leveled_penciller:pcl_loadsnapshot(Penciller,
gb_trees:empty()),
leveled_skiplist:empty()),
leveled_log:log("B0002", [Inker, Penciller]),
{ok, #state{penciller=Penciller,
inker=Inker,
@ -431,7 +432,7 @@ bucket_stats(State, Bucket, Tag) ->
{LedgerSnapshot, LedgerCache},
_JournalSnapshot} = snapshot_store(State, ledger),
Folder = fun() ->
leveled_log:log("B0004", [gb_trees:size(LedgerCache)]),
leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]),
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
LedgerCache),
StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
@ -454,7 +455,7 @@ binary_bucketlist(State, Tag, {FoldBucketsFun, InitAcc}) ->
{LedgerSnapshot, LedgerCache},
_JournalSnapshot} = snapshot_store(State, ledger),
Folder = fun() ->
leveled_log:log("B0004", [gb_trees:size(LedgerCache)]),
leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]),
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
LedgerCache),
BucketAcc = get_nextbucket(null,
@ -509,7 +510,7 @@ index_query(State,
{B, null}
end,
Folder = fun() ->
leveled_log:log("B0004", [gb_trees:size(LedgerCache)]),
leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]),
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
LedgerCache),
StartKey = leveled_codec:to_ledgerkey(Bucket,
@ -551,7 +552,7 @@ hashtree_query(State, Tag, JournalCheck) ->
{LedgerSnapshot, LedgerCache},
JournalSnapshot} = snapshot_store(State, SnapType),
Folder = fun() ->
leveled_log:log("B0004", [gb_trees:size(LedgerCache)]),
leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]),
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
LedgerCache),
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
@ -602,7 +603,7 @@ foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun) ->
{FoldObjectsFun, []}
end,
Folder = fun() ->
leveled_log:log("B0004", [gb_trees:size(LedgerCache)]),
leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]),
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
LedgerCache),
AccFun = accumulate_objects(FoldFun, JournalSnapshot, Tag),
@ -623,7 +624,7 @@ bucketkey_query(State, Tag, Bucket, {FoldKeysFun, InitAcc}) ->
{LedgerSnapshot, LedgerCache},
_JournalSnapshot} = snapshot_store(State, ledger),
Folder = fun() ->
leveled_log:log("B0004", [gb_trees:size(LedgerCache)]),
leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]),
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
LedgerCache),
SK = leveled_codec:to_ledgerkey(Bucket, null, Tag),
@ -635,7 +636,7 @@ bucketkey_query(State, Tag, Bucket, {FoldKeysFun, InitAcc}) ->
AccFun,
InitAcc),
ok = leveled_penciller:pcl_close(LedgerSnapshot),
lists:reverse(Acc)
Acc
end,
{async, Folder}.
@ -661,7 +662,7 @@ snapshot_store(State, SnapType) ->
set_options(Opts) ->
MaxJournalSize = get_opt(max_journalsize, Opts, 10000000000),
SyncStrat = get_opt(sync_strategy, Opts, sync),
WRP = get_opt(waste_retention_period, Opts),
AltStrategy = get_opt(reload_strategy, Opts, []),
@ -680,7 +681,8 @@ set_options(Opts) ->
max_run_length = get_opt(max_run_length, Opts),
waste_retention_period = WRP,
cdb_options = #cdb_options{max_size=MaxJournalSize,
binary_mode=true}},
binary_mode=true,
sync_strategy=SyncStrat}},
#penciller_options{root_path = LedgerFP,
max_inmemory_tablesize = PCLL0CacheSize}}.
@ -697,7 +699,7 @@ startup(InkerOpts, PencillerOpts) ->
fetch_head(Key, Penciller, LedgerCache) ->
case gb_trees:lookup(Key, LedgerCache) of
case leveled_skiplist:lookup(Key, LedgerCache) of
{value, Head} ->
Head;
none ->
@ -863,18 +865,18 @@ preparefor_ledgercache(_Type, LedgerKey, SQN, Obj, Size, {IndexSpecs, TTL}) ->
addto_ledgercache(Changes, Cache) ->
lists:foldl(fun({K, V}, Acc) -> gb_trees:enter(K, V, Acc) end,
lists:foldl(fun({K, V}, Acc) -> leveled_skiplist:enter(K, V, Acc) end,
Cache,
Changes).
maybepush_ledgercache(MaxCacheSize, Cache, Penciller) ->
CacheSize = gb_trees:size(Cache),
CacheSize = leveled_skiplist:size(Cache),
TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize),
if
TimeToPush ->
case leveled_penciller:pcl_pushmem(Penciller, Cache) of
ok ->
{ok, gb_trees:empty()};
{ok, leveled_skiplist:empty()};
returned ->
{returned, Cache}
end;
@ -966,112 +968,6 @@ generate_multiple_objects(Count, KeyNumber, ObjL) ->
KeyNumber + 1,
ObjL ++ [{Key, Value, IndexSpec}]).
generate_multiple_robjects(Count, KeyNumber) ->
generate_multiple_robjects(Count, KeyNumber, []).
generate_multiple_robjects(0, _KeyNumber, ObjL) ->
ObjL;
generate_multiple_robjects(Count, KeyNumber, ObjL) ->
Obj = {"Bucket",
"Key" ++ integer_to_list(KeyNumber),
crypto:rand_bytes(1024),
[],
[{"MDK", "MDV" ++ integer_to_list(KeyNumber)},
{"MDK2", "MDV" ++ integer_to_list(KeyNumber)}]},
{B1, K1, V1, Spec1, MD} = Obj,
Content = #r_content{metadata=MD, value=V1},
Obj1 = #r_object{bucket=B1, key=K1, contents=[Content], vclock=[{'a',1}]},
generate_multiple_robjects(Count - 1, KeyNumber + 1, ObjL ++ [{Obj1, Spec1}]).
single_key_test() ->
RootPath = reset_filestructure(),
{ok, Bookie1} = book_start([{root_path, RootPath}]),
{B1, K1, V1, Spec1, MD} = {"Bucket1",
"Key1",
"Value1",
[],
{"MDK1", "MDV1"}},
Content = #r_content{metadata=MD, value=V1},
Object = #r_object{bucket=B1, key=K1, contents=[Content], vclock=[{'a',1}]},
ok = book_put(Bookie1, B1, K1, Object, Spec1, ?RIAK_TAG),
{ok, F1} = book_get(Bookie1, B1, K1, ?RIAK_TAG),
?assertMatch(F1, Object),
ok = book_close(Bookie1),
{ok, Bookie2} = book_start([{root_path, RootPath}]),
{ok, F2} = book_get(Bookie2, B1, K1, ?RIAK_TAG),
?assertMatch(F2, Object),
ok = book_close(Bookie2),
reset_filestructure().
multi_key_test() ->
RootPath = reset_filestructure(),
{ok, Bookie1} = book_start([{root_path, RootPath}]),
{B1, K1, V1, Spec1, MD1} = {"Bucket",
"Key1",
"Value1",
[],
{"MDK1", "MDV1"}},
C1 = #r_content{metadata=MD1, value=V1},
Obj1 = #r_object{bucket=B1, key=K1, contents=[C1], vclock=[{'a',1}]},
{B2, K2, V2, Spec2, MD2} = {"Bucket",
"Key2",
"Value2",
[],
{"MDK2", "MDV2"}},
C2 = #r_content{metadata=MD2, value=V2},
Obj2 = #r_object{bucket=B2, key=K2, contents=[C2], vclock=[{'a',1}]},
ok = book_put(Bookie1, B1, K1, Obj1, Spec1, ?RIAK_TAG),
ObjL1 = generate_multiple_robjects(20, 3),
SW1 = os:timestamp(),
lists:foreach(fun({O, S}) ->
{B, K} = leveled_codec:riakto_keydetails(O),
ok = book_put(Bookie1, B, K, O, S, ?RIAK_TAG)
end,
ObjL1),
io:format("PUT of 20 objects completed in ~w microseconds~n",
[timer:now_diff(os:timestamp(),SW1)]),
ok = book_put(Bookie1, B2, K2, Obj2, Spec2, ?RIAK_TAG),
{ok, F1A} = book_get(Bookie1, B1, K1, ?RIAK_TAG),
?assertMatch(F1A, Obj1),
{ok, F2A} = book_get(Bookie1, B2, K2, ?RIAK_TAG),
?assertMatch(F2A, Obj2),
ObjL2 = generate_multiple_robjects(20, 23),
SW2 = os:timestamp(),
lists:foreach(fun({O, S}) ->
{B, K} = leveled_codec:riakto_keydetails(O),
ok = book_put(Bookie1, B, K, O, S, ?RIAK_TAG)
end,
ObjL2),
io:format("PUT of 20 objects completed in ~w microseconds~n",
[timer:now_diff(os:timestamp(),SW2)]),
{ok, F1B} = book_get(Bookie1, B1, K1, ?RIAK_TAG),
?assertMatch(F1B, Obj1),
{ok, F2B} = book_get(Bookie1, B2, K2, ?RIAK_TAG),
?assertMatch(F2B, Obj2),
ok = book_close(Bookie1),
% Now reopen the file, and confirm that a fetch is still possible
{ok, Bookie2} = book_start([{root_path, RootPath}]),
{ok, F1C} = book_get(Bookie2, B1, K1, ?RIAK_TAG),
?assertMatch(F1C, Obj1),
{ok, F2C} = book_get(Bookie2, B2, K2, ?RIAK_TAG),
?assertMatch(F2C, Obj2),
ObjL3 = generate_multiple_robjects(20, 43),
SW3 = os:timestamp(),
lists:foreach(fun({O, S}) ->
{B, K} = leveled_codec:riakto_keydetails(O),
ok = book_put(Bookie2, B, K, O, S, ?RIAK_TAG)
end,
ObjL3),
io:format("PUT of 20 objects completed in ~w microseconds~n",
[timer:now_diff(os:timestamp(),SW3)]),
{ok, F1D} = book_get(Bookie2, B1, K1, ?RIAK_TAG),
?assertMatch(F1D, Obj1),
{ok, F2D} = book_get(Bookie2, B2, K2, ?RIAK_TAG),
?assertMatch(F2D, Obj2),
ok = book_close(Bookie2),
reset_filestructure().
ttl_test() ->
RootPath = reset_filestructure(),

View file

@ -107,7 +107,8 @@
delete_point = 0 :: integer(),
inker :: pid(),
deferred_delete = false :: boolean(),
waste_path :: string()}).
waste_path :: string(),
sync_strategy = none}).
%%%============================================================================
@ -222,12 +223,15 @@ init([Opts]) ->
starting,
#state{max_size=MaxSize,
binary_mode=Opts#cdb_options.binary_mode,
waste_path=Opts#cdb_options.waste_path}}.
waste_path=Opts#cdb_options.waste_path,
sync_strategy=Opts#cdb_options.sync_strategy}}.
starting({open_writer, Filename}, _From, State) ->
leveled_log:log("CDB01", [Filename]),
{LastPosition, HashTree, LastKey} = open_active_file(Filename),
{ok, Handle} = file:open(Filename, [sync | ?WRITE_OPS]),
WriteOps = set_writeops(State#state.sync_strategy),
leveled_log:log("CDB13", [WriteOps]),
{ok, Handle} = file:open(Filename, WriteOps),
{reply, ok, writer, State#state{handle=Handle,
last_position=LastPosition,
last_key=LastKey,
@ -263,6 +267,13 @@ writer({put_kv, Key, Value}, _From, State) ->
%% Key and value could not be written
{reply, roll, writer, State};
{UpdHandle, NewPosition, HashTree} ->
ok =
case State#state.sync_strategy of
riak_sync ->
file:datasync(UpdHandle);
_ ->
ok
end,
{reply, ok, writer, State#state{handle=UpdHandle,
last_position=NewPosition,
last_key=Key,
@ -381,23 +392,32 @@ reader({get_positions, SampleSize}, _From, State) ->
end;
reader({direct_fetch, PositionList, Info}, _From, State) ->
H = State#state.handle,
case Info of
key_only ->
KeyList = lists:map(fun(P) ->
extract_key(H, P) end,
PositionList),
{reply, KeyList, reader, State};
key_size ->
KeySizeList = lists:map(fun(P) ->
extract_key_size(H, P) end,
PositionList),
{reply, KeySizeList, reader, State};
key_value_check ->
KVCList = lists:map(fun(P) ->
extract_key_value_check(H, P) end,
PositionList),
{reply, KVCList, reader, State}
end;
FilterFalseKey = fun(Tpl) -> case element(1, Tpl) of
false ->
false;
_Key ->
{true, Tpl}
end end,
Reply =
case Info of
key_only ->
FM = lists:filtermap(
fun(P) ->
FilterFalseKey(extract_key(H, P)) end,
PositionList),
lists:map(fun(T) -> element(1, T) end, FM);
key_size ->
lists:filtermap(
fun(P) ->
FilterFalseKey(extract_key_size(H, P)) end,
PositionList);
key_value_check ->
lists:filtermap(
fun(P) ->
FilterFalseKey(extract_key_value_check(H, P)) end,
PositionList)
end,
{reply, Reply, reader, State};
reader(cdb_complete, _From, State) ->
ok = file:close(State#state.handle),
{stop, normal, {ok, State#state.filename}, State#state{handle=undefined}};
@ -482,7 +502,8 @@ handle_sync_event(cdb_firstkey, _From, StateName, State) ->
?BASE_POSITION ->
empty;
_ ->
extract_key(State#state.handle, ?BASE_POSITION)
element(1, extract_key(State#state.handle,
?BASE_POSITION))
end,
{reply, FirstKey, StateName, State};
handle_sync_event(cdb_filename, _From, StateName, State) ->
@ -520,6 +541,23 @@ code_change(_OldVsn, StateName, State, _Extra) ->
%%% Internal functions
%%%============================================================================
%% Assumption is that sync should be used - it is a transaction log.
%%
%% However this flag is not supported in OTP 16. Bitcask appears to pass an
%% o_sync flag, but this isn't supported either (maybe it works with the
%% bitcask nif fileops).
%%
%% To get round this will try and datasync on each PUT with riak_sync
set_writeops(SyncStrategy) ->
case SyncStrategy of
sync ->
[sync | ?WRITE_OPS];
riak_sync ->
?WRITE_OPS;
none ->
?WRITE_OPS
end.
%% from_dict(FileName,ListOfKeyValueTuples)
%% Given a filename and a dictionary, create a cdb
@ -865,17 +903,17 @@ extract_kvpair(Handle, [Position|Rest], Key) ->
extract_key(Handle, Position) ->
{ok, _} = file:position(Handle, Position),
{KeyLength, _ValueLength} = read_next_2_integers(Handle),
read_next_term(Handle, KeyLength).
{safe_read_next_term(Handle, KeyLength)}.
extract_key_size(Handle, Position) ->
{ok, _} = file:position(Handle, Position),
{KeyLength, ValueLength} = read_next_2_integers(Handle),
{read_next_term(Handle, KeyLength), ValueLength}.
{safe_read_next_term(Handle, KeyLength), ValueLength}.
extract_key_value_check(Handle, Position) ->
{ok, _} = file:position(Handle, Position),
{KeyLength, ValueLength} = read_next_2_integers(Handle),
K = read_next_term(Handle, KeyLength),
K = safe_read_next_term(Handle, KeyLength),
{Check, V} = read_next_term(Handle, ValueLength, crc),
{K, V, Check}.
@ -1648,21 +1686,20 @@ get_keys_byposition_simple_test() ->
io:format("Position list of ~w~n", [PositionList]),
?assertMatch(3, length(PositionList)),
R1 = cdb_directfetch(P2, PositionList, key_only),
io:format("R1 ~w~n", [R1]),
?assertMatch(3, length(R1)),
lists:foreach(fun(Key) ->
Check = lists:member(Key, KeyList),
?assertMatch(Check, true) end,
?assertMatch(true, lists:member(Key, KeyList)) end,
R1),
R2 = cdb_directfetch(P2, PositionList, key_size),
?assertMatch(3, length(R2)),
lists:foreach(fun({Key, _Size}) ->
Check = lists:member(Key, KeyList),
?assertMatch(Check, true) end,
?assertMatch(true, lists:member(Key, KeyList)) end,
R2),
R3 = cdb_directfetch(P2, PositionList, key_value_check),
?assertMatch(3, length(R3)),
lists:foreach(fun({Key, Value, Check}) ->
?assertMatch(Check, true),
?assertMatch(true, Check),
{K, V} = cdb_get(P2, Key),
?assertMatch(K, Key),
?assertMatch(V, Value) end,

View file

@ -60,9 +60,12 @@
get_size/2,
get_keyandhash/2,
convert_indexspecs/5,
riakto_keydetails/1,
generate_uuid/0,
integer_now/0]).
integer_now/0,
riak_extract_metadata/2]).
-define(V1_VERS, 1).
-define(MAGIC, 53). % riak_kv -> riak_object
%% Credit to
@ -325,44 +328,67 @@ get_keyandhash(LK, Value) ->
build_metadata_object(PrimaryKey, MD) ->
{Tag, Bucket, Key, null} = PrimaryKey,
{Tag, _Bucket, _Key, null} = PrimaryKey,
case Tag of
?RIAK_TAG ->
riak_metadata_object(Bucket, Key, MD);
{SibMetaBinList, Vclock, _Hash, _Size} = MD,
riak_metadata_to_binary(Vclock, SibMetaBinList);
?STD_TAG ->
MD
end.
riak_metadata_object(Bucket, Key, MD) ->
{RMD, VC, _Hash, _Size} = MD,
Contents = lists:foldl(fun(X, Acc) -> Acc ++ [#r_content{metadata=X}] end,
[],
RMD),
#r_object{contents=Contents, bucket=Bucket, key=Key, vclock=VC}.
riak_extract_metadata(delete, Size) ->
{delete, null, null, Size};
riak_extract_metadata(Obj, Size) ->
{get_metadatas(Obj), vclock(Obj), riak_hash(Obj), Size}.
riak_extract_metadata(ObjBin, Size) ->
{Vclock, SibMetaBinList} = riak_metadata_from_binary(ObjBin),
{SibMetaBinList, Vclock, erlang:phash2(ObjBin), Size}.
riak_hash(Obj=#r_object{}) ->
Vclock = vclock(Obj),
UpdObj = set_vclock(Obj, lists:sort(Vclock)),
erlang:phash2(term_to_binary(UpdObj)).
%% <<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
%%% VclockBin/binary, SibCount:32/integer, SibsBin/binary>>.
riakto_keydetails(Object) ->
{Object#r_object.bucket, Object#r_object.key}.
riak_metadata_to_binary(Vclock, SibMetaBinList) ->
VclockBin = term_to_binary(Vclock),
VclockLen = byte_size(VclockBin),
SibCount = length(SibMetaBinList),
SibsBin = slimbin_contents(SibMetaBinList),
<<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
VclockBin:VclockLen/binary, SibCount:32/integer, SibsBin/binary>>.
% Fixes the value length for each sibling to be zero, and so includes no value
slimbin_content(MetaBin) ->
MetaLen = byte_size(MetaBin),
<<0:32/integer, MetaLen:32/integer, MetaBin:MetaLen/binary>>.
get_metadatas(#r_object{contents=Contents}) ->
[Content#r_content.metadata || Content <- Contents].
set_vclock(Object=#r_object{}, VClock) -> Object#r_object{vclock=VClock}.
vclock(#r_object{vclock=VClock}) -> VClock.
slimbin_contents(SibMetaBinList) ->
F = fun(MetaBin, Acc) ->
<<Acc/binary, (slimbin_content(MetaBin))/binary>>
end,
lists:foldl(F, <<>>, SibMetaBinList).
riak_metadata_from_binary(V1Binary) ->
<<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
Rest/binary>> = V1Binary,
<<VclockBin:VclockLen/binary, SibCount:32/integer, SibsBin/binary>> = Rest,
SibMetaBinList =
case SibCount of
0 ->
[];
SC when is_integer(SC) ->
get_metadata_from_siblings(SibsBin, SibCount, [])
end,
{binary_to_term(VclockBin), SibMetaBinList}.
get_metadata_from_siblings(<<>>, 0, SibMetaBinList) ->
SibMetaBinList;
get_metadata_from_siblings(<<ValLen:32/integer, Rest0/binary>>,
SibCount,
SibMetaBinList) ->
<<_ValBin:ValLen/binary, MetaLen:32/integer, Rest1/binary>> = Rest0,
<<MetaBin:MetaLen/binary, Rest2/binary>> = Rest1,
get_metadata_from_siblings(Rest2,
SibCount - 1,
[MetaBin|SibMetaBinList]).

View file

@ -633,7 +633,7 @@ load_from_sequence(MinSQN, FilterFun, Penciller, [{_LowSQN, FN, Pid}|Rest]) ->
load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller,
CDBpid, StartPos, FN, Rest) ->
leveled_log:log("I0014", [FN, MinSQN]),
InitAcc = {MinSQN, MaxSQN, gb_trees:empty()},
InitAcc = {MinSQN, MaxSQN, leveled_skiplist:empty()},
Res = case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitAcc, StartPos) of
{eof, {AccMinSQN, _AccMaxSQN, AccKL}} ->
ok = push_to_penciller(Penciller, AccKL),

View file

@ -256,7 +256,9 @@
{"CDB11",
{info, "CRC check failed due to size"}},
{"CDB12",
{inof, "HashTree written"}}
{info, "HashTree written"}},
{"CDB13",
{info, "Write options of ~w"}}
])).

View file

@ -212,15 +212,16 @@
levelzero_pending = false :: boolean(),
levelzero_constructor :: pid(),
levelzero_cache = [] :: list(), % a list of gb_trees
levelzero_index :: array:array(),
levelzero_cache = [] :: list(), % a list of skiplists
levelzero_index,
% is an array - but cannot specif due to OTP compatability
levelzero_size = 0 :: integer(),
levelzero_maxcachesize :: integer(),
is_snapshot = false :: boolean(),
snapshot_fully_loaded = false :: boolean(),
source_penciller :: pid(),
levelzero_astree :: gb_trees:tree(),
levelzero_astree :: list(), % skiplist
ongoing_work = [] :: list(),
work_backlog = false :: boolean()}).
@ -366,25 +367,24 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys},
_From,
State=#state{snapshot_fully_loaded=Ready})
when Ready == true ->
L0AsTree =
L0AsList =
case State#state.levelzero_astree of
undefined ->
leveled_pmem:merge_trees(StartKey,
EndKey,
State#state.levelzero_cache,
gb_trees:empty());
Tree ->
Tree
leveled_skiplist:empty());
List ->
List
end,
L0iter = gb_trees:iterator(L0AsTree),
SFTiter = initiate_rangequery_frommanifest(StartKey,
EndKey,
State#state.manifest),
Acc = keyfolder({L0iter, SFTiter},
Acc = keyfolder({L0AsList, SFTiter},
{StartKey, EndKey},
{AccFun, InitAcc},
MaxKeys),
{reply, Acc, State#state{levelzero_astree = L0AsTree}};
{reply, Acc, State#state{levelzero_astree = L0AsList}};
handle_call(work_for_clerk, From, State) ->
{UpdState, Work} = return_work(State, From),
{reply, Work, UpdState};
@ -985,77 +985,73 @@ keyfolder(IMMiter, SFTiter, StartKey, EndKey, {AccFun, Acc}) ->
keyfolder(_Iterators, _KeyRange, {_AccFun, Acc}, MaxKeys) when MaxKeys == 0 ->
Acc;
keyfolder({null, SFTiter}, KeyRange, {AccFun, Acc}, MaxKeys) ->
keyfolder({[], SFTiter}, KeyRange, {AccFun, Acc}, MaxKeys) ->
{StartKey, EndKey} = KeyRange,
case find_nextkey(SFTiter, StartKey, EndKey) of
no_more_keys ->
Acc;
{NxSFTiter, {SFTKey, SFTVal}} ->
Acc1 = AccFun(SFTKey, SFTVal, Acc),
keyfolder({null, NxSFTiter}, KeyRange, {AccFun, Acc1}, MaxKeys - 1)
keyfolder({[], NxSFTiter}, KeyRange, {AccFun, Acc1}, MaxKeys - 1)
end;
keyfolder({IMMiterator, SFTiterator}, KeyRange, {AccFun, Acc}, MaxKeys) ->
keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SFTiterator}, KeyRange,
{AccFun, Acc}, MaxKeys) ->
{StartKey, EndKey} = KeyRange,
case gb_trees:next(IMMiterator) of
none ->
% There are no more keys in the in-memory iterator, so now
% iterate only over the remaining keys in the SFT iterator
keyfolder({null, SFTiterator}, KeyRange, {AccFun, Acc}, MaxKeys);
{IMMKey, _IMMVal, NxIMMiterator} when IMMKey < StartKey ->
case {IMMKey < StartKey, leveled_codec:endkey_passed(EndKey, IMMKey)} of
{true, _} ->
% Normally everything is pre-filterd, but the IMM iterator can
% be re-used and do may be behind the StartKey if the StartKey has
% be re-used and so may be behind the StartKey if the StartKey has
% advanced from the previous use
keyfolder({NxIMMiterator, SFTiterator},
KeyRange,
{AccFun, Acc},
MaxKeys);
{IMMKey, IMMVal, NxIMMiterator} ->
case leveled_codec:endkey_passed(EndKey, IMMKey) of
true ->
% There are no more keys in-range in the in-memory
% iterator, so take action as if this iterator is empty
% (see above)
keyfolder({null, SFTiterator},
{false, true} ->
% There are no more keys in-range in the in-memory
% iterator, so take action as if this iterator is empty
% (see above)
keyfolder({[], SFTiterator},
KeyRange,
{AccFun, Acc},
MaxKeys);
{false, false} ->
case find_nextkey(SFTiterator, StartKey, EndKey) of
no_more_keys ->
% No more keys in range in the persisted store, so use the
% in-memory KV as the next
Acc1 = AccFun(IMMKey, IMMVal, Acc),
keyfolder({NxIMMiterator, SFTiterator},
KeyRange,
{AccFun, Acc},
MaxKeys);
false ->
case find_nextkey(SFTiterator, StartKey, EndKey) of
no_more_keys ->
% No more keys in range in the persisted store, so use the
% in-memory KV as the next
{AccFun, Acc1},
MaxKeys - 1);
{NxSFTiterator, {SFTKey, SFTVal}} ->
% There is a next key, so need to know which is the
% next key between the two (and handle two keys
% with different sequence numbers).
case leveled_codec:key_dominates({IMMKey,
IMMVal},
{SFTKey,
SFTVal}) of
left_hand_first ->
Acc1 = AccFun(IMMKey, IMMVal, Acc),
keyfolder({NxIMMiterator, SFTiterator},
KeyRange,
{AccFun, Acc1},
MaxKeys - 1);
{NxSFTiterator, {SFTKey, SFTVal}} ->
% There is a next key, so need to know which is the
% next key between the two (and handle two keys
% with different sequence numbers).
case leveled_codec:key_dominates({IMMKey,
IMMVal},
{SFTKey,
SFTVal}) of
left_hand_first ->
Acc1 = AccFun(IMMKey, IMMVal, Acc),
keyfolder({NxIMMiterator, SFTiterator},
KeyRange,
{AccFun, Acc1},
MaxKeys - 1);
right_hand_first ->
Acc1 = AccFun(SFTKey, SFTVal, Acc),
keyfolder({IMMiterator, NxSFTiterator},
KeyRange,
{AccFun, Acc1},
MaxKeys - 1);
left_hand_dominant ->
Acc1 = AccFun(IMMKey, IMMVal, Acc),
keyfolder({NxIMMiterator, NxSFTiterator},
KeyRange,
{AccFun, Acc1},
MaxKeys - 1)
end
right_hand_first ->
Acc1 = AccFun(SFTKey, SFTVal, Acc),
keyfolder({[{IMMKey, IMMVal}|NxIMMiterator],
NxSFTiterator},
KeyRange,
{AccFun, Acc1},
MaxKeys - 1);
left_hand_dominant ->
Acc1 = AccFun(IMMKey, IMMVal, Acc),
keyfolder({NxIMMiterator, NxSFTiterator},
KeyRange,
{AccFun, Acc1},
MaxKeys - 1)
end
end
end.
@ -1267,8 +1263,8 @@ confirm_delete_test() ->
maybe_pause_push(PCL, KL) ->
T0 = gb_trees:empty(),
T1 = lists:foldl(fun({K, V}, Acc) -> gb_trees:enter(K, V, Acc) end,
T0 = leveled_skiplist:empty(),
T1 = lists:foldl(fun({K, V}, Acc) -> leveled_skiplist:enter(K, V, Acc) end,
T0,
KL),
case pcl_pushmem(PCL, T1) of
@ -1335,7 +1331,7 @@ simple_server_test() ->
SnapOpts = #penciller_options{start_snapshot = true,
source_penciller = PCLr},
{ok, PclSnap} = pcl_start(SnapOpts),
ok = pcl_loadsnapshot(PclSnap, gb_trees:empty()),
ok = pcl_loadsnapshot(PclSnap, leveled_skiplist:empty()),
?assertMatch(Key1, pcl_fetch(PclSnap, {o,"Bucket0001", "Key0001", null})),
?assertMatch(Key2, pcl_fetch(PclSnap, {o,"Bucket0002", "Key0002", null})),
?assertMatch(Key3, pcl_fetch(PclSnap, {o,"Bucket0003", "Key0003", null})),
@ -1384,7 +1380,7 @@ simple_server_test() ->
term_to_binary("Hello")),
{ok, PclSnap2} = pcl_start(SnapOpts),
ok = pcl_loadsnapshot(PclSnap2, gb_trees:empty()),
ok = pcl_loadsnapshot(PclSnap2, leveled_skiplist:empty()),
?assertMatch(false, pcl_checksequencenumber(PclSnap2,
{o,
"Bucket0001",
@ -1543,16 +1539,16 @@ foldwithimm_simple_test() ->
{3, [{{o, "Bucket1", "Key3"}, {3, {active, infinity}, null}}]},
{5, [{{o, "Bucket1", "Key5"}, {2, {active, infinity}, null}}]}
],
IMM0 = gb_trees:enter({o, "Bucket1", "Key6"},
{7, {active, infinity}, null},
gb_trees:empty()),
IMM1 = gb_trees:enter({o, "Bucket1", "Key1"},
{8, {active, infinity}, null},
IMM0),
IMM2 = gb_trees:enter({o, "Bucket1", "Key8"},
{9, {active, infinity}, null},
IMM1),
IMMiter = gb_trees:iterator_from({o, "Bucket1", "Key1"}, IMM2),
IMM0 = leveled_skiplist:enter({o, "Bucket1", "Key6"},
{7, {active, infinity}, null},
leveled_skiplist:empty()),
IMM1 = leveled_skiplist:enter({o, "Bucket1", "Key1"},
{8, {active, infinity}, null},
IMM0),
IMM2 = leveled_skiplist:enter({o, "Bucket1", "Key8"},
{9, {active, infinity}, null},
IMM1),
IMMiter = leveled_skiplist:to_range(IMM2, {o, "Bucket1", "Key1"}),
AccFun = fun(K, V, Acc) -> SQN = leveled_codec:strip_to_seqonly({K, V}),
Acc ++ [{K, SQN}] end,
Acc = keyfolder(IMMiter,
@ -1564,10 +1560,10 @@ foldwithimm_simple_test() ->
{{o, "Bucket1", "Key5"}, 2},
{{o, "Bucket1", "Key6"}, 7}], Acc),
IMM1A = gb_trees:enter({o, "Bucket1", "Key1"},
{8, {active, infinity}, null},
gb_trees:empty()),
IMMiterA = gb_trees:iterator_from({o, "Bucket1", "Key1"}, IMM1A),
IMM1A = leveled_skiplist:enter({o, "Bucket1", "Key1"},
{8, {active, infinity}, null},
leveled_skiplist:empty()),
IMMiterA = leveled_skiplist:to_range(IMM1A, {o, "Bucket1", "Key1"}),
AccA = keyfolder(IMMiterA,
QueryArray,
{o, "Bucket1", "Key1"}, {o, "Bucket1", "Key6"},
@ -1576,10 +1572,10 @@ foldwithimm_simple_test() ->
{{o, "Bucket1", "Key3"}, 3},
{{o, "Bucket1", "Key5"}, 2}], AccA),
IMM3 = gb_trees:enter({o, "Bucket1", "Key4"},
{10, {active, infinity}, null},
IMM2),
IMMiterB = gb_trees:iterator_from({o, "Bucket1", "Key1"}, IMM3),
IMM3 = leveled_skiplist:enter({o, "Bucket1", "Key4"},
{10, {active, infinity}, null},
IMM2),
IMMiterB = leveled_skiplist:to_range(IMM3, {o, "Bucket1", "Key1"}),
AccB = keyfolder(IMMiterB,
QueryArray,
{o, "Bucket1", "Key1"}, {o, "Bucket1", "Key6"},
@ -1594,7 +1590,7 @@ create_file_test() ->
Filename = "../test/new_file.sft",
ok = file:write_file(Filename, term_to_binary("hello")),
KVL = lists:usort(leveled_sft:generate_randomkeys(10000)),
Tree = gb_trees:from_orddict(KVL),
Tree = leveled_skiplist:from_list(KVL),
FetchFun = fun(Slot) -> lists:nth(Slot, [Tree]) end,
{ok,
SP,

View file

@ -76,7 +76,7 @@ add_to_index(L0Index, L0Size, LevelMinus1, LedgerSQN, TreeList) ->
Count0,
array:set(Slot, [{Hash, SlotInTreeList}|L], HashIndex)}
end,
LM1List = gb_trees:to_list(LevelMinus1),
LM1List = leveled_skiplist:to_list(LevelMinus1),
StartingT = {infinity, 0, L0Size, L0Index},
{MinSQN, MaxSQN, NewL0Size, UpdL0Index} = lists:foldl(FoldFun,
StartingT,
@ -96,7 +96,7 @@ to_list(Slots, FetchFun) ->
SlotList = lists:reverse(lists:seq(1, Slots)),
FullList = lists:foldl(fun(Slot, Acc) ->
Tree = FetchFun(Slot),
L = gb_trees:to_list(Tree),
L = leveled_skiplist:to_list(Tree),
lists:ukeymerge(1, Acc, L)
end,
[],
@ -128,7 +128,7 @@ check_levelzero(Key, L0Index, TreeList) ->
{Found, KV};
false ->
CheckTree = lists:nth(SlotToCheck, TreeList),
case gb_trees:lookup(Key, CheckTree) of
case leveled_skiplist:lookup(Key, CheckTree) of
none ->
{Found, KV};
{value, Value} ->
@ -139,12 +139,15 @@ check_levelzero(Key, L0Index, TreeList) ->
{false, not_found},
lists:reverse(lists:usort(SlotList))).
merge_trees(StartKey, EndKey, TreeList, LevelMinus1) ->
lists:foldl(fun(Tree, TreeAcc) ->
merge_nexttree(Tree, TreeAcc, StartKey, EndKey) end,
gb_trees:empty(),
lists:append(TreeList, [LevelMinus1])).
merge_trees(StartKey, EndKey, SkipListList, LevelMinus1) ->
lists:foldl(fun(SkipList, Acc) ->
R = leveled_skiplist:to_range(SkipList,
StartKey,
EndKey),
lists:ukeymerge(1, Acc, R) end,
[],
[LevelMinus1|lists:reverse(SkipListList)]).
%%%============================================================================
%%% Internal Functions
@ -155,24 +158,6 @@ hash_to_slot(Key) ->
H = erlang:phash2(Key),
{H bsr element(2, ?SLOT_WIDTH), H band (element(1, ?SLOT_WIDTH) - 1)}.
merge_nexttree(Tree, TreeAcc, StartKey, EndKey) ->
Iter = gb_trees:iterator_from(StartKey, Tree),
merge_nexttree(Iter, TreeAcc, EndKey).
merge_nexttree(Iter, TreeAcc, EndKey) ->
case gb_trees:next(Iter) of
none ->
TreeAcc;
{Key, Value, NewIter} ->
case leveled_codec:endkey_passed(EndKey, Key) of
true ->
TreeAcc;
false ->
merge_nexttree(NewIter,
gb_trees:enter(Key, Value, TreeAcc),
EndKey)
end
end.
%%%============================================================================
%%% Test
@ -183,7 +168,7 @@ merge_nexttree(Iter, TreeAcc, EndKey) ->
generate_randomkeys(Seqn, Count, BucketRangeLow, BucketRangeHigh) ->
generate_randomkeys(Seqn,
Count,
gb_trees:empty(),
leveled_skiplist:empty(),
BucketRangeLow,
BucketRangeHigh).
@ -197,7 +182,7 @@ generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) ->
{Seqn, {active, infinity}, null}},
generate_randomkeys(Seqn + 1,
Count - 1,
gb_trees:enter(K, V, Acc),
leveled_skiplist:enter(K, V, Acc),
BucketLow,
BRange).
@ -216,29 +201,29 @@ compare_method_test() ->
?assertMatch(32000, SQN),
?assertMatch(true, Size =< 32000),
TestList = gb_trees:to_list(generate_randomkeys(1, 2000, 1, 800)),
TestList = leveled_skiplist:to_list(generate_randomkeys(1, 2000, 1, 800)),
S0 = lists:foldl(fun({Key, _V}, Acc) ->
R0 = lists:foldr(fun(Tree, {Found, KV}) ->
case Found of
true ->
{true, KV};
false ->
L0 = gb_trees:lookup(Key, Tree),
case L0 of
none ->
{false, not_found};
{value, Value} ->
{true, {Key, Value}}
end
R0 = lists:foldr(fun(Tree, {Found, KV}) ->
case Found of
true ->
{true, KV};
false ->
L0 = leveled_skiplist:lookup(Key, Tree),
case L0 of
none ->
{false, not_found};
{value, Value} ->
{true, {Key, Value}}
end
end,
{false, not_found},
TreeList),
[R0|Acc]
end,
[],
TestList),
end
end,
{false, not_found},
TreeList),
[R0|Acc]
end,
[],
TestList),
S1 = lists:foldl(fun({Key, _V}, Acc) ->
R0 = check_levelzero(Key, Index, TreeList),
@ -258,20 +243,20 @@ compare_method_test() ->
P = leveled_codec:endkey_passed(EndKey, K),
case {K, P} of
{K, false} when K >= StartKey ->
gb_trees:enter(K, V, Acc);
leveled_skiplist:enter(K, V, Acc);
_ ->
Acc
end
end,
gb_trees:empty(),
leveled_skiplist:empty(),
DumpList),
Sz0 = gb_trees:size(Q0),
Sz0 = leveled_skiplist:size(Q0),
io:format("Crude method took ~w microseconds resulting in tree of " ++
"size ~w~n",
[timer:now_diff(os:timestamp(), SWa), Sz0]),
SWb = os:timestamp(),
Q1 = merge_trees(StartKey, EndKey, TreeList, gb_trees:empty()),
Sz1 = gb_trees:size(Q1),
Q1 = merge_trees(StartKey, EndKey, TreeList, leveled_skiplist:empty()),
Sz1 = length(Q1),
io:format("Merge method took ~w microseconds resulting in tree of " ++
"size ~w~n",
[timer:now_diff(os:timestamp(), SWb), Sz1]),

539
src/leveled_skiplist.erl Normal file
View file

@ -0,0 +1,539 @@
%% -------- SKIPLIST ---------
%%
%% For storing small numbers of {K, V} pairs where reasonable insertion and
%% fetch times, but with fast support for flattening to a list or a sublist
%% within a certain key range
%%
%% Used instead of gb_trees to retain compatability of OTP16 (and Riak's
%% ongoing dependency on OTP16)
%%
%% Not a proper skip list. Only supports a fixed depth. Good enough for the
%% purposes of leveled. Also uses peculiar enkey_passed function within
%% leveled. Not tested beyond a depth of 2.
-module(leveled_skiplist).
-include("include/leveled.hrl").
-export([
from_list/1,
from_sortedlist/1,
to_list/1,
enter/3,
to_range/2,
to_range/3,
lookup/2,
empty/0,
size/1
]).
-include_lib("eunit/include/eunit.hrl").
-define(SKIP_WIDTH, 16).
-define(LIST_HEIGHT, 2).
-define(INFINITY_KEY, {null, null, null, null, null}).
%%%============================================================================
%%% SkipList API
%%%============================================================================
enter(Key, Value, SkipList) ->
enter(Key, Value, SkipList, ?SKIP_WIDTH, ?LIST_HEIGHT).
from_list(UnsortedKVL) ->
KVL = lists:ukeysort(1, UnsortedKVL),
from_list(KVL, ?SKIP_WIDTH, ?LIST_HEIGHT).
from_sortedlist(SortedKVL) ->
from_list(SortedKVL, ?SKIP_WIDTH, ?LIST_HEIGHT).
lookup(Key, SkipList) ->
lookup(Key, SkipList, ?LIST_HEIGHT).
%% Rather than support iterator_from like gb_trees, will just an output a key
%% sorted list for the desired range, which can the be iterated over as normal
to_range(SkipList, Start) ->
to_range(SkipList, Start, ?INFINITY_KEY, ?LIST_HEIGHT).
to_range(SkipList, Start, End) ->
to_range(SkipList, Start, End, ?LIST_HEIGHT).
to_list(SkipList) ->
to_list(SkipList, ?LIST_HEIGHT).
empty() ->
empty([], ?LIST_HEIGHT).
size(SkipList) ->
size(SkipList, ?LIST_HEIGHT).
%%%============================================================================
%%% SkipList Base Functions
%%%============================================================================
enter(Key, Value, SkipList, Width, 1) ->
Hash = erlang:phash2(Key),
{MarkerKey, SubList} = find_mark(Key, SkipList),
case Hash rem Width of
0 ->
{LHS, RHS} = lists:splitwith(fun({K, _V}) ->
K =< Key end,
SubList),
SkpL1 = lists:keyreplace(MarkerKey, 1, SkipList, {MarkerKey, RHS}),
SkpL2 = [{Key, lists:ukeysort(1, [{Key, Value}|LHS])}|SkpL1],
lists:ukeysort(1, SkpL2);
_ ->
{LHS, RHS} = lists:splitwith(fun({K, _V}) -> K < Key end, SubList),
UpdSubList =
case RHS of
[] ->
LHS ++ [{Key, Value}];
[{FirstKey, _V}|RHSTail] ->
case FirstKey of
Key ->
LHS ++ [{Key, Value}] ++ RHSTail;
_ ->
LHS ++ [{Key, Value}] ++ RHS
end
end,
lists:keyreplace(MarkerKey, 1, SkipList, {MarkerKey, UpdSubList})
end;
enter(Key, Value, SkipList, Width, Level) ->
Hash = erlang:phash2(Key),
HashMatch = width(Level, Width),
{MarkerKey, SubSkipList} = find_mark(Key, SkipList),
UpdSubSkipList = enter(Key, Value, SubSkipList, Width, Level - 1),
case Hash rem HashMatch of
0 ->
%
{LHS, RHS} = lists:splitwith(fun({K, _V}) ->
K =< Key end,
UpdSubSkipList),
SkpL1 = lists:keyreplace(MarkerKey, 1, SkipList, {MarkerKey, RHS}),
lists:ukeysort(1, [{Key, LHS}|SkpL1]);
_ ->
% Need to replace Marker Key with sublist
lists:keyreplace(MarkerKey,
1,
SkipList,
{MarkerKey, UpdSubSkipList})
end.
from_list(KVL, Width, 1) ->
Slots = length(KVL) div Width,
SkipList0 = lists:map(fun(X) ->
N = X * Width,
{K, _V} = lists:nth(N, KVL),
{K, lists:sublist(KVL,
N - Width + 1,
Width)}
end,
lists:seq(1, length(KVL) div Width)),
case Slots * Width < length(KVL) of
true ->
{LastK, _V} = lists:last(KVL),
SkipList0 ++ [{LastK, lists:nthtail(Slots * Width, KVL)}];
false ->
SkipList0
end;
from_list(KVL, Width, Level) ->
SkipWidth = width(Level, Width),
LoftSlots = length(KVL) div SkipWidth,
case LoftSlots of
0 ->
{K, _V} = lists:last(KVL),
[{K, from_list(KVL, Width, Level - 1)}];
_ ->
SkipList0 =
lists:map(fun(X) ->
N = X * SkipWidth,
{K, _V} = lists:nth(N, KVL),
SL = lists:sublist(KVL,
N - SkipWidth + 1,
SkipWidth),
{K, from_list(SL, Width, Level - 1)}
end,
lists:seq(1, LoftSlots)),
case LoftSlots * SkipWidth < length(KVL) of
true ->
{LastK, _V} = lists:last(KVL),
TailList = lists:nthtail(LoftSlots * SkipWidth, KVL),
SkipList0 ++ [{LastK, from_list(TailList,
Width,
Level - 1)}];
false ->
SkipList0
end
end.
lookup(Key, SkipList, 1) ->
SubList = get_sublist(Key, SkipList),
case lists:keyfind(Key, 1, SubList) of
false ->
none;
{Key, V} ->
{value, V}
end;
lookup(Key, SkipList, Level) ->
SubList = get_sublist(Key, SkipList),
case SubList of
null ->
none;
_ ->
lookup(Key, SubList, Level - 1)
end.
to_list(SkipList, 1) ->
lists:foldl(fun({_Mark, SL}, Acc) -> Acc ++ SL end, [], SkipList);
to_list(SkipList, Level) ->
lists:foldl(fun({_Mark, SL}, Acc) -> Acc ++ to_list(SL, Level - 1) end,
[],
SkipList).
to_range(SkipList, Start, End, 1) ->
R = lists:foldl(fun({Mark, SL}, {PassedStart, PassedEnd, Acc, PrevList}) ->
case {PassedStart, PassedEnd} of
{true, true} ->
{true, true, Acc, null};
{false, false} ->
case Start > Mark of
true ->
{false, false, Acc, SL};
false ->
RHS = splitlist_start(Start, PrevList ++ SL),
case leveled_codec:endkey_passed(End, Mark) of
true ->
EL = splitlist_end(End, RHS),
{true, true, EL, null};
false ->
{true, false, RHS, null}
end
end;
{true, false} ->
case leveled_codec:endkey_passed(End, Mark) of
true ->
EL = splitlist_end(End, SL),
{true, true, Acc ++ EL, null};
false ->
{true, false, Acc ++ SL, null}
end
end end,
{false, false, [], []},
SkipList),
{_Bool1, _Bool2, SubList, _PrevList} = R,
SubList;
to_range(SkipList, Start, End, Level) ->
R = lists:foldl(fun({Mark, SL}, {PassedStart, PassedEnd, Acc, PrevList}) ->
case {PassedStart, PassedEnd} of
{true, true} ->
{true, true, Acc, null};
{false, false} ->
case Start > Mark of
true ->
{false, false, Acc, SL};
false ->
SkipLRange = to_range(PrevList,
Start, End,
Level - 1) ++
to_range(SL,
Start, End,
Level - 1),
case leveled_codec:endkey_passed(End, Mark) of
true ->
{true, true, SkipLRange, null};
false ->
{true, false, SkipLRange, null}
end
end;
{true, false} ->
SkipLRange = to_range(SL, Start, End, Level - 1),
case leveled_codec:endkey_passed(End, Mark) of
true ->
{true, true, Acc ++ SkipLRange, null};
false ->
{true, false, Acc ++ SkipLRange, null}
end
end end,
{false, false, [], []},
SkipList),
{_Bool1, _Bool2, SubList, _PrevList} = R,
SubList.
empty(SkipList, 1) ->
[{?INFINITY_KEY, SkipList}];
empty(SkipList, Level) ->
empty([{?INFINITY_KEY, SkipList}], Level - 1).
size(SkipList, 1) ->
lists:foldl(fun({_Mark, SL}, Acc) -> length(SL) + Acc end, 0, SkipList);
size(SkipList, Level) ->
lists:foldl(fun({_Mark, SL}, Acc) -> size(SL, Level - 1) + Acc end,
0,
SkipList).
%%%============================================================================
%%% Internal Functions
%%%============================================================================
width(1, Width) ->
Width;
width(N, Width) ->
width(N - 1, Width * Width).
find_mark(Key, SkipList) ->
lists:foldl(fun({Marker, SL}, Acc) ->
case Acc of
false ->
case Marker >= Key of
true ->
{Marker, SL};
false ->
Acc
end;
_ ->
Acc
end end,
false,
SkipList).
get_sublist(Key, SkipList) ->
lists:foldl(fun({SkipKey, SL}, Acc) ->
case {Acc, SkipKey} of
{null, SkipKey} when SkipKey >= Key ->
SL;
_ ->
Acc
end end,
null,
SkipList).
splitlist_start(StartKey, SL) ->
{_LHS, RHS} = lists:splitwith(fun({K, _V}) -> K < StartKey end, SL),
RHS.
splitlist_end(EndKey, SL) ->
{LHS, _RHS} = lists:splitwith(fun({K, _V}) ->
not leveled_codec:endkey_passed(EndKey, K)
end,
SL),
LHS.
%%%============================================================================
%%% Test
%%%============================================================================
-ifdef(TEST).
generate_randomkeys(Seqn, Count, BucketRangeLow, BucketRangeHigh) ->
generate_randomkeys(Seqn,
Count,
[],
BucketRangeLow,
BucketRangeHigh).
generate_randomkeys(_Seqn, 0, Acc, _BucketLow, _BucketHigh) ->
Acc;
generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) ->
BNumber =
case BRange of
0 ->
string:right(integer_to_list(BucketLow), 4, $0);
_ ->
BRand = random:uniform(BRange),
string:right(integer_to_list(BucketLow + BRand), 4, $0)
end,
KNumber = string:right(integer_to_list(random:uniform(1000)), 4, $0),
{K, V} = {{o, "Bucket" ++ BNumber, "Key" ++ KNumber, null},
{Seqn, {active, infinity}, null}},
generate_randomkeys(Seqn + 1,
Count - 1,
[{K, V}|Acc],
BucketLow,
BRange).
skiplist_small_test() ->
% Check nothing bad happens with very small lists
lists:foreach(fun(N) -> dotest_skiplist_small(N) end, lists:seq(1, 32)).
dotest_skiplist_small(N) ->
KL = generate_randomkeys(1, N, 1, 2),
SkipList1 =
lists:foldl(fun({K, V}, SL) ->
enter(K, V, SL)
end,
empty(),
KL),
SkipList2 = from_list(lists:reverse(KL)),
lists:foreach(fun({K, V}) -> ?assertMatch({value, V}, lookup(K, SkipList1))
end,
lists:ukeysort(1, lists:reverse(KL))),
lists:foreach(fun({K, V}) -> ?assertMatch({value, V}, lookup(K, SkipList2))
end,
lists:ukeysort(1, lists:reverse(KL))).
skiplist_test() ->
N = 8000,
KL = generate_randomkeys(1, N, 1, N div 5),
SWaGSL = os:timestamp(),
SkipList = from_list(lists:reverse(KL)),
io:format(user, "Generating skip list with ~w keys in ~w microseconds~n" ++
"Top level key count of ~w~n",
[N, timer:now_diff(os:timestamp(), SWaGSL), length(SkipList)]),
io:format(user, "Second tier key counts of ~w~n",
[lists:map(fun({_L, SL}) -> length(SL) end, SkipList)]),
KLSorted = lists:ukeysort(1, lists:reverse(KL)),
SWaGSL2 = os:timestamp(),
SkipList = from_sortedlist(KLSorted),
io:format(user, "Generating skip list with ~w sorted keys in ~w " ++
"microseconds~n",
[N, timer:now_diff(os:timestamp(), SWaGSL2)]),
SWaDSL = os:timestamp(),
SkipList1 =
lists:foldl(fun({K, V}, SL) ->
enter(K, V, SL)
end,
empty(),
KL),
io:format(user, "Dynamic load of skiplist with ~w keys took ~w " ++
"microseconds~n" ++
"Top level key count of ~w~n",
[N, timer:now_diff(os:timestamp(), SWaDSL), length(SkipList1)]),
io:format(user, "Second tier key counts of ~w~n",
[lists:map(fun({_L, SL}) -> length(SL) end, SkipList1)]),
io:format(user, "~nRunning timing tests for generated skiplist:~n", []),
skiplist_timingtest(KLSorted, SkipList, N),
io:format(user, "~nRunning timing tests for dynamic skiplist:~n", []),
skiplist_timingtest(KLSorted, SkipList1, N).
skiplist_timingtest(KL, SkipList, N) ->
io:format(user, "Timing tests on skiplist of size ~w~n",
[leveled_skiplist:size(SkipList)]),
CheckList1 = lists:sublist(KL, N div 4, 200),
CheckList2 = lists:sublist(KL, N div 3, 200),
CheckList3 = lists:sublist(KL, N div 2, 200),
CheckList4 = lists:sublist(KL, N - 1000, 200),
CheckList5 = lists:sublist(KL, N - 500, 200),
CheckList6 = lists:sublist(KL, 1, 10),
CheckList7 = lists:nthtail(N - 200, KL),
CheckList8 = lists:sublist(KL, N div 2, 1),
CheckAll = CheckList1 ++ CheckList2 ++ CheckList3 ++
CheckList4 ++ CheckList5 ++ CheckList6 ++ CheckList7,
SWb = os:timestamp(),
lists:foreach(fun({K, V}) ->
?assertMatch({value, V}, lookup(K, SkipList))
end,
CheckAll),
io:format(user, "Finding 1020 keys took ~w microseconds~n",
[timer:now_diff(os:timestamp(), SWb)]),
RangeFun =
fun(SkipListToQuery, CheckListForQ, Assert) ->
KR =
to_range(SkipListToQuery,
element(1, lists:nth(1, CheckListForQ)),
element(1, lists:last(CheckListForQ))),
case Assert of
true ->
CompareL = length(lists:usort(CheckListForQ)),
?assertMatch(CompareL, length(KR));
false ->
KR
end
end,
SWc = os:timestamp(),
RangeFun(SkipList, CheckList1, true),
RangeFun(SkipList, CheckList2, true),
RangeFun(SkipList, CheckList3, true),
RangeFun(SkipList, CheckList4, true),
RangeFun(SkipList, CheckList5, true),
RangeFun(SkipList, CheckList6, true),
RangeFun(SkipList, CheckList7, true),
RangeFun(SkipList, CheckList8, true),
KL_OOR1 = generate_randomkeys(1, 4, N div 5 + 1, N div 5 + 10),
KR9 = RangeFun(SkipList, KL_OOR1, false),
?assertMatch([], KR9),
KL_OOR2 = generate_randomkeys(1, 4, 0, 0),
KR10 = RangeFun(SkipList, KL_OOR2, false),
?assertMatch([], KR10),
io:format(user, "Finding 10 ranges took ~w microseconds~n",
[timer:now_diff(os:timestamp(), SWc)]),
AltKL1 = generate_randomkeys(1, 1000, 1, 200),
SWd = os:timestamp(),
lists:foreach(fun({K, _V}) ->
lookup(K, SkipList)
end,
AltKL1),
io:format(user, "Getting 1000 mainly missing keys took ~w microseconds~n",
[timer:now_diff(os:timestamp(), SWd)]),
AltKL2 = generate_randomkeys(1, 1000, N div 5 + 1, N div 5 + 300),
SWe = os:timestamp(),
lists:foreach(fun({K, _V}) ->
none = lookup(K, SkipList)
end,
AltKL2),
io:format(user, "Getting 1000 missing keys above range took ~w " ++
"microseconds~n",
[timer:now_diff(os:timestamp(), SWe)]),
AltKL3 = generate_randomkeys(1, 1000, 0, 0),
SWf = os:timestamp(),
lists:foreach(fun({K, _V}) ->
none = lookup(K, SkipList)
end,
AltKL3),
io:format(user, "Getting 1000 missing keys below range took ~w " ++
"microseconds~n",
[timer:now_diff(os:timestamp(), SWf)]),
SWg = os:timestamp(),
FlatList = to_list(SkipList),
io:format(user, "Flattening skiplist took ~w microseconds~n",
[timer:now_diff(os:timestamp(), SWg)]),
?assertMatch(KL, FlatList).
define_kv(X) ->
{{o, "Bucket", "Key" ++ string:right(integer_to_list(X), 6), null},
{X, {active, infinity}, null}}.
skiplist_roundsize_test() ->
KVL = lists:map(fun(X) -> define_kv(X) end, lists:seq(1, 4096)),
SkipList = from_list(KVL),
lists:foreach(fun({K, V}) ->
?assertMatch({value, V}, lookup(K, SkipList)) end,
KVL),
lists:foreach(fun(X) ->
{KS, _VS} = define_kv(X * 32 + 1),
{KE, _VE} = define_kv((X + 1) * 32),
R = to_range(SkipList, KS, KE),
L = lists:sublist(KVL,
X * 32 + 1,
32),
?assertMatch(L, R) end,
lists:seq(0, 24)).
-endif.

View file

@ -24,7 +24,8 @@ all() -> [
simple_put_fetch_head_delete(_Config) ->
RootPath = testutil:reset_filestructure(),
StartOpts1 = [{root_path, RootPath}],
StartOpts1 = [{root_path, RootPath},
{sync_strategy, testutil:sync_strategy()}],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
{TestObject, TestSpec} = testutil:generate_testobject(),
ok = testutil:book_riakput(Bookie1, TestObject, TestSpec),
@ -32,7 +33,8 @@ simple_put_fetch_head_delete(_Config) ->
testutil:check_formissingobject(Bookie1, "Bucket1", "Key2"),
ok = leveled_bookie:book_close(Bookie1),
StartOpts2 = [{root_path, RootPath},
{max_journalsize, 3000000}],
{max_journalsize, 3000000},
{sync_strategy, testutil:sync_strategy()}],
{ok, Bookie2} = leveled_bookie:book_start(StartOpts2),
testutil:check_forobject(Bookie2, TestObject),
ObjList1 = testutil:generate_objects(5000, 2),
@ -66,7 +68,9 @@ simple_put_fetch_head_delete(_Config) ->
many_put_fetch_head(_Config) ->
RootPath = testutil:reset_filestructure(),
StartOpts1 = [{root_path, RootPath}, {max_pencillercachesize, 16000}],
StartOpts1 = [{root_path, RootPath},
{max_pencillercachesize, 16000},
{sync_strategy, riak_sync}],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
{TestObject, TestSpec} = testutil:generate_testobject(),
ok = testutil:book_riakput(Bookie1, TestObject, TestSpec),
@ -74,7 +78,8 @@ many_put_fetch_head(_Config) ->
ok = leveled_bookie:book_close(Bookie1),
StartOpts2 = [{root_path, RootPath},
{max_journalsize, 1000000000},
{max_pencillercachesize, 32000}],
{max_pencillercachesize, 32000},
{sync_strategy, testutil:sync_strategy()}],
{ok, Bookie2} = leveled_bookie:book_start(StartOpts2),
testutil:check_forobject(Bookie2, TestObject),
GenList = [2, 20002, 40002, 60002, 80002,
@ -103,7 +108,8 @@ journal_compaction(_Config) ->
RootPath = testutil:reset_filestructure(),
StartOpts1 = [{root_path, RootPath},
{max_journalsize, 10000000},
{max_run_length, 1}],
{max_run_length, 1},
{sync_strategy, testutil:sync_strategy()}],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
ok = leveled_bookie:book_compactjournal(Bookie1, 30000),
{TestObject, TestSpec} = testutil:generate_testobject(),
@ -118,7 +124,7 @@ journal_compaction(_Config) ->
"Key1",
"Value1",
[],
{"MDK1", "MDV1"}},
[{"MDK1", "MDV1"}]},
{TestObject2, TestSpec2} = testutil:generate_testobject(B2, K2,
V2, Spec2, MD),
ok = testutil:book_riakput(Bookie1, TestObject2, TestSpec2),
@ -193,7 +199,8 @@ journal_compaction(_Config) ->
StartOpts2 = [{root_path, RootPath},
{max_journalsize, 10000000},
{max_run_length, 1},
{waste_retention_period, 1}],
{waste_retention_period, 1},
{sync_strategy, testutil:sync_strategy()}],
{ok, Bookie3} = leveled_bookie:book_start(StartOpts2),
ok = leveled_bookie:book_compactjournal(Bookie3, 30000),
testutil:wait_for_compaction(Bookie3),
@ -208,7 +215,9 @@ journal_compaction(_Config) ->
fetchput_snapshot(_Config) ->
RootPath = testutil:reset_filestructure(),
StartOpts1 = [{root_path, RootPath}, {max_journalsize, 30000000}],
StartOpts1 = [{root_path, RootPath},
{max_journalsize, 30000000},
{sync_strategy, none}],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
{TestObject, TestSpec} = testutil:generate_testobject(),
ok = testutil:book_riakput(Bookie1, TestObject, TestSpec),
@ -309,7 +318,9 @@ load_and_count(_Config) ->
% Use artificially small files, and the load keys, counting they're all
% present
RootPath = testutil:reset_filestructure(),
StartOpts1 = [{root_path, RootPath}, {max_journalsize, 50000000}],
StartOpts1 = [{root_path, RootPath},
{max_journalsize, 50000000},
{sync_strategy, testutil:sync_strategy()}],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
{TestObject, TestSpec} = testutil:generate_testobject(),
ok = testutil:book_riakput(Bookie1, TestObject, TestSpec),
@ -392,7 +403,9 @@ load_and_count(_Config) ->
load_and_count_withdelete(_Config) ->
RootPath = testutil:reset_filestructure(),
StartOpts1 = [{root_path, RootPath}, {max_journalsize, 50000000}],
StartOpts1 = [{root_path, RootPath},
{max_journalsize, 50000000},
{sync_strategy, testutil:sync_strategy()}],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
{TestObject, TestSpec} = testutil:generate_testobject(),
ok = testutil:book_riakput(Bookie1, TestObject, TestSpec),
@ -415,7 +428,8 @@ load_and_count_withdelete(_Config) ->
0,
lists:seq(1, 20)),
testutil:check_forobject(Bookie1, TestObject),
{BucketD, KeyD} = leveled_codec:riakto_keydetails(TestObject),
{BucketD, KeyD} = {TestObject#r_object.bucket,
TestObject#r_object.key},
{_, 1} = testutil:check_bucket_stats(Bookie1, BucketD),
ok = testutil:book_riakdelete(Bookie1, BucketD, KeyD, []),
not_found = testutil:book_riakget(Bookie1, BucketD, KeyD),
@ -448,7 +462,9 @@ load_and_count_withdelete(_Config) ->
space_clear_ondelete(_Config) ->
RootPath = testutil:reset_filestructure(),
StartOpts1 = [{root_path, RootPath}, {max_journalsize, 20000000}],
StartOpts1 = [{root_path, RootPath},
{max_journalsize, 20000000},
{sync_strategy, testutil:sync_strategy()}],
{ok, Book1} = leveled_bookie:book_start(StartOpts1),
G2 = fun testutil:generate_compressibleobjects/2,
testutil:load_objects(20000,
@ -475,8 +491,8 @@ space_clear_ondelete(_Config) ->
[length(FNsA_J), length(FNsA_L)]),
% Get an iterator to lock the inker during compaction
FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, testutil:riak_hash(V)}|Acc]
end,
FoldObjectsFun = fun(B, K, ObjBin, Acc) ->
[{B, K, erlang:phash2(ObjBin)}|Acc] end,
{async, HTreeF1} = leveled_bookie:book_returnfolder(Book1,
{foldobjects_allkeys,
?RIAK_TAG,

View file

@ -6,21 +6,74 @@
-define(KEY_ONLY, {false, undefined}).
-export([all/0]).
-export([small_load_with2i/1,
-export([single_object_with2i/1,
small_load_with2i/1,
query_count/1,
rotating_objects/1]).
all() -> [
single_object_with2i,
small_load_with2i,
query_count,
rotating_objects
].
single_object_with2i(_Config) ->
% Load a single object with an integer and a binary
% index and query for it
RootPath = testutil:reset_filestructure(),
StartOpts1 = [{root_path, RootPath},
{max_journalsize, 5000000},
{sync_strategy, testutil:sync_strategy()}],
% low journal size to make sure > 1 created
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
{TestObject, _TestSpec} = testutil:generate_testobject(),
TestSpec = [{add, list_to_binary("integer_int"), 100},
{add, list_to_binary("binary_bin"), <<100:32/integer>>}],
ok = testutil:book_riakput(Bookie1, TestObject, TestSpec),
IdxQ1 = {index_query,
"Bucket1",
{fun testutil:foldkeysfun/3, []},
{list_to_binary("binary_bin"),
<<99:32/integer>>, <<101:32/integer>>},
{true, undefined}},
{async, IdxFolder1} = leveled_bookie:book_returnfolder(Bookie1, IdxQ1),
R1 = IdxFolder1(),
io:format("R1 of ~w~n", [R1]),
true = [{<<100:32/integer>>,"Key1"}] == R1,
IdxQ2 = {index_query,
"Bucket1",
{fun testutil:foldkeysfun/3, []},
{list_to_binary("integer_int"),
99, 101},
{true, undefined}},
{async, IdxFolder2} = leveled_bookie:book_returnfolder(Bookie1, IdxQ2),
R2 = IdxFolder2(),
io:format("R2 of ~w~n", [R2]),
true = [{100,"Key1"}] == R2,
IdxQ3 = {index_query,
{"Bucket1", "Key1"},
{fun testutil:foldkeysfun/3, []},
{list_to_binary("integer_int"),
99, 101},
{true, undefined}},
{async, IdxFolder3} = leveled_bookie:book_returnfolder(Bookie1, IdxQ3),
R3 = IdxFolder3(),
io:format("R2 of ~w~n", [R3]),
true = [{100,"Key1"}] == R3,
ok = leveled_bookie:book_close(Bookie1),
testutil:reset_filestructure().
small_load_with2i(_Config) ->
RootPath = testutil:reset_filestructure(),
StartOpts1 = [{root_path, RootPath},
{max_journalsize, 5000000}],
{max_journalsize, 5000000},
{sync_strategy, testutil:sync_strategy()}],
% low journal size to make sure > 1 created
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
{TestObject, TestSpec} = testutil:generate_testobject(),
@ -65,12 +118,12 @@ small_load_with2i(_Config) ->
DSpc = lists:map(fun({add, F, T}) -> {remove, F, T}
end,
Spc),
{B, K} = leveled_codec:riakto_keydetails(Obj),
{B, K} = {Obj#r_object.bucket, Obj#r_object.key},
testutil:book_riakdelete(Bookie1, B, K, DSpc)
end,
ChkList1),
%% Get the Buckets Keys and Hashes for the whole bucket
FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, testutil:riak_hash(V)}|Acc]
FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, erlang:phash2(V)}|Acc]
end,
{async, HTreeF1} = leveled_bookie:book_returnfolder(Bookie1,
{foldobjects_allkeys,
@ -95,9 +148,8 @@ small_load_with2i(_Config) ->
true = 9900 == length(KeyHashList2),
true = 9900 == length(KeyHashList3),
SumIntFun = fun(_B, _K, V, Acc) ->
[C] = V#r_object.contents,
{I, _Bin} = C#r_content.value,
SumIntFun = fun(_B, _K, Obj, Acc) ->
{I, _Bin} = testutil:get_value(Obj),
Acc + I
end,
BucketObjQ = {foldobjects_bybucket, ?RIAK_TAG, "Bucket", {SumIntFun, 0}},
@ -128,13 +180,16 @@ small_load_with2i(_Config) ->
query_count(_Config) ->
RootPath = testutil:reset_filestructure(),
{ok, Book1} = leveled_bookie:book_start(RootPath, 2000, 50000000),
{ok, Book1} = leveled_bookie:book_start(RootPath,
2000,
50000000,
testutil:sync_strategy()),
BucketBin = list_to_binary("Bucket"),
{TestObject, TestSpec} = testutil:generate_testobject(BucketBin,
"Key1",
"Value1",
[],
{"MDK1", "MDV1"}),
[{"MDK1", "MDV1"}]),
ok = testutil:book_riakput(Book1, TestObject, TestSpec),
testutil:check_forobject(Book1, TestObject),
testutil:check_formissingobject(Book1, "Bucket1", "Key2"),
@ -177,7 +232,10 @@ query_count(_Config) ->
Book1,
?KEY_ONLY),
ok = leveled_bookie:book_close(Book1),
{ok, Book2} = leveled_bookie:book_start(RootPath, 1000, 50000000),
{ok, Book2} = leveled_bookie:book_start(RootPath,
1000,
50000000,
testutil:sync_strategy()),
Index1Count = count_termsonindex(BucketBin,
"idx1_bin",
Book2,
@ -288,7 +346,10 @@ query_count(_Config) ->
end,
R9),
ok = leveled_bookie:book_close(Book2),
{ok, Book3} = leveled_bookie:book_start(RootPath, 2000, 50000000),
{ok, Book3} = leveled_bookie:book_start(RootPath,
2000,
50000000,
testutil:sync_strategy()),
lists:foreach(fun({IdxF, IdxT, X}) ->
Q = {index_query,
BucketBin,
@ -305,7 +366,10 @@ query_count(_Config) ->
R9),
ok = testutil:book_riakput(Book3, Obj9, Spc9),
ok = leveled_bookie:book_close(Book3),
{ok, Book4} = leveled_bookie:book_start(RootPath, 2000, 50000000),
{ok, Book4} = leveled_bookie:book_start(RootPath,
2000,
50000000,
testutil:sync_strategy()),
lists:foreach(fun({IdxF, IdxT, X}) ->
Q = {index_query,
BucketBin,
@ -365,7 +429,10 @@ query_count(_Config) ->
ok = leveled_bookie:book_close(Book4),
{ok, Book5} = leveled_bookie:book_start(RootPath, 2000, 50000000),
{ok, Book5} = leveled_bookie:book_start(RootPath,
2000,
50000000,
testutil:sync_strategy()),
{async, BLF3} = leveled_bookie:book_returnfolder(Book5, BucketListQuery),
SW_QC = os:timestamp(),
BucketSet3 = BLF3(),

View file

@ -20,10 +20,12 @@ retain_strategy(_Config) ->
BookOpts = [{root_path, RootPath},
{cache_size, 1000},
{max_journalsize, 5000000},
{sync_strategy, testutil:sync_strategy()},
{reload_strategy, [{?RIAK_TAG, retain}]}],
BookOptsAlt = [{root_path, RootPath},
{cache_size, 1000},
{max_journalsize, 100000},
{sync_strategy, testutil:sync_strategy()},
{reload_strategy, [{?RIAK_TAG, retain}]},
{max_run_length, 8}],
{ok, Spcl3, LastV3} = rotating_object_check(BookOpts, "Bucket3", 800),
@ -47,6 +49,7 @@ recovr_strategy(_Config) ->
BookOpts = [{root_path, RootPath},
{cache_size, 1000},
{max_journalsize, 5000000},
{sync_strategy, testutil:sync_strategy()},
{reload_strategy, [{?RIAK_TAG, recovr}]}],
R6 = rotating_object_check(BookOpts, "Bucket6", 6400),
@ -64,10 +67,12 @@ recovr_strategy(_Config) ->
lists:foreach(fun({K, _SpcL}) ->
{ok, OH} = testutil:book_riakhead(Book1, "Bucket6", K),
K = OH#r_object.key,
VCH = testutil:get_vclock(OH),
{ok, OG} = testutil:book_riakget(Book1, "Bucket6", K),
V = testutil:get_value(OG),
true = V == V4
VCG = testutil:get_vclock(OG),
true = V == V4,
true = VCH == VCG
end,
lists:nthtail(6400, AllSpcL)),
Q = fun(RT) -> {index_query,
@ -84,13 +89,16 @@ recovr_strategy(_Config) ->
[length(KeyList), length(KeyTermList)]),
true = length(KeyList) == 6400,
true = length(KeyList) < length(KeyTermList),
true = length(KeyTermList) < 25600.
true = length(KeyTermList) < 25600,
ok = leveled_bookie:book_close(Book1),
testutil:reset_filestructure().
aae_bustedjournal(_Config) ->
RootPath = testutil:reset_filestructure(),
StartOpts = [{root_path, RootPath},
{max_journalsize, 20000000}],
{max_journalsize, 20000000},
{sync_strategy, testutil:sync_strategy()}],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts),
{TestObject, TestSpec} = testutil:generate_testobject(),
ok = testutil:book_riakput(Bookie1, TestObject, TestSpec),
@ -150,7 +158,7 @@ aae_bustedjournal(_Config) ->
% Will need to remove the file or corrupt the hashtree to get presence to
% fail
FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, testutil:riak_hash(V)}|Acc]
FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, erlang:phash2(V)}|Acc]
end,
SW = os:timestamp(),
{async, HashTreeF3} = leveled_bookie:book_returnfolder(Bookie2,
@ -243,7 +251,8 @@ journal_compaction_bustedjournal(_Config) ->
RootPath = testutil:reset_filestructure(),
StartOpts1 = [{root_path, RootPath},
{max_journalsize, 10000000},
{max_run_length, 10}],
{max_run_length, 10},
{sync_strategy, testutil:sync_strategy()}],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
{TestObject, TestSpec} = testutil:generate_testobject(),
ok = testutil:book_riakput(Bookie1, TestObject, TestSpec),

View file

@ -25,6 +25,7 @@
set_object/5,
get_key/1,
get_value/1,
get_vclock/1,
get_compressiblevalue/0,
get_compressiblevalue_andinteger/0,
get_randomindexes_generator/1,
@ -39,18 +40,112 @@
restore_file/2,
restore_topending/2,
find_journals/1,
riak_hash/1,
wait_for_compaction/1,
foldkeysfun/3]).
foldkeysfun/3,
sync_strategy/0]).
-define(RETURN_TERMS, {true, undefined}).
-define(SLOWOFFER_DELAY, 5).
-define(V1_VERS, 1).
-define(MAGIC, 53). % riak_kv -> riak_object
-define(MD_VTAG, <<"X-Riak-VTag">>).
-define(MD_LASTMOD, <<"X-Riak-Last-Modified">>).
-define(MD_DELETED, <<"X-Riak-Deleted">>).
-define(EMPTY_VTAG_BIN, <<"e">>).
%% =================================================
%% From riak_object
to_binary(v1, #r_object{contents=Contents, vclock=VClock}) ->
new_v1(VClock, Contents).
new_v1(Vclock, Siblings) ->
VclockBin = term_to_binary(Vclock),
VclockLen = byte_size(VclockBin),
SibCount = length(Siblings),
SibsBin = bin_contents(Siblings),
<<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
VclockBin/binary, SibCount:32/integer, SibsBin/binary>>.
bin_content(#r_content{metadata=Meta, value=Val}) ->
ValBin = encode_maybe_binary(Val),
ValLen = byte_size(ValBin),
MetaBin = meta_bin(Meta),
MetaLen = byte_size(MetaBin),
<<ValLen:32/integer, ValBin:ValLen/binary,
MetaLen:32/integer, MetaBin:MetaLen/binary>>.
bin_contents(Contents) ->
F = fun(Content, Acc) ->
<<Acc/binary, (bin_content(Content))/binary>>
end,
lists:foldl(F, <<>>, Contents).
meta_bin(MD) ->
{{VTagVal, Deleted, LastModVal}, RestBin} =
dict:fold(fun fold_meta_to_bin/3,
{{undefined, <<0>>, undefined}, <<>>},
MD),
VTagBin = case VTagVal of
undefined -> ?EMPTY_VTAG_BIN;
_ -> list_to_binary(VTagVal)
end,
VTagLen = byte_size(VTagBin),
LastModBin = case LastModVal of
undefined ->
<<0:32/integer, 0:32/integer, 0:32/integer>>;
{Mega,Secs,Micro} ->
<<Mega:32/integer, Secs:32/integer, Micro:32/integer>>
end,
<<LastModBin/binary, VTagLen:8/integer, VTagBin:VTagLen/binary,
Deleted:1/binary-unit:8, RestBin/binary>>.
fold_meta_to_bin(?MD_VTAG, Value, {{_Vt,Del,Lm},RestBin}) ->
{{Value, Del, Lm}, RestBin};
fold_meta_to_bin(?MD_LASTMOD, Value, {{Vt,Del,_Lm},RestBin}) ->
{{Vt, Del, Value}, RestBin};
fold_meta_to_bin(?MD_DELETED, true, {{Vt,_Del,Lm},RestBin})->
{{Vt, <<1>>, Lm}, RestBin};
fold_meta_to_bin(?MD_DELETED, "true", Acc) ->
fold_meta_to_bin(?MD_DELETED, true, Acc);
fold_meta_to_bin(?MD_DELETED, _, {{Vt,_Del,Lm},RestBin}) ->
{{Vt, <<0>>, Lm}, RestBin};
fold_meta_to_bin(Key, Value, {{_Vt,_Del,_Lm}=Elems,RestBin}) ->
ValueBin = encode_maybe_binary(Value),
ValueLen = byte_size(ValueBin),
KeyBin = encode_maybe_binary(Key),
KeyLen = byte_size(KeyBin),
MetaBin = <<KeyLen:32/integer, KeyBin/binary,
ValueLen:32/integer, ValueBin/binary>>,
{Elems, <<RestBin/binary, MetaBin/binary>>}.
encode_maybe_binary(Bin) when is_binary(Bin) ->
<<1, Bin/binary>>;
encode_maybe_binary(Bin) ->
<<0, (term_to_binary(Bin))/binary>>.
%% =================================================
sync_strategy() ->
case erlang:system_info(otp_release) of
"17" ->
sync;
"18" ->
sync;
"19" ->
sync;
"16" ->
none
end.
book_riakput(Pid, RiakObject, IndexSpecs) ->
{Bucket, Key} = leveled_codec:riakto_keydetails(RiakObject),
leveled_bookie:book_put(Pid, Bucket, Key, RiakObject, IndexSpecs, ?RIAK_TAG).
leveled_bookie:book_put(Pid,
RiakObject#r_object.bucket,
RiakObject#r_object.key,
to_binary(v1, RiakObject),
IndexSpecs,
?RIAK_TAG).
book_riakdelete(Pid, Bucket, Key, IndexSpecs) ->
leveled_bookie:book_put(Pid, Bucket, Key, delete, IndexSpecs, ?RIAK_TAG).
@ -131,9 +226,9 @@ check_forlist(Bookie, ChkList, Log) ->
R = book_riakget(Bookie,
Obj#r_object.bucket,
Obj#r_object.key),
ok = case R of
{ok, Obj} ->
ok;
true = case R of
{ok, Val} ->
to_binary(v1, Obj) == Val;
not_found ->
io:format("Object not found for key ~s~n",
[Obj#r_object.key]),
@ -156,20 +251,18 @@ check_formissinglist(Bookie, ChkList) ->
[timer:now_diff(os:timestamp(), SW), length(ChkList)]).
check_forobject(Bookie, TestObject) ->
{ok, TestObject} = book_riakget(Bookie,
TestBinary = to_binary(v1, TestObject),
{ok, TestBinary} = book_riakget(Bookie,
TestObject#r_object.bucket,
TestObject#r_object.key),
{ok, HeadObject} = book_riakhead(Bookie,
{ok, HeadBinary} = book_riakhead(Bookie,
TestObject#r_object.bucket,
TestObject#r_object.key),
ok = case {HeadObject#r_object.bucket,
HeadObject#r_object.key,
HeadObject#r_object.vclock} of
{B1, K1, VC1} when B1 == TestObject#r_object.bucket,
K1 == TestObject#r_object.key,
VC1 == TestObject#r_object.vclock ->
ok
end.
{_SibMetaBinList,
Vclock,
_Hash,
size} = leveled_codec:riak_extract_metadata(HeadBinary, size),
true = Vclock == TestObject#r_object.vclock.
check_formissingobject(Bookie, Bucket, Key) ->
not_found = book_riakget(Bookie, Bucket, Key),
@ -181,11 +274,11 @@ generate_testobject() ->
"Key1",
"Value1",
[],
{"MDK1", "MDV1"}},
[{"MDK1", "MDV1"}]},
generate_testobject(B1, K1, V1, Spec1, MD).
generate_testobject(B, K, V, Spec, MD) ->
Content = #r_content{metadata=MD, value=V},
Content = #r_content{metadata=dict:from_list(MD), value=V},
{#r_object{bucket=B, key=K, contents=[Content], vclock=[{'a',1}]},
Spec}.
@ -269,6 +362,7 @@ set_object(Bucket, Key, Value, IndexGen) ->
set_object(Bucket, Key, Value, IndexGen, []).
set_object(Bucket, Key, Value, IndexGen, Indexes2Remove) ->
Obj = {Bucket,
Key,
Value,
@ -278,16 +372,52 @@ set_object(Bucket, Key, Value, IndexGen, Indexes2Remove) ->
[{"MDK", "MDV" ++ Key},
{"MDK2", "MDV" ++ Key}]},
{B1, K1, V1, Spec1, MD} = Obj,
Content = #r_content{metadata=MD, value=V1},
{#r_object{bucket=B1, key=K1, contents=[Content], vclock=[{'a',1}]},
Content = #r_content{metadata=dict:from_list(MD), value=V1},
{#r_object{bucket=B1,
key=K1,
contents=[Content],
vclock=generate_vclock()},
Spec1}.
generate_vclock() ->
lists:map(fun(X) ->
{_, Actor} = lists:keyfind(random:uniform(10),
1,
actor_list()),
{Actor, X} end,
lists:seq(1, random:uniform(8))).
actor_list() ->
[{1, albert}, {2, bertie}, {3, clara}, {4, dave}, {5, elton},
{6, fred}, {7, george}, {8, harry}, {9, isaac}, {10, leila}].
get_key(Object) ->
Object#r_object.key.
get_value(Object) ->
[Content] = Object#r_object.contents,
Content#r_content.value.
get_value(ObjectBin) ->
<<_Magic:8/integer, _Vers:8/integer, VclockLen:32/integer,
Rest1/binary>> = ObjectBin,
<<_VclockBin:VclockLen/binary, SibCount:32/integer, SibsBin/binary>> = Rest1,
case SibCount of
1 ->
<<SibLength:32/integer, Rest2/binary>> = SibsBin,
<<ContentBin:SibLength/binary, _MetaBin/binary>> = Rest2,
case ContentBin of
<<0, ContentBin0/binary>> ->
binary_to_term(ContentBin0)
end;
N ->
io:format("SibCount of ~w with ObjectBin ~w~n", [N, ObjectBin]),
error
end.
get_vclock(ObjectBin) ->
<<_Magic:8/integer, _Vers:8/integer, VclockLen:32/integer,
Rest1/binary>> = ObjectBin,
<<VclockBin:VclockLen/binary, _Bin/binary>> = Rest1,
binary_to_term(VclockBin).
load_objects(ChunkSize, GenList, Bookie, TestObject, Generator) ->
lists:map(fun(KN) ->
@ -431,7 +561,8 @@ put_altered_indexed_objects(Book, Bucket, KSpecL, RemoveOld2i) ->
rotating_object_check(RootPath, B, NumberOfObjects) ->
BookOpts = [{root_path, RootPath},
{cache_size, 1000},
{max_journalsize, 5000000}],
{max_journalsize, 5000000},
{sync_strategy, sync_strategy()}],
{ok, Book1} = leveled_bookie:book_start(BookOpts),
{KSpcL1, V1} = testutil:put_indexed_objects(Book1, B, NumberOfObjects),
ok = testutil:check_indexed_objects(Book1, B, KSpcL1, V1),
@ -453,6 +584,10 @@ corrupt_journal(RootPath, FileName, Corruptions, BasePosition, GapSize) ->
OriginalPath = RootPath ++ "/journal/journal_files/" ++ FileName,
BackupPath = RootPath ++ "/journal/journal_files/" ++
filename:basename(FileName, ".cdb") ++ ".bak",
io:format("Corruption attempt to be made to filename ~s ~w ~w~n",
[FileName,
filelib:is_file(OriginalPath),
filelib:is_file(BackupPath)]),
{ok, _BytesCopied} = file:copy(OriginalPath, BackupPath),
{ok, Handle} = file:open(OriginalPath, [binary, raw, read, write]),
lists:foreach(fun(X) ->
@ -490,11 +625,3 @@ find_journals(RootPath) ->
FNsA_J),
CDBFiles.
riak_hash(Obj=#r_object{}) ->
Vclock = vclock(Obj),
UpdObj = set_vclock(Obj, lists:sort(Vclock)),
erlang:phash2(term_to_binary(UpdObj)).
set_vclock(Object=#r_object{}, VClock) -> Object#r_object{vclock=VClock}.
vclock(#r_object{vclock=VClock}) -> VClock.