diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index c11c8e0..ba1a333 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -159,8 +159,6 @@ -record(state, {inker :: pid(), penciller :: pid(), - metadata_extractor :: function(), - indexspec_converter :: function(), cache_size :: integer(), back_pressure :: boolean(), ledger_cache :: gb_trees:tree(), @@ -209,18 +207,6 @@ init([Opts]) -> % Start from file not snapshot {InkerOpts, PencillerOpts} = set_options(Opts), {Inker, Penciller} = startup(InkerOpts, PencillerOpts), - Extractor = if - Opts#bookie_options.metadata_extractor == undefined -> - fun extract_metadata/2; - true -> - Opts#bookie_options.metadata_extractor - end, - Converter = if - Opts#bookie_options.indexspec_converter == undefined -> - fun convert_indexspecs/3; - true -> - Opts#bookie_options.indexspec_converter - end, CacheSize = if Opts#bookie_options.cache_size == undefined -> ?CACHE_SIZE; @@ -229,8 +215,6 @@ init([Opts]) -> end, {ok, #state{inker=Inker, penciller=Penciller, - metadata_extractor=Extractor, - indexspec_converter=Converter, cache_size=CacheSize, ledger_cache=gb_trees:empty(), is_snapshot=false}}; @@ -311,19 +295,21 @@ handle_call({snapshot, Requestor, SnapType, _Timeout}, _From, State) -> {ok, JournalSnapshot} = leveled_inker:ink_start(InkerOpts), {reply, {ok, - {LedgerSnapshot, State#state.ledger_cache}, + {LedgerSnapshot, + State#state.ledger_cache}, JournalSnapshot}, State}; ledger -> {reply, {ok, - {LedgerSnapshot, State#state.ledger_cache}, + {LedgerSnapshot, + State#state.ledger_cache}, null}, State} end; handle_call({compact_journal, Timeout}, _From, State) -> ok = leveled_inker:ink_compactjournal(State#state.inker, - State#state.penciller, + self(), Timeout), {reply, ok, State}; handle_call(close, _From, State) -> @@ -510,7 +496,6 @@ load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) -> {Obj, IndexSpecs} = binary_to_term(ExtractFun(ValueInLedger)), case SQN of SQN when SQN < MinSQN -> - io:format("Skipping due to low SQN ~w~n", [SQN]), {loop, Acc0}; SQN when SQN =< MaxSQN -> %% TODO - get correct size in a more efficient manner @@ -631,4 +616,16 @@ multi_key_test() -> ok = book_close(Bookie2), reset_filestructure(). +indexspecs_test() -> + IndexSpecs = [{add, "t1_int", 456}, + {add, "t1_bin", "adbc123"}, + {remove, "t1_bin", "abdc456"}], + Changes = convert_indexspecs(IndexSpecs, 1, {o, "Bucket", "Key2"}), + ?assertMatch({{i, "Bucket", "t1_int", 456, "Key2"}, + {1, {active, infinity}, null}}, lists:nth(1, Changes)), + ?assertMatch({{i, "Bucket", "t1_bin", "adbc123", "Key2"}, + {1, {active, infinity}, null}}, lists:nth(2, Changes)), + ?assertMatch({{i, "Bucket", "t1_bin", "abdc456", "Key2"}, + {1, {tomb, infinity}, null}}, lists:nth(3, Changes)). + -endif. \ No newline at end of file diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 756d96b..6f187a3 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -13,7 +13,6 @@ terminate/2, clerk_new/1, clerk_compact/6, - clerk_remove/2, clerk_stop/1, code_change/3]). @@ -47,10 +46,6 @@ clerk_new(InkerClerkOpts) -> gen_server:start(?MODULE, [InkerClerkOpts], []). -clerk_remove(Pid, Removals) -> - gen_server:cast(Pid, {remove, Removals}), - ok. - clerk_compact(Pid, Checker, InitiateFun, FilterFun, Inker, Timeout) -> gen_server:cast(Pid, {compact, @@ -88,12 +83,12 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout}, % Don't want to process a queued call waiting on an old manifest Manifest = leveled_inker:ink_getmanifest(Inker), MaxRunLength = State#state.max_run_length, - FilterServer = InitiateFun(Checker), + {FilterServer, MaxSQN} = InitiateFun(Checker), CDBopts = State#state.cdb_options, FP = CDBopts#cdb_options.file_path, ok = filelib:ensure_dir(FP), - Candidates = scan_all_files(Manifest, FilterFun, FilterServer), + Candidates = scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN), BestRun = assess_candidates(Candidates, MaxRunLength), case score_run(BestRun, MaxRunLength) of Score when Score > 0 -> @@ -102,7 +97,8 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout}, PromptDelete} = compact_files(BestRun, CDBopts, FilterFun, - FilterServer), + FilterServer, + MaxSQN), FilesToDelete = lists:map(fun(C) -> {C#candidate.low_sqn, C#candidate.filename, @@ -127,8 +123,6 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout}, ok = leveled_inker:ink_compactioncomplete(Inker), {noreply, State} end; -handle_cast({remove, _Removals}, State) -> - {noreply, State}; handle_cast(stop, State) -> {stop, normal, State}. @@ -147,38 +141,45 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================ -check_single_file(CDB, FilterFun, FilterServer, SampleSize, BatchSize) -> +check_single_file(CDB, FilterFun, FilterServer, MaxSQN, SampleSize, BatchSize) -> + FN = leveled_cdb:cdb_filename(CDB), PositionList = leveled_cdb:cdb_getpositions(CDB, SampleSize), KeySizeList = fetch_inbatches(PositionList, BatchSize, CDB, []), R0 = lists:foldl(fun(KS, {ActSize, RplSize}) -> {{SQN, PK}, Size} = KS, Check = FilterFun(FilterServer, PK, SQN), - case Check of - true -> + case {Check, SQN > MaxSQN} of + {true, _} -> {ActSize + Size, RplSize}; - false -> + {false, true} -> + {ActSize + Size, RplSize}; + _ -> {ActSize, RplSize + Size} end end, {0, 0}, KeySizeList), {ActiveSize, ReplacedSize} = R0, - 100 * ActiveSize / (ActiveSize + ReplacedSize). + Score = 100 * ActiveSize / (ActiveSize + ReplacedSize), + io:format("Score for filename ~s is ~w~n", [FN, Score]), + Score. -scan_all_files(Manifest, FilterFun, FilterServer) -> - scan_all_files(Manifest, FilterFun, FilterServer, []). +scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN) -> + scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN, []). -scan_all_files([], _FilterFun, _FilterServer, CandidateList) -> +scan_all_files([], _FilterFun, _FilterServer, _MaxSQN, CandidateList) -> CandidateList; -scan_all_files([Entry|Tail], FilterFun, FilterServer, CandidateList) -> +scan_all_files([Entry|Tail], FilterFun, FilterServer, MaxSQN, CandidateList) -> {LowSQN, FN, JournalP} = Entry, CpctPerc = check_single_file(JournalP, FilterFun, FilterServer, + MaxSQN, ?SAMPLE_SIZE, ?BATCH_SIZE), scan_all_files(Tail, FilterFun, FilterServer, + MaxSQN, CandidateList ++ [#candidate{low_sqn = LowSQN, filename = FN, @@ -274,27 +275,29 @@ print_compaction_run(BestRun, MaxRunLength) -> end, BestRun). -compact_files([], _CDBopts, _FilterFun, _FilterServer) -> +compact_files([], _CDBopts, _FilterFun, _FilterServer, _MaxSQN) -> {[], 0}; -compact_files(BestRun, CDBopts, FilterFun, FilterServer) -> +compact_files(BestRun, CDBopts, FilterFun, FilterServer, MaxSQN) -> BatchesOfPositions = get_all_positions(BestRun, []), compact_files(BatchesOfPositions, CDBopts, null, FilterFun, FilterServer, + MaxSQN, [], true). -compact_files([], _CDBopts, null, _FilterFun, _FilterServer, +compact_files([], _CDBopts, null, _FilterFun, _FilterServer, _MaxSQN, ManSlice0, PromptDelete0) -> {ManSlice0, PromptDelete0}; -compact_files([], _CDBopts, ActiveJournal0, _FilterFun, _FilterServer, +compact_files([], _CDBopts, ActiveJournal0, _FilterFun, _FilterServer, _MaxSQN, ManSlice0, PromptDelete0) -> ManSlice1 = ManSlice0 ++ generate_manifest_entry(ActiveJournal0), {ManSlice1, PromptDelete0}; -compact_files([Batch|T], CDBopts, ActiveJournal0, FilterFun, FilterServer, +compact_files([Batch|T], CDBopts, ActiveJournal0, + FilterFun, FilterServer, MaxSQN, ManSlice0, PromptDelete0) -> {SrcJournal, PositionList} = Batch, KVCs0 = leveled_cdb:cdb_directfetch(SrcJournal, @@ -302,7 +305,8 @@ compact_files([Batch|T], CDBopts, ActiveJournal0, FilterFun, FilterServer, key_value_check), R0 = filter_output(KVCs0, FilterFun, - FilterServer), + FilterServer, + MaxSQN), {KVCs1, PromptDelete1} = R0, PromptDelete2 = case {PromptDelete0, PromptDelete1} of {true, true} -> @@ -314,7 +318,7 @@ compact_files([Batch|T], CDBopts, ActiveJournal0, FilterFun, FilterServer, CDBopts, ActiveJournal0, ManSlice0), - compact_files(T, CDBopts, ActiveJournal1, FilterFun, FilterServer, + compact_files(T, CDBopts, ActiveJournal1, FilterFun, FilterServer, MaxSQN, ManSlice1, PromptDelete2). get_all_positions([], PositionBatches) -> @@ -341,16 +345,18 @@ split_positions_into_batches(Positions, Journal, Batches) -> Batches ++ [{Journal, ThisBatch}]). -filter_output(KVCs, FilterFun, FilterServer) -> +filter_output(KVCs, FilterFun, FilterServer, MaxSQN) -> lists:foldl(fun(KVC, {Acc, PromptDelete}) -> {{SQN, PK}, _V, CrcCheck} = KVC, KeyValid = FilterFun(FilterServer, PK, SQN), - case {KeyValid, CrcCheck} of - {true, true} -> + case {KeyValid, CrcCheck, SQN > MaxSQN} of + {true, true, _} -> {Acc ++ [KVC], PromptDelete}; - {false, _} -> + {false, true, true} -> + {Acc ++ [KVC], PromptDelete}; + {false, true, false} -> {Acc, PromptDelete}; - {_, false} -> + {_, false, _} -> io:format("Corrupted value found for " ++ " Key ~w at SQN ~w~n", [PK, SQN]), {Acc, false} @@ -415,7 +421,9 @@ simple_score_test() -> ?assertMatch(6.0, score_run(Run1, 4)), Run2 = [#candidate{compaction_perc = 75.0}], ?assertMatch(-15.0, score_run(Run2, 4)), - ?assertMatch(0.0, score_run([], 4)). + ?assertMatch(0.0, score_run([], 4)), + Run3 = [#candidate{compaction_perc = 100.0}], + ?assertMatch(-40.0, score_run(Run3, 4)). score_compare_test() -> Run1 = [#candidate{compaction_perc = 75.0}, @@ -514,15 +522,18 @@ check_single_file_test() -> _ -> false end end, - Score1 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 8, 4), + Score1 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 4), ?assertMatch(37.5, Score1), LedgerFun2 = fun(_Srv, _Key, _ObjSQN) -> true end, - Score2 = check_single_file(CDB, LedgerFun2, LedgerSrv1, 8, 4), + Score2 = check_single_file(CDB, LedgerFun2, LedgerSrv1, 9, 8, 4), ?assertMatch(100.0, Score2), - Score3 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 8, 3), + Score3 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 3), ?assertMatch(37.5, Score3), + Score4 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 4, 8, 4), + ?assertMatch(75.0, Score4), ok = leveled_cdb:cdb_destroy(CDB). - + + compact_single_file_test() -> RP = "../test/journal", {ok, CDB} = fetch_testcdb(RP), @@ -543,7 +554,8 @@ compact_single_file_test() -> R1 = compact_files([Candidate], #cdb_options{file_path=CompactFP}, LedgerFun1, - LedgerSrv1), + LedgerSrv1, + 9), {ManSlice1, PromptDelete1} = R1, ?assertMatch(true, PromptDelete1), [{LowSQN, FN, PidR}] = ManSlice1, diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 32cf340..2161730 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -170,12 +170,12 @@ ink_forceclose(Pid) -> ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> gen_server:call(Pid, {load_pcl, MinSQN, FilterFun, Penciller}, infinity). -ink_compactjournal(Pid, Penciller, Timeout) -> +ink_compactjournal(Pid, Bookie, Timeout) -> CheckerInitiateFun = fun initiate_penciller_snapshot/1, CheckerFilterFun = fun leveled_penciller:pcl_checksequencenumber/3, gen_server:call(Pid, {compact, - Penciller, + Bookie, CheckerInitiateFun, CheckerFilterFun, Timeout}, @@ -818,13 +818,14 @@ manifest_printer(Manifest) -> Manifest). -initiate_penciller_snapshot(Penciller) -> - PclOpts = #penciller_options{start_snapshot = true, - source_penciller = Penciller, - requestor = self()}, - {ok, FilterServer} = leveled_penciller:pcl_start(PclOpts), - ok = leveled_penciller:pcl_loadsnapshot(FilterServer, []), - FilterServer. +initiate_penciller_snapshot(Bookie) -> + {ok, + {LedgerSnap, LedgerCache}, + _} = leveled_bookie:book_snapshotledger(Bookie, self(), undefined), + ok = leveled_penciller:pcl_loadsnapshot(LedgerSnap, + gb_trees:to_list(LedgerCache)), + MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap), + {LedgerSnap, MaxSQN}. %%%============================================================================ %%% Test @@ -864,6 +865,7 @@ build_dummy_journal() -> clean_testdir(RootPath) -> clean_subdir(filepath(RootPath, journal_dir)), + clean_subdir(filepath(RootPath, journal_compact_dir)), clean_subdir(filepath(RootPath, manifest_dir)). clean_subdir(DirPath) -> @@ -1033,7 +1035,7 @@ compact_journal_test() -> ?assertMatch(2, length(ActualManifest)), ok = ink_compactjournal(Ink1, Checker, - fun(X) -> X end, + fun(X) -> {X, 55} end, fun(L, K, SQN) -> lists:member({SQN, K}, L) end, 5000), timer:sleep(1000), diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 84745ad..fcb0657 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -862,14 +862,10 @@ fetch(Key, Manifest, Level, FetchFun) -> not_present, LevelManifest) of not_present -> - io:format("Key ~w out of range at level ~w with manifest ~w~n", - [Key, Level, LevelManifest]), fetch(Key, Manifest, Level + 1, FetchFun); FileToCheck -> case FetchFun(FileToCheck, Key) of not_present -> - io:format("Key ~w not found checking file at level ~w~n", - [Key, Level]), fetch(Key, Manifest, Level + 1, FetchFun); ObjectFound -> ObjectFound diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 5e73ac9..3173d05 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -12,6 +12,7 @@ all() -> [simple_put_fetch_head, journal_compaction, simple_snapshot]. + simple_put_fetch_head(_Config) -> RootPath = reset_filestructure(), StartOpts1 = #bookie_options{root_path=RootPath}, @@ -19,6 +20,7 @@ simple_put_fetch_head(_Config) -> {TestObject, TestSpec} = generate_testobject(), ok = leveled_bookie:book_riakput(Bookie1, TestObject, TestSpec), check_bookie_forobject(Bookie1, TestObject), + check_bookie_formissingobject(Bookie1, "Bucket1", "Key2"), ok = leveled_bookie:book_close(Bookie1), StartOpts2 = #bookie_options{root_path=RootPath, max_journalsize=3000000}, @@ -31,6 +33,7 @@ simple_put_fetch_head(_Config) -> ChkList1 = lists:sublist(lists:sort(ObjList1), 100), check_bookie_forlist(Bookie2, ChkList1), check_bookie_forobject(Bookie2, TestObject), + check_bookie_formissingobject(Bookie2, "Bucket1", "Key2"), ok = leveled_bookie:book_close(Bookie2), reset_filestructure(). @@ -88,7 +91,6 @@ check_bookie_forlist(Bookie, ChkList) -> R = leveled_bookie:book_riakget(Bookie, Obj#r_object.bucket, Obj#r_object.key), - io:format("Checking key ~s~n", [Obj#r_object.key]), R = {ok, Obj} end, ChkList). @@ -108,6 +110,10 @@ check_bookie_forobject(Bookie, TestObject) -> ok end. +check_bookie_formissingobject(Bookie, Bucket, Key) -> + not_found = leveled_bookie:book_riakget(Bookie, Bucket, Key), + not_found = leveled_bookie:book_riakhead(Bookie, Bucket, Key). + journal_compaction(_Config) -> RootPath = reset_filestructure(), StartOpts1 = #bookie_options{root_path=RootPath, @@ -115,23 +121,30 @@ journal_compaction(_Config) -> {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), {TestObject, TestSpec} = generate_testobject(), ok = leveled_bookie:book_riakput(Bookie1, TestObject, TestSpec), - {ok, TestObject} = leveled_bookie:book_riakget(Bookie1, - TestObject#r_object.bucket, - TestObject#r_object.key), + check_bookie_forobject(Bookie1, TestObject), ObjList1 = generate_multiple_objects(5000, 2), lists:foreach(fun({_RN, Obj, Spc}) -> leveled_bookie:book_riakput(Bookie1, Obj, Spc) end, ObjList1), - ChkList1 = lists:sublist(lists:sort(ObjList1), 100), - lists:foreach(fun({_RN, Obj, _Spc}) -> - R = leveled_bookie:book_riakget(Bookie1, - Obj#r_object.bucket, - Obj#r_object.key), - R = {ok, Obj} end, - ChkList1), - {ok, TestObject} = leveled_bookie:book_riakget(Bookie1, - TestObject#r_object.bucket, - TestObject#r_object.key), + ChkList1 = lists:sublist(lists:sort(ObjList1), 1000), + check_bookie_forlist(Bookie1, ChkList1), + check_bookie_forobject(Bookie1, TestObject), + {B2, K2, V2, Spec2, MD} = {"Bucket1", + "Key1", + "Value1", + [], + {"MDK1", "MDV1"}}, + {TestObject2, TestSpec2} = generate_testobject(B2, K2, V2, Spec2, MD), + ok = leveled_bookie:book_riakput(Bookie1, TestObject2, TestSpec2), + ok = leveled_bookie:book_compactjournal(Bookie1, 30000), + check_bookie_forlist(Bookie1, ChkList1), + check_bookie_forobject(Bookie1, TestObject), + check_bookie_forobject(Bookie1, TestObject2), + timer:sleep(5000), % Allow for compaction to complete + io:format("Has journal completed?~n"), + check_bookie_forlist(Bookie1, ChkList1), + check_bookie_forobject(Bookie1, TestObject), + check_bookie_forobject(Bookie1, TestObject2), %% Now replace all the objects ObjList2 = generate_multiple_objects(5000, 2), lists:foreach(fun({_RN, Obj, Spc}) -> @@ -139,24 +152,12 @@ journal_compaction(_Config) -> ObjList2), ok = leveled_bookie:book_compactjournal(Bookie1, 30000), ChkList3 = lists:sublist(lists:sort(ObjList2), 500), - lists:foreach(fun({_RN, Obj, _Spc}) -> - R = leveled_bookie:book_riakget(Bookie1, - Obj#r_object.bucket, - Obj#r_object.key), - R = {ok, Obj} end, - ChkList3), + check_bookie_forlist(Bookie1, ChkList3), ok = leveled_bookie:book_close(Bookie1), % Restart {ok, Bookie2} = leveled_bookie:book_start(StartOpts1), - {ok, TestObject} = leveled_bookie:book_riakget(Bookie2, - TestObject#r_object.bucket, - TestObject#r_object.key), - lists:foreach(fun({_RN, Obj, _Spc}) -> - R = leveled_bookie:book_riakget(Bookie2, - Obj#r_object.bucket, - Obj#r_object.key), - R = {ok, Obj} end, - ChkList3), + check_bookie_forobject(Bookie2, TestObject), + check_bookie_forlist(Bookie2, ChkList3), ok = leveled_bookie:book_close(Bookie2), reset_filestructure(). @@ -200,15 +201,19 @@ reset_filestructure() -> leveled_penciller:clean_testdir(RootPath ++ "/ledger"), RootPath. + generate_testobject() -> {B1, K1, V1, Spec1, MD} = {"Bucket1", "Key1", "Value1", [], {"MDK1", "MDV1"}}, - Content = #r_content{metadata=MD, value=V1}, - {#r_object{bucket=B1, key=K1, contents=[Content], vclock=[{'a',1}]}, - Spec1}. + generate_testobject(B1, K1, V1, Spec1, MD). + +generate_testobject(B, K, V, Spec, MD) -> + Content = #r_content{metadata=MD, value=V}, + {#r_object{bucket=B, key=K, contents=[Content], vclock=[{'a',1}]}, + Spec}. generate_multiple_smallobjects(Count, KeyNumber) -> generate_multiple_objects(Count, KeyNumber, [], crypto:rand_bytes(512)).