Add functionality to delete keys.  No tombstone reaping yet.
This commit is contained in:
martinsumner 2016-10-16 15:41:09 +01:00
parent ed17e44f52
commit e3ce372f31
5 changed files with 122 additions and 31 deletions

View file

@ -1,4 +1,8 @@
-define(RIAK_TAG, o_rkv).
-define(STD_TAG, o).
-define(IDX_TAG, i).
-record(sft_options,
{wait = true :: boolean(),
expire_tombstones = false :: boolean()}).

View file

@ -71,7 +71,7 @@
%% The Bookie should generate a series of ledger key changes from this
%% information, using a function passed in at startup. For Riak this will be
%% of the form:
%% {{o_rkv@v1, Bucket, Key, SubKey|null},
%% {{o_rkv, Bucket, Key, SubKey|null},
%% SQN,
%% {Hash, Size, {Riak_Metadata}},
%% {active, TS}|{tomb, TS}} or
@ -134,9 +134,11 @@
code_change/3,
book_start/1,
book_riakput/3,
book_riakdelete/4,
book_riakget/3,
book_riakhead/3,
book_put/5,
book_delete/4,
book_get/3,
book_head/3,
book_returnfolder/2,
@ -172,22 +174,28 @@ book_start(Opts) ->
book_riakput(Pid, RiakObject, IndexSpecs) ->
{Bucket, Key} = leveled_codec:riakto_keydetails(RiakObject),
book_put(Pid, Bucket, Key, RiakObject, IndexSpecs, o_rkv@v1).
book_put(Pid, Bucket, Key, RiakObject, IndexSpecs, ?RIAK_TAG).
book_put(Pid, Bucket, Key, Object, IndexSpecs) ->
book_put(Pid, Bucket, Key, Object, IndexSpecs, o).
book_put(Pid, Bucket, Key, Object, IndexSpecs, ?STD_TAG).
book_riakdelete(Pid, Bucket, Key, IndexSpecs) ->
book_put(Pid, Bucket, Key, delete, IndexSpecs, ?RIAK_TAG).
book_delete(Pid, Bucket, Key, IndexSpecs) ->
book_put(Pid, Bucket, Key, delete, IndexSpecs, ?STD_TAG).
book_riakget(Pid, Bucket, Key) ->
book_get(Pid, Bucket, Key, o_rkv@v1).
book_get(Pid, Bucket, Key, ?RIAK_TAG).
book_get(Pid, Bucket, Key) ->
book_get(Pid, Bucket, Key, o).
book_get(Pid, Bucket, Key, ?STD_TAG).
book_riakhead(Pid, Bucket, Key) ->
book_head(Pid, Bucket, Key, o_rkv@v1).
book_head(Pid, Bucket, Key, ?RIAK_TAG).
book_head(Pid, Bucket, Key) ->
book_head(Pid, Bucket, Key, o).
book_head(Pid, Bucket, Key, ?STD_TAG).
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag) ->
gen_server:call(Pid, {put, Bucket, Key, Object, IndexSpecs, Tag}, infinity).
@ -281,7 +289,7 @@ handle_call({get, Bucket, Key, Tag}, _From, State) ->
Head ->
{Seqn, Status, _MD} = leveled_codec:striphead_to_details(Head),
case Status of
{tomb, _} ->
tomb ->
{reply, not_found, State};
{active, _} ->
case fetch_value(LedgerKey, Seqn, State#state.inker) of
@ -302,7 +310,7 @@ handle_call({head, Bucket, Key, Tag}, _From, State) ->
Head ->
{_Seqn, Status, MD} = leveled_codec:striphead_to_details(Head),
case Status of
{tomb, _} ->
tomb ->
{reply, not_found, State};
{active, _} ->
OMD = leveled_codec:build_metadata_object(LedgerKey, MD),
@ -339,7 +347,7 @@ handle_call({return_folder, FolderType}, _From, State) ->
bucket_stats(State#state.penciller,
State#state.ledger_cache,
Bucket,
o_rkv@v1),
?RIAK_TAG),
State}
end;
handle_call({compact_journal, Timeout}, _From, State) ->
@ -461,7 +469,12 @@ fetch_value(Key, SQN, Inker) ->
end.
accumulate_size(Key, Value, {Size, Count}) ->
{Size + leveled_codec:get_size(Key, Value), Count + 1}.
case leveled_codec:is_active(Key, Value) of
true ->
{Size + leveled_codec:get_size(Key, Value), Count + 1};
false ->
{Size, Count}
end.
preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs) ->
{Bucket, Key, PrimaryChange} = leveled_codec:generate_ledgerkv(PK,

View file

@ -22,7 +22,7 @@
%%
%% Currently the only tags supported are:
%% - o (standard objects)
%% - o_rkv@v1 (riak objects)
%% - o_rkv (riak objects)
%% - i (index entries)
@ -38,6 +38,7 @@
strip_to_statusonly/1,
strip_to_keyseqstatusonly/1,
striphead_to_details/1,
is_active/2,
endkey_passed/2,
key_dominates/2,
print_key/1,
@ -78,6 +79,14 @@ key_dominates(LeftKey, RightKey) ->
right_hand_dominant
end.
is_active(Key, Value) ->
case strip_to_statusonly({Key, Value}) of
{active, infinity} ->
true;
tomb ->
false
end.
to_ledgerkey(Bucket, Key, Tag) ->
{Tag, Bucket, Key, null}.
@ -87,11 +96,11 @@ hash(Obj) ->
% Return a tuple of strings to ease the printing of keys to logs
print_key(Key) ->
{A_STR, B_TERM, C_TERM} = case Key of
{o, B, K, _SK} ->
{?STD_TAG, B, K, _SK} ->
{"Object", B, K};
{o_rkv@v1, B, K, _SK} ->
{?RIAK_TAG, B, K, _SK} ->
{"RiakObject", B, K};
{i, B, {F, _V}, _K} ->
{?IDX_TAG, B, {F, _V}, _K} ->
{"Index", B, F}
end,
{B_STR, FB} = check_for_string(B_TERM),
@ -142,24 +151,32 @@ generate_ledgerkv(PrimaryKey, SQN, Obj, Size) ->
generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) ->
{Tag, Bucket, Key, _} = PrimaryKey,
Status = case Obj of
delete ->
tomb;
_ ->
{active, TS}
end,
{Bucket,
Key,
{PrimaryKey, {SQN, {active, TS}, extract_metadata(Obj, Size, Tag)}}}.
{PrimaryKey, {SQN, Status, extract_metadata(Obj, Size, Tag)}}}.
extract_metadata(Obj, Size, o_rkv@v1) ->
extract_metadata(Obj, Size, ?RIAK_TAG) ->
riak_extract_metadata(Obj, Size);
extract_metadata(Obj, Size, o) ->
extract_metadata(Obj, Size, ?STD_TAG) ->
{hash(Obj), Size}.
get_size(PK, Value) ->
{Tag, _Bucket, _Key, _} = PK,
{_, _, MD} = Value,
case Tag of
o_rkv@v1 ->
?RIAK_TAG ->
{_RMD, _VC, _Hash, Size} = MD,
Size;
o ->
?STD_TAG ->
{_Hash, Size} = MD,
Size
end.
@ -168,9 +185,9 @@ get_size(PK, Value) ->
build_metadata_object(PrimaryKey, MD) ->
{Tag, Bucket, Key, null} = PrimaryKey,
case Tag of
o_rkv@v1 ->
?RIAK_TAG ->
riak_metadata_object(Bucket, Key, MD);
o ->
?STD_TAG ->
MD
end.
@ -184,6 +201,8 @@ riak_metadata_object(Bucket, Key, MD) ->
RMD),
#r_object{contents=Contents, bucket=Bucket, key=Key, vclock=VC}.
riak_extract_metadata(delete, Size) ->
{delete, null, null, Size};
riak_extract_metadata(Obj, Size) ->
{get_metadatas(Obj), vclock(Obj), riak_hash(Obj), Size}.

View file

@ -712,8 +712,6 @@ get_nearestkey(KVList, all) ->
end;
get_nearestkey(KVList, Key) ->
case Key of
{first, K} ->
get_firstkeytomatch(KVList, K, not_found);
{next, K} ->
get_nextkeyaftermatch(KVList, K, not_found);
_ ->

View file

@ -2,21 +2,23 @@
-include_lib("common_test/include/ct.hrl").
-include("../include/leveled.hrl").
-export([all/0]).
-export([simple_put_fetch_head/1,
-export([simple_put_fetch_head_delete/1,
many_put_fetch_head/1,
journal_compaction/1,
fetchput_snapshot/1,
load_and_count/1]).
load_and_count/1,
load_and_count_withdelete/1]).
all() -> [simple_put_fetch_head,
all() -> [simple_put_fetch_head_delete,
many_put_fetch_head,
journal_compaction,
fetchput_snapshot,
load_and_count
load_and_count,
load_and_count_withdelete
].
simple_put_fetch_head(_Config) ->
simple_put_fetch_head_delete(_Config) ->
RootPath = reset_filestructure(),
StartOpts1 = #bookie_options{root_path=RootPath},
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
@ -51,7 +53,13 @@ simple_put_fetch_head(_Config) ->
ok = leveled_bookie:book_close(Bookie2),
{ok, Bookie3} = leveled_bookie:book_start(StartOpts2),
{ok, <<"Value2">>} = leveled_bookie:book_get(Bookie3, "Bucket1", "Key2"),
ok = leveled_bookie:book_delete(Bookie3, "Bucket1", "Key2",
[{remove, "Index1", "Term1"}]),
not_found = leveled_bookie:book_get(Bookie3, "Bucket1", "Key2"),
ok = leveled_bookie:book_close(Bookie3),
{ok, Bookie4} = leveled_bookie:book_start(StartOpts2),
not_found = leveled_bookie:book_get(Bookie4, "Bucket1", "Key2"),
ok = leveled_bookie:book_close(Bookie4),
reset_filestructure().
many_put_fetch_head(_Config) ->
@ -300,6 +308,50 @@ load_and_count(_Config) ->
ok = leveled_bookie:book_close(Bookie2),
reset_filestructure().
load_and_count_withdelete(_Config) ->
RootPath = reset_filestructure(),
StartOpts1 = #bookie_options{root_path=RootPath, max_journalsize=50000000},
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
{TestObject, TestSpec} = generate_testobject(),
ok = leveled_bookie:book_riakput(Bookie1, TestObject, TestSpec),
check_bookie_forobject(Bookie1, TestObject),
io:format("Loading initial small objects~n"),
lists:foldl(fun(_X, Acc) ->
load_objects(5000, [Acc + 2], Bookie1, TestObject,
fun generate_multiple_smallobjects/2),
{_Size, Count} = check_bucket_stats(Bookie1, "Bucket"),
if
Acc + 5000 == Count ->
ok
end,
Acc + 5000 end,
0,
lists:seq(1, 20)),
check_bookie_forobject(Bookie1, TestObject),
{BucketD, KeyD} = leveled_codec:riakto_keydetails(TestObject),
{_, 1} = check_bucket_stats(Bookie1, BucketD),
ok = leveled_bookie:book_riakdelete(Bookie1, BucketD, KeyD, []),
not_found = leveled_bookie:book_riakget(Bookie1, BucketD, KeyD),
{_, 0} = check_bucket_stats(Bookie1, BucketD),
io:format("Loading larger compressible objects~n"),
lists:foldl(fun(_X, Acc) ->
load_objects(5000, [Acc + 2], Bookie1, no_check,
fun generate_multiple_compressibleobjects/2),
{_Size, Count} = check_bucket_stats(Bookie1, "Bucket"),
if
Acc + 5000 == Count ->
ok
end,
Acc + 5000 end,
100000,
lists:seq(1, 20)),
not_found = leveled_bookie:book_riakget(Bookie1, BucketD, KeyD),
ok = leveled_bookie:book_close(Bookie1),
{ok, Bookie2} = leveled_bookie:book_start(StartOpts1),
check_bookie_formissingobject(Bookie2, BucketD, KeyD),
{_BSize, 0} = check_bucket_stats(Bookie2, BucketD),
ok = leveled_bookie:book_close(Bookie2).
reset_filestructure() ->
RootPath = "test",
@ -445,6 +497,11 @@ load_objects(ChunkSize, GenList, Bookie, TestObject, Generator) ->
Time = timer:now_diff(os:timestamp(), StartWatchA),
io:format("~w objects loaded in ~w seconds~n",
[ChunkSize, Time/1000000]),
check_bookie_forobject(Bookie, TestObject),
if
TestObject == no_check ->
ok;
true ->
check_bookie_forobject(Bookie, TestObject)
end,
lists:sublist(ObjListA, 1000) end,
GenList).