From d24b100aa6509e76016c207bc436d61679dd13df Mon Sep 17 00:00:00 2001 From: martinsumner Date: Tue, 27 Sep 2016 14:58:26 +0100 Subject: [PATCH] Initial work on Journal Compaction Largely untested work at this stage to allow for the Inker to request the Inker's clerk to perform a single round of compact based on the best run of files it can find. --- include/leveled.hrl | 10 +- src/leveled_cdb.erl | 27 ++- src/leveled_iclerk.erl | 438 +++++++++++++++++++++++++++++++++++++---- src/leveled_inker.erl | 71 ++++++- src/leveled_sft.erl | 4 +- 5 files changed, 494 insertions(+), 56 deletions(-) diff --git a/include/leveled.hrl b/include/leveled.hrl index 232aed7..321ff7c 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -21,13 +21,14 @@ filename :: string()}). -record(cdb_options, - {max_size :: integer()}). + {max_size :: integer(), + file_path :: string()}). -record(inker_options, {cdb_max_size :: integer(), root_path :: string(), cdb_options :: #cdb_options{}, - start_snapshot = false :: boolean, + start_snapshot = false :: boolean(), source_inker :: pid(), requestor :: pid()}). @@ -44,6 +45,11 @@ metadata_extractor :: function(), indexspec_converter :: function()}). +-record(iclerk_options, + {inker :: pid(), + max_run_length :: integer(), + cdb_options :: #cdb_options{}}). + %% Temp location for records related to riak -record(r_content, { diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 40a4aaf..b1eb11a 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -60,11 +60,13 @@ cdb_getpositions/2, cdb_directfetch/3, cdb_lastkey/1, + cdb_firstkey/1, cdb_filename/1, cdb_keycheck/2, cdb_scan/4, cdb_close/1, - cdb_complete/1]). + cdb_complete/1, + cdb_destroy/1]). -include_lib("eunit/include/eunit.hrl"). @@ -133,6 +135,9 @@ cdb_close(Pid) -> cdb_complete(Pid) -> gen_server:call(Pid, cdb_complete, infinity). +cdb_destroy(Pid) -> + gen_server:cast(Pid, destroy). + %% cdb_scan returns {LastPosition, Acc}. Use LastPosition as StartPosiiton to %% continue from that point (calling function has to protect against) double %% counting. @@ -150,6 +155,9 @@ cdb_scan(Pid, FilterFun, InitAcc, StartPosition) -> cdb_lastkey(Pid) -> gen_server:call(Pid, cdb_lastkey, infinity). +cdb_firstkey(Pid) -> + gen_server:call(Pid, cdb_firstkey, infinity). + %% Get the filename of the database cdb_filename(Pid) -> gen_server:call(Pid, cdb_filename, infinity). @@ -254,6 +262,8 @@ handle_call({put_kv, Key, Value}, _From, State) -> end; handle_call(cdb_lastkey, _From, State) -> {reply, State#state.last_key, State}; +handle_call(cdb_firstkey, _From, State) -> + {reply, extract_key(State#state.handle, ?BASE_POSITION), State}; handle_call(cdb_filename, _From, State) -> {reply, State#state.filename, State}; handle_call({get_positions, SampleSize}, _From, State) -> @@ -339,7 +349,10 @@ handle_call(cdb_complete, _From, State) -> end. - +handle_cast(destroy, State) -> + ok = file:close(State#state.handle), + ok = file:delete(State#state.filename), + {noreply, State}; handle_cast(_Msg, State) -> {noreply, State}. @@ -1551,16 +1564,14 @@ find_lastkey_test() -> ok = cdb_put(P1, "Key1", "Value1"), ok = cdb_put(P1, "Key3", "Value3"), ok = cdb_put(P1, "Key2", "Value2"), - R1 = cdb_lastkey(P1), - ?assertMatch(R1, "Key2"), + ?assertMatch("Key2", cdb_lastkey(P1)), + ?assertMatch("Key1", cdb_firstkey(P1)), ok = cdb_close(P1), {ok, P2} = cdb_open_writer("../test/lastkey.pnd"), - R2 = cdb_lastkey(P2), - ?assertMatch(R2, "Key2"), + ?assertMatch("Key2", cdb_lastkey(P2)), {ok, F2} = cdb_complete(P2), {ok, P3} = cdb_open_reader(F2), - R3 = cdb_lastkey(P3), - ?assertMatch(R3, "Key2"), + ?assertMatch("Key2", cdb_lastkey(P3)), ok = cdb_close(P3), ok = file:delete("../test/lastkey.cdb"). diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index fee2417..2ead967 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -12,35 +12,47 @@ handle_info/2, terminate/2, clerk_new/1, - clerk_compact/5, + clerk_compact/4, clerk_remove/2, clerk_stop/1, code_change/3]). -include_lib("eunit/include/eunit.hrl"). +-define(JOURNAL_FILEX, "cdb"). +-define(PENDING_FILEX, "pnd"). -define(SAMPLE_SIZE, 200). -define(BATCH_SIZE, 16). -define(BATCHES_TO_CHECK, 8). +%% How many consecutive files to compact in one run +-define(MAX_COMPACTION_RUN, 4). +%% Sliding scale to allow preference of longer runs up to maximum +-define(SINGLEFILE_COMPACTION_TARGET, 60.0). +-define(MAXRUN_COMPACTION_TARGET, 80.0). + +-record(state, {inker :: pid(), + max_run_length :: integer(), + cdb_options}). + +-record(candidate, {low_sqn :: integer(), + filename :: string(), + journal :: pid(), + compaction_perc :: float()}). --record(state, {owner :: pid(), - penciller_snapshot :: pid()}). %%%============================================================================ %%% API %%%============================================================================ -clerk_new(Owner) -> - {ok, Pid} = gen_server:start(?MODULE, [], []), - ok = gen_server:call(Pid, {register, Owner}, infinity), - {ok, Pid}. +clerk_new(InkerClerkOpts) -> + gen_server:start(?MODULE, [InkerClerkOpts], []). clerk_remove(Pid, Removals) -> gen_server:cast(Pid, {remove, Removals}), ok. -clerk_compact(Pid, Manifest, ManifestSQN, Penciller, Timeout) -> - gen_server:cast(Pid, {compact, Manifest, ManifestSQN, Penciller, Timeout}). +clerk_compact(Pid, Penciller, Inker, Timeout) -> + clerk_compact(Pid, Penciller, Inker, Timeout). clerk_stop(Pid) -> gen_server:cast(Pid, stop). @@ -49,21 +61,60 @@ clerk_stop(Pid) -> %%% gen_server callbacks %%%============================================================================ -init([]) -> - {ok, #state{}}. +init([IClerkOpts]) -> + case IClerkOpts#iclerk_options.max_run_length of + undefined -> + {ok, #state{max_run_length = ?MAX_COMPACTION_RUN, + inker = IClerkOpts#iclerk_options.inker, + cdb_options = IClerkOpts#iclerk_options.cdb_options}}; + MRL -> + {ok, #state{max_run_length = MRL, + inker = IClerkOpts#iclerk_options.inker, + cdb_options = IClerkOpts#iclerk_options.cdb_options}} + end. -handle_call({register, Owner}, _From, State) -> - {reply, ok, State#state{owner=Owner}}. +handle_call(_Msg, _From, State) -> + {reply, not_supprted, State}. -handle_cast({compact, Manifest, _ManifestSQN, Penciller, _Timeout}, State) -> +handle_cast({compact, Penciller, Inker, _Timeout}, State) -> + % Need to fetch manifest at start rather than have it be passed in + % Don't want to process a queued call waiting on an old manifest + Manifest = leveled_inker:ink_getmanifest(Inker), + MaxRunLength = State#state.max_run_length, PclOpts = #penciller_options{start_snapshot = true, source_penciller = Penciller, requestor = self()}, - PclSnap = leveled_penciller:pcl_start(PclOpts), - ok = leveled_penciller:pcl_loadsnapshot(PclSnap, []), - _CandidateList = scan_all_files(Manifest, PclSnap), - %% TODO - Lots - {noreply, State}; + FilterFun = fun leveled_penciller:pcl_checksequencenumber/3, + FilterServer = leveled_penciller:pcl_start(PclOpts), + ok = leveled_penciller:pcl_loadsnapshot(FilterServer, []), + Candidates = scan_all_files(Manifest, + FilterFun, + FilterServer), + BestRun = assess_candidates(Candidates, MaxRunLength), + case score_run(BestRun, MaxRunLength) of + Score when Score > 0 -> + print_compaction_run(BestRun, MaxRunLength), + CDBopts = State#state.cdb_options, + {ManifestSlice, + PromptDelete} = compact_files(BestRun, + CDBopts, + FilterFun, + FilterServer), + FilesToDelete = lists:map(fun(C) -> + {C#candidate.low_sqn, + C#candidate.filename, + C#candidate.journal} + end, + BestRun), + ok = leveled_inker:ink_updatemanifest(Inker, + ManifestSlice, + PromptDelete, + FilesToDelete), + {noreply, State}; + Score -> + io:format("No compaction run as highest score=~w~n", [Score]), + {noreply, State} + end; handle_cast({remove, _Removals}, State) -> {noreply, State}; handle_cast(stop, State) -> @@ -84,15 +135,13 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================ -check_single_file(CDB, PclSnap, SampleSize, BatchSize) -> +check_single_file(CDB, FilterFun, FilterServer, SampleSize, BatchSize) -> PositionList = leveled_cdb:cdb_getpositions(CDB, SampleSize), KeySizeList = fetch_inbatches(PositionList, BatchSize, CDB, []), R0 = lists:foldl(fun(KS, {ActSize, RplSize}) -> - {{PK, SQN}, Size} = KS, - Chk = leveled_pcl:pcl_checksequencenumber(PclSnap, - PK, - SQN), - case Chk of + {{SQN, PK}, Size} = KS, + Check = FilterFun(FilterServer, PK, SQN), + case Check of true -> {ActSize + Size, RplSize}; false -> @@ -101,32 +150,345 @@ check_single_file(CDB, PclSnap, SampleSize, BatchSize) -> {0, 0}, KeySizeList), {ActiveSize, ReplacedSize} = R0, - 100 * (ActiveSize / (ActiveSize + ReplacedSize)). + 100 * ActiveSize / (ActiveSize + ReplacedSize). -scan_all_files(Manifest, Penciller) -> - scan_all_files(Manifest, Penciller, []). +scan_all_files(Manifest, FilterFun, FilterServer) -> + scan_all_files(Manifest, FilterFun, FilterServer, []). -scan_all_files([], _Penciller, CandidateList) -> +scan_all_files([], _FilterFun, _FilterServer, CandidateList) -> CandidateList; -scan_all_files([{LowSQN, FN, JournalP}|Tail], Penciller, CandidateList) -> - CompactPerc = check_single_file(JournalP, - Penciller, - ?SAMPLE_SIZE, - ?BATCH_SIZE), - scan_all_files(Tail, Penciller, CandidateList ++ - [{LowSQN, FN, JournalP, CompactPerc}]). +scan_all_files([Entry|Tail], FilterFun, FilterServer, CandidateList) -> + {LowSQN, FN, JournalP} = Entry, + CpctPerc = check_single_file(JournalP, + FilterFun, + FilterServer, + ?SAMPLE_SIZE, + ?BATCH_SIZE), + scan_all_files(Tail, + FilterFun, + FilterServer, + CandidateList ++ + [#candidate{low_sqn = LowSQN, + filename = FN, + journal = JournalP, + compaction_perc = CpctPerc}]). fetch_inbatches([], _BatchSize, _CDB, CheckedList) -> CheckedList; fetch_inbatches(PositionList, BatchSize, CDB, CheckedList) -> - {Batch, Tail} = lists:split(BatchSize, PositionList), - KL_List = leveled_cdb:direct_fetch(CDB, Batch, key_size), + {Batch, Tail} = if + length(PositionList) >= BatchSize -> + lists:split(BatchSize, PositionList); + true -> + {PositionList, []} + end, + KL_List = leveled_cdb:cdb_directfetch(CDB, Batch, key_size), fetch_inbatches(Tail, BatchSize, CDB, CheckedList ++ KL_List). +assess_candidates(AllCandidates, MaxRunLength) -> + NaiveBestRun = assess_candidates(AllCandidates, MaxRunLength, [], []), + case length(AllCandidates) of + L when L > MaxRunLength, MaxRunLength > 1 -> + %% Assess with different offsets from the start + SqL = lists:seq(1, MaxRunLength - 1), + lists:foldl(fun(Counter, BestRun) -> + SubList = lists:nthtail(Counter, + AllCandidates), + assess_candidates(SubList, + MaxRunLength, + [], + BestRun) + end, + NaiveBestRun, + SqL); + _ -> + NaiveBestRun + end. + +assess_candidates([], _MaxRunLength, _CurrentRun0, BestAssessment) -> + BestAssessment; +assess_candidates([HeadC|Tail], MaxRunLength, CurrentRun0, BestAssessment) -> + CurrentRun1 = choose_best_assessment(CurrentRun0 ++ [HeadC], + [HeadC], + MaxRunLength), + assess_candidates(Tail, + MaxRunLength, + CurrentRun1, + choose_best_assessment(CurrentRun1, + BestAssessment, + MaxRunLength)). +choose_best_assessment(RunToAssess, BestRun, MaxRunLength) -> + case length(RunToAssess) of + LR1 when LR1 > MaxRunLength -> + BestRun; + _ -> + AssessScore = score_run(RunToAssess, MaxRunLength), + BestScore = score_run(BestRun, MaxRunLength), + if + AssessScore > BestScore -> + RunToAssess; + true -> + BestRun + end + end. + +score_run([], _MaxRunLength) -> + 0.0; +score_run(Run, MaxRunLength) -> + TargetIncr = case MaxRunLength of + 1 -> + 0.0; + MaxRunSize -> + (?MAXRUN_COMPACTION_TARGET + - ?SINGLEFILE_COMPACTION_TARGET) + / (MaxRunSize - 1) + end, + Target = ?SINGLEFILE_COMPACTION_TARGET + TargetIncr * (length(Run) - 1), + RunTotal = lists:foldl(fun(Cand, Acc) -> + Acc + Cand#candidate.compaction_perc end, + 0.0, + Run), + Target - RunTotal / length(Run). +print_compaction_run(BestRun, MaxRunLength) -> + io:format("Compaction to be performed on ~w files with score of ~w~n", + [length(BestRun), score_run(BestRun, MaxRunLength)]), + lists:foreach(fun(File) -> + io:format("Filename ~s is part of compaction run~n", + [File#candidate.filename]) + end, + BestRun). + +compact_files([], _CDBopts, _FilterFun, _FilterServer) -> + {[], 0}; +compact_files(BestRun, CDBopts, FilterFun, FilterServer) -> + BatchesOfPositions = get_all_positions(BestRun, []), + compact_files(BatchesOfPositions, + CDBopts, + null, + FilterFun, + FilterServer, + [], + true). + +compact_files([], _CDBopts, _ActiveJournal0, _FilterFun, _FilterServer, + ManSlice0, PromptDelete0) -> + %% Need to close the active file + {ManSlice0, PromptDelete0}; +compact_files([Batch|T], CDBopts, ActiveJournal0, FilterFun, FilterServer, + ManSlice0, PromptDelete0) -> + {SrcJournal, PositionList} = Batch, + KVCs0 = leveled_cdb:cdb_directfetch(SrcJournal, + PositionList, + key_value_check), + R0 = filter_output(KVCs0, + FilterFun, + FilterServer), + {KVCs1, PromptDelete1} = R0, + PromptDelete2 = case {PromptDelete0, PromptDelete1} of + {true, true} -> + true; + _ -> + false + end, + {ActiveJournal1, ManSlice1} = write_values(KVCs1, + CDBopts, + ActiveJournal0, + ManSlice0), + compact_files(T, CDBopts, ActiveJournal1, FilterFun, FilterServer, + ManSlice1, PromptDelete2). + +get_all_positions([], PositionBatches) -> + PositionBatches; +get_all_positions([HeadRef|RestOfBest], PositionBatches) -> + SrcJournal = HeadRef#candidate.journal, + Positions = leveled_cdb:cdb_getpositions(SrcJournal, all), + Batches = split_positions_into_batches(Positions, SrcJournal, []), + get_all_positions(RestOfBest, PositionBatches ++ Batches). + +split_positions_into_batches([], _Journal, Batches) -> + Batches; +split_positions_into_batches(Positions, Journal, Batches) -> + {ThisBatch, Tail} = lists:split(?BATCH_SIZE, Positions), + split_positions_into_batches(Tail, + Journal, + Batches ++ [{Journal, ThisBatch}]). + + +filter_output(KVCs, FilterFun, FilterServer) -> + lists:foldl(fun(KVC, {Acc, PromptDelete}) -> + {{SQN, PK}, _V, CrcCheck} = KVC, + KeyValid = FilterFun(FilterServer, PK, SQN), + case {KeyValid, CrcCheck} of + {true, true} -> + {Acc ++ [KVC], PromptDelete}; + {false, _} -> + {Acc, PromptDelete}; + {_, false} -> + io:format("Corrupted value found for " ++ " + Key ~w at SQN ~w~n", [PK, SQN]), + {Acc, false} + end + end, + {[], true}, + KVCs). + + +write_values([KVC|Rest], CDBopts, ActiveJournal0, ManSlice0) -> + {{SQN, PK}, V, _CrcCheck} = KVC, + {ok, ActiveJournal1} = case ActiveJournal0 of + null -> + FP = CDBopts#cdb_options.file_path, + FN = leveled_inker:filepath(FP, + SQN, + new_journal), + leveled_cdb:cdb_open_writer(FN, + CDBopts); + _ -> + {ok, ActiveJournal0} + end, + R = leveled_cdb:cdb_put(ActiveJournal1, {SQN, PK}, V), + case R of + ok -> + write_values(Rest, CDBopts, ActiveJournal1, ManSlice0); + roll -> + {ok, NewFN} = leveled_cdb:cdb_complete(ActiveJournal1), + {ok, PidR} = leveled_cdb:cdb_open_reader(NewFN), + {StartSQN, _PK} = leveled_cdb:cdb_firstkey(PidR), + ManSlice1 = ManSlice0 ++ [{StartSQN, NewFN, PidR}], + write_values(Rest, CDBopts, null, ManSlice1) + end. + + + + + + + + + %%%============================================================================ %%% Test %%%============================================================================ + + +-ifdef(TEST). + +simple_score_test() -> + Run1 = [#candidate{compaction_perc = 75.0}, + #candidate{compaction_perc = 75.0}, + #candidate{compaction_perc = 76.0}, + #candidate{compaction_perc = 70.0}], + ?assertMatch(6.0, score_run(Run1, 4)), + Run2 = [#candidate{compaction_perc = 75.0}], + ?assertMatch(-15.0, score_run(Run2, 4)), + ?assertMatch(0.0, score_run([], 4)). + +score_compare_test() -> + Run1 = [#candidate{compaction_perc = 75.0}, + #candidate{compaction_perc = 75.0}, + #candidate{compaction_perc = 76.0}, + #candidate{compaction_perc = 70.0}], + ?assertMatch(6.0, score_run(Run1, 4)), + Run2 = [#candidate{compaction_perc = 75.0}], + ?assertMatch(Run1, choose_best_assessment(Run1, Run2, 4)), + ?assertMatch(Run2, choose_best_assessment(Run1 ++ Run2, Run2, 4)). + +find_bestrun_test() -> +%% Tests dependent on these defaults +%% -define(MAX_COMPACTION_RUN, 4). +%% -define(SINGLEFILE_COMPACTION_TARGET, 60.0). +%% -define(MAXRUN_COMPACTION_TARGET, 80.0). +%% Tested first with blocks significant as no back-tracking + Block1 = [#candidate{compaction_perc = 75.0}, + #candidate{compaction_perc = 85.0}, + #candidate{compaction_perc = 62.0}, + #candidate{compaction_perc = 70.0}], + Block2 = [#candidate{compaction_perc = 58.0}, + #candidate{compaction_perc = 95.0}, + #candidate{compaction_perc = 95.0}, + #candidate{compaction_perc = 65.0}], + Block3 = [#candidate{compaction_perc = 90.0}, + #candidate{compaction_perc = 100.0}, + #candidate{compaction_perc = 100.0}, + #candidate{compaction_perc = 100.0}], + Block4 = [#candidate{compaction_perc = 75.0}, + #candidate{compaction_perc = 76.0}, + #candidate{compaction_perc = 76.0}, + #candidate{compaction_perc = 60.0}], + Block5 = [#candidate{compaction_perc = 80.0}, + #candidate{compaction_perc = 80.0}], + CList0 = Block1 ++ Block2 ++ Block3 ++ Block4 ++ Block5, + ?assertMatch(Block4, assess_candidates(CList0, 4, [], [])), + CList1 = CList0 ++ [#candidate{compaction_perc = 20.0}], + ?assertMatch([#candidate{compaction_perc = 20.0}], + assess_candidates(CList1, 4, [], [])), + CList2 = Block4 ++ Block3 ++ Block2 ++ Block1 ++ Block5, + ?assertMatch(Block4, assess_candidates(CList2, 4, [], [])), + CList3 = Block5 ++ Block1 ++ Block2 ++ Block3 ++ Block4, + ?assertMatch([#candidate{compaction_perc = 62.0}, + #candidate{compaction_perc = 70.0}, + #candidate{compaction_perc = 58.0}], + assess_candidates(CList3, 4, [], [])), + %% Now do some back-tracking to get a genuinely optimal solution without + %% needing to re-order + ?assertMatch([#candidate{compaction_perc = 62.0}, + #candidate{compaction_perc = 70.0}, + #candidate{compaction_perc = 58.0}], + assess_candidates(CList0, 4)), + ?assertMatch([#candidate{compaction_perc = 62.0}, + #candidate{compaction_perc = 70.0}, + #candidate{compaction_perc = 58.0}], + assess_candidates(CList0, 5)), + ?assertMatch([#candidate{compaction_perc = 62.0}, + #candidate{compaction_perc = 70.0}, + #candidate{compaction_perc = 58.0}, + #candidate{compaction_perc = 95.0}, + #candidate{compaction_perc = 95.0}, + #candidate{compaction_perc = 65.0}], + assess_candidates(CList0, 6)). + +check_single_file_test() -> + RP = "../test/journal", + FN1 = leveled_inker:filepath(RP, 1, new_journal), + {ok, CDB1} = leveled_cdb:cdb_open_writer(FN1, #cdb_options{}), + {K1, V1} = {{1, "Key1"}, term_to_binary("Value1")}, + {K2, V2} = {{2, "Key2"}, term_to_binary("Value2")}, + {K3, V3} = {{3, "Key3"}, term_to_binary("Value3")}, + {K4, V4} = {{4, "Key1"}, term_to_binary("Value4")}, + {K5, V5} = {{5, "Key1"}, term_to_binary("Value5")}, + {K6, V6} = {{6, "Key1"}, term_to_binary("Value6")}, + {K7, V7} = {{7, "Key1"}, term_to_binary("Value7")}, + {K8, V8} = {{8, "Key1"}, term_to_binary("Value8")}, + ok = leveled_cdb:cdb_put(CDB1, K1, V1), + ok = leveled_cdb:cdb_put(CDB1, K2, V2), + ok = leveled_cdb:cdb_put(CDB1, K3, V3), + ok = leveled_cdb:cdb_put(CDB1, K4, V4), + ok = leveled_cdb:cdb_put(CDB1, K5, V5), + ok = leveled_cdb:cdb_put(CDB1, K6, V6), + ok = leveled_cdb:cdb_put(CDB1, K7, V7), + ok = leveled_cdb:cdb_put(CDB1, K8, V8), + {ok, FN2} = leveled_cdb:cdb_complete(CDB1), + {ok, CDB2} = leveled_cdb:cdb_open_reader(FN2), + LedgerSrv1 = [{8, "Key1"}, {2, "Key2"}, {3, "Key3"}], + LedgerFun1 = fun(Srv, Key, ObjSQN) -> + case lists:keyfind(ObjSQN, 1, Srv) of + {ObjSQN, Key} -> + true; + _ -> + false + end end, + Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 8, 4), + ?assertMatch(37.5, Score1), + LedgerFun2 = fun(_Srv, _Key, _ObjSQN) -> true end, + Score2 = check_single_file(CDB2, LedgerFun2, LedgerSrv1, 8, 4), + ?assertMatch(100.0, Score2), + Score3 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 8, 3), + ?assertMatch(37.5, Score3), + ok = leveled_cdb:cdb_destroy(CDB2). + + +-endif. \ No newline at end of file diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index b7a54a6..8cc8722 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -105,16 +105,20 @@ ink_loadpcl/4, ink_registersnapshot/2, ink_compactjournal/3, - ink_close/1, + ink_getmanifest/1, + ink_updatemanifest/4, ink_print_manifest/1, + ink_close/1, build_dummy_journal/0, simple_manifest_reader/2, - clean_testdir/1]). + clean_testdir/1, + filepath/3]). -include_lib("eunit/include/eunit.hrl"). -define(MANIFEST_FP, "journal_manifest"). -define(FILES_FP, "journal_files"). +-define(COMPACT_FP, "post_compact"). -define(JOURNAL_FILEX, "cdb"). -define(MANIFEST_FILEX, "man"). -define(PENDING_FILEX, "pnd"). @@ -126,7 +130,7 @@ journal_sqn = 0 :: integer(), active_journaldb :: pid(), active_journaldb_sqn :: integer(), - removed_journaldbs = [] :: list(), + pending_removals = [] :: list(), registered_snapshots = [] :: list(), root_path :: string(), cdb_options :: #cdb_options{}, @@ -161,6 +165,17 @@ ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> ink_compactjournal(Pid, Penciller, Timeout) -> gen_server:call(Pid, {compact_journal, Penciller, Timeout}, infinty). +ink_getmanifest(Pid) -> + gen_server:call(Pid, get_manifest, infinity). + +ink_updatemanifest(Pid, ManifestSnippet, PromptDeletion, DeletedFiles) -> + gen_server:call(Pid, + {update_manifest, + ManifestSnippet, + PromptDeletion, + DeletedFiles}, + infinity). + ink_print_manifest(Pid) -> gen_server:call(Pid, print_manifest, infinity). @@ -233,11 +248,45 @@ handle_call({register_snapshot, Requestor}, _From , State) -> {reply, {State#state.manifest, State#state.active_journaldb}, State#state{registered_snapshots=Rs}}; +handle_call(get_manifest, _From, State) -> + {reply, State#state.manifest, State}; +handle_call({update_manifest, + ManifestSnippet, + PromptDeletion, + DeletedFiles}, _From, State) -> + Man0 = lists:foldl(fun(ManEntry, AccMan) -> + Check = lists:member(ManEntry, DeletedFiles), + if + Check == false -> + lists:append(AccMan, ManEntry) + end + end, + [], + State#state.manifest), + Man1 = lists:foldl(fun(ManEntry, AccMan) -> + add_to_manifest(AccMan, ManEntry) end, + Man0, + ManifestSnippet), + NewManifestSQN = State#state.manifest_sqn + 1, + ok = simple_manifest_writer(Man1, NewManifestSQN, State#state.root_path), + PendingRemovals = case PromptDeletion of + true -> + State#state.pending_removals ++ + {NewManifestSQN, DeletedFiles}; + _ -> + State#state.pending_removals + end, + {reply, ok, State#state{manifest=Man1, + manifest_sqn=NewManifestSQN, + pending_removals=PendingRemovals}}; handle_call(print_manifest, _From, State) -> manifest_printer(State#state.manifest), {reply, ok, State}; handle_call({compact_journal, Penciller, Timeout}, _From, State) -> - leveled_iclerk:clerk_compact(Penciller, Timeout), + leveled_iclerk:clerk_compact(State#state.clerk, + self(), + Penciller, + Timeout), {reply, ok, State}; handle_call(close, _From, State) -> {stop, normal, ok, State}. @@ -265,7 +314,6 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================ start_from_file(InkerOpts) -> - {ok, Clerk} = leveled_iclerk:clerk_new(self()), RootPath = InkerOpts#inker_options.root_path, CDBopts = InkerOpts#inker_options.cdb_options, JournalFP = filepath(RootPath, journal_dir), @@ -284,6 +332,14 @@ start_from_file(InkerOpts) -> filelib:ensure_dir(ManifestFP), {ok, []} end, + + CompactFP = filepath(RootPath, journal_compact_dir), + filelib:ensure_dir(CompactFP), + IClerkCDBOpts = CDBopts#cdb_options{file_path = CompactFP}, + IClerkOpts = #iclerk_options{inker = self(), + cdb_options=IClerkCDBOpts}, + {ok, Clerk} = leveled_iclerk:clerk_new(IClerkOpts), + {Manifest, {ActiveJournal, LowActiveSQN}, JournalSQN, @@ -597,8 +653,9 @@ find_in_manifest(SQN, [_Head|Tail]) -> filepath(RootPath, journal_dir) -> RootPath ++ "/" ++ ?FILES_FP ++ "/"; filepath(RootPath, manifest_dir) -> - RootPath ++ "/" ++ ?MANIFEST_FP ++ "/". - + RootPath ++ "/" ++ ?MANIFEST_FP ++ "/"; +filepath(RootPath, journal_compact_dir) -> + filepath(RootPath, journal_dir) ++ "/" ++ ?COMPACT_FP ++ "/". filepath(RootPath, NewSQN, new_journal) -> filename:join(filepath(RootPath, journal_dir), diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 793edc5..ac1bf68 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -1393,6 +1393,7 @@ findremainder(BitStr, Factor) -> %%%============================================================================ +-ifdef(TEST). generate_randomkeys({Count, StartSQN}) -> generate_randomkeys(Count, StartSQN, []); @@ -1760,4 +1761,5 @@ big_iterator_test() -> ?assertMatch(NumFoundKeys3, 4 * 128), ok = file:close(Handle), ok = file:delete(Filename). - \ No newline at end of file + +-endif. \ No newline at end of file