diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 2ead967..f6d85d7 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -87,6 +87,11 @@ handle_cast({compact, Penciller, Inker, _Timeout}, State) -> FilterFun = fun leveled_penciller:pcl_checksequencenumber/3, FilterServer = leveled_penciller:pcl_start(PclOpts), ok = leveled_penciller:pcl_loadsnapshot(FilterServer, []), + + CDBopts = State#state.cdb_options, + FP = CDBopts#cdb_options.file_path, + ok = filelib:ensure_dir(FP), + Candidates = scan_all_files(Manifest, FilterFun, FilterServer), @@ -94,7 +99,6 @@ handle_cast({compact, Penciller, Inker, _Timeout}, State) -> case score_run(BestRun, MaxRunLength) of Score when Score > 0 -> print_compaction_run(BestRun, MaxRunLength), - CDBopts = State#state.cdb_options, {ManifestSlice, PromptDelete} = compact_files(BestRun, CDBopts, @@ -274,10 +278,10 @@ compact_files(BestRun, CDBopts, FilterFun, FilterServer) -> [], true). -compact_files([], _CDBopts, _ActiveJournal0, _FilterFun, _FilterServer, +compact_files([], _CDBopts, ActiveJournal0, _FilterFun, _FilterServer, ManSlice0, PromptDelete0) -> - %% Need to close the active file - {ManSlice0, PromptDelete0}; + ManSlice1 = ManSlice0 ++ generate_manifest_entry(ActiveJournal0), + {ManSlice1, PromptDelete0}; compact_files([Batch|T], CDBopts, ActiveJournal0, FilterFun, FilterServer, ManSlice0, PromptDelete0) -> {SrcJournal, PositionList} = Batch, @@ -306,13 +310,20 @@ get_all_positions([], PositionBatches) -> get_all_positions([HeadRef|RestOfBest], PositionBatches) -> SrcJournal = HeadRef#candidate.journal, Positions = leveled_cdb:cdb_getpositions(SrcJournal, all), - Batches = split_positions_into_batches(Positions, SrcJournal, []), + Batches = split_positions_into_batches(lists:sort(Positions), + SrcJournal, + []), get_all_positions(RestOfBest, PositionBatches ++ Batches). split_positions_into_batches([], _Journal, Batches) -> Batches; split_positions_into_batches(Positions, Journal, Batches) -> - {ThisBatch, Tail} = lists:split(?BATCH_SIZE, Positions), + {ThisBatch, Tail} = if + length(Positions) > ?BATCH_SIZE -> + lists:split(?BATCH_SIZE, Positions); + true -> + {Positions, []} + end, split_positions_into_batches(Tail, Journal, Batches ++ [{Journal, ThisBatch}]). @@ -337,32 +348,36 @@ filter_output(KVCs, FilterFun, FilterServer) -> KVCs). -write_values([KVC|Rest], CDBopts, ActiveJournal0, ManSlice0) -> +write_values([], _CDBopts, Journal0, ManSlice0) -> + {Journal0, ManSlice0}; +write_values([KVC|Rest], CDBopts, Journal0, ManSlice0) -> {{SQN, PK}, V, _CrcCheck} = KVC, - {ok, ActiveJournal1} = case ActiveJournal0 of - null -> - FP = CDBopts#cdb_options.file_path, - FN = leveled_inker:filepath(FP, - SQN, - new_journal), - leveled_cdb:cdb_open_writer(FN, - CDBopts); - _ -> - {ok, ActiveJournal0} - end, - R = leveled_cdb:cdb_put(ActiveJournal1, {SQN, PK}, V), + {ok, Journal1} = case Journal0 of + null -> + FP = CDBopts#cdb_options.file_path, + FN = leveled_inker:filepath(FP, + SQN, + compact_journal), + leveled_cdb:cdb_open_writer(FN, + CDBopts); + _ -> + {ok, Journal0} + end, + R = leveled_cdb:cdb_put(Journal1, {SQN, PK}, V), case R of ok -> - write_values(Rest, CDBopts, ActiveJournal1, ManSlice0); + write_values(Rest, CDBopts, Journal1, ManSlice0); roll -> - {ok, NewFN} = leveled_cdb:cdb_complete(ActiveJournal1), - {ok, PidR} = leveled_cdb:cdb_open_reader(NewFN), - {StartSQN, _PK} = leveled_cdb:cdb_firstkey(PidR), - ManSlice1 = ManSlice0 ++ [{StartSQN, NewFN, PidR}], + ManSlice1 = ManSlice0 ++ generate_manifest_entry(Journal1), write_values(Rest, CDBopts, null, ManSlice1) end. +generate_manifest_entry(ActiveJournal) -> + {ok, NewFN} = leveled_cdb:cdb_complete(ActiveJournal), + {ok, PidR} = leveled_cdb:cdb_open_reader(NewFN), + {StartSQN, _PK} = leveled_cdb:cdb_firstkey(PidR), + [{StartSQN, NewFN, PidR}]. @@ -451,8 +466,7 @@ find_bestrun_test() -> #candidate{compaction_perc = 65.0}], assess_candidates(CList0, 6)). -check_single_file_test() -> - RP = "../test/journal", +fetch_testcdb(RP) -> FN1 = leveled_inker:filepath(RP, 1, new_journal), {ok, CDB1} = leveled_cdb:cdb_open_writer(FN1, #cdb_options{}), {K1, V1} = {{1, "Key1"}, term_to_binary("Value1")}, @@ -472,7 +486,11 @@ check_single_file_test() -> ok = leveled_cdb:cdb_put(CDB1, K7, V7), ok = leveled_cdb:cdb_put(CDB1, K8, V8), {ok, FN2} = leveled_cdb:cdb_complete(CDB1), - {ok, CDB2} = leveled_cdb:cdb_open_reader(FN2), + leveled_cdb:cdb_open_reader(FN2). + +check_single_file_test() -> + RP = "../test/journal", + {ok, CDB} = fetch_testcdb(RP), LedgerSrv1 = [{8, "Key1"}, {2, "Key2"}, {3, "Key3"}], LedgerFun1 = fun(Srv, Key, ObjSQN) -> case lists:keyfind(ObjSQN, 1, Srv) of @@ -481,14 +499,46 @@ check_single_file_test() -> _ -> false end end, - Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 8, 4), + Score1 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 8, 4), ?assertMatch(37.5, Score1), LedgerFun2 = fun(_Srv, _Key, _ObjSQN) -> true end, - Score2 = check_single_file(CDB2, LedgerFun2, LedgerSrv1, 8, 4), + Score2 = check_single_file(CDB, LedgerFun2, LedgerSrv1, 8, 4), ?assertMatch(100.0, Score2), - Score3 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 8, 3), + Score3 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 8, 3), ?assertMatch(37.5, Score3), - ok = leveled_cdb:cdb_destroy(CDB2). + ok = leveled_cdb:cdb_destroy(CDB). +compact_single_file_test() -> + RP = "../test/journal", + {ok, CDB} = fetch_testcdb(RP), + Candidate = #candidate{journal = CDB, + low_sqn = 1, + filename = "test", + compaction_perc = 37.5}, + LedgerSrv1 = [{8, "Key1"}, {2, "Key2"}, {3, "Key3"}], + LedgerFun1 = fun(Srv, Key, ObjSQN) -> + case lists:keyfind(ObjSQN, 1, Srv) of + {ObjSQN, Key} -> + true; + _ -> + false + end end, + CompactFP = leveled_inker:filepath(RP, journal_compact_dir), + ok = filelib:ensure_dir(CompactFP), + R1 = compact_files([Candidate], + #cdb_options{file_path=CompactFP}, + LedgerFun1, + LedgerSrv1), + {ManSlice1, PromptDelete1} = R1, + ?assertMatch(true, PromptDelete1), + [{LowSQN, FN, PidR}] = ManSlice1, + io:format("FN of ~s~n", [FN]), + ?assertMatch(2, LowSQN), + ?assertMatch(probably, leveled_cdb:cdb_keycheck(PidR, {8, "Key1"})), + ?assertMatch(missing, leveled_cdb:cdb_get(PidR, {7, "Key1"})), + ?assertMatch(missing, leveled_cdb:cdb_get(PidR, {1, "Key1"})), + {_RK1, RV1} = leveled_cdb:cdb_get(PidR, {2, "Key2"}), + ?assertMatch("Value2", binary_to_term(RV1)). + -endif. \ No newline at end of file diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 8cc8722..8142799 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -112,6 +112,7 @@ build_dummy_journal/0, simple_manifest_reader/2, clean_testdir/1, + filepath/2, filepath/3]). -include_lib("eunit/include/eunit.hrl"). @@ -659,11 +660,15 @@ filepath(RootPath, journal_compact_dir) -> filepath(RootPath, NewSQN, new_journal) -> filename:join(filepath(RootPath, journal_dir), + "nursery_" + ++ integer_to_list(NewSQN) + ++ "." ++ ?PENDING_FILEX); +filepath(CompactFilePath, NewSQN, compact_journal) -> + filename:join(CompactFilePath, "nursery_" ++ integer_to_list(NewSQN) ++ "." ++ ?PENDING_FILEX). - simple_manifest_reader(SQN, RootPath) -> ManifestPath = filepath(RootPath, manifest_dir), io:format("Opening manifest file at ~s with SQN ~w~n",