From e3ce372f31f8ac44eabe90887e29c52dc180dce6 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Sun, 16 Oct 2016 15:41:09 +0100 Subject: [PATCH] Delete Add functionality to delete keys. No tombstone reaping yet. --- include/leveled.hrl | 4 ++ src/leveled_bookie.erl | 35 +++++++++++------ src/leveled_codec.erl | 43 ++++++++++++++------ src/leveled_sft.erl | 2 - test/end_to_end/basic_SUITE.erl | 69 ++++++++++++++++++++++++++++++--- 5 files changed, 122 insertions(+), 31 deletions(-) diff --git a/include/leveled.hrl b/include/leveled.hrl index e421500..481b8db 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -1,4 +1,8 @@ +-define(RIAK_TAG, o_rkv). +-define(STD_TAG, o). +-define(IDX_TAG, i). + -record(sft_options, {wait = true :: boolean(), expire_tombstones = false :: boolean()}). diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 28b628d..72a6966 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -71,7 +71,7 @@ %% The Bookie should generate a series of ledger key changes from this %% information, using a function passed in at startup. For Riak this will be %% of the form: -%% {{o_rkv@v1, Bucket, Key, SubKey|null}, +%% {{o_rkv, Bucket, Key, SubKey|null}, %% SQN, %% {Hash, Size, {Riak_Metadata}}, %% {active, TS}|{tomb, TS}} or @@ -134,9 +134,11 @@ code_change/3, book_start/1, book_riakput/3, + book_riakdelete/4, book_riakget/3, book_riakhead/3, book_put/5, + book_delete/4, book_get/3, book_head/3, book_returnfolder/2, @@ -172,22 +174,28 @@ book_start(Opts) -> book_riakput(Pid, RiakObject, IndexSpecs) -> {Bucket, Key} = leveled_codec:riakto_keydetails(RiakObject), - book_put(Pid, Bucket, Key, RiakObject, IndexSpecs, o_rkv@v1). + book_put(Pid, Bucket, Key, RiakObject, IndexSpecs, ?RIAK_TAG). book_put(Pid, Bucket, Key, Object, IndexSpecs) -> - book_put(Pid, Bucket, Key, Object, IndexSpecs, o). + book_put(Pid, Bucket, Key, Object, IndexSpecs, ?STD_TAG). + +book_riakdelete(Pid, Bucket, Key, IndexSpecs) -> + book_put(Pid, Bucket, Key, delete, IndexSpecs, ?RIAK_TAG). + +book_delete(Pid, Bucket, Key, IndexSpecs) -> + book_put(Pid, Bucket, Key, delete, IndexSpecs, ?STD_TAG). book_riakget(Pid, Bucket, Key) -> - book_get(Pid, Bucket, Key, o_rkv@v1). + book_get(Pid, Bucket, Key, ?RIAK_TAG). book_get(Pid, Bucket, Key) -> - book_get(Pid, Bucket, Key, o). + book_get(Pid, Bucket, Key, ?STD_TAG). book_riakhead(Pid, Bucket, Key) -> - book_head(Pid, Bucket, Key, o_rkv@v1). + book_head(Pid, Bucket, Key, ?RIAK_TAG). book_head(Pid, Bucket, Key) -> - book_head(Pid, Bucket, Key, o). + book_head(Pid, Bucket, Key, ?STD_TAG). book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag) -> gen_server:call(Pid, {put, Bucket, Key, Object, IndexSpecs, Tag}, infinity). @@ -281,7 +289,7 @@ handle_call({get, Bucket, Key, Tag}, _From, State) -> Head -> {Seqn, Status, _MD} = leveled_codec:striphead_to_details(Head), case Status of - {tomb, _} -> + tomb -> {reply, not_found, State}; {active, _} -> case fetch_value(LedgerKey, Seqn, State#state.inker) of @@ -302,7 +310,7 @@ handle_call({head, Bucket, Key, Tag}, _From, State) -> Head -> {_Seqn, Status, MD} = leveled_codec:striphead_to_details(Head), case Status of - {tomb, _} -> + tomb -> {reply, not_found, State}; {active, _} -> OMD = leveled_codec:build_metadata_object(LedgerKey, MD), @@ -339,7 +347,7 @@ handle_call({return_folder, FolderType}, _From, State) -> bucket_stats(State#state.penciller, State#state.ledger_cache, Bucket, - o_rkv@v1), + ?RIAK_TAG), State} end; handle_call({compact_journal, Timeout}, _From, State) -> @@ -461,7 +469,12 @@ fetch_value(Key, SQN, Inker) -> end. accumulate_size(Key, Value, {Size, Count}) -> - {Size + leveled_codec:get_size(Key, Value), Count + 1}. + case leveled_codec:is_active(Key, Value) of + true -> + {Size + leveled_codec:get_size(Key, Value), Count + 1}; + false -> + {Size, Count} + end. preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs) -> {Bucket, Key, PrimaryChange} = leveled_codec:generate_ledgerkv(PK, diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 9d9a6cf..b8170c6 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -22,7 +22,7 @@ %% %% Currently the only tags supported are: %% - o (standard objects) -%% - o_rkv@v1 (riak objects) +%% - o_rkv (riak objects) %% - i (index entries) @@ -38,6 +38,7 @@ strip_to_statusonly/1, strip_to_keyseqstatusonly/1, striphead_to_details/1, + is_active/2, endkey_passed/2, key_dominates/2, print_key/1, @@ -77,7 +78,15 @@ key_dominates(LeftKey, RightKey) -> when LK == RK, LSN < RSN -> right_hand_dominant end. - + +is_active(Key, Value) -> + case strip_to_statusonly({Key, Value}) of + {active, infinity} -> + true; + tomb -> + false + end. + to_ledgerkey(Bucket, Key, Tag) -> {Tag, Bucket, Key, null}. @@ -87,11 +96,11 @@ hash(Obj) -> % Return a tuple of strings to ease the printing of keys to logs print_key(Key) -> {A_STR, B_TERM, C_TERM} = case Key of - {o, B, K, _SK} -> + {?STD_TAG, B, K, _SK} -> {"Object", B, K}; - {o_rkv@v1, B, K, _SK} -> + {?RIAK_TAG, B, K, _SK} -> {"RiakObject", B, K}; - {i, B, {F, _V}, _K} -> + {?IDX_TAG, B, {F, _V}, _K} -> {"Index", B, F} end, {B_STR, FB} = check_for_string(B_TERM), @@ -142,24 +151,32 @@ generate_ledgerkv(PrimaryKey, SQN, Obj, Size) -> generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) -> {Tag, Bucket, Key, _} = PrimaryKey, + Status = case Obj of + delete -> + tomb; + _ -> + {active, TS} + end, {Bucket, Key, - {PrimaryKey, {SQN, {active, TS}, extract_metadata(Obj, Size, Tag)}}}. + {PrimaryKey, {SQN, Status, extract_metadata(Obj, Size, Tag)}}}. -extract_metadata(Obj, Size, o_rkv@v1) -> + + +extract_metadata(Obj, Size, ?RIAK_TAG) -> riak_extract_metadata(Obj, Size); -extract_metadata(Obj, Size, o) -> +extract_metadata(Obj, Size, ?STD_TAG) -> {hash(Obj), Size}. get_size(PK, Value) -> {Tag, _Bucket, _Key, _} = PK, {_, _, MD} = Value, case Tag of - o_rkv@v1 -> + ?RIAK_TAG -> {_RMD, _VC, _Hash, Size} = MD, Size; - o -> + ?STD_TAG -> {_Hash, Size} = MD, Size end. @@ -168,9 +185,9 @@ get_size(PK, Value) -> build_metadata_object(PrimaryKey, MD) -> {Tag, Bucket, Key, null} = PrimaryKey, case Tag of - o_rkv@v1 -> + ?RIAK_TAG -> riak_metadata_object(Bucket, Key, MD); - o -> + ?STD_TAG -> MD end. @@ -184,6 +201,8 @@ riak_metadata_object(Bucket, Key, MD) -> RMD), #r_object{contents=Contents, bucket=Bucket, key=Key, vclock=VC}. +riak_extract_metadata(delete, Size) -> + {delete, null, null, Size}; riak_extract_metadata(Obj, Size) -> {get_metadatas(Obj), vclock(Obj), riak_hash(Obj), Size}. diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 95a3f63..6df0b02 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -712,8 +712,6 @@ get_nearestkey(KVList, all) -> end; get_nearestkey(KVList, Key) -> case Key of - {first, K} -> - get_firstkeytomatch(KVList, K, not_found); {next, K} -> get_nextkeyaftermatch(KVList, K, not_found); _ -> diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index fa0e54d..8f8f8f1 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -2,21 +2,23 @@ -include_lib("common_test/include/ct.hrl"). -include("../include/leveled.hrl"). -export([all/0]). --export([simple_put_fetch_head/1, +-export([simple_put_fetch_head_delete/1, many_put_fetch_head/1, journal_compaction/1, fetchput_snapshot/1, - load_and_count/1]). + load_and_count/1, + load_and_count_withdelete/1]). -all() -> [simple_put_fetch_head, +all() -> [simple_put_fetch_head_delete, many_put_fetch_head, journal_compaction, fetchput_snapshot, - load_and_count + load_and_count, + load_and_count_withdelete ]. -simple_put_fetch_head(_Config) -> +simple_put_fetch_head_delete(_Config) -> RootPath = reset_filestructure(), StartOpts1 = #bookie_options{root_path=RootPath}, {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), @@ -51,7 +53,13 @@ simple_put_fetch_head(_Config) -> ok = leveled_bookie:book_close(Bookie2), {ok, Bookie3} = leveled_bookie:book_start(StartOpts2), {ok, <<"Value2">>} = leveled_bookie:book_get(Bookie3, "Bucket1", "Key2"), + ok = leveled_bookie:book_delete(Bookie3, "Bucket1", "Key2", + [{remove, "Index1", "Term1"}]), + not_found = leveled_bookie:book_get(Bookie3, "Bucket1", "Key2"), ok = leveled_bookie:book_close(Bookie3), + {ok, Bookie4} = leveled_bookie:book_start(StartOpts2), + not_found = leveled_bookie:book_get(Bookie4, "Bucket1", "Key2"), + ok = leveled_bookie:book_close(Bookie4), reset_filestructure(). many_put_fetch_head(_Config) -> @@ -300,6 +308,50 @@ load_and_count(_Config) -> ok = leveled_bookie:book_close(Bookie2), reset_filestructure(). +load_and_count_withdelete(_Config) -> + RootPath = reset_filestructure(), + StartOpts1 = #bookie_options{root_path=RootPath, max_journalsize=50000000}, + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + {TestObject, TestSpec} = generate_testobject(), + ok = leveled_bookie:book_riakput(Bookie1, TestObject, TestSpec), + check_bookie_forobject(Bookie1, TestObject), + io:format("Loading initial small objects~n"), + lists:foldl(fun(_X, Acc) -> + load_objects(5000, [Acc + 2], Bookie1, TestObject, + fun generate_multiple_smallobjects/2), + {_Size, Count} = check_bucket_stats(Bookie1, "Bucket"), + if + Acc + 5000 == Count -> + ok + end, + Acc + 5000 end, + 0, + lists:seq(1, 20)), + check_bookie_forobject(Bookie1, TestObject), + {BucketD, KeyD} = leveled_codec:riakto_keydetails(TestObject), + {_, 1} = check_bucket_stats(Bookie1, BucketD), + ok = leveled_bookie:book_riakdelete(Bookie1, BucketD, KeyD, []), + not_found = leveled_bookie:book_riakget(Bookie1, BucketD, KeyD), + {_, 0} = check_bucket_stats(Bookie1, BucketD), + io:format("Loading larger compressible objects~n"), + lists:foldl(fun(_X, Acc) -> + load_objects(5000, [Acc + 2], Bookie1, no_check, + fun generate_multiple_compressibleobjects/2), + {_Size, Count} = check_bucket_stats(Bookie1, "Bucket"), + if + Acc + 5000 == Count -> + ok + end, + Acc + 5000 end, + 100000, + lists:seq(1, 20)), + not_found = leveled_bookie:book_riakget(Bookie1, BucketD, KeyD), + ok = leveled_bookie:book_close(Bookie1), + {ok, Bookie2} = leveled_bookie:book_start(StartOpts1), + check_bookie_formissingobject(Bookie2, BucketD, KeyD), + {_BSize, 0} = check_bucket_stats(Bookie2, BucketD), + ok = leveled_bookie:book_close(Bookie2). + reset_filestructure() -> RootPath = "test", @@ -445,6 +497,11 @@ load_objects(ChunkSize, GenList, Bookie, TestObject, Generator) -> Time = timer:now_diff(os:timestamp(), StartWatchA), io:format("~w objects loaded in ~w seconds~n", [ChunkSize, Time/1000000]), - check_bookie_forobject(Bookie, TestObject), + if + TestObject == no_check -> + ok; + true -> + check_bookie_forobject(Bookie, TestObject) + end, lists:sublist(ObjListA, 1000) end, GenList).