diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 9d61661..acd0d9e 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -366,7 +366,13 @@ handle_call({return_folder, FolderType}, _From, State) -> Bucket, {IdxField, StartValue, EndValue}, {ReturnTerms, TermRegex}), - State} + State}; + {keylist, Tag} -> + {reply, + allkey_query(State#state.penciller, + State#state.ledger_cache, + Tag), + State} end; handle_call({compact_journal, Timeout}, _From, State) -> ok = leveled_inker:ink_compactjournal(State#state.inker, @@ -460,6 +466,28 @@ index_query(Penciller, LedgerCache, end, {async, Folder}. +allkey_query(Penciller, LedgerCache, Tag) -> + PCLopts = #penciller_options{start_snapshot=true, + source_penciller=Penciller}, + {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), + Folder = fun() -> + Increment = gb_trees:to_list(LedgerCache), + io:format("Length of increment in snapshot is ~w~n", + [length(Increment)]), + ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, + {infinity, Increment}), + SK = leveled_codec:to_ledgerkey(null, null, Tag), + EK = leveled_codec:to_ledgerkey(null, null, Tag), + Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, + SK, + EK, + fun accumulate_keys/3, + []), + ok = leveled_penciller:pcl_close(LedgerSnapshot), + lists:reverse(Acc) + end, + {async, Folder}. + shutdown_wait([], _Inker) -> false; @@ -529,6 +557,14 @@ accumulate_size(Key, Value, {Size, Count}) -> {Size, Count} end. +accumulate_keys(Key, Value, KeyList) -> + case leveled_codec:is_active(Key, Value) of + true -> + [leveled_codec:from_ledgerkey(Key)|KeyList]; + false -> + KeyList + end. + add_keys(ObjKey, _IdxValue, Acc) -> Acc ++ [ObjKey]. diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index a1c20b4..a5474dc 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -111,7 +111,9 @@ is_active(Key, Value) -> from_ledgerkey({Tag, Bucket, {_IdxField, IdxValue}, Key}) when Tag == ?IDX_TAG -> - {Bucket, Key, IdxValue}. + {Bucket, Key, IdxValue}; +from_ledgerkey({_Tag, Bucket, Key, null}) -> + {Bucket, Key}. to_ledgerkey(Bucket, Key, Tag, Field, Value) when Tag == ?IDX_TAG -> {?IDX_TAG, Bucket, {Field, Value}, Key}. diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index ca1a6f3..8fb0579 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -405,7 +405,8 @@ write_values([KVC|Rest], CDBopts, Journal0, ManSlice0) -> _ -> {ok, Journal0} end, - R = leveled_cdb:cdb_put(Journal1, {SQN, PK}, V), + ValueToStore = leveled_inker:create_value_for_cdb(V), + R = leveled_cdb:cdb_put(Journal1, {SQN, PK}, ValueToStore), case R of ok -> write_values(Rest, CDBopts, Journal1, ManSlice0); diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 79a0cd8..e46ddfa 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -97,6 +97,7 @@ ink_print_manifest/1, ink_close/1, ink_forceclose/1, + create_value_for_cdb/1, build_dummy_journal/0, simple_manifest_reader/2, clean_testdir/1, @@ -375,7 +376,7 @@ put_object(PrimaryKey, Object, KeyChanges, State) -> %% as the CDB will also do the same conversion %% Perhaps have CDB started up in apure binary mode, when it doesn't %5 receive terms? - Bin1 = term_to_binary({Object, KeyChanges}, [compressed]), + Bin1 = create_value_for_cdb({Object, KeyChanges}), ObjSize = byte_size(Bin1), case leveled_cdb:cdb_put(State#state.active_journaldb, {NewSQN, PrimaryKey}, @@ -406,6 +407,15 @@ put_object(PrimaryKey, Object, KeyChanges, State) -> end. +create_value_for_cdb(Value) -> + case Value of + {Object, KeyChanges} -> + term_to_binary({Object, KeyChanges}, [compressed]); + Value when is_binary(Value) -> + Value + end. + + get_object(PrimaryKey, SQN, Manifest) -> JournalP = find_in_manifest(SQN, Manifest), Obj = leveled_cdb:cdb_get(JournalP, {SQN, PrimaryKey}), diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 718587c..6887556 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -7,7 +7,8 @@ journal_compaction/1, fetchput_snapshot/1, load_and_count/1, - load_and_count_withdelete/1 + load_and_count_withdelete/1, + space_clear_ondelete_test/1 ]). all() -> [ @@ -16,7 +17,8 @@ all() -> [ journal_compaction, fetchput_snapshot, load_and_count, - load_and_count_withdelete + load_and_count_withdelete, + space_clear_ondelete_test ]. @@ -395,3 +397,75 @@ load_and_count_withdelete(_Config) -> ok = leveled_bookie:book_close(Bookie2), testutil:reset_filestructure(). + +space_clear_ondelete_test(_Config) -> + % Test is a work in progress + RootPath = testutil:reset_filestructure(), + StartOpts1 = #bookie_options{root_path=RootPath, max_journalsize=20000000}, + {ok, Book1} = leveled_bookie:book_start(StartOpts1), + G2 = fun testutil:generate_compressibleobjects/2, + testutil:load_objects(20000, + [uuid, uuid, uuid, uuid], + Book1, + no_check, + G2), + + {async, F1} = leveled_bookie:book_returnfolder(Book1, {keylist, o_rkv}), + SW1 = os:timestamp(), + KL1 = F1(), + ok = case length(KL1) of + 80000 -> + io:format("Key list took ~w microseconds for 80K keys~n", + [timer:now_diff(os:timestamp(), SW1)]), + ok + end, + timer:sleep(10000), % Allow for any L0 file to be rolled + {ok, FNsA_L} = file:list_dir(RootPath ++ "/ledger/ledger_files"), + {ok, FNsA_J} = file:list_dir(RootPath ++ "/journal/journal_files"), + io:format("Bookie created ~w journal files and ~w ledger files~n", + [length(FNsA_J), length(FNsA_L)]), + SW2 = os:timestamp(), + lists:foreach(fun({Bucket, Key}) -> + ok = leveled_bookie:book_riakdelete(Book1, + Bucket, + Key, + []) + end, + KL1), + io:format("Deletion took ~w microseconds for 80K keys~n", + [timer:now_diff(os:timestamp(), SW2)]), + ok = leveled_bookie:book_compactjournal(Book1, 30000), + timer:sleep(30000), % Allow for any L0 file to be rolled + {ok, FNsB_L} = file:list_dir(RootPath ++ "/ledger/ledger_files"), + {ok, FNsB_J} = file:list_dir(RootPath ++ "/journal/journal_files"), + io:format("Bookie has ~w journal files and ~w ledger files " ++ + "after deletes~n", + [length(FNsB_J), length(FNsB_L)]), + + {async, F2} = leveled_bookie:book_returnfolder(Book1, {keylist, o_rkv}), + SW3 = os:timestamp(), + KL2 = F2(), + ok = case length(KL2) of + 0 -> + io:format("Key list took ~w microseconds for no keys~n", + [timer:now_diff(os:timestamp(), SW3)]), + ok + end, + ok = leveled_bookie:book_close(Book1), + + {ok, Book2} = leveled_bookie:book_start(StartOpts1), + {async, F3} = leveled_bookie:book_returnfolder(Book2, {keylist, o_rkv}), + SW4 = os:timestamp(), + KL3 = F3(), + ok = case length(KL3) of + 0 -> + io:format("Key list took ~w microseconds for no keys~n", + [timer:now_diff(os:timestamp(), SW4)]), + ok + end, + ok = leveled_bookie:book_close(Book2), + {ok, FNsC_L} = file:list_dir(RootPath ++ "/ledger/ledger_files"), + {ok, FNsC_J} = file:list_dir(RootPath ++ "/journal/journal_files"), + io:format("Bookie has ~w journal files and ~w ledger files " ++ + "after deletes~n", + [length(FNsC_J), length(FNsC_L)]). diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index c836f65..b0a4707 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -28,7 +28,7 @@ reset_filestructure() -> % io:format("Waiting ~w ms to give a chance for all file closes " ++ - "to complete~n", [Wait]), + % "to complete~n", [Wait]), % timer:sleep(Wait), RootPath = "test", filelib:ensure_dir(RootPath ++ "/journal/"),