From dc28388c76e5e0dcdbaf2c451b2299eb5864c9a8 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Thu, 29 Dec 2016 02:07:14 +0000 Subject: [PATCH] Removed SFT Now moved over to SST on this branch --- src/leveled_bookie.erl | 5 +- src/leveled_log.erl | 13 +- src/leveled_pclerk.erl | 75 +- src/leveled_penciller.erl | 286 +++--- src/leveled_sft.erl | 2024 ------------------------------------- src/leveled_skiplist.erl | 20 +- src/leveled_sst.erl | 304 +++++- 7 files changed, 481 insertions(+), 2246 deletions(-) delete mode 100644 src/leveled_sft.erl diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 90a0b2a..d3c3f1f 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -1186,7 +1186,10 @@ hashtree_query_test() -> {hashtree_query, ?STD_TAG, false}), - ?assertMatch(KeyHashList, HTFolder2()), + L0 = length(KeyHashList), + HTR2 = HTFolder2(), + ?assertMatch(L0, length(HTR2)), + ?assertMatch(KeyHashList, HTR2), ok = book_close(Bookie2), reset_filestructure(). diff --git a/src/leveled_log.erl b/src/leveled_log.erl index 8161409..8ab798f 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -19,7 +19,7 @@ -define(GET_LOGPOINT, 160000). -define(SST_LOGPOINT, 200000). -define(LOG_LEVEL, [info, warn, error, critical]). --define(SAMPLE_RATE, 15). +-define(SAMPLE_RATE, 16). -define(LOGBASE, dict:from_list([ @@ -96,7 +96,7 @@ {info, "Response to push_mem of ~w with " ++ "L0 pending ~w and merge backlog ~w"}}, {"P0019", - {info, "Rolling level zero to filename ~s"}}, + {info, "Rolling level zero to filename ~s at ledger sqn ~w"}}, {"P0020", {info, "Work at Level ~w to be scheduled for ~w with ~w " ++ "queue items outstanding at all levels"}}, @@ -238,11 +238,18 @@ {error, "False result returned from SST with filename ~s as " ++ "slot ~w has failed crc check"}}, {"SST03", - {info, "Opening SST file with filename ~s keys ~w and slots ~w"}}, + {info, "Opening SST file with filename ~s keys ~w slots ~w and" + ++ " max sqn ~w"}}, {"SST04", {info, "Exit called for reason ~w on filename ~s"}}, {"SST05", {warn, "Rename rogue filename ~s to ~s"}}, + {"SST06", + {info, "File ~s has been set for delete"}}, + {"SST07", + {info, "Exit called and now clearing ~s"}}, + {"SST08", + {info, "Completed creation of ~s at level ~w with max sqn ~w"}}, {"SFT01", {info, "Opened filename with name ~s"}}, diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index a0f64d9..2f29920 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -9,7 +9,7 @@ %% %% -------- COMMITTING MANIFEST CHANGES --------- %% -%% Once the Penciller has taken a manifest change, the SFT file owners which no +%% Once the Penciller has taken a manifest change, the SST file owners which no %% longer form part of the manifest will be marked for delete. By marking for %% deletion, the owners will poll to confirm when it is safe for them to be %% deleted. @@ -225,7 +225,7 @@ merge(WI) -> mark_for_delete([], _Penciller) -> ok; mark_for_delete([Head|Tail], Penciller) -> - ok = leveled_sft:sft_setfordelete(Head#manifest_entry.owner, Penciller), + ok = leveled_sst:sst_setfordelete(Head#manifest_entry.owner, Penciller), mark_for_delete(Tail, Penciller). @@ -268,13 +268,13 @@ select_filetomerge(SrcLevel, Manifest) -> -%% Assumption is that there is a single SFT from a higher level that needs -%% to be merged into multiple SFTs at a lower level. This should create an -%% entirely new set of SFTs, and the calling process can then update the +%% Assumption is that there is a single SST from a higher level that needs +%% to be merged into multiple SSTs at a lower level. This should create an +%% entirely new set of SSTs, and the calling process can then update the %% manifest. %% %% Once the FileToMerge has been emptied, the remainder of the candidate list -%% needs to be placed in a remainder SFT that may be of a sub-optimal (small) +%% needs to be placed in a remainder SST that may be of a sub-optimal (small) %% size. This stops the need to perpetually roll over the whole level if the %% level consists of already full files. Some smartness may be required when %% selecting the candidate list so that small files just outside the candidate @@ -293,18 +293,22 @@ perform_merge({SrcPid, SrcFN}, CandidateList, LevelInfo, {Filepath, MSN}) -> PointerList = lists:map(fun(P) -> {next, P#manifest_entry.owner, all} end, CandidateList), + MaxSQN = leveled_sst:sst_getmaxsequencenumber(SrcPid), do_merge([{next, SrcPid, all}], PointerList, LevelInfo, {Filepath, MSN}, + MaxSQN, 0, []). -do_merge([], [], {SrcLevel, _IsB}, {_Filepath, MSN}, FileCounter, OutList) -> +do_merge([], [], {SrcLevel, _IsB}, {_Filepath, MSN}, _MaxSQN, + FileCounter, OutList) -> leveled_log:log("PC011", [MSN, SrcLevel, FileCounter]), OutList; -do_merge(KL1, KL2, {SrcLevel, IsB}, {Filepath, MSN}, FileCounter, OutList) -> - FileName = lists:flatten(io_lib:format(Filepath ++ "_~w_~w.sft", +do_merge(KL1, KL2, {SrcLevel, IsB}, {Filepath, MSN}, MaxSQN, + FileCounter, OutList) -> + FileName = lists:flatten(io_lib:format(Filepath ++ "_~w_~w.sst", [SrcLevel + 1, FileCounter])), leveled_log:log("PC012", [MSN, FileName]), TS1 = os:timestamp(), @@ -312,12 +316,13 @@ do_merge(KL1, KL2, {SrcLevel, IsB}, {Filepath, MSN}, FileCounter, OutList) -> KL1, KL2, IsB, - SrcLevel + 1), + SrcLevel + 1, + MaxSQN), case Reply of {{[], []}, null, _} -> leveled_log:log("PC013", [FileName]), leveled_log:log("PC014", [FileName]), - ok = leveled_sft:sft_clear(Pid), + ok = leveled_sst:sst_clear(Pid), OutList; {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} -> ExtMan = lists:append(OutList, @@ -327,7 +332,7 @@ do_merge(KL1, KL2, {SrcLevel, IsB}, {Filepath, MSN}, FileCounter, OutList) -> filename=FileName}]), leveled_log:log_timer("PC015", [], TS1), do_merge(KL1Rem, KL2Rem, - {SrcLevel, IsB}, {Filepath, MSN}, + {SrcLevel, IsB}, {Filepath, MSN}, MaxSQN, FileCounter + 1, ExtMan) end. @@ -377,7 +382,7 @@ find_randomkeys(FList, Count, Source) -> KV1 = lists:nth(random:uniform(length(Source)), Source), K1 = leveled_codec:strip_to_keyonly(KV1), P1 = choose_pid_toquery(FList, K1), - FoundKV = leveled_sft:sft_get(P1, K1), + FoundKV = leveled_sst:sst_get(P1, K1), Found = leveled_codec:strip_to_keyonly(FoundKV), io:format("success finding ~w in ~w~n", [K1, P1]), ?assertMatch(K1, Found), @@ -386,21 +391,31 @@ find_randomkeys(FList, Count, Source) -> merge_file_test() -> KL1_L1 = lists:sort(generate_randomkeys(8000, 0, 1000)), - {ok, PidL1_1, _} = leveled_sft:sft_new("../test/KL1_L1.sft", - KL1_L1, [], 1), + {ok, PidL1_1, _} = leveled_sst:sst_new("../test/KL1_L1.sst", + 1, + KL1_L1, + undefined), KL1_L2 = lists:sort(generate_randomkeys(8000, 0, 250)), - {ok, PidL2_1, _} = leveled_sft:sft_new("../test/KL1_L2.sft", - KL1_L2, [], 2), + {ok, PidL2_1, _} = leveled_sst:sst_new("../test/KL1_L2.sst", + 2, + KL1_L2, + undefined), KL2_L2 = lists:sort(generate_randomkeys(8000, 250, 250)), - {ok, PidL2_2, _} = leveled_sft:sft_new("../test/KL2_L2.sft", - KL2_L2, [], 2), + {ok, PidL2_2, _} = leveled_sst:sst_new("../test/KL2_L2.sst", + 2, + KL2_L2, + undefined), KL3_L2 = lists:sort(generate_randomkeys(8000, 500, 250)), - {ok, PidL2_3, _} = leveled_sft:sft_new("../test/KL3_L2.sft", - KL3_L2, [], 2), + {ok, PidL2_3, _} = leveled_sst:sst_new("../test/KL3_L2.sst", + 2, + KL3_L2, + undefined), KL4_L2 = lists:sort(generate_randomkeys(8000, 750, 250)), - {ok, PidL2_4, _} = leveled_sft:sft_new("../test/KL4_L2.sft", - KL4_L2, [], 2), - Result = perform_merge({PidL1_1, "../test/KL1_L1.sft"}, + {ok, PidL2_4, _} = leveled_sst:sst_new("../test/KL4_L2.sst", + 2, + KL4_L2, + undefined), + Result = perform_merge({PidL1_1, "../test/KL1_L1.sst"}, [#manifest_entry{owner=PidL2_1}, #manifest_entry{owner=PidL2_2}, #manifest_entry{owner=PidL2_3}, @@ -422,13 +437,13 @@ merge_file_test() -> ok = find_randomkeys(Result, 50, KL3_L2), io:format("Finding keys in KL4_L2~n"), ok = find_randomkeys(Result, 50, KL4_L2), - leveled_sft:sft_clear(PidL1_1), - leveled_sft:sft_clear(PidL2_1), - leveled_sft:sft_clear(PidL2_2), - leveled_sft:sft_clear(PidL2_3), - leveled_sft:sft_clear(PidL2_4), + leveled_sst:sst_clear(PidL1_1), + leveled_sst:sst_clear(PidL2_1), + leveled_sst:sst_clear(PidL2_2), + leveled_sst:sst_clear(PidL2_3), + leveled_sst:sst_clear(PidL2_4), lists:foreach(fun(ManEntry) -> - leveled_sft:sft_clear(ManEntry#manifest_entry.owner) end, + leveled_sst:sst_clear(ManEntry#manifest_entry.owner) end, Result). select_merge_candidates_test() -> diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 0de9b2b..7f36325 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -22,17 +22,17 @@ %% %% The Ledger is divided into many levels %% - L0: New keys are received from the Bookie and merged into a single -%% gb_tree, until that tree is the size of a SFT file, and it is then persisted -%% as a SFT file at this level. L0 SFT files can be larger than the normal +%% gb_tree, until that tree is the size of a SST file, and it is then persisted +%% as a SST file at this level. L0 SST files can be larger than the normal %% maximum size - so we don't have to consider problems of either having more %% than one L0 file (and handling what happens on a crash between writing the %% files when the second may have overlapping sequence numbers), or having a %% remainder with overlapping in sequence numbers in memory after the file is %% written. Once the persistence is completed, the L0 tree can be erased. -%% There can be only one SFT file at Level 0, so the work to merge that file +%% There can be only one SST file at Level 0, so the work to merge that file %% to the lower level must be the highest priority, as otherwise writes to the %% ledger will stall, when there is next a need to persist. -%% - L1 TO L7: May contain multiple processes managing non-overlapping sft +%% - L1 TO L7: May contain multiple processes managing non-overlapping SST %% files. Compaction work should be sheduled if the number of files exceeds %% the target size of the level, where the target size is 8 ^ n. %% @@ -67,14 +67,14 @@ %% completed to merge the tree into the L0 tree. %% %% The Penciller MUST NOT accept a new PUSH if the Clerk has commenced the -%% conversion of the current L0 tree into a SFT file, but not completed this +%% conversion of the current L0 tree into a SST file, but not completed this %% change. The Penciller in this case returns the push, and the Bookie should %% continue to grow the cache before trying again. %% %% ---------- FETCH ---------- %% %% On request to fetch a key the Penciller should look first in the in-memory -%% L0 tree, then look in the SFT files Level by Level (including level 0), +%% L0 tree, then look in the SST files Level by Level (including level 0), %% consulting the Manifest to determine which file should be checked at each %% level. %% @@ -82,16 +82,16 @@ %% %% Iterators may request a snapshot of the database. A snapshot is a cloned %% Penciller seeded not from disk, but by the in-memory L0 gb_tree and the -%% in-memory manifest, allowing for direct reference for the SFT file processes. +%% in-memory manifest, allowing for direct reference for the SST file processes. %% %% Clones formed to support snapshots are registered by the Penciller, so that -%% SFT files valid at the point of the snapshot until either the iterator is +%% SST files valid at the point of the snapshot until either the iterator is %% completed or has timed out. %% %% ---------- ON STARTUP ---------- %% %% On Startup the Bookie with ask the Penciller to initiate the Ledger first. -%% To initiate the Ledger the must consult the manifest, and then start a SFT +%% To initiate the Ledger the must consult the manifest, and then start a SST %% management process for each file in the manifest. %% %% The penciller should then try and read any Level 0 file which has the @@ -103,14 +103,14 @@ %% ---------- ON SHUTDOWN ---------- %% %% On a controlled shutdown the Penciller should attempt to write any in-memory -%% ETS table to a L0 SFT file, assuming one is nto already pending. If one is +%% ETS table to a L0 SST file, assuming one is nto already pending. If one is %% already pending then the Penciller will not persist this part of the Ledger. %% %% ---------- FOLDER STRUCTURE ---------- %% %% The following folders are used by the Penciller %% $ROOT/ledger/ledger_manifest/ - used for keeping manifest files -%% $ROOT/ledger/ledger_files/ - containing individual SFT files +%% $ROOT/ledger/ledger_files/ - containing individual SST files %% %% In larger stores there could be a large number of files in the ledger_file %% folder - perhaps o(1000). It is assumed that modern file systems should @@ -120,7 +120,7 @@ %% %% The Penciller can have one and only one Clerk for performing compaction %% work. When the Clerk has requested and taken work, it should perform the -%5 compaction work starting the new SFT process to manage the new Ledger state +%5 compaction work starting the new SST process to manage the new Ledger state %% and then write a new manifest file that represents that state with using %% the next Manifest sequence number as the filename: %% - nonzero_.pnd @@ -130,14 +130,14 @@ %% %% On startup, the Penciller should look for the nonzero_*.crr file with the %% highest such manifest sequence number. This will be started as the -%% manifest, together with any _0_0.sft file found at that Manifest SQN. +%% manifest, together with any _0_0.sst file found at that Manifest SQN. %% Level zero files are not kept in the persisted manifest, and adding a L0 %% file does not advanced the Manifest SQN. %% %% The pace at which the store can accept updates will be dependent on the %% speed at which the Penciller's Clerk can merge files at lower levels plus %% the time it takes to merge from Level 0. As if a clerk has commenced -%% compaction work at a lower level and then immediately a L0 SFT file is +%% compaction work at a lower level and then immediately a L0 SST file is %% written the Penciller will need to wait for this compaction work to %% complete and the L0 file to be compacted before the ETS table can be %% allowed to again reach capacity @@ -145,7 +145,7 @@ %% The writing of L0 files do not require the involvement of the clerk. %% The L0 files are prompted directly by the penciller when the in-memory tree %% has reached capacity. This places the penciller in a levelzero_pending -%% state, and in this state it must return new pushes. Once the SFT file has +%% state, and in this state it must return new pushes. Once the SST file has %% been completed it will confirm completion to the penciller which can then %% revert the levelzero_pending state, add the file to the manifest and clear %% the current level zero in-memory view. @@ -399,10 +399,11 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys}, List -> List end, - SFTiter = initiate_rangequery_frommanifest(StartKey, + SSTiter = initiate_rangequery_frommanifest(StartKey, EndKey, State#state.manifest), - Acc = keyfolder({L0AsList, SFTiter}, + io:format("SSTiter on query ~w~n", [SSTiter]), + Acc = keyfolder({L0AsList, SSTiter}, {StartKey, EndKey}, {AccFun, InitAcc}, MaxKeys), @@ -456,7 +457,7 @@ handle_cast({confirm_delete, FileName}, State=#state{is_snapshot=Snap}) {true, Pid} -> UF1 = lists:keydelete(FileName, 1, State#state.unreferenced_files), leveled_log:log("P0005", [FileName]), - ok = leveled_sft:sft_deleteconfirmed(Pid), + ok = leveled_sst:sst_deleteconfirmed(Pid), {noreply, State#state{unreferenced_files=UF1}}; _ -> {noreply, State} @@ -525,7 +526,7 @@ terminate(Reason, State) -> leveled_log:log("P0009", []); {false, [], _N} -> L0Pid = roll_memory(UpdState, true), - ok = leveled_sft:sft_close(L0Pid); + ok = leveled_sst:sst_close(L0Pid); StatusTuple -> leveled_log:log("P0010", [StatusTuple]) end, @@ -533,7 +534,7 @@ terminate(Reason, State) -> % Tidy shutdown of individual files ok = close_files(0, UpdState#state.manifest), lists:foreach(fun({_FN, Pid, _SN}) -> - ok = leveled_sft:sft_close(Pid) end, + ok = leveled_sst:sst_close(Pid) end, UpdState#state.unreferenced_files), leveled_log:log("P0011", []), ok. @@ -608,14 +609,14 @@ start_from_file(PCLopts) -> leveled_log:log("P0014", [MaxSQN]), %% Find any L0 files - L0FN = filepath(RootPath, TopManSQN, new_merge_files) ++ "_0_0.sft", + L0FN = filepath(RootPath, TopManSQN, new_merge_files) ++ "_0_0.sst", case filelib:is_file(L0FN) of true -> leveled_log:log("P0015", [L0FN]), {ok, L0Pid, - {L0StartKey, L0EndKey}} = leveled_sft:sft_open(L0FN), - L0SQN = leveled_sft:sft_getmaxsequencenumber(L0Pid), + {L0StartKey, L0EndKey}} = leveled_sst:sst_open(L0FN), + L0SQN = leveled_sst:sst_getmaxsequencenumber(L0Pid), ManifestEntry = #manifest_entry{start_key=L0StartKey, end_key=L0EndKey, owner=L0Pid, @@ -696,7 +697,7 @@ update_levelzero(L0Size, {PushedTree, MinSQN, MaxSQN}, %% to an immediate return as expected. With 32K keys in the TreeList it could %% take around 35-40ms. %% -%% To avoid blocking this gen_server, the SFT file can request each item of the +%% To avoid blocking this gen_server, the SST file can request each item of the %% cache one at a time. %% %% The Wait is set to false to use a cast when calling this in normal operation @@ -704,25 +705,22 @@ update_levelzero(L0Size, {PushedTree, MinSQN, MaxSQN}, roll_memory(State, false) -> FileName = levelzero_filename(State), - leveled_log:log("P0019", [FileName]), - Opts = #sft_options{wait=false, penciller=self()}, + leveled_log:log("P0019", [FileName, State#state.ledger_sqn]), PCL = self(), FetchFun = fun(Slot) -> pcl_fetchlevelzero(PCL, Slot) end, - % FetchFun = fun(Slot) -> lists:nth(Slot, State#state.levelzero_cache) end, - R = leveled_sft:sft_newfroml0cache(FileName, + R = leveled_sst:sst_newlevelzero(FileName, length(State#state.levelzero_cache), FetchFun, - Opts), + PCL, + State#state.ledger_sqn), {ok, Constructor, _} = R, Constructor; roll_memory(State, true) -> FileName = levelzero_filename(State), - Opts = #sft_options{wait=true}, FetchFun = fun(Slot) -> lists:nth(Slot, State#state.levelzero_cache) end, - R = leveled_sft:sft_newfroml0cache(FileName, - length(State#state.levelzero_cache), - FetchFun, - Opts), + KVList = leveled_pmem:to_list(length(State#state.levelzero_cache), + FetchFun), + R = leveled_sst:sst_new(FileName, 0, KVList, State#state.ledger_sqn), {ok, Constructor, _} = R, Constructor. @@ -753,7 +751,7 @@ fetch_mem(Key, Hash, Manifest, L0Cache, none) -> L0Check = leveled_pmem:check_levelzero(Key, Hash, L0Cache), case L0Check of {false, not_found} -> - fetch(Key, Hash, Manifest, 0, fun timed_sft_get/3); + fetch(Key, Hash, Manifest, 0, fun timed_sst_get/3); {true, KV} -> {KV, 0} end; @@ -762,7 +760,7 @@ fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> true -> fetch_mem(Key, Hash, Manifest, L0Cache, none); false -> - fetch(Key, Hash, Manifest, 0, fun timed_sft_get/3) + fetch(Key, Hash, Manifest, 0, fun timed_sst_get/3) end. fetch(_Key, _Hash, _Manifest, ?MAX_LEVELS + 1, _FetchFun) -> @@ -791,9 +789,9 @@ fetch(Key, Hash, Manifest, Level, FetchFun) -> end end. -timed_sft_get(PID, Key, Hash) -> +timed_sst_get(PID, Key, Hash) -> SW = os:timestamp(), - R = leveled_sft:sft_get(PID, Key, Hash), + R = leveled_sst:sst_get(PID, Key, Hash), T0 = timer:now_diff(os:timestamp(), SW), case {T0, R} of {T, R} when T < ?SLOW_FETCH -> @@ -880,7 +878,7 @@ close_files(?MAX_LEVELS - 1, _Manifest) -> close_files(Level, Manifest) -> LevelList = get_item(Level, Manifest, []), lists:foreach(fun(F) -> - ok = leveled_sft:sft_close(F#manifest_entry.owner) end, + ok = leveled_sst:sst_close(F#manifest_entry.owner) end, LevelList), close_files(Level + 1, Manifest). @@ -897,8 +895,8 @@ open_all_filesinmanifest({Manifest, TopSQN}, Level) -> %5 replace them LvlR = lists:foldl(fun(F, {FL, FL_SQN}) -> FN = F#manifest_entry.filename, - {ok, P, _Keys} = leveled_sft:sft_open(FN), - F_SQN = leveled_sft:sft_getmaxsequencenumber(P), + {ok, P, _Keys} = leveled_sst:sst_open(FN), + F_SQN = leveled_sst:sst_getmaxsequencenumber(P), {lists:append(FL, [F#manifest_entry{owner = P}]), max(FL_SQN, F_SQN)} @@ -932,24 +930,24 @@ initiate_rangequery_frommanifest(StartKey, EndKey, Manifest) -> C2 = leveled_codec:endkey_passed(EndKey, M#manifest_entry.start_key), not (C1 or C2) end, - lists:foldl(fun(L, AccL) -> - Level = get_item(L, Manifest, []), - FL = lists:foldl(fun(M, Acc) -> - case CompareFun(M) of - true -> - Acc ++ [{next_file, M}]; - false -> - Acc - end end, - [], - Level), - case FL of - [] -> AccL; - FL -> AccL ++ [{L, FL}] - end - end, - [], - lists:seq(0, ?MAX_LEVELS - 1)). + FoldFun = + fun(L, AccL) -> + Level = get_item(L, Manifest, []), + FL = lists:foldl(fun(M, Acc) -> + case CompareFun(M) of + true -> + Acc ++ [{next, M, StartKey}]; + false -> + Acc + end end, + [], + Level), + case FL of + [] -> AccL; + FL -> AccL ++ [{L, FL}] + end + end, + lists:foldl(FoldFun, [], lists:seq(0, ?MAX_LEVELS - 1)). %% Looks to find the best choice for the next key across the levels (other %% than in-memory table) @@ -960,22 +958,25 @@ find_nextkey(QueryArray, StartKey, EndKey) -> find_nextkey(QueryArray, 0, {null, null}, - {fun leveled_sft:sft_getkvrange/4, StartKey, EndKey, 1}). + StartKey, + EndKey, + 1). -find_nextkey(_QueryArray, LCnt, {null, null}, _QueryFunT) +find_nextkey(_QueryArray, LCnt, {null, null}, _StartKey, _EndKey, _Width) when LCnt > ?MAX_LEVELS -> % The array has been scanned wihtout finding a best key - must be % exhausted - respond to indicate no more keys to be found by the % iterator no_more_keys; -find_nextkey(QueryArray, LCnt, {BKL, BestKV}, _QueryFunT) +find_nextkey(QueryArray, LCnt, {BKL, BestKV}, _StartKey, _EndKey, _Width) when LCnt > ?MAX_LEVELS -> % All levels have been scanned, so need to remove the best result from % the array, and return that array along with the best key/sqn/status % combination {BKL, [BestKV|Tail]} = lists:keyfind(BKL, 1, QueryArray), {lists:keyreplace(BKL, 1, QueryArray, {BKL, Tail}), BestKV}; -find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, QueryFunT) -> +find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, + StartKey, EndKey, Width) -> % Get the next key at this level {NextKey, RestOfKeys} = case lists:keyfind(LCnt, 1, QueryArray) of false -> @@ -989,39 +990,46 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, QueryFunT) -> case {NextKey, BestKeyLevel, BestKV} of {null, BKL, BKV} -> % There is no key at this level - go to the next level - find_nextkey(QueryArray, LCnt + 1, {BKL, BKV}, QueryFunT); - {{next_file, ManifestEntry}, BKL, BKV} -> + find_nextkey(QueryArray, + LCnt + 1, + {BKL, BKV}, + StartKey, EndKey, Width); + {{next, ManifestEntry, _SK}, BKL, BKV} -> % The first key at this level is pointer to a file - need to query % the file to expand this level out before proceeding Owner = ManifestEntry#manifest_entry.owner, - {QueryFun, StartKey, EndKey, ScanSize} = QueryFunT, - QueryResult = QueryFun(Owner, StartKey, EndKey, ScanSize), - NewEntry = {LCnt, QueryResult ++ RestOfKeys}, + Pointer = {next, Owner, StartKey, EndKey}, + UpdList = leveled_sst:expand_list_by_pointer(Pointer, + RestOfKeys, + Width), + NewEntry = {LCnt, UpdList}, % Need to loop around at this level (LCnt) as we have not yet % examined a real key at this level find_nextkey(lists:keyreplace(LCnt, 1, QueryArray, NewEntry), LCnt, {BKL, BKV}, - QueryFunT); - {{next, SFTpid, NewStartKey}, BKL, BKV} -> + StartKey, EndKey, Width); + {{pointer, SSTPid, Slot, PSK, PEK}, BKL, BKV} -> % The first key at this level is pointer within a file - need to % query the file to expand this level out before proceeding - {QueryFun, _StartKey, EndKey, ScanSize} = QueryFunT, - QueryResult = QueryFun(SFTpid, NewStartKey, EndKey, ScanSize), - NewEntry = {LCnt, QueryResult ++ RestOfKeys}, + Pointer = {pointer, SSTPid, Slot, PSK, PEK}, + UpdList = leveled_sst:expand_list_by_pointer(Pointer, + RestOfKeys, + Width), + NewEntry = {LCnt, UpdList}, % Need to loop around at this level (LCnt) as we have not yet % examined a real key at this level find_nextkey(lists:keyreplace(LCnt, 1, QueryArray, NewEntry), LCnt, {BKL, BKV}, - QueryFunT); + StartKey, EndKey, Width); {{Key, Val}, null, null} -> % No best key set - so can assume that this key is the best key, % and check the lower levels find_nextkey(QueryArray, LCnt + 1, {LCnt, {Key, Val}}, - QueryFunT); + StartKey, EndKey, Width); {{Key, Val}, _BKL, {BestKey, _BestVal}} when Key < BestKey -> % There is a real key and a best key to compare, and the real key % at this level is before the best key, and so is now the new best @@ -1030,7 +1038,7 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, QueryFunT) -> find_nextkey(QueryArray, LCnt + 1, {LCnt, {Key, Val}}, - QueryFunT); + StartKey, EndKey, Width); {{Key, Val}, BKL, {BestKey, BestVal}} when Key == BestKey -> SQN = leveled_codec:strip_to_seqonly({Key, Val}), BestSQN = leveled_codec:strip_to_seqonly({BestKey, BestVal}), @@ -1041,7 +1049,7 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, QueryFunT) -> find_nextkey(lists:keyreplace(LCnt, 1, QueryArray, NewEntry), LCnt + 1, {BKL, {BestKey, BestVal}}, - QueryFunT); + StartKey, EndKey, Width); SQN > BestSQN -> % There is a real key at the front of this level and it has % a higher SQN than the best key, so we should use this as @@ -1056,29 +1064,32 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, QueryFunT) -> {BKL, BestTail}), LCnt + 1, {LCnt, {Key, Val}}, - QueryFunT) + StartKey, EndKey, Width) end; {_, BKL, BKV} -> % This is not the best key - find_nextkey(QueryArray, LCnt + 1, {BKL, BKV}, QueryFunT) + find_nextkey(QueryArray, + LCnt + 1, + {BKL, BKV}, + StartKey, EndKey, Width) end. -keyfolder(IMMiter, SFTiter, StartKey, EndKey, {AccFun, Acc}) -> - keyfolder({IMMiter, SFTiter}, {StartKey, EndKey}, {AccFun, Acc}, -1). +keyfolder(IMMiter, SSTiter, StartKey, EndKey, {AccFun, Acc}) -> + keyfolder({IMMiter, SSTiter}, {StartKey, EndKey}, {AccFun, Acc}, -1). keyfolder(_Iterators, _KeyRange, {_AccFun, Acc}, MaxKeys) when MaxKeys == 0 -> Acc; -keyfolder({[], SFTiter}, KeyRange, {AccFun, Acc}, MaxKeys) -> +keyfolder({[], SSTiter}, KeyRange, {AccFun, Acc}, MaxKeys) -> {StartKey, EndKey} = KeyRange, - case find_nextkey(SFTiter, StartKey, EndKey) of + case find_nextkey(SSTiter, StartKey, EndKey) of no_more_keys -> Acc; - {NxSFTiter, {SFTKey, SFTVal}} -> - Acc1 = AccFun(SFTKey, SFTVal, Acc), - keyfolder({[], NxSFTiter}, KeyRange, {AccFun, Acc1}, MaxKeys - 1) + {NxSSTiter, {SSTKey, SSTVal}} -> + Acc1 = AccFun(SSTKey, SSTVal, Acc), + keyfolder({[], NxSSTiter}, KeyRange, {AccFun, Acc1}, MaxKeys - 1) end; -keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SFTiterator}, KeyRange, +keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, KeyRange, {AccFun, Acc}, MaxKeys) -> {StartKey, EndKey} = KeyRange, case {IMMKey < StartKey, leveled_codec:endkey_passed(EndKey, IMMKey)} of @@ -1087,7 +1098,7 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SFTiterator}, KeyRange, % Normally everything is pre-filterd, but the IMM iterator can % be re-used and so may be behind the StartKey if the StartKey has % advanced from the previous use - keyfolder({NxIMMiterator, SFTiterator}, + keyfolder({NxIMMiterator, SSTiterator}, KeyRange, {AccFun, Acc}, MaxKeys); @@ -1095,44 +1106,44 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SFTiterator}, KeyRange, % There are no more keys in-range in the in-memory % iterator, so take action as if this iterator is empty % (see above) - keyfolder({[], SFTiterator}, + keyfolder({[], SSTiterator}, KeyRange, {AccFun, Acc}, MaxKeys); {false, false} -> - case find_nextkey(SFTiterator, StartKey, EndKey) of + case find_nextkey(SSTiterator, StartKey, EndKey) of no_more_keys -> % No more keys in range in the persisted store, so use the % in-memory KV as the next Acc1 = AccFun(IMMKey, IMMVal, Acc), - keyfolder({NxIMMiterator, SFTiterator}, + keyfolder({NxIMMiterator, SSTiterator}, KeyRange, {AccFun, Acc1}, MaxKeys - 1); - {NxSFTiterator, {SFTKey, SFTVal}} -> + {NxSSTiterator, {SSTKey, SSTVal}} -> % There is a next key, so need to know which is the % next key between the two (and handle two keys % with different sequence numbers). case leveled_codec:key_dominates({IMMKey, IMMVal}, - {SFTKey, - SFTVal}) of + {SSTKey, + SSTVal}) of left_hand_first -> Acc1 = AccFun(IMMKey, IMMVal, Acc), - keyfolder({NxIMMiterator, SFTiterator}, + keyfolder({NxIMMiterator, SSTiterator}, KeyRange, {AccFun, Acc1}, MaxKeys - 1); right_hand_first -> - Acc1 = AccFun(SFTKey, SFTVal, Acc), + Acc1 = AccFun(SSTKey, SSTVal, Acc), keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], - NxSFTiterator}, + NxSSTiterator}, KeyRange, {AccFun, Acc1}, MaxKeys - 1); left_hand_dominant -> Acc1 = AccFun(IMMKey, IMMVal, Acc), - keyfolder({NxIMMiterator, NxSFTiterator}, + keyfolder({NxIMMiterator, NxSSTiterator}, KeyRange, {AccFun, Acc1}, MaxKeys - 1) @@ -1286,6 +1297,27 @@ confirm_delete(Filename, UnreferencedFiles, RegisteredSnapshots) -> -ifdef(TEST). + +generate_randomkeys({Count, StartSQN}) -> + generate_randomkeys(Count, StartSQN, []); +generate_randomkeys(Count) -> + generate_randomkeys(Count, 0, []). + +generate_randomkeys(0, _SQN, Acc) -> + lists:reverse(Acc); +generate_randomkeys(Count, SQN, Acc) -> + K = {o, + lists:concat(["Bucket", random:uniform(1024)]), + lists:concat(["Key", random:uniform(1024)]), + null}, + RandKey = {K, + {SQN, + {active, infinity}, + leveled_codec:magic_hash(K), + null}}, + generate_randomkeys(Count - 1, SQN + 1, [RandKey|Acc]). + + clean_testdir(RootPath) -> clean_subdir(filepath(RootPath, manifest)), clean_subdir(filepath(RootPath, files)). @@ -1332,8 +1364,8 @@ compaction_work_assessment_test() -> ?assertMatch([{1, Manifest3, 1}], WorkQ3). confirm_delete_test() -> - Filename = 'test.sft', - UnreferencedFiles = [{'other.sft', dummy_owner, 15}, + Filename = 'test.sst', + UnreferencedFiles = [{'other.sst', dummy_owner, 15}, {Filename, dummy_owner, 10}], RegisteredIterators1 = [{dummy_pid, 16}, {dummy_pid, 12}], R1 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators1), @@ -1376,20 +1408,20 @@ simple_server_test() -> Key1_Pre = {{o,"Bucket0001", "Key0001", null}, {1, {active, infinity}, null}}, Key1 = add_missing_hash(Key1_Pre), - KL1 = leveled_sft:generate_randomkeys({1000, 2}), + KL1 = generate_randomkeys({1000, 2}), Key2_Pre = {{o,"Bucket0002", "Key0002", null}, {1002, {active, infinity}, null}}, Key2 = add_missing_hash(Key2_Pre), - KL2 = leveled_sft:generate_randomkeys({900, 1003}), + KL2 = generate_randomkeys({900, 1003}), % Keep below the max table size by having 900 not 1000 Key3_Pre = {{o,"Bucket0003", "Key0003", null}, {2003, {active, infinity}, null}}, Key3 = add_missing_hash(Key3_Pre), - KL3 = leveled_sft:generate_randomkeys({1000, 2004}), + KL3 = generate_randomkeys({1000, 2004}), Key4_Pre = {{o,"Bucket0004", "Key0004", null}, {3004, {active, infinity}, null}}, Key4 = add_missing_hash(Key4_Pre), - KL4 = leveled_sft:generate_randomkeys({1000, 3005}), + KL4 = generate_randomkeys({1000, 3005}), ok = maybe_pause_push(PCL, [Key1]), ?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001", null})), ok = maybe_pause_push(PCL, KL1), @@ -1464,7 +1496,7 @@ simple_server_test() -> Key1A_Pre = {{o,"Bucket0001", "Key0001", null}, {4005, {active, infinity}, null}}, Key1A = add_missing_hash(Key1A_Pre), - KL1A = leveled_sft:generate_randomkeys({2000, 4006}), + KL1A = generate_randomkeys({2000, 4006}), ok = maybe_pause_push(PCLr, [Key1A]), ok = maybe_pause_push(PCLr, KL1A), ?assertMatch(true, pcl_checksequencenumber(PclSnap, @@ -1528,17 +1560,16 @@ rangequery_manifest_test() -> end_key={o, "Bucket1", "K996", null}, filename="Z6"}}, Man = [{1, [E1, E2, E3]}, {2, [E4, E5, E6]}], - R1 = initiate_rangequery_frommanifest({o, "Bucket1", "K711", null}, - {o, "Bucket1", "K999", null}, - Man), - ?assertMatch([{1, [{next_file, E3}]}, - {2, [{next_file, E5}, {next_file, E6}]}], + SK1 = {o, "Bucket1", "K711", null}, + EK1 = {o, "Bucket1", "K999", null}, + R1 = initiate_rangequery_frommanifest(SK1, EK1, Man), + ?assertMatch([{1, [{next, E3, SK1}]}, + {2, [{next, E5, SK1}, {next, E6, SK1}]}], R1), - R2 = initiate_rangequery_frommanifest({i, "Bucket1", {"Idx1", "Fld8"}, null}, - {i, "Bucket1", {"Idx1", "Fld8"}, null}, - Man), - ?assertMatch([{1, [{next_file, E1}]}, {2, [{next_file, E5}]}], - R2), + SK2 = {i, "Bucket1", {"Idx1", "Fld8"}, null}, + EK2 = {i, "Bucket1", {"Idx1", "Fld8"}, null}, + R2 = initiate_rangequery_frommanifest(SK2, EK2, Man), + ?assertMatch([{1, [{next, E1, SK2}]}, {2, [{next, E5, SK2}]}], R2), R3 = initiate_rangequery_frommanifest({i, "Bucket1", {"Idx0", "Fld8"}, null}, {i, "Bucket1", {"Idx0", "Fld9"}, null}, Man), @@ -1693,17 +1724,18 @@ foldwithimm_simple_test() -> {{o, "Bucket1", "Key6"}, 7}], AccB). create_file_test() -> - Filename = "../test/new_file.sft", + Filename = "../test/new_file.sst", ok = file:write_file(Filename, term_to_binary("hello")), - KVL = lists:usort(leveled_sft:generate_randomkeys(10000)), + KVL = lists:usort(generate_randomkeys(10000)), Tree = leveled_skiplist:from_list(KVL), FetchFun = fun(Slot) -> lists:nth(Slot, [Tree]) end, {ok, SP, - noreply} = leveled_sft:sft_newfroml0cache(Filename, + noreply} = leveled_sst:sst_newlevelzero(Filename, 1, FetchFun, - #sft_options{wait=false}), + undefined, + 10000), lists:foreach(fun(X) -> case checkready(SP) of timeout -> @@ -1716,9 +1748,9 @@ create_file_test() -> io:format("StartKey ~w EndKey ~w~n", [StartKey, EndKey]), ?assertMatch({o, _, _, _}, StartKey), ?assertMatch({o, _, _, _}, EndKey), - ?assertMatch("../test/new_file.sft", SrcFN), - ok = leveled_sft:sft_clear(SP), - {ok, Bin} = file:read_file("../test/new_file.sft.discarded"), + ?assertMatch("../test/new_file.sst", SrcFN), + ok = leveled_sst:sst_clear(SP), + {ok, Bin} = file:read_file("../test/new_file.sst.discarded"), ?assertMatch("hello", binary_to_term(Bin)). commit_manifest_test() -> @@ -1735,14 +1767,14 @@ commit_manifest_test() -> ok = file:write_file(ManifestFP ++ "nonzero_1.pnd", term_to_binary("dummy data")), - L1_0 = [{1, [#manifest_entry{filename="1.sft"}]}], + L1_0 = [{1, [#manifest_entry{filename="1.sst"}]}], Resp_WI0 = Resp_WI#penciller_work{new_manifest=L1_0, unreferenced_files=[]}, {ok, State0} = commit_manifest_change(Resp_WI0, State), ?assertMatch(1, State0#state.manifest_sqn), ?assertMatch([], get_item(0, State0#state.manifest, [])), - L0Entry = [#manifest_entry{filename="0.sft"}], + L0Entry = [#manifest_entry{filename="0.sst"}], ManifestPlus = [{0, L0Entry}|State0#state.manifest], NxtSent_WI = #penciller_work{next_sqn=2, @@ -1756,7 +1788,7 @@ commit_manifest_test() -> ok = file:write_file(ManifestFP ++ "nonzero_2.pnd", term_to_binary("dummy data")), - L2_0 = [#manifest_entry{filename="2.sft"}], + L2_0 = [#manifest_entry{filename="2.sst"}], NxtResp_WI0 = NxtResp_WI#penciller_work{new_manifest=[{2, L2_0}], unreferenced_files=[]}, {ok, State2} = commit_manifest_change(NxtResp_WI0, State1), @@ -1777,7 +1809,7 @@ badmanifest_test() -> Key1_pre = {{o,"Bucket0001", "Key0001", null}, {1001, {active, infinity}, null}}, Key1 = add_missing_hash(Key1_pre), - KL1 = leveled_sft:generate_randomkeys({1000, 1}), + KL1 = generate_randomkeys({1000, 1}), ok = maybe_pause_push(PCL, KL1 ++ [Key1]), %% Added together, as split apart there will be a race between the close @@ -1798,7 +1830,7 @@ badmanifest_test() -> checkready(Pid) -> try - leveled_sft:sft_checkready(Pid) + leveled_sst:sst_checkready(Pid) catch exit:{timeout, _} -> timeout diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl deleted file mode 100644 index e736a47..0000000 --- a/src/leveled_sft.erl +++ /dev/null @@ -1,2024 +0,0 @@ -%% This module provides functions for managing sft files - a modified version -%% of sst files, to be used in leveleddb. -%% -%% sft files are segment filtered tables in that they are guarded by a quick -%% access filter that checks for the presence of key by segment id, with the -%% segment id being a hash in the range 0 - 1024 * 1024 -%% -%% This filter has a dual purpose -%% - a memory efficient way of discovering non-presence with low false positive -%% rate -%% - to make searching for all keys by hashtree segment more efficient (a -%% specific change to optimise behaviour for use with the incremental refresh) -%% of riak hashtrees -%% -%% All keys are not equal in sft files, keys are only expected in a specific -%% series of formats -%% - {Tag, Bucket, Key, SubKey|null} - Object Keys -%% - {i, Bucket, {IndexName, IndexTerm}, Key} - Postings -%% The {Bucket, Key} part of all types of keys are hashed for segment filters. -%% For Postings the {Bucket, IndexName, IndexTerm} is also hashed. This -%% causes a false positive on lookup of a segment, but allows for the presence -%% of specific index terms to be checked -%% -%% The objects stored are a tuple of {Key, SequenceNumber, State, Value}, where -%% Key - as above -%% SequenceNumber - monotonically increasing counter of addition to the nursery -%% log -%% State - {active|tomb, ExpiryTimestamp | infinity} -%% Value - null (all postings) | [Object Metadata] (all object keys) -%% Keys should be unique in files. If more than two keys are candidate for -%% the same file the highest sequence number should be chosen. If the file -%% is at the basemenet level of a leveleddb database the objects with an -%% ExpiryTimestamp in the past should not be written, but at all other levels -%% keys should not be ignored because of a timestamp in the past. -%% tomb objects are written for deletions, and these tombstones may have an -%% Expirytimestamp which in effect is the time when the tombstone should be -%% reaped. -%% -%% sft files are broken into the following sections: -%% - Header (fixed width 80 bytes - containing pointers and metadata) -%% - Blocks (variable length) -%% - Slot Filter (variable length) -%% - Slot Index (variable length) -%% - Table Summary (variable length) -%% Each section should contain at the footer of the section a 4-byte CRC which -%% is to be checked only on the opening of the file -%% -%% The keys in the sft file are placed into the file in erlang term order. -%% There will normally be 256 slots of keys. The Slot Index is a gb_tree -%% acting as a helper to find the right slot to check when searching for a key -%% or range of keys. -%% The Key in the Slot Index is the Key at the start of the Slot. -%% The Value in the Slot Index is a record indicating: -%% - The starting position of the Slot within the Blocks (relative to the -%% starting position of the Blocks) -%% - The (relative) starting position of the Slot Filter for this Slot -%% - The number of blocks within the Slot -%% - The length of each of the Blocks within the Slot -%% -%% When checking for a Key in the sft file, the key should be hashed to the -%% segment, then the key should be looked-up in the Slot Index. The segment -%% ID can then be checked against the Slot Filter which will either return -%% not_present or [BlockIDs] -%% If a list of BlockIDs (normally of length 1) is returned the block should -%% be fetched using the starting position and length of the Block to find the -%% actual key (or not if the Slot Filter had returned a false positive) -%% -%% There will exist a Slot Filter for each entry in the Slot Index -%% The Slot Filter starts with some fixed length metadata -%% - 1 byte stating the expected number of keys in the block -%% - 1 byte stating the number of complete (i.e. containing the expected -%% number of keys) Blocks in the Slot -%% - 1 byte stating the number of keys in any incomplete Block (there can -%% only be 1 incomplete Block per Slot and it must be the last block) -%% - 3 bytes stating the largest segment ID in the Slot -%% - 1 byte stating the exponent used in the rice-encoding of the filter -%% The Filter itself is a rice-encoded list of Integers representing the -%% differences between the Segment IDs in the Slot with each entry being -%% appended by the minimal number of bits to represent the Block ID in which -%% an entry for that segment can be found. Where a segment exists more than -%% once then a 0 length will be used. -%% To use the filter code should roll over the filter incrementing the Segment -%% ID by each difference, and counting the keys by Block ID. This should -%% return one of: -%% mismatch - the final Segment Count didn't meet the largest Segment ID or -%% the per-block key counts don't add-up. There could have been a bit-flip, -%% so don't rely on the filter -%% no_match - everything added up but the counter never equalled the queried -%% Segment ID -%% {match, [BlockIDs]} - everything added up and the Segment may be -%% represented in the given blocks -%% -%% The makeup of a block -%% - A block is a list of 32 {Key, Value} pairs in Erlang term order -%% - The block is stored using standard compression in term_to_binary -%% May be improved by use of lz4 or schema-based binary_to_term -%% -%% The Table Summary may contain multiple summaries -%% The standard table summary contains: -%% - a count of keys by bucket and type of key (posting or object key) -%% - the total size of objects referred to by object keys -%% - the number of postings by index name -%% - the number of tombstones within the file -%% - the highest and lowest sequence number in the file -%% Summaries could be used for other summaries of table content in the future, -%% perhaps application-specific bloom filters - -%% The 56-byte header is made up of -%% - 1 byte version (major 5 bits, minor 3 bits) - default 0.1 -%% - 1 byte options (currently undefined) -%% - 1 byte Block Size - the expected number of keys in each block -%% - 1 byte Block Count - the expected number of blocks in each slot -%% - 2 byte Slot Count - the maximum number of slots in the file -%% - 6 bytes - spare -%% - 4 bytes - Blocks length -%% - 4 bytes - Slot Index length -%% - 4 bytes - Slot Filter length -%% - 4 bytes - Table summary length -%% - 24 bytes - spare -%% - 4 bytes - CRC32 -%% -%% The file body is written in the same order of events as the header (i.e. -%% Blocks first) -%% -%% Once open the file can be in the following states -%% - writing, the file is still being created -%% - available, the file may be read, but never again must be modified -%% - pending_deletion, the file can be closed and deleted once all outstanding -%% Snapshots have been started beyond a certain sequence number -%% -%% Level managers should only be aware of files in the available state. -%% Iterators may be aware of files in either available or pending_delete. -%% Level maintainers should control the file exclusively when in the writing -%% state, and send the event to trigger pending_delete with the a sequence -%% number equal to or higher than the number at the point it was no longer -%% active at any level. -%% -%% The format of the file is intended to support quick lookups, whilst -%% allowing for a new file to be written incrementally (so that all keys and -%% values need not be retained in memory) - perhaps n blocks at a time - - --module(leveled_sft). - --behaviour(gen_fsm). --include("include/leveled.hrl"). - --export([init/1, - handle_sync_event/4, - handle_event/3, - handle_info/3, - terminate/3, - code_change/4, - starting/2, - starting/3, - reader/3, - delete_pending/3, - delete_pending/2]). - --export([sft_new/4, - sft_newfroml0cache/4, - sft_open/1, - sft_get/2, - sft_get/3, - sft_getkvrange/4, - sft_close/1, - sft_clear/1, - sft_checkready/1, - sft_setfordelete/2, - sft_deleteconfirmed/1, - sft_getmaxsequencenumber/1]). - --export([generate_randomkeys/1]). - --include_lib("eunit/include/eunit.hrl"). - - --define(WORD_SIZE, 4). --define(DWORD_SIZE, 8). --define(CURRENT_VERSION, {0,1}). --define(SLOT_COUNT, 256). --define(SLOT_GROUPWRITE_COUNT, 16). --define(BLOCK_SIZE, 32). --define(BLOCK_COUNT, 4). --define(FOOTERPOS_HEADERPOS, 2). --define(MAX_SEG_HASH, 1048576). --define(DIVISOR_BITS, 13). --define(DIVISOR, 8092). --define(COMPRESSION_LEVEL, 1). --define(HEADER_LEN, 56). --define(ITERATOR_SCANWIDTH, 1). --define(MERGE_SCANWIDTH, 32). --define(BLOOM_WIDTH, 48). --define(DELETE_TIMEOUT, 10000). --define(MAX_KEYS, ?SLOT_COUNT * ?BLOCK_COUNT * ?BLOCK_SIZE). --define(DISCARD_EXT, ".discarded"). --define(WRITE_OPS, [binary, raw, read, write, delayed_write]). --define(READ_OPS, [binary, raw, read]). - --record(state, {version = ?CURRENT_VERSION :: tuple(), - slot_index :: list(), - next_position :: integer(), - smallest_sqn :: integer(), - highest_sqn :: integer(), - smallest_key :: string(), - highest_key :: string(), - slots_pointer :: integer(), - index_pointer :: integer(), - filter_pointer :: integer(), - summ_pointer :: integer(), - summ_length :: integer(), - filename = "not set" :: string(), - handle :: file:fd(), - background_complete = false :: boolean(), - oversized_file = false :: boolean(), - penciller :: pid(), - bloom}). - -%% Helper object when writing a file to keep track of various accumulators --record(writer, {slot_index = [] :: list(), - slot_binary = <<>> :: binary(), - bloom = leveled_tinybloom:empty(?BLOOM_WIDTH), - min_sqn = infinity :: integer()|infinity, - max_sqn = 0 :: integer(), - last_key = {last, null}}). - -%%%============================================================================ -%%% API -%%%============================================================================ - - -sft_new(Filename, KL1, KL2, LevelInfo) -> - LevelR = case is_integer(LevelInfo) of - true -> - #level{level=LevelInfo}; - _ -> - if - is_record(LevelInfo, level) -> - LevelInfo - end - end, - {ok, Pid} = gen_fsm:start(?MODULE, [], []), - Reply = gen_fsm:sync_send_event(Pid, - {sft_new, Filename, KL1, KL2, LevelR}, - infinity), - {ok, Pid, Reply}. - -sft_newfroml0cache(Filename, Slots, FetchFun, Options) -> - {ok, Pid} = gen_fsm:start(?MODULE, [], []), - case Options#sft_options.wait of - true -> - KL1 = leveled_pmem:to_list(Slots, FetchFun), - Reply = gen_fsm:sync_send_event(Pid, - {sft_new, - Filename, - KL1, - [], - #level{level=0}}, - infinity), - {ok, Pid, Reply}; - false -> - gen_fsm:send_event(Pid, - {sft_newfroml0cache, - Filename, - Slots, - FetchFun, - Options#sft_options.penciller}), - {ok, Pid, noreply} - end. - -sft_open(Filename) -> - {ok, Pid} = gen_fsm:start(?MODULE, [], []), - case gen_fsm:sync_send_event(Pid, {sft_open, Filename}, infinity) of - {ok, {SK, EK}} -> - {ok, Pid, {SK, EK}} - end. - -sft_setfordelete(Pid, Penciller) -> - gen_fsm:sync_send_event(Pid, {set_for_delete, Penciller}, infinity). - -sft_get(Pid, Key, Hash) -> - gen_fsm:sync_send_event(Pid, {get_kv, Key, Hash}, infinity). - -sft_get(Pid, Key) -> - sft_get(Pid, Key, leveled_codec:magic_hash(Key)). - -sft_getkvrange(Pid, StartKey, EndKey, ScanWidth) -> - gen_fsm:sync_send_event(Pid, - {get_kvrange, StartKey, EndKey, ScanWidth}, - infinity). - -sft_clear(Pid) -> - gen_fsm:sync_send_event(Pid, {set_for_delete, false}, infinity), - gen_fsm:sync_send_event(Pid, close, 1000). - -sft_close(Pid) -> - gen_fsm:sync_send_event(Pid, close, 1000). - -sft_deleteconfirmed(Pid) -> - gen_fsm:send_event(Pid, close). - -sft_checkready(Pid) -> - gen_fsm:sync_send_event(Pid, background_complete, 20). - -sft_getmaxsequencenumber(Pid) -> - gen_fsm:sync_send_event(Pid, get_maxsqn, infinity). - - - -%%%============================================================================ -%%% gen_server callbacks -%%%============================================================================ - -init([]) -> - {ok, starting, #state{}}. - -starting({sft_new, Filename, KL1, [], _LevelR=#level{level=L}}, _From, _State) - when L == 0 -> - {ok, State} = create_levelzero(KL1, Filename), - {reply, - {{[], []}, State#state.smallest_key, State#state.highest_key}, - reader, - State}; -starting({sft_new, Filename, KL1, KL2, LevelR}, _From, _State) -> - case create_file(Filename) of - {Handle, FileMD} -> - {ReadHandle, UpdFileMD, KeyRemainders} = complete_file(Handle, - FileMD, - KL1, KL2, - LevelR), - {reply, - {KeyRemainders, - UpdFileMD#state.smallest_key, - UpdFileMD#state.highest_key}, - reader, - UpdFileMD#state{handle=ReadHandle, filename=Filename}} - end; -starting({sft_open, Filename}, _From, _State) -> - {_Handle, FileMD} = open_file(#state{filename=Filename}), - leveled_log:log("SFT01", [Filename]), - {reply, - {ok, {FileMD#state.smallest_key, FileMD#state.highest_key}}, - reader, - FileMD}. - -starting({sft_newfroml0cache, Filename, Slots, FetchFun, PCL}, _State) -> - SW = os:timestamp(), - Inp1 = leveled_pmem:to_list(Slots, FetchFun), - {ok, State} = create_levelzero(Inp1, Filename), - leveled_log:log_timer("SFT03", [Filename], SW), - case PCL of - undefined -> - {next_state, reader, State}; - _ -> - leveled_penciller:pcl_confirml0complete(PCL, - State#state.filename, - State#state.smallest_key, - State#state.highest_key), - {next_state, reader, State} - end. - - -reader({get_kv, Key, Hash}, _From, State) -> - Reply = - case leveled_tinybloom:check({hash, Hash}, State#state.bloom) of - false -> - not_present; - true -> - fetch_keyvalue(State#state.handle, State, Key) - end, - {reply, Reply, reader, State}; -reader({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) -> - Reply = pointer_append_queryresults(fetch_range_kv(State#state.handle, - State, - StartKey, - EndKey, - ScanWidth), - self()), - {reply, Reply, reader, State}; -reader(get_maxsqn, _From, State) -> - {reply, State#state.highest_sqn, reader, State}; -reader({set_for_delete, Penciller}, _From, State) -> - leveled_log:log("SFT02", [State#state.filename]), - {reply, - ok, - delete_pending, - State#state{penciller=Penciller}, - ?DELETE_TIMEOUT}; -reader(background_complete, _From, State) -> - if - State#state.background_complete == true -> - {reply, - {ok, - State#state.filename, - State#state.smallest_key, - State#state.highest_key}, - reader, - State} - end; -reader(close, _From, State) -> - ok = file:close(State#state.handle), - {stop, normal, ok, State}. - -delete_pending({get_kv, Key, Hash}, _From, State) -> - Reply = - case leveled_tinybloom:check({hash, Hash}, State#state.bloom) of - false -> - not_present; - true -> - fetch_keyvalue(State#state.handle, State, Key) - end, - {reply, Reply, delete_pending, State, ?DELETE_TIMEOUT}; -delete_pending({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) -> - Reply = pointer_append_queryresults(fetch_range_kv(State#state.handle, - State, - StartKey, - EndKey, - ScanWidth), - self()), - {reply, Reply, delete_pending, State, ?DELETE_TIMEOUT}; -delete_pending(close, _From, State) -> - leveled_log:log("SFT06", [State#state.filename]), - ok = file:close(State#state.handle), - ok = file:delete(State#state.filename), - {stop, normal, ok, State}. - -delete_pending(timeout, State) -> - leveled_log:log("SFT05", [timeout, State#state.filename]), - ok = leveled_penciller:pcl_confirmdelete(State#state.penciller, - State#state.filename), - {next_state, delete_pending, State, ?DELETE_TIMEOUT}; -delete_pending(close, State) -> - leveled_log:log("SFT06", [State#state.filename]), - ok = file:close(State#state.handle), - ok = file:delete(State#state.filename), - {stop, normal, State}. - -handle_sync_event(_Msg, _From, StateName, State) -> - {reply, undefined, StateName, State}. - -handle_event(_Msg, StateName, State) -> - {next_state, StateName, State}. - -handle_info(_Msg, StateName, State) -> - {next_state, StateName, State}. - -terminate(Reason, _StateName, State) -> - leveled_log:log("SFT05", [Reason, State#state.filename]). - -code_change(_OldVsn, StateName, State, _Extra) -> - {ok, StateName, State}. - - - -%%%============================================================================ -%%% Internal functions -%%%============================================================================ - - -create_levelzero(ListForFile, Filename) -> - {TmpFilename, PrmFilename} = generate_filenames(Filename), - {Handle, FileMD} = create_file(TmpFilename), - InputSize = length(ListForFile), - leveled_log:log("SFT07", [InputSize]), - Rename = {true, TmpFilename, PrmFilename}, - {ReadHandle, - UpdFileMD, - {[], []}} = complete_file(Handle, FileMD, - ListForFile, [], - #level{level=0}, Rename), - {ok, - UpdFileMD#state{handle=ReadHandle, - filename=PrmFilename, - background_complete=true, - oversized_file=InputSize>?MAX_KEYS}}. - - -generate_filenames(RootFilename) -> - Ext = filename:extension(RootFilename), - Components = filename:split(RootFilename), - case Ext of - [] -> - {filename:join(Components) ++ ".pnd", - filename:join(Components) ++ ".sft"}; - Ext -> - %% This seems unnecessarily hard - DN = filename:dirname(RootFilename), - FP = lists:last(Components), - FP_NOEXT = lists:sublist(FP, 1, 1 + length(FP) - length(Ext)), - {DN ++ "/" ++ FP_NOEXT ++ "pnd", DN ++ "/" ++ FP_NOEXT ++ "sft"} - end. - - -%% Start a bare file with an initial header and no further details -%% Return the {Handle, metadata record} -create_file(FileName) when is_list(FileName) -> - leveled_log:log("SFT01", [FileName]), - ok = filelib:ensure_dir(FileName), - {ok, Handle} = file:open(FileName, ?WRITE_OPS), - Header = create_header(initial), - {ok, _} = file:position(Handle, bof), - ok = file:write(Handle, Header), - {ok, StartPos} = file:position(Handle, cur), - FileMD = #state{next_position=StartPos, filename=FileName}, - {Handle, FileMD}. - - -create_header(initial) -> - {Major, Minor} = ?CURRENT_VERSION, - Version = <>, - %% Not thought of any options - options are ignored - Options = <<0:8>>, - %% Settings are currently ignored - {BlSize, BlCount, SlCount} = {?BLOCK_COUNT, ?BLOCK_SIZE, ?SLOT_COUNT}, - Settings = <>, - {SpareO, SpareL} = {<<0:48>>, <<0:192>>}, - Lengths = <<0:32, 0:32, 0:32, 0:32>>, - H1 = <>, - CRC32 = erlang:crc32(H1), - <

>. - -%% Open a file returning a handle and metadata which can be used in fetch and -%% iterator requests -%% The handle should be read-only as these are immutable files, a file cannot -%% be opened for writing keys, it can only be created to write keys - -open_file(FileMD) -> - Filename = FileMD#state.filename, - {ok, Handle} = file:open(Filename, [binary, raw, read]), - {ok, HeaderLengths} = file:pread(Handle, 12, 16), - <> = HeaderLengths, - {ok, <>} = - file:pread(Handle, ?HEADER_LEN + Blen + Ilen + Flen, Slen), - {{LowSQN, HighSQN}, {LowKey, HighKey}, Bloom} = - case erlang:crc32(SummaryBin) of - SummaryCRC -> - binary_to_term(SummaryBin) - end, - {ok, SlotIndexBin} = file:pread(Handle, ?HEADER_LEN + Blen, Ilen), - SlotIndex = binary_to_term(SlotIndexBin), - {Handle, FileMD#state{slot_index=SlotIndex, - smallest_sqn=LowSQN, - highest_sqn=HighSQN, - smallest_key=LowKey, - highest_key=HighKey, - slots_pointer=?HEADER_LEN, - index_pointer=?HEADER_LEN + Blen, - filter_pointer=?HEADER_LEN + Blen + Ilen, - summ_pointer=?HEADER_LEN + Blen + Ilen + Flen, - summ_length=Slen, - handle=Handle, - bloom=Bloom}}. - -%% Take a file handle with a previously created header and complete it based on -%% the two key lists KL1 and KL2 -complete_file(Handle, FileMD, KL1, KL2, LevelR) -> - complete_file(Handle, FileMD, KL1, KL2, LevelR, false). - -complete_file(Handle, FileMD, KL1, KL2, LevelR, Rename) -> - {ok, KeyRemainders} = write_keys(Handle, - maybe_expand_pointer(KL1), - maybe_expand_pointer(KL2), - LevelR, - fun sftwrite_function/2, - #writer{}), - {ReadHandle, UpdFileMD} = case Rename of - false -> - open_file(FileMD); - {true, OldName, NewName} -> - ok = rename_file(OldName, NewName), - open_file(FileMD#state{filename=NewName}) - end, - {ReadHandle, UpdFileMD, KeyRemainders}. - -rename_file(OldName, NewName) -> - leveled_log:log("SFT08", [OldName, NewName]), - case filelib:is_file(NewName) of - true -> - leveled_log:log("SFT09", [NewName]), - AltName = filename:join(filename:dirname(NewName), - filename:basename(NewName)) - ++ ?DISCARD_EXT, - leveled_log:log("SFT10", [NewName, AltName]), - ok = file:rename(NewName, AltName); - false -> - ok - end, - file:rename(OldName, NewName). - - -%% Fetch a Key and Value from a file, returns -%% {value, KV} or not_present -%% The key must be pre-checked to ensure it is in the valid range for the file -%% A key out of range may fail - -fetch_keyvalue(Handle, FileMD, Key) -> - case get_nearestkey(FileMD#state.slot_index, Key) of - not_found -> - not_present; - {_NearestKey, {FilterLen, PointerF}, {LengthList, PointerB}} -> - FilterPointer = PointerF + FileMD#state.filter_pointer, - {ok, SegFilter} = file:pread(Handle, - FilterPointer, - FilterLen), - SegID = hash_for_segmentid({keyonly, Key}), - case check_for_segments(SegFilter, [SegID], true) of - {maybe_present, BlockList} -> - BlockPointer = PointerB + FileMD#state.slots_pointer, - fetch_keyvalue_fromblock(BlockList, - Key, - LengthList, - Handle, - BlockPointer); - not_present -> - not_present; - error_so_maybe_present -> - BlockPointer = PointerB + FileMD#state.slots_pointer, - fetch_keyvalue_fromblock(lists:seq(0,length(LengthList)), - Key, - LengthList, - Handle, - BlockPointer) - end - end. - -%% Fetches a range of keys returning a list of {Key, SeqN} tuples -fetch_range_keysonly(Handle, FileMD, StartKey, EndKey) -> - fetch_range(Handle, FileMD, StartKey, EndKey, fun acc_list_keysonly/2). - -fetch_range_keysonly(Handle, FileMD, StartKey, EndKey, ScanWidth) -> - fetch_range(Handle, FileMD, StartKey, EndKey, fun acc_list_keysonly/2, - ScanWidth). - -%% Fetches a range of keys returning the full tuple, including value -fetch_range_kv(Handle, FileMD, StartKey, EndKey, ScanWidth) -> - fetch_range(Handle, FileMD, StartKey, EndKey, fun acc_list_kv/2, - ScanWidth). - -acc_list_keysonly(null, empty) -> - []; -acc_list_keysonly(null, RList) -> - RList; -acc_list_keysonly(R, RList) when is_list(R) -> - lists:foldl(fun acc_list_keysonly/2, RList, R); -acc_list_keysonly(R, RList) -> - lists:append(RList, [leveled_codec:strip_to_keyseqstatusonly(R)]). - -acc_list_kv(null, empty) -> - []; -acc_list_kv(null, RList) -> - RList; -acc_list_kv(R, RList) when is_list(R) -> - RList ++ R; -acc_list_kv(R, RList) -> - lists:append(RList, [R]). - -%% Iterate keys, returning a batch of keys & values in a range -%% - the iterator can have a ScanWidth which is how many slots should be -%% scanned by the iterator before returning a result -%% - batches can be ended with a pointer to indicate there are potentially -%% further values in the range -%% - a list of functions can be provided, which should either return true -%% or false, and these can be used to filter the results from the query, -%% for example to ignore keys above a certain sequence number, to ignore -%% keys not matching a certain regular expression, or to ignore keys not -%% a member of a particular partition -%% - An Accumulator and an Accumulator function can be passed. The function -%% needs to handle being passed (KV, Acc) to add the current result to the -%% Accumulator. The functional should handle KV=null, Acc=empty to initiate -%% the accumulator, and KV=null to leave the Accumulator unchanged. -%% Flexibility with accumulators is such that keys-only can be returned rather -%% than keys and values, or other entirely different accumulators can be -%% used - e.g. counters, hash-lists to build bloom filters etc - -fetch_range(Handle, FileMD, StartKey, EndKey, AccFun) -> - fetch_range(Handle, FileMD, StartKey, EndKey, AccFun, ?ITERATOR_SCANWIDTH). - -fetch_range(Handle, FileMD, StartKey, EndKey, AccFun, ScanWidth) -> - fetch_range(Handle, FileMD, StartKey, EndKey, AccFun, ScanWidth, empty). - -fetch_range(_Handle, _FileMD, StartKey, _EndKey, _AccFun, 0, Acc) -> - {partial, Acc, StartKey}; -fetch_range(Handle, FileMD, StartKey, EndKey, AccFun, ScanWidth, Acc) -> - %% get_nearestkey gets the last key in the index <= StartKey, or the next - %% key along if {next, StartKey} is passed - case get_nearestkey(FileMD#state.slot_index, StartKey) of - {NearestKey, _Filter, {LengthList, PointerB}} -> - fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, - AccFun, ScanWidth, - LengthList, - 0, - PointerB + FileMD#state.slots_pointer, - AccFun(null, Acc)); - not_found -> - {complete, AccFun(null, Acc)} - end. - -fetch_range(Handle, FileMD, _StartKey, NearestKey, EndKey, - AccFun, ScanWidth, - LengthList, - BlockNumber, - _Pointer, - Acc) - when length(LengthList) == BlockNumber -> - %% Reached the end of the slot. Move the start key on one to scan a new slot - fetch_range(Handle, FileMD, {next, NearestKey}, EndKey, - AccFun, ScanWidth - 1, - Acc); -fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, - AccFun, ScanWidth, - LengthList, - BlockNumber, - Pointer, - Acc) -> - Block = fetch_block(Handle, LengthList, BlockNumber, Pointer), - Results = - case maybe_scan_entire_block(Block, StartKey, EndKey) of - true -> - {partial, AccFun(Block, Acc), StartKey}; - false -> - scan_block(Block, StartKey, EndKey, AccFun, Acc) - end, - case Results of - {partial, Acc1, StartKey} -> - %% Move on to the next block - fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, - AccFun, ScanWidth, - LengthList, - BlockNumber + 1, - Pointer, - Acc1); - {complete, Acc1} -> - {complete, Acc1} - end. - -scan_block([], StartKey, _EndKey, _AccFun, Acc) -> - {partial, Acc, StartKey}; -scan_block([HeadKV|T], StartKey, EndKey, AccFun, Acc) -> - K = leveled_codec:strip_to_keyonly(HeadKV), - case {StartKey > K, leveled_codec:endkey_passed(EndKey, K)} of - {true, _} when StartKey /= all -> - scan_block(T, StartKey, EndKey, AccFun, Acc); - {_, true} when EndKey /= all -> - {complete, Acc}; - _ -> - scan_block(T, StartKey, EndKey, AccFun, AccFun(HeadKV, Acc)) - end. - - -maybe_scan_entire_block([], _, _) -> - true; -maybe_scan_entire_block(_Block, all, all) -> - true; -maybe_scan_entire_block(Block, StartKey, all) -> - [FirstKey|_Tail] = Block, - leveled_codec:strip_to_keyonly(FirstKey) > StartKey; -maybe_scan_entire_block(Block, StartKey, EndKey) -> - [FirstKey|_Tail] = Block, - LastKey = leveled_codec:strip_to_keyonly(lists:last(Block)), - FromStart = leveled_codec:strip_to_keyonly(FirstKey) > StartKey, - ToEnd = leveled_codec:endkey_passed(EndKey, LastKey), - case {FromStart, ToEnd} of - {true, false} -> - true; - _ -> - false - end. - -fetch_keyvalue_fromblock([], _Key, _LengthList, _Handle, _StartOfSlot) -> - not_present; -fetch_keyvalue_fromblock([BlockNmb|T], Key, LengthList, Handle, StartOfSlot) -> - BlockToCheck = fetch_block(Handle, LengthList, BlockNmb, StartOfSlot), - Result = lists:keyfind(Key, 1, BlockToCheck), - case Result of - false -> - fetch_keyvalue_fromblock(T, Key, LengthList, Handle, StartOfSlot); - KV -> - KV - end. - -fetch_block(Handle, LengthList, BlockNmb, StartOfSlot) -> - Start = lists:sum(lists:sublist(LengthList, BlockNmb)), - Length = lists:nth(BlockNmb + 1, LengthList), - {ok, BlockToCheckBin} = file:pread(Handle, Start + StartOfSlot, Length), - binary_to_term(BlockToCheckBin). - -%% Need to deal with either Key or {next, Key} -get_nearestkey([H|_Tail], all) -> - H; -get_nearestkey(KVList, Key) -> - case Key of - {next, K} -> - get_nextkeyaftermatch(KVList, K, not_found); - _ -> - get_firstkeytomatch(KVList, Key, not_found) - end. - -get_firstkeytomatch([], _KeyToFind, PrevV) -> - PrevV; -get_firstkeytomatch([{K, FilterInfo, SlotInfo}|_T], KeyToFind, PrevV) - when K > KeyToFind -> - case PrevV of - not_found -> - {K, FilterInfo, SlotInfo}; - _ -> - PrevV - end; -get_firstkeytomatch([{K, FilterInfo, SlotInfo}|T], KeyToFind, _PrevV) -> - get_firstkeytomatch(T, KeyToFind, {K, FilterInfo, SlotInfo}). - -get_nextkeyaftermatch([], _KeyToFind, _PrevV) -> - not_found; -get_nextkeyaftermatch([{K, FilterInfo, SlotInfo}|T], KeyToFind, PrevV) - when K >= KeyToFind -> - case PrevV of - not_found -> - get_nextkeyaftermatch(T, KeyToFind, next); - next -> - {K, FilterInfo, SlotInfo} - end; -get_nextkeyaftermatch([_KTuple|T], KeyToFind, PrevV) -> - get_nextkeyaftermatch(T, KeyToFind, PrevV). - - -%% Take a file handle at the sart position (after creating the header) and then -%% write the Key lists to the file slot by slot. -%% -%% Slots are created then written in bulk to impove I/O efficiency. Slots will -%% be written in groups - -write_keys(Handle, KL1, KL2, LevelR, WriteFun, WriteState) -> - write_keys(Handle, KL1, KL2, LevelR, WriteFun, WriteState, {0, 0, []}). - -write_keys(Handle, KL1, KL2, LevelR, WriteFun, WState, - {SlotC, SlotT, SlotLists}) - when SlotC =:= ?SLOT_GROUPWRITE_COUNT -> - WState0 = lists:foldl(fun finalise_slot/2, WState, SlotLists), - Handle0 = WriteFun(slots, {Handle, WState0#writer.slot_binary}), - case maxslots_bylevel(SlotT, LevelR#level.level) of - reached -> - {complete_keywrite(Handle0, WState0, WriteFun), {KL1, KL2}}; - continue -> - write_keys(Handle0, KL1, KL2, LevelR, WriteFun, - WState0#writer{slot_binary = <<>>}, {0, SlotT, []}) - end; -write_keys(Handle, KL1, KL2, LevelR, WriteFun, WState, - {SlotC, SlotT, SlotLists}) -> - {Status, BlockKeyLists} = create_slot(KL1, KL2, LevelR), - case Status of - S when S == complete; S == partial -> - WState0 = - case BlockKeyLists of - [[]] -> - WState; - _ -> - lists:foldl(fun finalise_slot/2, - WState, - SlotLists ++ [BlockKeyLists]) - end, - Handle0 = WriteFun(slots, {Handle, WState0#writer.slot_binary}), - {complete_keywrite(Handle0, WState0, WriteFun), {[], []}}; - {full, KL1Rem, KL2Rem} -> - write_keys(Handle, KL1Rem, KL2Rem, LevelR, WriteFun, WState, - {SlotC + 1, SlotT + 1, SlotLists ++ [BlockKeyLists]}) - end. - - -complete_keywrite(Handle, WriteState, WriteFun) -> - FirstKey = - case length(WriteState#writer.slot_index) of - 0 -> - null; - _ -> - element(1, lists:nth(1, WriteState#writer.slot_index)) - end, - ConvSlotIndex = convert_slotindex(WriteState#writer.slot_index), - WriteFun(finalise, {Handle, - ConvSlotIndex, - {{WriteState#writer.min_sqn, WriteState#writer.max_sqn}, - {FirstKey, WriteState#writer.last_key}, - WriteState#writer.bloom}}). - -%% Take a slot index, and remove the SegFilters replacing with pointers -%% Return a tuple of the accumulated slot filters, and a pointer-based -%% slot-index - -convert_slotindex(SlotIndex) -> - SlotFun = fun({LowKey, SegFilter, LengthList}, - {FilterAcc, SlotIndexAcc, PointerF, PointerB}) -> - FilterOut = serialise_segment_filter(SegFilter), - FilterLen = byte_size(FilterOut), - {<>, - lists:append(SlotIndexAcc, [{LowKey, - {FilterLen, PointerF}, - {LengthList, PointerB}}]), - PointerF + FilterLen, - PointerB + lists:sum(LengthList)} end, - {SlotFilters, PointerIndex, _FLength, _BLength} = - lists:foldl(SlotFun, {<<>>, [], 0, 0}, SlotIndex), - {SlotFilters, PointerIndex}. - -sftwrite_function(slots, {Handle, SerialisedSlots}) -> - ok = file:write(Handle, SerialisedSlots), - Handle; -sftwrite_function(finalise, - {Handle, - {SlotFilters, PointerIndex}, - {SNExtremes, KeyExtremes, Bloom}}) -> - {ok, Position} = file:position(Handle, cur), - - BlocksLength = Position - ?HEADER_LEN, - Index = term_to_binary(PointerIndex), - IndexLength = byte_size(Index), - FilterLength = byte_size(SlotFilters), - Summary = term_to_binary({SNExtremes, KeyExtremes, Bloom}), - SummaryCRC = erlang:crc32(Summary), - SummaryLength = byte_size(Summary) + 4, - %% Write Index, Filter and Summary - ok = file:write(Handle, <>), - %% Write Lengths into header - ok = file:pwrite(Handle, 12, <>), - {ok, _Position} = file:position(Handle, bof), - ok = file:advise(Handle, - BlocksLength + IndexLength, - FilterLength, - will_need), - file:close(Handle). - -%% Level 0 files are of variable (infinite) size to avoid issues with having -%% any remainders when flushing from memory -maxslots_bylevel(_SlotTotal, 0) -> - continue; -maxslots_bylevel(SlotTotal, _Level) -> - case SlotTotal of - ?SLOT_COUNT -> - reached; - X when X < ?SLOT_COUNT -> - continue - end. - - - -%% Take two potentially overlapping lists of keys and produce a block size -%% list of keys in the correct order. Outputs: -%% - Status of -%% - - all_complete (no more keys and block is complete) -%% - - partial (no more keys and block is not complete) -%% - - {block_full, Rem1, Rem2} the block is complete but there is a remainder -%% of keys - -create_block(KeyList1, KeyList2, LevelR) -> - create_block(KeyList1, KeyList2, LevelR, []). - - -create_block([], [], _LevelR, BlockKeyList) - when length(BlockKeyList)==?BLOCK_SIZE -> - {all_complete, lists:reverse(BlockKeyList)}; -create_block([], [], _LevelR, BlockKeyList) -> - {partial, lists:reverse(BlockKeyList)}; -create_block(KeyList1, KeyList2, _LevelR, BlockKeyList) - when length(BlockKeyList)==?BLOCK_SIZE -> - {{block_full, KeyList1, KeyList2}, lists:reverse(BlockKeyList)}; -create_block(KeyList1, KeyList2, LevelR, BlockKeyList) -> - case key_dominates(KeyList1, KeyList2, - {LevelR#level.is_basement, LevelR#level.timestamp}) of - {{next_key, TopKey}, Rem1, Rem2} -> - create_block(Rem1, Rem2, LevelR, [TopKey|BlockKeyList]); - {skipped_key, Rem1, Rem2} -> - create_block(Rem1, Rem2, LevelR, BlockKeyList) - end. - -%% create_slot should simply output a list of BlockKeyLists no bigger than -%% the BlockCount, the the status (with key remianders if not complete) - -create_slot(KL1, KL2, LevelR) -> - create_slot(KL1, KL2, LevelR, ?BLOCK_COUNT, []). - -create_slot(KL1, KL2, LevelR, BlockCount, BlockKeyLists) -> - {Status, KeyList} = create_block(KL1, KL2, LevelR), - case {Status, BlockCount - 1} of - {partial, _N} -> - {partial, BlockKeyLists ++ [KeyList]}; - {all_complete, 0} -> - {complete, BlockKeyLists ++ [KeyList]}; - {all_complete, _N} -> - % From the perspective of the slot it is partially complete - {partial, BlockKeyLists ++ [KeyList]}; - {{block_full, KL1Rem, KL2Rem}, 0} -> - {{full, KL1Rem, KL2Rem}, BlockKeyLists ++ [KeyList]}; - {{block_full, KL1Rem, KL2Rem}, N} -> - create_slot(KL1Rem, KL2Rem, LevelR, N, BlockKeyLists ++ [KeyList]) - end. - - - -%% Fold over the List of BlockKeys updating the writer record -finalise_slot(BlockKeyLists, WriteState) -> - BlockFolder = - fun(KV, {AccMinSQN, AccMaxSQN, Bloom, SegmentIDList}) -> - {SQN, Hash} = leveled_codec:strip_to_seqnhashonly(KV), - {min(AccMinSQN, SQN), - max(AccMaxSQN, SQN), - leveled_tinybloom:enter({hash, Hash}, Bloom), - [hash_for_segmentid(KV)|SegmentIDList]} - end, - SlotFolder = - fun(BlockKeyList, - {MinSQN, MaxSQN, Bloom, SegLists, KVBinary, Lengths}) -> - {BlockMinSQN, BlockMaxSQN, UpdBloom, Segs} = - lists:foldr(BlockFolder, - {infinity, 0, Bloom, []}, - BlockKeyList), - SerialisedBlock = serialise_block(BlockKeyList), - {min(MinSQN, BlockMinSQN), - max(MaxSQN, BlockMaxSQN), - UpdBloom, - SegLists ++ [Segs], - <>, - Lengths ++ [byte_size(SerialisedBlock)]} - end, - - {SlotMinSQN, - SlotMaxSQN, - SlotUpdBloom, - SlotSegLists, - SlotBinary, - BlockLengths} = - lists:foldl(SlotFolder, - {WriteState#writer.min_sqn, - WriteState#writer.max_sqn, - WriteState#writer.bloom, - [], - WriteState#writer.slot_binary, - []}, - BlockKeyLists), - - FirstSlotKey = leveled_codec:strip_to_keyonly(lists:nth(1, - lists:nth(1, - BlockKeyLists))), - LastSlotKV = lists:last(lists:last(BlockKeyLists)), - SegFilter = generate_segment_filter(SlotSegLists), - UpdSlotIndex = lists:append(WriteState#writer.slot_index, - [{FirstSlotKey, SegFilter, BlockLengths}]), - - #writer{slot_index = UpdSlotIndex, - slot_binary = SlotBinary, - bloom = SlotUpdBloom, - min_sqn = SlotMinSQN, - max_sqn = SlotMaxSQN, - last_key = leveled_codec:strip_to_keyonly(LastSlotKV)}. - - -serialise_block(BlockKeyList) -> - term_to_binary(BlockKeyList, [{compressed, ?COMPRESSION_LEVEL}]). - - -%% Compare the keys at the head of the list, and either skip that "best" key or -%% identify as the next key. -%% -%% The logic needs to change if the file is in the basement level, as keys with -%% expired timestamps need not be written at this level -%% -%% The best key is considered to be the lowest key in erlang term order. If -%% there are matching keys then the highest sequence number must be chosen and -%% any lower sequence numbers should be compacted out of existence - - -key_dominates(KL1, KL2, Level) -> - key_dominates_expanded(maybe_expand_pointer(KL1), - maybe_expand_pointer(KL2), - Level). - -key_dominates_expanded([H1|T1], [], Level) -> - case leveled_codec:maybe_reap_expiredkey(H1, Level) of - true -> - {skipped_key, maybe_expand_pointer(T1), []}; - false -> - {{next_key, H1}, maybe_expand_pointer(T1), []} - end; -key_dominates_expanded([], [H2|T2], Level) -> - case leveled_codec:maybe_reap_expiredkey(H2, Level) of - true -> - {skipped_key, [], maybe_expand_pointer(T2)}; - false -> - {{next_key, H2}, [], maybe_expand_pointer(T2)} - end; -key_dominates_expanded([H1|T1], [H2|T2], Level) -> - case leveled_codec:key_dominates(H1, H2) of - left_hand_first -> - case leveled_codec:maybe_reap_expiredkey(H1, Level) of - true -> - {skipped_key, maybe_expand_pointer(T1), [H2|T2]}; - false -> - {{next_key, H1}, maybe_expand_pointer(T1), [H2|T2]} - end; - right_hand_first -> - case leveled_codec:maybe_reap_expiredkey(H2, Level) of - true -> - {skipped_key, [H1|T1], maybe_expand_pointer(T2)}; - false -> - {{next_key, H2}, [H1|T1], maybe_expand_pointer(T2)} - end; - left_hand_dominant -> - {skipped_key, [H1|T1], maybe_expand_pointer(T2)}; - right_hand_dominant -> - {skipped_key, maybe_expand_pointer(T1), [H2|T2]} - end. - - -%% When a list is provided it may include a pointer to gain another batch of -%% entries from the same file, or a new batch of entries from another file -%% -%% This resultant list should include the Tail of any pointers added at the -%% end of the list - -maybe_expand_pointer([]) -> - []; -maybe_expand_pointer([H|Tail]) -> - case H of - {next, SFTPid, StartKey} -> - %% io:format("Scanning further on PID ~w ~w~n", [SFTPid, StartKey]), - SW = os:timestamp(), - Acc = sft_getkvrange(SFTPid, StartKey, all, ?MERGE_SCANWIDTH), - leveled_log:log_timer("SFT14", [SFTPid], SW), - lists:append(Acc, Tail); - _ -> - [H|Tail] - end. - - -pointer_append_queryresults(Results, QueryPid) -> - case Results of - {complete, Acc} -> - Acc; - {partial, Acc, StartKey} -> - lists:append(Acc, [{next, QueryPid, StartKey}]) - end. - - -%% The Segment filter is a compressed filter representing the keys in a -%% given slot. The filter is delta-compressed list of integers using rice -%% encoding extended by the reference to each integer having an extra two bits -%% to indicate the block - there are four blocks in each slot. -%% -%% So each delta is represented as -%% - variable length exponent ending in 0, -%% with 0 representing the exponent of 0, -%% 10 -> 2 ^ 13, -%% 110 -> 2^14, -%% 1110 -> 2^15 etc -%% - 13-bit fixed length remainder -%% - 2-bit block number -%% This gives about 2-bytes per key, with a 1:8000 (approx) false positive -%% ratio (when checking the key by hashing to the segment ID) -%% -%% Before the delta list are three 20-bit integers representing the highest -%% integer in each block. Plus two bytes to indicate how many hashes -%% there are in the slot -%% -%% To check for the presence of a segment in a slot, roll over the deltas -%% keeping a running total overall and the current highest segment ID seen -%% per block. Roll all the way through even if matches are found or passed -%% over to confirm that the totals match the expected value (hence creating -%% a natural checksum) -%% -%% The end-result is a 260-byte check for the presence of a key in a slot -%% returning the block in which the segment can be found, which may also be -%% used directly for checking for the presence of segments. -%% -%% This is more space efficient than the equivalent bloom filter and avoids -%% the calculation of many hash functions. - -generate_segment_filter([SegL1]) -> - generate_segment_filter({SegL1, [], [], []}); -generate_segment_filter([SegL1, SegL2]) -> - generate_segment_filter({SegL1, SegL2, [], []}); -generate_segment_filter([SegL1, SegL2, SegL3]) -> - generate_segment_filter({SegL1, SegL2, SegL3, []}); -generate_segment_filter([SegL1, SegL2, SegL3, SegL4]) -> - generate_segment_filter({SegL1, SegL2, SegL3, SegL4}); -generate_segment_filter(SegLists) -> - generate_segment_filter(merge_seglists(SegLists), - [], - [{0, 0}, {0, 1}, {0, 2}, {0, 3}]). - -%% to generate the segment filter needs a sorted list of {Delta, Block} pairs -%% as DeltaList and a list of {TopHash, Block} pairs as TopHashes - -generate_segment_filter([], DeltaList, TopHashes) -> - {lists:reverse(DeltaList), TopHashes}; -generate_segment_filter([NextSeg|SegTail], DeltaList, TopHashes) -> - {TopHash, _} = lists:max(TopHashes), - {NextSegHash, NextSegBlock} = NextSeg, - DeltaList2 = [{NextSegHash - TopHash, NextSegBlock}|DeltaList], - TopHashes2 = lists:keyreplace(NextSegBlock, 2, TopHashes, - {NextSegHash, NextSegBlock}), - generate_segment_filter(SegTail, DeltaList2, TopHashes2). - - -serialise_segment_filter({DeltaList, TopHashes}) -> - TopHashesBin = lists:foldl(fun({X, _}, Acc) -> - <> end, - <<>>, TopHashes), - Length = length(DeltaList), - HeaderBin = <>, - {Divisor, Factor} = {?DIVISOR, ?DIVISOR_BITS}, - F = fun({Delta, Block}, Acc) -> - Exponent = buildexponent(Delta div Divisor), - Remainder = Delta rem Divisor, - Block2Bit = Block, - <> end, - pad_binary(lists:foldl(F, HeaderBin, DeltaList)). - - -pad_binary(BitString) -> - Pad = 8 - bit_size(BitString) rem 8, - case Pad of - 8 -> BitString; - _ -> <> - end. - -buildexponent(Exponent) -> - buildexponent(Exponent, <<0:1>>). - -buildexponent(0, OutputBits) -> - OutputBits; -buildexponent(Exponent, OutputBits) -> - buildexponent(Exponent - 1, <<1:1, OutputBits/bitstring>>). - -merge_seglists({SegList1, SegList2, SegList3, SegList4}) -> - Stage1 = lists:foldl(fun(X, Acc) -> [{X, 0}|Acc] end, [], SegList1), - Stage2 = lists:foldl(fun(X, Acc) -> [{X, 1}|Acc] end, Stage1, SegList2), - Stage3 = lists:foldl(fun(X, Acc) -> [{X, 2}|Acc] end, Stage2, SegList3), - Stage4 = lists:foldl(fun(X, Acc) -> [{X, 3}|Acc] end, Stage3, SegList4), - lists:sort(Stage4). - -hash_for_segmentid(KV) -> - erlang:phash2(leveled_codec:strip_to_keyonly(KV), ?MAX_SEG_HASH). - - -%% Check for a given list of segments in the filter, returning in normal -%% operations a TupleList of {SegmentID, [ListOFBlocks]} where the ListOfBlocks -%% are the block IDs which contain keys in that given segment -%% -%% If there is a failure - perhaps due to a bit flip of some sort an error -%% willl be returned (error_so_maybe_present) and all blocks should be checked -%% as the filter cannot be relied upon - -check_for_segments(SegFilter, SegmentList, CRCCheck) -> - case CRCCheck of - true -> - <> = SegFilter, - CheckSum = [T0, T1, T2, T3], - case safecheck_for_segments(SegRem, SegmentList, - [0, 0, 0, 0], - 0, Count, []) of - {error_so_maybe_present, Reason} -> - leveled_log:log("SFT11", [Reason]), - error_so_maybe_present; - {OutputCheck, BlockList} when OutputCheck == CheckSum, - BlockList == [] -> - not_present; - {OutputCheck, BlockList} when OutputCheck == CheckSum -> - {maybe_present, BlockList}; - {OutputCheck, _} -> - leveled_log:log("SFT12", [OutputCheck, CheckSum]), - error_so_maybe_present - end; - false -> - <<_:80/bitstring, Count:16/integer, SegRem/bitstring>> = SegFilter, - case quickcheck_for_segments(SegRem, SegmentList, - lists:max(SegmentList), - 0, Count, []) of - {error_so_maybe_present, Reason} -> - leveled_log:log("SFT13", [Reason]), - error_so_maybe_present; - BlockList when BlockList == [] -> - not_present; - BlockList -> - {maybe_present, BlockList} - end - end. - - -safecheck_for_segments(_, _, TopHashes, _, 0, BlockList) -> - {TopHashes, BlockList}; -safecheck_for_segments(Filter, SegmentList, TopHs, Acc, Count, BlockList) -> - case findexponent(Filter) of - {ok, Exp, FilterRem1} -> - case findremainder(FilterRem1, ?DIVISOR_BITS) of - {ok, Remainder, BlockID, FilterRem2} -> - {NextHash, BlockList2} = checkhash_forsegments(Acc, - Exp, - Remainder, - SegmentList, - BlockList, - BlockID), - TopHashes2 = setnth(BlockID, TopHs, NextHash), - safecheck_for_segments(FilterRem2, SegmentList, - TopHashes2, - NextHash, Count - 1, - BlockList2); - error -> - {error_so_maybe_present, "Remainder Check"} - end; - error -> - {error_so_maybe_present, "Exponent Check"} - end. - -quickcheck_for_segments(_, _, _, _, 0, BlockList) -> - BlockList; -quickcheck_for_segments(Filter, SegmentList, MaxSeg, Acc, Count, BlockList) -> - case findexponent(Filter) of - {ok, Exp, FilterRem1} -> - case findremainder(FilterRem1, ?DIVISOR_BITS) of - {ok, Remainder, BlockID, FilterRem2} -> - {NextHash, BlockList2} = checkhash_forsegments(Acc, - Exp, - Remainder, - SegmentList, - BlockList, - BlockID), - case NextHash > MaxSeg of - true -> - BlockList2; - false -> - quickcheck_for_segments(FilterRem2, SegmentList, - MaxSeg, - NextHash, Count - 1, - BlockList2) - end; - error -> - {error_so_maybe_present, "Remainder Check"} - end; - error -> - {error_so_maybe_present, "Exponent Check"} - end. - - -checkhash_forsegments(Acc, Exp, Remainder, SegmentList, BlockList, BlockID) -> - NextHash = Acc + ?DIVISOR * Exp + Remainder, - case lists:member(NextHash, SegmentList) of - true -> - {NextHash, [BlockID|BlockList]}; - false -> - {NextHash, BlockList} - end. - - -setnth(0, [_|Rest], New) -> [New|Rest]; -setnth(I, [E|Rest], New) -> [E|setnth(I-1, Rest, New)]. - - -findexponent(BitStr) -> - findexponent(BitStr, 0). - -findexponent(<<>>, _) -> - error; -findexponent(<>, Acc) -> - case H of - 1 -> findexponent(T, Acc + 1); - 0 -> {ok, Acc, T} - end. - - -findremainder(BitStr, Factor) -> - case BitStr of - <> -> - {ok, Remainder, BlockID, Tail}; - _ -> - error - end. - - - -%%%============================================================================ -%%% Test -%%%============================================================================ - - --ifdef(TEST). - -generate_randomkeys({Count, StartSQN}) -> - generate_randomkeys(Count, StartSQN, []); -generate_randomkeys(Count) -> - generate_randomkeys(Count, 0, []). - -generate_randomkeys(0, _SQN, Acc) -> - lists:reverse(Acc); -generate_randomkeys(Count, SQN, Acc) -> - K = {o, - lists:concat(["Bucket", random:uniform(1024)]), - lists:concat(["Key", random:uniform(1024)]), - null}, - RandKey = {K, - {SQN, - {active, infinity}, - leveled_codec:magic_hash(K), - null}}, - generate_randomkeys(Count - 1, SQN + 1, [RandKey|Acc]). - -generate_sequentialkeys(Count, Start) -> - generate_sequentialkeys(Count + Start, Start, []). - -generate_sequentialkeys(Target, Incr, Acc) when Incr =:= Target -> - Acc; -generate_sequentialkeys(Target, Incr, Acc) -> - KeyStr = string:right(integer_to_list(Incr), 8, $0), - K = {o, "BucketSeq", lists:concat(["Key", KeyStr]), null}, - NextKey = {K, - {5, - {active, infinity}, - leveled_codec:magic_hash(K), - null}}, - generate_sequentialkeys(Target, Incr + 1, [NextKey|Acc]). - -simple_create_block_test() -> - KeyList1 = [{{o, "Bucket1", "Key1", null}, - {1, {active, infinity}, no_lookup, null}}, - {{o, "Bucket1", "Key3", null}, - {2, {active, infinity}, no_lookup, null}}], - KeyList2 = [{{o, "Bucket1", "Key2", null}, - {3, {active, infinity}, no_lookup, null}}], - {Status, BlockKeyList} = create_block(KeyList1, - KeyList2, - #level{level=1}), - ?assertMatch(partial, Status), - [H1|T1] = BlockKeyList, - ?assertMatch({{o, "Bucket1", "Key1", null}, - {1, {active, infinity}, no_lookup, null}}, H1), - [H2|T2] = T1, - ?assertMatch({{o, "Bucket1", "Key2", null}, - {3, {active, infinity}, no_lookup, null}}, H2), - ?assertMatch([{{o, "Bucket1", "Key3", null}, - {2, {active, infinity}, no_lookup, null}}], T2). - -dominate_create_block_test() -> - KeyList1 = [{{o, "Bucket1", "Key1", null}, - {1, {active, infinity}, no_lookup, null}}, - {{o, "Bucket1", "Key2", null}, - {2, {active, infinity}, no_lookup, null}}], - KeyList2 = [{{o, "Bucket1", "Key2", null}, - {3, {tomb, infinity}, no_lookup, null}}], - {Status, BlockKeyList} = create_block(KeyList1, - KeyList2, - #level{level=1}), - ?assertMatch(partial, Status), - [K1, K2] = BlockKeyList, - ?assertMatch(K1, lists:nth(1, KeyList1)), - ?assertMatch(K2, lists:nth(1, KeyList2)). - -sample_keylist() -> - KeyList1 = - [{{o, "Bucket1", "Key1", null}, {1, {active, infinity}, 0, null}}, - {{o, "Bucket1", "Key3", null}, {1, {active, infinity}, 0, null}}, - {{o, "Bucket1", "Key5", null}, {1, {active, infinity}, 0, null}}, - {{o, "Bucket1", "Key7", null}, {1, {active, infinity}, 0, null}}, - {{o, "Bucket1", "Key9", null}, {1, {active, infinity}, 0, null}}, - {{o, "Bucket2", "Key1", null}, {1, {active, infinity}, 0, null}}, - {{o, "Bucket2", "Key3", null}, {1, {active, infinity}, 0, null}}, - {{o, "Bucket2", "Key5", null}, {1, {active, infinity}, 0, null}}, - {{o, "Bucket2", "Key7", null}, {1, {active, infinity}, 0, null}}, - {{o, "Bucket2", "Key9", null}, {1, {active, infinity}, 0, null}}, - {{o, "Bucket3", "Key1", null}, {1, {active, infinity}, 0, null}}, - {{o, "Bucket3", "Key3", null}, {1, {active, infinity}, 0, null}}, - {{o, "Bucket3", "Key5", null}, {1, {active, infinity}, 0, null}}, - {{o, "Bucket3", "Key7", null}, {1, {active, infinity}, 0, null}}, - {{o, "Bucket3", "Key9", null}, {1, {active, infinity}, 0, null}}, - {{o, "Bucket4", "Key1", null}, {1, {active, infinity}, 0, null}}], - KeyList2 = - [{{o, "Bucket1", "Key2", null}, {1, {active, infinity}, 0, null}}, - {{o, "Bucket1", "Key4", null}, {1, {active, infinity}, 0, null}}, - {{o, "Bucket1", "Key6", null}, {1, {active, infinity}, 0, null}}, - {{o, "Bucket1", "Key8", null}, {1, {active, infinity}, 0, null}}, - {{o, "Bucket1", "Key9a", null}, {1, {active, infinity}, 0, null}}, - {{o, "Bucket1", "Key9b", null}, {1, {active, infinity}, 0, null}}, - {{o, "Bucket1", "Key9c", null}, {1, {active, infinity}, 0, null}}, - {{o, "Bucket1", "Key9d", null}, {1, {active, infinity}, 0, null}}, - {{o, "Bucket2", "Key2", null}, {1, {active, infinity}, 0, null}}, - {{o, "Bucket2", "Key4", null}, {1, {active, infinity}, 0, null}}, - {{o, "Bucket2", "Key6", null}, {1, {active, infinity}, 0, null}}, - {{o, "Bucket2", "Key8", null}, {1, {active, infinity}, 0, null}}, - {{o, "Bucket3", "Key2", null}, {1, {active, infinity}, 0, null}}, - {{o, "Bucket3", "Key4", null}, {3, {active, infinity}, 0, null}}, - {{o, "Bucket3", "Key6", null}, {2, {active, infinity}, 0, null}}, - {{o, "Bucket3", "Key8", null}, {1, {active, infinity}, 0, null}}], - {KeyList1, KeyList2}. - -alternating_create_block_test() -> - {KeyList1, KeyList2} = sample_keylist(), - {Status, BlockKeyList} = create_block(KeyList1, - KeyList2, - #level{level=1}), - BlockSize = length(BlockKeyList), - ?assertMatch(BlockSize, 32), - ?assertMatch(all_complete, Status), - K1 = lists:nth(1, BlockKeyList), - ?assertMatch(K1, {{o, "Bucket1", "Key1", null}, {1, {active, infinity}, 0, null}}), - K11 = lists:nth(11, BlockKeyList), - ?assertMatch(K11, {{o, "Bucket1", "Key9b", null}, {1, {active, infinity}, 0, null}}), - K32 = lists:nth(32, BlockKeyList), - ?assertMatch(K32, {{o, "Bucket4", "Key1", null}, {1, {active, infinity}, 0, null}}), - HKey = {{o, "Bucket1", "Key0", null}, {1, {active, infinity}, 0, null}}, - {Status2, _} = create_block([HKey|KeyList1], KeyList2, #level{level=1}), - ?assertMatch(block_full, element(1, Status2)). - - -merge_seglists_test() -> - SegList1 = [0, 100, 200], - SegList2 = [50, 200], - SegList3 = [75, 10000], - SegList4 = [], - MergedList = merge_seglists({SegList1, SegList2, - SegList3, SegList4}), - ?assertMatch(MergedList, [{0, 0}, {50, 1}, {75, 2}, {100, 0}, - {200, 0}, {200,1}, {10000,2}]), - SegTerm = generate_segment_filter({SegList1, SegList2, - SegList3, SegList4}), - ?assertMatch(SegTerm, {[{0, 0}, {50, 1}, {25, 2}, {25, 0}, - {100, 0}, {0, 1}, {9800, 2}], - [{200, 0}, {200, 1}, {10000, 2},{0, 3}]}), - SegBin = serialise_segment_filter(SegTerm), - ExpectedTopHashes = <<200:20, 200:20, 10000:20, 0:20>>, - ExpectedDeltas = <<0:1, 0:13, 0:2, - 0:1, 50:13, 1:2, - 0:1, 25:13, 2:2, - 0:1, 25:13, 0:2, - 0:1, 100:13, 0:2, - 0:1, 0:13, 1:2, - 2:2, 1708:13, 2:2>>, - ExpectedResult = <>, - ?assertMatch(SegBin, ExpectedResult), - R1 = check_for_segments(SegBin, [100], true), - ?assertMatch(R1,{maybe_present, [0]}), - R2 = check_for_segments(SegBin, [900], true), - ?assertMatch(R2, not_present), - R3 = check_for_segments(SegBin, [200], true), - ?assertMatch(R3, {maybe_present, [1,0]}), - R4 = check_for_segments(SegBin, [0,900], true), - ?assertMatch(R4, {maybe_present, [0]}), - R5 = check_for_segments(SegBin, [100], false), - ?assertMatch(R5, {maybe_present, [0]}), - R6 = check_for_segments(SegBin, [900], false), - ?assertMatch(R6, not_present), - R7 = check_for_segments(SegBin, [200], false), - ?assertMatch(R7, {maybe_present, [1,0]}), - R8 = check_for_segments(SegBin, [0,900], false), - ?assertMatch(R8, {maybe_present, [0]}), - R9 = check_for_segments(SegBin, [1024*1024 - 1], false), - ?assertMatch(R9, not_present), - io:format("Try corrupted bloom filter with flipped bit in " ++ - "penultimate delta~n"), - ExpectedDeltasFlippedBit = <<0:1, 0:13, 0:2, - 0:1, 50:13, 1:2, - 0:1, 25:13, 2:2, - 0:1, 25:13, 0:2, - 0:1, 100:13, 0:2, - 0:1, 0:13, 1:2, - 2:2, 1709:13, 2:2>>, - SegBin1 = <>, - ?assertMatch(error_so_maybe_present, - check_for_segments(SegBin1, [900], true)), - ?assertMatch(error_so_maybe_present, - check_for_segments(SegBin1, [200], true)), - ?assertMatch(error_so_maybe_present, - check_for_segments(SegBin1, [0,900], true)), - ?assertMatch(error_so_maybe_present, - check_for_segments(SegBin1, [1024*1024 - 1], true)), - % This match is before the flipped bit, so still works without CRC check - ?assertMatch({maybe_present, [0]}, - check_for_segments(SegBin1, [0,900], false)), - io:format("Try corrupted bloom filter with flipped bit in " ++ - "final block's top hash~n"), - ExpectedTopHashesFlippedBit = <<200:20, 200:20, 10000:20, 1:20>>, - SegBin2 = <>, - ?assertMatch(error_so_maybe_present, - check_for_segments(SegBin2, [900], true)), - ?assertMatch(error_so_maybe_present, - check_for_segments(SegBin2, [200], true)), - ?assertMatch(error_so_maybe_present, - check_for_segments(SegBin2, [0,900], true)), - ?assertMatch(error_so_maybe_present, - check_for_segments(SegBin2, [1024*1024 - 1], true)), - % This match is before the flipped bit, so still works without CRC check - ?assertMatch({maybe_present, [0]}, - check_for_segments(SegBin2, [0,900], false)), - - ExpectedDeltasAll1s = <<4294967295:32/integer>>, - SegBin3 = <>, - ?assertMatch(error_so_maybe_present, - check_for_segments(SegBin3, [900], true)), - ?assertMatch(error_so_maybe_present, - check_for_segments(SegBin3, [200], true)), - ?assertMatch(error_so_maybe_present, - check_for_segments(SegBin3, [0,900], true)), - ?assertMatch(error_so_maybe_present, - check_for_segments(SegBin3, [1024*1024 - 1], true)), - % This is so badly mangled, the error gets detected event without CRC - % checking being enforced - ?assertMatch(error_so_maybe_present, - check_for_segments(SegBin3, [900], false)), - ?assertMatch(error_so_maybe_present, - check_for_segments(SegBin3, [200], false)), - ?assertMatch(error_so_maybe_present, - check_for_segments(SegBin3, [0,900], false)), - ?assertMatch(error_so_maybe_present, - check_for_segments(SegBin3, [1024*1024 - 1], false)), - - ExpectedDeltasNearlyAll1s = <<4294967287:32/integer>>, - SegBin4 = <>, - ?assertMatch(error_so_maybe_present, - check_for_segments(SegBin4, [900], true)), - ?assertMatch(error_so_maybe_present, - check_for_segments(SegBin4, [200], true)), - ?assertMatch(error_so_maybe_present, - check_for_segments(SegBin4, [0,900], true)), - ?assertMatch(error_so_maybe_present, - check_for_segments(SegBin4, [1024*1024 - 1], true)), - % This is so badly mangled, the error gets detected event without CRC - % checking being enforced - ?assertMatch(error_so_maybe_present, - check_for_segments(SegBin4, [900], false)), - ?assertMatch(error_so_maybe_present, - check_for_segments(SegBin4, [200], false)), - ?assertMatch(error_so_maybe_present, - check_for_segments(SegBin4, [0,900], false)), - ?assertMatch(error_so_maybe_present, - check_for_segments(SegBin4, [1024*1024 - 1], false)). - -createslot_stage1_test() -> - {KeyList1, KeyList2} = sample_keylist(), - {Status, BlockKeyLists} = create_slot(KeyList1, KeyList2, #level{level=1}), - WState = finalise_slot(BlockKeyLists, #writer{}), - - ?assertMatch({o, "Bucket4", "Key1", null}, WState#writer.last_key), - ?assertMatch(partial, Status), - - %% Writer state has the SlotIndex which includes the segment filter - SegFilter = element(2, lists:nth(1, WState#writer.slot_index)), - - R0 = check_for_segments(serialise_segment_filter(SegFilter), - [hash_for_segmentid({keyonly, {o, "Bucket1", "Key1", null}})], - true), - ?assertMatch({maybe_present, [0]}, R0), - R1 = check_for_segments(serialise_segment_filter(SegFilter), - [hash_for_segmentid({keyonly, {o, "Bucket1", "Key99", null}})], - true), - ?assertMatch(not_present, R1), - ?assertMatch(1, WState#writer.min_sqn), - ?assertMatch(3, WState#writer.max_sqn). - - -createslot_stage2_test() -> - {Status, BlockKeyLists} = create_slot(lists:sort(generate_randomkeys(100)), - lists:sort(generate_randomkeys(100)), - #level{level=1}), - WState = finalise_slot(BlockKeyLists, #writer{}), - LengthList = element(3, lists:nth(1, WState#writer.slot_index)), - - ?assertMatch(full, element(1, Status)), - Sum1 = lists:sum(LengthList), - Sum2 = byte_size(WState#writer.slot_binary), - ?assertMatch(Sum1, Sum2). - - -createslot_stage3_test() -> - {Status, BlockKeyLists} = create_slot(lists:sort(generate_sequentialkeys(100, 1)), - lists:sort(generate_sequentialkeys(100, 101)), - #level{level=1}), - WState = finalise_slot(BlockKeyLists, #writer{}), - {FirstKey, SegFilter, LengthList} = lists:nth(1, WState#writer.slot_index), - - ?assertMatch(full, element(1, Status)), - Sum1 = lists:sum(LengthList), - Sum2 = byte_size(WState#writer.slot_binary), - ?assertMatch(Sum1, Sum2), - ?assertMatch({o, "BucketSeq", "Key00000001", null}, FirstKey), - ?assertMatch({o, "BucketSeq", "Key00000128", null}, WState#writer.last_key), - ?assertMatch([], element(2, Status)), - Rem = length(element(3, Status)), - ?assertMatch(Rem, 72), - R0 = check_for_segments(serialise_segment_filter(SegFilter), - [hash_for_segmentid({keyonly, - {o, "BucketSeq", "Key00000100", null}})], - true), - ?assertMatch({maybe_present, [3]}, R0), - R1 = check_for_segments(serialise_segment_filter(SegFilter), - [hash_for_segmentid({keyonly, - {o, "Bucket1", "Key99", null}})], - true), - ?assertMatch(not_present, R1), - R2 = check_for_segments(serialise_segment_filter(SegFilter), - [hash_for_segmentid({keyonly, - {o, "BucketSeq", "Key00000040", null}})], - true), - ?assertMatch({maybe_present, [1]}, R2), - R3 = check_for_segments(serialise_segment_filter(SegFilter), - [hash_for_segmentid({keyonly, - {o, "BucketSeq", "Key00000004", null}})], - true), - ?assertMatch({maybe_present, [0]}, R3). - - -initial_create_header_test() -> - Output = create_header(initial), - ?assertMatch(?HEADER_LEN, byte_size(Output)). - -initial_create_file_test() -> - Filename = "../test/test1.sft", - {KL1, KL2} = sample_keylist(), - {Handle, FileMD} = create_file(Filename), - {UpdHandle, UpdFileMD, {[], []}} = complete_file(Handle, FileMD, - KL1, KL2, - #level{level=1}), - - io:format("Slot Index of UpdFileMD ~w~n", [UpdFileMD#state.slot_index]), - Result1 = fetch_keyvalue(UpdHandle, UpdFileMD, {o, "Bucket1", "Key8", null}), - ?assertMatch({{o, "Bucket1", "Key8", null}, - {1, {active, infinity}, 0, null}}, Result1), - Result2 = fetch_keyvalue(UpdHandle, UpdFileMD, {o, "Bucket1", "Key88", null}), - ?assertMatch(not_present, Result2), - ok = file:close(UpdHandle), - ok = file:delete(Filename). - -big_create_file_test() -> - Filename = "../test/bigtest1.sft", - {KL1, KL2} = {lists:sort(generate_randomkeys(2000)), - lists:sort(generate_randomkeys(40000))}, - {InitHandle, InitFileMD} = create_file(Filename), - {Handle, FileMD, {_KL1Rem, _KL2Rem}} = complete_file(InitHandle, - InitFileMD, - KL1, KL2, - #level{level=1}), - [{K1, {Sq1, St1, MH1, V1}}|_] = KL1, - [{K2, {Sq2, St2, MH2, V2}}|_] = KL2, - Result1 = fetch_keyvalue(Handle, FileMD, K1), - Result2 = fetch_keyvalue(Handle, FileMD, K2), - ?assertMatch({K1, {Sq1, St1, MH1, V1}}, Result1), - ?assertMatch({K2, {Sq2, St2, MH2, V2}}, Result2), - SubList = lists:sublist(KL2, 1000), - lists:foreach(fun(KV) -> - {Kn, _} = KV, - Rn = fetch_keyvalue(Handle, FileMD, Kn), - ?assertMatch({Kn, _}, Rn) - end, - SubList), - Result3 = fetch_keyvalue(Handle, - FileMD, - {o, "Bucket1024", "Key1024Alt", null}), - ?assertMatch(Result3, not_present), - ok = file:close(Handle), - ok = file:delete(Filename). - -initial_iterator_test() -> - Filename = "../test/test2.sft", - {KL1, KL2} = sample_keylist(), - {Handle, FileMD} = create_file(Filename), - {UpdHandle, UpdFileMD, {[], []}} = complete_file(Handle, FileMD, - KL1, KL2, - #level{level=1}), - Result1 = fetch_range_keysonly(UpdHandle, UpdFileMD, - {o, "Bucket1", "Key8", null}, - {o, "Bucket1", "Key9d", null}), - io:format("Result returned of ~w~n", [Result1]), - ?assertMatch({complete, - [{{o, "Bucket1", "Key8", null}, 1, {active, infinity}}, - {{o, "Bucket1", "Key9", null}, 1, {active, infinity}}, - {{o, "Bucket1", "Key9a", null}, 1, {active, infinity}}, - {{o, "Bucket1", "Key9b", null}, 1, {active, infinity}}, - {{o, "Bucket1", "Key9c", null}, 1, {active, infinity}}, - {{o, "Bucket1", "Key9d", null}, 1, {active, infinity}} - ]}, - Result1), - Result2 = fetch_range_keysonly(UpdHandle, UpdFileMD, - {o, "Bucket1", "Key8", null}, - {o, "Bucket1", "Key9b", null}), - ?assertMatch({complete, - [{{o, "Bucket1", "Key8", null}, 1, {active, infinity}}, - {{o, "Bucket1", "Key9", null}, 1, {active, infinity}}, - {{o, "Bucket1", "Key9a", null}, 1, {active, infinity}}, - {{o, "Bucket1", "Key9b", null}, 1, {active, infinity}} - ]}, - Result2), - Result3 = fetch_range_keysonly(UpdHandle, UpdFileMD, - {o, "Bucket3", "Key4", null}, - all), - {partial, RL3, _} = Result3, - ?assertMatch([{{o, "Bucket3", "Key4", null}, 3, {active, infinity}}, - {{o, "Bucket3", "Key5", null}, 1, {active, infinity}}, - {{o, "Bucket3", "Key6", null}, 2, {active, infinity}}, - {{o, "Bucket3", "Key7", null}, 1, {active, infinity}}, - {{o, "Bucket3", "Key8", null}, 1, {active, infinity}}, - {{o, "Bucket3", "Key9", null}, 1, {active, infinity}}, - {{o, "Bucket4", "Key1", null}, 1, {active, infinity}}], - RL3), - ok = file:close(UpdHandle), - ok = file:delete(Filename). - -key_dominates_test() -> - KV1 = {{o, "Bucket", "Key1", null}, {5, {active, infinity}, 0, []}}, - KV2 = {{o, "Bucket", "Key3", null}, {6, {active, infinity}, 0, []}}, - KV3 = {{o, "Bucket", "Key2", null}, {3, {active, infinity}, 0, []}}, - KV4 = {{o, "Bucket", "Key4", null}, {7, {active, infinity}, 0, []}}, - KV5 = {{o, "Bucket", "Key1", null}, {4, {active, infinity}, 0, []}}, - KV6 = {{o, "Bucket", "Key1", null}, {99, {tomb, 999}, 0, []}}, - KV7 = {{o, "Bucket", "Key1", null}, {99, tomb, 0, []}}, - KL1 = [KV1, KV2], - KL2 = [KV3, KV4], - ?assertMatch({{next_key, KV1}, [KV2], KL2}, - key_dominates(KL1, KL2, {undefined, 1})), - ?assertMatch({{next_key, KV1}, KL2, [KV2]}, - key_dominates(KL2, KL1, {undefined, 1})), - ?assertMatch({skipped_key, KL2, KL1}, - key_dominates([KV5|KL2], KL1, {undefined, 1})), - ?assertMatch({{next_key, KV1}, [KV2], []}, - key_dominates(KL1, [], {undefined, 1})), - ?assertMatch({skipped_key, [KV6|KL2], [KV2]}, - key_dominates([KV6|KL2], KL1, {undefined, 1})), - ?assertMatch({{next_key, KV6}, KL2, [KV2]}, - key_dominates([KV6|KL2], [KV2], {undefined, 1})), - ?assertMatch({skipped_key, [KV6|KL2], [KV2]}, - key_dominates([KV6|KL2], KL1, {true, 1})), - ?assertMatch({skipped_key, [KV6|KL2], [KV2]}, - key_dominates([KV6|KL2], KL1, {true, 1000})), - ?assertMatch({{next_key, KV6}, KL2, [KV2]}, - key_dominates([KV6|KL2], [KV2], {true, 1})), - ?assertMatch({skipped_key, KL2, [KV2]}, - key_dominates([KV6|KL2], [KV2], {true, 1000})), - ?assertMatch({skipped_key, [], []}, - key_dominates([KV6], [], {true, 1000})), - ?assertMatch({skipped_key, [], []}, - key_dominates([], [KV6], {true, 1000})), - ?assertMatch({{next_key, KV6}, [], []}, - key_dominates([KV6], [], {true, 1})), - ?assertMatch({{next_key, KV6}, [], []}, - key_dominates([], [KV6], {true, 1})), - ?assertMatch({skipped_key, [], []}, - key_dominates([KV7], [], {true, 1})), - ?assertMatch({skipped_key, [], []}, - key_dominates([], [KV7], {true, 1})), - ?assertMatch({skipped_key, [KV7|KL2], [KV2]}, - key_dominates([KV7|KL2], KL1, {undefined, 1})), - ?assertMatch({{next_key, KV7}, KL2, [KV2]}, - key_dominates([KV7|KL2], [KV2], {undefined, 1})), - ?assertMatch({skipped_key, [KV7|KL2], [KV2]}, - key_dominates([KV7|KL2], KL1, {true, 1})), - ?assertMatch({skipped_key, KL2, [KV2]}, - key_dominates([KV7|KL2], [KV2], {true, 1})). - - -corrupted_sft_test() -> - Filename = "../test/bigcorrupttest1.sft", - {KL1, KL2} = {lists:ukeysort(1, generate_randomkeys(2000)), []}, - {InitHandle, InitFileMD} = create_file(Filename), - {Handle, _FileMD, _Rems} = complete_file(InitHandle, - InitFileMD, - KL1, KL2, - #level{level=1}), - {ok, Lengths} = file:pread(Handle, 12, 12), - <> = Lengths, - ok = file:close(Handle), - - {ok, Corrupter} = file:open(Filename , [binary, raw, read, write]), - lists:foreach(fun(X) -> - case X * 5 of - Y when Y < FilterLength -> - Position = ?HEADER_LEN + X * 5 - + BlocksLength + IndexLength, - file:pwrite(Corrupter, - Position, - <<0:8/integer>>) - end - end, - lists:seq(1, 100)), - ok = file:close(Corrupter), - - {ok, SFTr, _KeyExtremes} = sft_open(Filename), - lists:foreach(fun({K, V}) -> - ?assertMatch({K, V}, sft_get(SFTr, K)) - end, - KL1), - ok = sft_clear(SFTr). - -big_iterator_test() -> - Filename = "../test/bigtest1.sft", - {KL1, KL2} = {lists:sort(generate_randomkeys(10000)), []}, - {InitHandle, InitFileMD} = create_file(Filename), - {Handle, FileMD, {KL1Rem, KL2Rem}} = complete_file(InitHandle, InitFileMD, - KL1, KL2, - #level{level=1}), - io:format("Remainder lengths are ~w and ~w ~n", [length(KL1Rem), - length(KL2Rem)]), - {complete, - Result1} = fetch_range_keysonly(Handle, - FileMD, - {o, "Bucket0000", "Key0000", null}, - {o, "Bucket9999", "Key9999", null}, - 256), - NumAddedKeys = 10000 - length(KL1Rem), - ?assertMatch(NumAddedKeys, length(Result1)), - {partial, - Result2, - _} = fetch_range_keysonly(Handle, - FileMD, - {o, "Bucket0000", "Key0000", null}, - {o, "Bucket9999", "Key9999", null}, - 32), - ?assertMatch(32 * 128, length(Result2)), - {partial, - Result3, - _} = fetch_range_keysonly(Handle, - FileMD, - {o, "Bucket0000", "Key0000", null}, - {o, "Bucket9999", "Key9999", null}, - 4), - ?assertMatch(4 * 128, length(Result3)), - ok = file:close(Handle), - ok = file:delete(Filename). - -hashclash_test() -> - Filename = "../test/hashclash.sft", - Key1 = {o, "Bucket", "Key838068", null}, - Key99 = {o, "Bucket", "Key898982", null}, - KeyNF = {o, "Bucket", "Key539122", null}, - ?assertMatch(4, hash_for_segmentid({keyonly, Key1})), - ?assertMatch(4, hash_for_segmentid({keyonly, Key99})), - ?assertMatch(4, hash_for_segmentid({keyonly, KeyNF})), - KeyList = lists:foldl(fun(X, Acc) -> - Key = {o, - "Bucket", - "Key8400" ++ integer_to_list(X), - null}, - Value = {X, - {active, infinity}, - leveled_codec:magic_hash(Key), - null}, - Acc ++ [{Key, Value}] end, - [], - lists:seq(10,98)), - KeyListToUse = [{Key1, - {1, - {active, infinity}, - leveled_codec:magic_hash(Key1), - null}}|KeyList] - ++ [{Key99, - {99, - {active, infinity}, - leveled_codec:magic_hash(Key99), - null}}], - {InitHandle, InitFileMD} = create_file(Filename), - {Handle, _FileMD, _Rem} = complete_file(InitHandle, InitFileMD, - KeyListToUse, [], - #level{level=1}), - ok = file:close(Handle), - {ok, SFTr, _KeyExtremes} = sft_open(Filename), - ?assertMatch({Key1, - {1, {active, infinity}, _, null}}, - sft_get(SFTr, Key1)), - ?assertMatch({Key99, - {99, {active, infinity}, _, null}}, - sft_get(SFTr, Key99)), - ?assertMatch(not_present, - sft_get(SFTr, KeyNF)), - - ok = sft_clear(SFTr). - -filename_test() -> - FN1 = "../tmp/filename", - FN2 = "../tmp/filename.pnd", - FN3 = "../tmp/subdir/file_name.pend", - ?assertMatch({"../tmp/filename.pnd", "../tmp/filename.sft"}, - generate_filenames(FN1)), - ?assertMatch({"../tmp/filename.pnd", "../tmp/filename.sft"}, - generate_filenames(FN2)), - ?assertMatch({"../tmp/subdir/file_name.pnd", - "../tmp/subdir/file_name.sft"}, - generate_filenames(FN3)). - -empty_file_test() -> - {ok, Pid, _Reply} = sft_new("../test/emptyfile.pnd", [], [], 1), - ?assertMatch(not_present, sft_get(Pid, "Key1")), - ?assertMatch([], sft_getkvrange(Pid, all, all, 16)), - ok = sft_clear(Pid). - - -nonsense_coverage_test() -> - {ok, Pid} = gen_fsm:start(?MODULE, [], []), - undefined = gen_fsm:sync_send_all_state_event(Pid, nonsense), - ok = gen_fsm:send_all_state_event(Pid, nonsense), - ?assertMatch({next_state, reader, #state{}}, handle_info(nonsense, - reader, - #state{})), - ?assertMatch({ok, reader, #state{}}, code_change(nonsense, - reader, - #state{}, - nonsense)). - --endif. \ No newline at end of file diff --git a/src/leveled_skiplist.erl b/src/leveled_skiplist.erl index d03f0c1..5829448 100644 --- a/src/leveled_skiplist.erl +++ b/src/leveled_skiplist.erl @@ -29,6 +29,7 @@ lookup/2, lookup/3, key_above/2, + key_above_notequals/2, empty/0, empty/1, size/1 @@ -123,8 +124,15 @@ to_range(SkipList, Start, End) -> to_list(SkipList) -> to_list(element(2, SkipList), ?LIST_HEIGHT). +%% If a mark is found that matches the key, will return that mark key_above(SkipList, Key) -> - key_above(element(2, SkipList), Key, ?LIST_HEIGHT). + TestFun = fun(Mark, K) -> Mark >= K end, + key_above(element(2, SkipList), Key, ?LIST_HEIGHT, TestFun). + +%% If a mark is found that matches the key, will return the next mark +key_above_notequals(SkipList, Key) -> + TestFun = fun(Mark, K) -> Mark > K end, + key_above(element(2, SkipList), Key, ?LIST_HEIGHT, TestFun). empty() -> empty(false). @@ -321,11 +329,11 @@ sublist_above(SkipList, StartKey, Level, StartIncl) -> sublist_above(SL, StartKey, Level - 1, StartIncl) end. -key_above(SkipList, Key, 0) -> +key_above(SkipList, Key, 0, TestFun) -> FindFun = fun({Mark, V}, Found) -> case Found of false -> - case Key =< Mark of + case TestFun(Mark, Key) of true -> {Mark, V}; false -> @@ -336,13 +344,13 @@ key_above(SkipList, Key, 0) -> end end, lists:foldl(FindFun, false, SkipList); -key_above(SkipList, Key, Level) -> +key_above(SkipList, Key, Level, TestFun) -> FindFun = fun({Mark, SL}, Found) -> case Found of false -> - case Key =< Mark of + case TestFun(Mark, Key) of true -> - key_above(SL, Key, Level - 1); + key_above(SL, Key, Level - 1, TestFun); false -> false end; diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 877e42c..ef699d6 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -65,6 +65,7 @@ -define(LEVEL_BLOOM_SLOTS, [{0, 64}, {1, 48}, {default, 32}]). -define(MERGE_SCANWIDTH, 16). -define(DISCARD_EXT, ".discarded"). +-define(DELETE_TIMEOUT, 10000). -include_lib("eunit/include/eunit.hrl"). @@ -74,21 +75,28 @@ handle_info/3, terminate/3, code_change/4, + starting/2, starting/3, - reader/3]). + reader/3, + delete_pending/2, + delete_pending/3]). --export([sst_new/3, - sst_new/5, - sst_newlevelzero/4, +-export([sst_new/4, + sst_new/6, + sst_newlevelzero/5, sst_open/1, sst_get/2, sst_get/3, sst_getkvrange/4, sst_getslots/2, + sst_getmaxsequencenumber/1, + sst_setfordelete/2, + sst_clear/1, + sst_checkready/1, + sst_deleteconfirmed/1, sst_close/1]). --export([generate_randomkeys/1]). - +-export([expand_list_by_pointer/3]). -record(slot_index_value, {slot_id :: integer(), @@ -100,12 +108,14 @@ last_key :: tuple(), index :: list(), % leveled_skiplist bloom :: tuple(), % leveled_tinybloom - size :: integer()}). + size :: integer(), + max_sqn :: integer()}). -record(state, {summary, handle :: file:fd(), sst_timings :: tuple(), slot_lengths :: list(), + penciller :: pid(), filename, cache}). @@ -121,33 +131,42 @@ sst_open(Filename) -> {ok, Pid, {SK, EK}} end. -sst_new(Filename, Level, KVList) -> +sst_new(Filename, Level, KVList, MaxSQN) -> {ok, Pid} = gen_fsm:start(?MODULE, [], []), case gen_fsm:sync_send_event(Pid, - {sst_new, Filename, Level, KVList}, + {sst_new, + Filename, + Level, + KVList, + MaxSQN}, infinity) of {ok, {SK, EK}} -> {ok, Pid, {SK, EK}} end. -sst_new(Filename, KL1, KL2, IsBasement, Level) -> +sst_new(Filename, KL1, KL2, IsBasement, Level, MaxSQN) -> {{Rem1, Rem2}, MergedList} = merge_lists(KL1, KL2, {IsBasement, Level}), {ok, Pid} = gen_fsm:start(?MODULE, [], []), case gen_fsm:sync_send_event(Pid, - {sst_new, Filename, Level, MergedList}, + {sst_new, + Filename, + Level, + MergedList, + MaxSQN}, infinity) of {ok, {SK, EK}} -> {ok, Pid, {{Rem1, Rem2}, SK, EK}} end. -sst_newlevelzero(Filename, Slots, FetchFun, Penciller) -> +sst_newlevelzero(Filename, Slots, FetchFun, Penciller, MaxSQN) -> {ok, Pid} = gen_fsm:start(?MODULE, [], []), gen_fsm:send_event(Pid, {sst_newlevelzero, Filename, Slots, FetchFun, - Penciller}), + Penciller, + MaxSQN}), {ok, Pid, noreply}. sst_get(Pid, LedgerKey) -> @@ -164,6 +183,24 @@ sst_getkvrange(Pid, StartKey, EndKey, ScanWidth) -> sst_getslots(Pid, SlotList) -> gen_fsm:sync_send_event(Pid, {get_slots, SlotList}, infinity). +sst_getmaxsequencenumber(Pid) -> + gen_fsm:sync_send_event(Pid, get_maxsequencenumber, infinity). + +sst_setfordelete(Pid, Penciller) -> + gen_fsm:sync_send_event(Pid, {set_for_delete, Penciller}, infinity). + +sst_clear(Pid) -> + gen_fsm:sync_send_event(Pid, {set_for_delete, false}, infinity), + gen_fsm:sync_send_event(Pid, close, 1000). + +sst_deleteconfirmed(Pid) -> + gen_fsm:send_event(Pid, close). + +sst_checkready(Pid) -> + %% Only used in test + gen_fsm:sync_send_event(Pid, background_complete, 100). + + sst_close(Pid) -> gen_fsm:sync_send_event(Pid, close, 2000). @@ -186,31 +223,48 @@ starting({sst_open, Filename}, _From, State) -> {ok, {Summary#summary.first_key, Summary#summary.last_key}}, reader, UpdState}; -starting({sst_new, Filename, Level, KVList}, _From, State) -> +starting({sst_new, Filename, Level, KVList, MaxSQN}, _From, State) -> {FirstKey, L, SlotIndex, AllHashes, SlotsBin} = build_all_slots(KVList), - SummaryBin = build_table_summary(SlotIndex, AllHashes, Level, FirstKey, L), + SummaryBin = build_table_summary(SlotIndex, + AllHashes, + Level, + FirstKey, + L, + MaxSQN), ActualFilename = write_file(Filename, SummaryBin, SlotsBin), - UpdState = read_file(ActualFilename, - State#state{filename=ActualFilename}), + UpdState = read_file(ActualFilename, State), Summary = UpdState#state.summary, + leveled_log:log("SST08", [ActualFilename, Level, Summary#summary.max_sqn]), {reply, {ok, {Summary#summary.first_key, Summary#summary.last_key}}, reader, UpdState}. -starting({sst_newlevelzero, Filename, Slots, FetchFun, Penciller}, State) -> +starting({sst_newlevelzero, Filename, Slots, FetchFun, Penciller, MaxSQN}, + State) -> KVList = leveled_pmem:to_list(Slots, FetchFun), {FirstKey, L, SlotIndex, AllHashes, SlotsBin} = build_all_slots(KVList), - SummaryBin = build_table_summary(SlotIndex, AllHashes, 0, FirstKey, L), + SummaryBin = build_table_summary(SlotIndex, + AllHashes, + 0, + FirstKey, + L, + MaxSQN), ActualFilename = write_file(Filename, SummaryBin, SlotsBin), - UpdState = read_file(ActualFilename, - State#state{filename=ActualFilename}), + UpdState = read_file(ActualFilename, State), Summary = UpdState#state.summary, - leveled_penciller:pcl_confirml0complete(Penciller, - UpdState#state.filename, - Summary#summary.first_key, - Summary#summary.last_key), - {next_state, reader, UpdState}. + leveled_log:log("SST08", [ActualFilename, 0, Summary#summary.max_sqn]), + case Penciller of + undefined -> + {next_state, reader, UpdState}; + _ -> + leveled_penciller:pcl_confirml0complete(Penciller, + UpdState#state.filename, + Summary#summary.first_key, + Summary#summary.last_key), + {next_state, reader, UpdState} + end. + reader({get_kv, LedgerKey, Hash}, _From, State) -> SW = os:timestamp(), @@ -240,13 +294,70 @@ reader({get_slots, SlotList}, _From, State) -> fun({SlotBin, SK, EK}, Acc) -> Acc ++ trim_slot(SlotBin, SK, EK) end, {reply, lists:foldl(FoldFun, [], SlotBins), reader, State}; +reader(get_maxsequencenumber, _From, State) -> + Summary = State#state.summary, + {reply, Summary#summary.max_sqn, reader, State}; reader(print_timings, _From, State) -> io:format(user, "Timings of ~w~n", [State#state.sst_timings]), {reply, ok, reader, State#state{sst_timings = undefined}}; +reader({set_for_delete, Penciller}, _From, State) -> + leveled_log:log("SST06", [State#state.filename]), + {reply, + ok, + delete_pending, + State#state{penciller=Penciller}, + ?DELETE_TIMEOUT}; +reader(background_complete, _From, State) -> + Summary = State#state.summary, + {reply, + {ok, + State#state.filename, + Summary#summary.first_key, + Summary#summary.last_key}, + reader, + State}; reader(close, _From, State) -> ok = file:close(State#state.handle), {stop, normal, ok, State}. + +delete_pending({get_kv, LedgerKey, Hash}, _From, State) -> + {Result, Stage, SlotID} = fetch(LedgerKey, Hash, State), + case {Result, Stage} of + {not_present, slot_crc_wonky} -> + leveled_log:log("SST02", [State#state.filename, SlotID]), + {reply, Result, reader, State, ?DELETE_TIMEOUT}; + {not_present, _} -> + {reply, Result, reader, State, ?DELETE_TIMEOUT}; + {KV, slot_lookup_hit} -> + UpdCache = array:set(SlotID, KV, State#state.cache), + UpdState = State#state{cache = UpdCache}, + {reply, Result, reader, UpdState, ?DELETE_TIMEOUT}; + _ -> + {reply, Result, reader, State, ?DELETE_TIMEOUT} + end; +delete_pending({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) -> + {reply, + fetch_range(StartKey, EndKey, ScanWidth, State), + reader, + State, + ?DELETE_TIMEOUT}; +delete_pending(close, _From, State) -> + leveled_log:log("SST07", [State#state.filename]), + ok = file:close(State#state.handle), + ok = file:delete(State#state.filename), + {stop, normal, ok, State}. + +delete_pending(timeout, State) -> + ok = leveled_penciller:pcl_confirmdelete(State#state.penciller, + State#state.filename), + {next_state, delete_pending, State, ?DELETE_TIMEOUT}; +delete_pending(close, State) -> + leveled_log:log("SST07", [State#state.filename]), + ok = file:close(State#state.handle), + ok = file:delete(State#state.filename), + {stop, normal, State}. + handle_sync_event(_Msg, _From, StateName, State) -> {reply, undefined, StateName, State}. @@ -413,10 +524,14 @@ read_file(Filename, State) -> SlotCount = length(SlotLengths), SkipL = leveled_skiplist:from_sortedlist(Summary#summary.index), UpdSummary = Summary#summary{index = SkipL}, - leveled_log:log("SST03", [Filename, Summary#summary.size, SlotCount]), + leveled_log:log("SST03", [Filename, + Summary#summary.size, + SlotCount, + Summary#summary.max_sqn]), State#state{summary = UpdSummary, slot_lengths = SlotLengths, handle = Handle, + filename = Filename, cache = array:new({size, SlotCount + 1})}. open_reader(Filename) -> @@ -426,7 +541,7 @@ open_reader(Filename) -> {ok, SummaryBin} = file:pread(Handle, SlotsLength + 8, SummaryLength), {Handle, SummaryBin}. -build_table_summary(SlotIndex, AllHashes, Level, FirstKey, L) -> +build_table_summary(SlotIndex, AllHashes, Level, FirstKey, L, MaxSQN) -> BloomSlots = case lists:keyfind(Level, 1, ?LEVEL_BLOOM_SLOTS) of {Level, N} -> @@ -442,7 +557,8 @@ build_table_summary(SlotIndex, AllHashes, Level, FirstKey, L) -> last_key = LastKey, size = L, index = lists:reverse(SlotIndex), - bloom = Bloom}, + bloom = Bloom, + max_sqn = MaxSQN}, SummBin = term_to_binary(Summary, [{compressed, ?COMPRESSION_LEVEL}]), SummCRC = erlang:crc32(SummBin), <>. @@ -546,8 +662,13 @@ lookup_slots_int(StartKey, EndKey, SkipList) -> EndKey -> {L0, true, false}; _ -> - LTail = leveled_skiplist:key_above(SkipList, EndKey), - {L0 ++ [LTail], true, true} + LTail = leveled_skiplist:key_above_notequals(SkipList, LastKey), + case LTail of + false -> + {L0, true, false}; + _ -> + {L0 ++ [LTail], true, true} + end end. @@ -751,13 +872,29 @@ key_dominates_expanded([H1|T1], [H2|T2], Level) -> maybe_expand_pointer([]) -> []; -maybe_expand_pointer([{pointer, SFTPid, Slot, StartKey, all}|Tail]) -> +maybe_expand_pointer([{pointer, SSTPid, Slot, StartKey, all}|Tail]) -> + expand_list_by_pointer({pointer, SSTPid, Slot, StartKey, all}, + Tail, + ?MERGE_SCANWIDTH); +maybe_expand_pointer([{next, SSTPid, StartKey}|Tail]) -> + expand_list_by_pointer({next, SSTPid, StartKey, all}, + Tail, + ?MERGE_SCANWIDTH); +maybe_expand_pointer(List) -> + List. + + +expand_list_by_pointer({pointer, SSTPid, Slot, StartKey, EndKey}, Tail, 1) -> + AccPointers = [{pointer, Slot, StartKey, EndKey}], + ExpPointers = leveled_sst:sst_getslots(SSTPid, AccPointers), + lists:append(ExpPointers, Tail); +expand_list_by_pointer({pointer, SSTPid, Slot, StartKey, all}, Tail, Width) -> FoldFun = fun(X, {Pointers, Remainder}) -> case length(Pointers) of - L when L < ?MERGE_SCANWIDTH -> + L when L < Width -> case X of - {pointer, SFTPid, S, SK, EK} -> + {pointer, SSTPid, S, SK, EK} -> {Pointers ++ [{pointer, S, SK, EK}], Remainder}; _ -> {Pointers, Remainder ++ [X]} @@ -768,16 +905,11 @@ maybe_expand_pointer([{pointer, SFTPid, Slot, StartKey, all}|Tail]) -> end, InitAcc = {[{pointer, Slot, StartKey, all}], []}, {AccPointers, AccTail} = lists:foldl(FoldFun, InitAcc, Tail), - SW = os:timestamp(), - ExpPointers = sst_getslots(SFTPid, AccPointers), - leveled_log:log_timer("SFT14", [SFTPid], SW), + ExpPointers = leveled_sst:sst_getslots(SSTPid, AccPointers), lists:append(ExpPointers, AccTail); -maybe_expand_pointer([{next, SFTPid, StartKey}|Tail]) -> - ExpPointer = sst_getkvrange(SFTPid, StartKey, all, ?MERGE_SCANWIDTH), - maybe_expand_pointer(ExpPointer ++ Tail); -maybe_expand_pointer(List) -> - List. - +expand_list_by_pointer({next, SSTPid, StartKey, EndKey}, Tail, Width) -> + ExpPointer = leveled_sst:sst_getkvrange(SSTPid, StartKey, EndKey, Width), + ExpPointer ++ Tail. @@ -787,13 +919,6 @@ maybe_expand_pointer(List) -> -ifdef(TEST). -generate_randomkeys({Count, StartSQN}) -> - BucketNumber = random:uniform(1024), - generate_randomkeys(Count, StartSQN, [], BucketNumber, BucketNumber); -generate_randomkeys(Count) -> - BucketNumber = random:uniform(1024), - generate_randomkeys(Count, 0, [], BucketNumber, BucketNumber). - generate_randomkeys(Seqn, Count, BucketRangeLow, BucketRangeHigh) -> generate_randomkeys(Seqn, Count, @@ -834,8 +959,8 @@ merge_test() -> KVL2 = lists:ukeysort(1, generate_randomkeys(1, N, 1, 20)), KVL3 = lists:ukeymerge(1, KVL1, KVL2), SW0 = os:timestamp(), - {ok, P1, {FK1, LK1}} = sst_new("../test/level1_src", 1, KVL1), - {ok, P2, {FK2, LK2}} = sst_new("../test/level2_src", 2, KVL2), + {ok, P1, {FK1, LK1}} = sst_new("../test/level1_src", 1, KVL1, 6000), + {ok, P2, {FK2, LK2}} = sst_new("../test/level2_src", 2, KVL2, 3000), ExpFK1 = element(1, lists:nth(1, KVL1)), ExpLK1 = element(1, lists:last(KVL1)), ExpFK2 = element(1, lists:nth(1, KVL2)), @@ -850,7 +975,8 @@ merge_test() -> ML1, ML2, false, - 2), + 2, + N * 2), ?assertMatch([], Rem1), ?assertMatch([], Rem2), ?assertMatch(true, FK3 == min(FK1, FK2)), @@ -915,7 +1041,8 @@ simple_slotbinsummary_test() -> AllHashes, 2, FirstKey, - length(KVList1)), + length(KVList1), + undefined), Summary = read_table_summary(SummaryBin), SummaryIndex = leveled_skiplist:from_sortedlist(Summary#summary.index), FetchFun = @@ -945,7 +1072,10 @@ simple_persisted_test() -> KVList1 = lists:ukeysort(1, KVList0), [{FirstKey, _FV}|_Rest] = KVList1, {LastKey, _LV} = lists:last(KVList1), - {ok, Pid, {FirstKey, LastKey}} = sst_new(Filename, 1, KVList1), + {ok, Pid, {FirstKey, LastKey}} = sst_new(Filename, + 1, + KVList1, + length(KVList1)), SW1 = os:timestamp(), lists:foreach(fun({K, V}) -> ?assertMatch({K, V}, sst_get(Pid, K)), @@ -1014,7 +1144,71 @@ simple_persisted_test() -> ?assertMatch(SubKVListA1L, length(FetchedListB2)), ?assertMatch(SubKVListA1, FetchedListB2), + FetchListB3 = sst_getkvrange(Pid, + Eight000Key, + {o, null, null, null}, + 4), + FetchedListB3 = lists:foldl(FoldFun, [], FetchListB3), + SubKVListA3 = lists:nthtail(800 - 1, KVList1), + SubKVListA3L = length(SubKVListA3), + io:format("Length expected ~w~n", [SubKVListA3L]), + ?assertMatch(SubKVListA3L, length(FetchedListB3)), + ?assertMatch(SubKVListA3, FetchedListB3), + ok = sst_close(Pid), ok = file:delete(Filename ++ ".sst"). +key_dominates_test() -> + KV1 = {{o, "Bucket", "Key1", null}, {5, {active, infinity}, 0, []}}, + KV2 = {{o, "Bucket", "Key3", null}, {6, {active, infinity}, 0, []}}, + KV3 = {{o, "Bucket", "Key2", null}, {3, {active, infinity}, 0, []}}, + KV4 = {{o, "Bucket", "Key4", null}, {7, {active, infinity}, 0, []}}, + KV5 = {{o, "Bucket", "Key1", null}, {4, {active, infinity}, 0, []}}, + KV6 = {{o, "Bucket", "Key1", null}, {99, {tomb, 999}, 0, []}}, + KV7 = {{o, "Bucket", "Key1", null}, {99, tomb, 0, []}}, + KL1 = [KV1, KV2], + KL2 = [KV3, KV4], + ?assertMatch({{next_key, KV1}, [KV2], KL2}, + key_dominates(KL1, KL2, {undefined, 1})), + ?assertMatch({{next_key, KV1}, KL2, [KV2]}, + key_dominates(KL2, KL1, {undefined, 1})), + ?assertMatch({skipped_key, KL2, KL1}, + key_dominates([KV5|KL2], KL1, {undefined, 1})), + ?assertMatch({{next_key, KV1}, [KV2], []}, + key_dominates(KL1, [], {undefined, 1})), + ?assertMatch({skipped_key, [KV6|KL2], [KV2]}, + key_dominates([KV6|KL2], KL1, {undefined, 1})), + ?assertMatch({{next_key, KV6}, KL2, [KV2]}, + key_dominates([KV6|KL2], [KV2], {undefined, 1})), + ?assertMatch({skipped_key, [KV6|KL2], [KV2]}, + key_dominates([KV6|KL2], KL1, {true, 1})), + ?assertMatch({skipped_key, [KV6|KL2], [KV2]}, + key_dominates([KV6|KL2], KL1, {true, 1000})), + ?assertMatch({{next_key, KV6}, KL2, [KV2]}, + key_dominates([KV6|KL2], [KV2], {true, 1})), + ?assertMatch({skipped_key, KL2, [KV2]}, + key_dominates([KV6|KL2], [KV2], {true, 1000})), + ?assertMatch({skipped_key, [], []}, + key_dominates([KV6], [], {true, 1000})), + ?assertMatch({skipped_key, [], []}, + key_dominates([], [KV6], {true, 1000})), + ?assertMatch({{next_key, KV6}, [], []}, + key_dominates([KV6], [], {true, 1})), + ?assertMatch({{next_key, KV6}, [], []}, + key_dominates([], [KV6], {true, 1})), + ?assertMatch({skipped_key, [], []}, + key_dominates([KV7], [], {true, 1})), + ?assertMatch({skipped_key, [], []}, + key_dominates([], [KV7], {true, 1})), + ?assertMatch({skipped_key, [KV7|KL2], [KV2]}, + key_dominates([KV7|KL2], KL1, {undefined, 1})), + ?assertMatch({{next_key, KV7}, KL2, [KV2]}, + key_dominates([KV7|KL2], [KV2], {undefined, 1})), + ?assertMatch({skipped_key, [KV7|KL2], [KV2]}, + key_dominates([KV7|KL2], KL1, {true, 1})), + ?assertMatch({skipped_key, KL2, [KV2]}, + key_dominates([KV7|KL2], [KV2], {true, 1})). + + + -endif. \ No newline at end of file