From c78b5bca7d4cb30ba7f8ac8bb566867534b3a521 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Sun, 23 Oct 2016 22:45:43 +0100 Subject: [PATCH] Basement Tombstones Further progress towards the tidying up of basement tombstones in the Ledger, with support added for key-listing to help with testing (and as a potentially required feature). The test is incomplete, but committing at this stage as the last commit broke some tests (within the test code). There are some outstanding questions about the handling of tombstones in the Journal during compaction. There exists a condition whereby values could return if a recent journal is compacted and tombstones are removed (as they are no longer present), but older journals have not been compacted. Now on stop/start - if the Ledger is wiped the removal of the keys will be forgotten but the original PUTs would still remain. The safest thing maybe to have rule that tombstones are never deleted from the Inker's Journal - and accept the build-up of garbage. Or there could be an addition to the compaction process that checks back through all the inker files to check that the Key of a tombstone is not present in the past, before it is removed in the compaction. --- src/leveled_bookie.erl | 38 +++++++++++++++- src/leveled_codec.erl | 4 +- src/leveled_iclerk.erl | 3 +- src/leveled_inker.erl | 12 ++++- test/end_to_end/basic_SUITE.erl | 78 ++++++++++++++++++++++++++++++++- test/end_to_end/testutil.erl | 2 +- 6 files changed, 130 insertions(+), 7 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 9d61661..acd0d9e 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -366,7 +366,13 @@ handle_call({return_folder, FolderType}, _From, State) -> Bucket, {IdxField, StartValue, EndValue}, {ReturnTerms, TermRegex}), - State} + State}; + {keylist, Tag} -> + {reply, + allkey_query(State#state.penciller, + State#state.ledger_cache, + Tag), + State} end; handle_call({compact_journal, Timeout}, _From, State) -> ok = leveled_inker:ink_compactjournal(State#state.inker, @@ -460,6 +466,28 @@ index_query(Penciller, LedgerCache, end, {async, Folder}. +allkey_query(Penciller, LedgerCache, Tag) -> + PCLopts = #penciller_options{start_snapshot=true, + source_penciller=Penciller}, + {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), + Folder = fun() -> + Increment = gb_trees:to_list(LedgerCache), + io:format("Length of increment in snapshot is ~w~n", + [length(Increment)]), + ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, + {infinity, Increment}), + SK = leveled_codec:to_ledgerkey(null, null, Tag), + EK = leveled_codec:to_ledgerkey(null, null, Tag), + Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, + SK, + EK, + fun accumulate_keys/3, + []), + ok = leveled_penciller:pcl_close(LedgerSnapshot), + lists:reverse(Acc) + end, + {async, Folder}. + shutdown_wait([], _Inker) -> false; @@ -529,6 +557,14 @@ accumulate_size(Key, Value, {Size, Count}) -> {Size, Count} end. +accumulate_keys(Key, Value, KeyList) -> + case leveled_codec:is_active(Key, Value) of + true -> + [leveled_codec:from_ledgerkey(Key)|KeyList]; + false -> + KeyList + end. + add_keys(ObjKey, _IdxValue, Acc) -> Acc ++ [ObjKey]. diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index a1c20b4..a5474dc 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -111,7 +111,9 @@ is_active(Key, Value) -> from_ledgerkey({Tag, Bucket, {_IdxField, IdxValue}, Key}) when Tag == ?IDX_TAG -> - {Bucket, Key, IdxValue}. + {Bucket, Key, IdxValue}; +from_ledgerkey({_Tag, Bucket, Key, null}) -> + {Bucket, Key}. to_ledgerkey(Bucket, Key, Tag, Field, Value) when Tag == ?IDX_TAG -> {?IDX_TAG, Bucket, {Field, Value}, Key}. diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index ca1a6f3..8fb0579 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -405,7 +405,8 @@ write_values([KVC|Rest], CDBopts, Journal0, ManSlice0) -> _ -> {ok, Journal0} end, - R = leveled_cdb:cdb_put(Journal1, {SQN, PK}, V), + ValueToStore = leveled_inker:create_value_for_cdb(V), + R = leveled_cdb:cdb_put(Journal1, {SQN, PK}, ValueToStore), case R of ok -> write_values(Rest, CDBopts, Journal1, ManSlice0); diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 79a0cd8..e46ddfa 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -97,6 +97,7 @@ ink_print_manifest/1, ink_close/1, ink_forceclose/1, + create_value_for_cdb/1, build_dummy_journal/0, simple_manifest_reader/2, clean_testdir/1, @@ -375,7 +376,7 @@ put_object(PrimaryKey, Object, KeyChanges, State) -> %% as the CDB will also do the same conversion %% Perhaps have CDB started up in apure binary mode, when it doesn't %5 receive terms? - Bin1 = term_to_binary({Object, KeyChanges}, [compressed]), + Bin1 = create_value_for_cdb({Object, KeyChanges}), ObjSize = byte_size(Bin1), case leveled_cdb:cdb_put(State#state.active_journaldb, {NewSQN, PrimaryKey}, @@ -406,6 +407,15 @@ put_object(PrimaryKey, Object, KeyChanges, State) -> end. +create_value_for_cdb(Value) -> + case Value of + {Object, KeyChanges} -> + term_to_binary({Object, KeyChanges}, [compressed]); + Value when is_binary(Value) -> + Value + end. + + get_object(PrimaryKey, SQN, Manifest) -> JournalP = find_in_manifest(SQN, Manifest), Obj = leveled_cdb:cdb_get(JournalP, {SQN, PrimaryKey}), diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 718587c..6887556 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -7,7 +7,8 @@ journal_compaction/1, fetchput_snapshot/1, load_and_count/1, - load_and_count_withdelete/1 + load_and_count_withdelete/1, + space_clear_ondelete_test/1 ]). all() -> [ @@ -16,7 +17,8 @@ all() -> [ journal_compaction, fetchput_snapshot, load_and_count, - load_and_count_withdelete + load_and_count_withdelete, + space_clear_ondelete_test ]. @@ -395,3 +397,75 @@ load_and_count_withdelete(_Config) -> ok = leveled_bookie:book_close(Bookie2), testutil:reset_filestructure(). + +space_clear_ondelete_test(_Config) -> + % Test is a work in progress + RootPath = testutil:reset_filestructure(), + StartOpts1 = #bookie_options{root_path=RootPath, max_journalsize=20000000}, + {ok, Book1} = leveled_bookie:book_start(StartOpts1), + G2 = fun testutil:generate_compressibleobjects/2, + testutil:load_objects(20000, + [uuid, uuid, uuid, uuid], + Book1, + no_check, + G2), + + {async, F1} = leveled_bookie:book_returnfolder(Book1, {keylist, o_rkv}), + SW1 = os:timestamp(), + KL1 = F1(), + ok = case length(KL1) of + 80000 -> + io:format("Key list took ~w microseconds for 80K keys~n", + [timer:now_diff(os:timestamp(), SW1)]), + ok + end, + timer:sleep(10000), % Allow for any L0 file to be rolled + {ok, FNsA_L} = file:list_dir(RootPath ++ "/ledger/ledger_files"), + {ok, FNsA_J} = file:list_dir(RootPath ++ "/journal/journal_files"), + io:format("Bookie created ~w journal files and ~w ledger files~n", + [length(FNsA_J), length(FNsA_L)]), + SW2 = os:timestamp(), + lists:foreach(fun({Bucket, Key}) -> + ok = leveled_bookie:book_riakdelete(Book1, + Bucket, + Key, + []) + end, + KL1), + io:format("Deletion took ~w microseconds for 80K keys~n", + [timer:now_diff(os:timestamp(), SW2)]), + ok = leveled_bookie:book_compactjournal(Book1, 30000), + timer:sleep(30000), % Allow for any L0 file to be rolled + {ok, FNsB_L} = file:list_dir(RootPath ++ "/ledger/ledger_files"), + {ok, FNsB_J} = file:list_dir(RootPath ++ "/journal/journal_files"), + io:format("Bookie has ~w journal files and ~w ledger files " ++ + "after deletes~n", + [length(FNsB_J), length(FNsB_L)]), + + {async, F2} = leveled_bookie:book_returnfolder(Book1, {keylist, o_rkv}), + SW3 = os:timestamp(), + KL2 = F2(), + ok = case length(KL2) of + 0 -> + io:format("Key list took ~w microseconds for no keys~n", + [timer:now_diff(os:timestamp(), SW3)]), + ok + end, + ok = leveled_bookie:book_close(Book1), + + {ok, Book2} = leveled_bookie:book_start(StartOpts1), + {async, F3} = leveled_bookie:book_returnfolder(Book2, {keylist, o_rkv}), + SW4 = os:timestamp(), + KL3 = F3(), + ok = case length(KL3) of + 0 -> + io:format("Key list took ~w microseconds for no keys~n", + [timer:now_diff(os:timestamp(), SW4)]), + ok + end, + ok = leveled_bookie:book_close(Book2), + {ok, FNsC_L} = file:list_dir(RootPath ++ "/ledger/ledger_files"), + {ok, FNsC_J} = file:list_dir(RootPath ++ "/journal/journal_files"), + io:format("Bookie has ~w journal files and ~w ledger files " ++ + "after deletes~n", + [length(FNsC_J), length(FNsC_L)]). diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index c836f65..b0a4707 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -28,7 +28,7 @@ reset_filestructure() -> % io:format("Waiting ~w ms to give a chance for all file closes " ++ - "to complete~n", [Wait]), + % "to complete~n", [Wait]), % timer:sleep(Wait), RootPath = "test", filelib:ensure_dir(RootPath ++ "/journal/"),