diff --git a/src/leveled_bst.erl b/src/leveled_bst.erl deleted file mode 100644 index 9253667..0000000 --- a/src/leveled_bst.erl +++ /dev/null @@ -1,350 +0,0 @@ -%% -%% This module provides functions for managing bst files - a modified version -%% of sst files, to be used in leveleddb. -%% bst files are broken into the following sections: -%% - Header (fixed width 32 bytes - containing pointers and metadata) -%% - Summaries (variable length) -%% - Blocks (variable length) -%% - Slots (variable length) -%% - Footer (variable length - contains slot index and helper metadata) -%% -%% The 64-byte header is made up of -%% - 1 byte version (major 5 bits, minor 3 bits) - default 0.1 -%% - 1 byte state bits (1 bit to indicate mutability, 1 for use of compression) -%% - 4 bytes summary length -%% - 4 bytes blocks length -%% - 4 bytes slots length -%% - 4 bytes footer position -%% - 4 bytes slot list length -%% - 4 bytes helper length -%% - 34 bytes spare for future options -%% - 4 bytes CRC (header) -%% -%% A key in the file is a tuple of {Key, Value/Metadata, Sequence #, State} -%% - Keys are themselves tuples, and all entries must be added to the bst -%% in key order -%% - Metadata can be null or some fast-access information that may be required -%% in preference to the full value (e.g. vector clocks, hashes). This could -%% be a value instead of Metadata should the file be used in an alternate -%% - Sequence numbers is the integer representing the order which the item -%% was added to the overall database -%% - State can be tomb (for tombstone), active or {timestamp, TS} -%% -%% The Blocks is a series of blocks of: -%% - 4 byte block length -%% - variable-length compressed list of 32 keys & values -%% - 4 byte CRC for block -%% There will be up to 4000 blocks in a single bst file -%% -%% The slots is a series of references -%% - 4 byte bloom-filter length -%% - 4 byte key-helper length -%% - a variable-length compressed bloom filter for all keys in slot (approx 3KB) -%% - 64 ordered variable-length key helpers pointing to first key in each -%% block (in slot) of the form Key Length, Key, Block Position -%% - 4 byte CRC for the slot -%% - ulitmately a slot covers 64 x 32 = 2048 keys -%% -%% The slot index in the footer is made up of up to 64 ordered keys and -%% pointers, with the key being a key at the start of each slot -%% - 1 byte value showing number of keys in slot index -%% - 64 x Key Length (4 byte), Key, Position (4 byte) indexes -%% - 4 bytes CRC for the index -%% -%% The format of the file is intended to support quick lookups, whilst -%% allowing for a new file to be written incrementally (so that all keys and -%% values need not be retained in memory) - perhaps n blocks at a time - - --module(leveled_bst). - --export([start_file/1, convert_header/1, append_slot/4]). - --include_lib("eunit/include/eunit.hrl"). - --define(WORD_SIZE, 4). --define(DWORD_SIZE, 8). --define(CURRENT_VERSION, {0,1}). --define(SLOT_COUNT, 64). --define(BLOCK_SIZE, 32). --define(SLOT_SIZE, 64). --define(FOOTERPOS_HEADERPOS, 2). - - --record(metadata, {version = ?CURRENT_VERSION :: tuple(), - mutable = false :: true | false, - compressed = true :: true | false, - slot_index, - open_slot :: integer(), - cache :: tuple(), - smallest_key :: tuple(), - largest_key :: tuple(), - smallest_sqn :: integer(), - largest_sqn :: integer() - }). - --record(object, {key :: tuple(), - value, - sequence_numb :: integer(), - state}). - -%% Start a bare file with an initial header and no further details -%% Return the {Handle, metadata record} -start_file(FileName) when is_list(FileName) -> - {ok, Handle} = file:open(FileName, [binary, raw, read, write]), - start_file(Handle); -start_file(Handle) -> - Header = create_header(initial), - {ok, _} = file:position(Handle, bof), - ok = file:write(Handle, Header), - {Version, {M, C}, _, _} = convert_header(Header), - FileMD = #metadata{version = Version, mutable = M, compressed = C, - slot_index = array:new(?SLOT_COUNT), open_slot = 0}, - {Handle, FileMD}. - - -create_header(initial) -> - {Major, Minor} = ?CURRENT_VERSION, - Version = <>, - State = <<0:6, 1:1, 1:1>>, % Mutable and compressed - Lengths = <<0:32, 0:32, 0:32>>, - Options = <<0:112>>, - H1 = <>, - CRC32 = erlang:crc32(H1), - <

>. - - -convert_header(Header) -> - <> = Header, - case erlang:crc32(H1) of - CRC32 -> - <> = H1, - case {Major, Minor} of - {0, 1} -> - convert_header_v01(H1); - _ -> - unknown_version - end; - _ -> - crc_mismatch - end. - -convert_header_v01(Header) -> - <<_:8, 0:6, Mutable:1, Comp:1, - FooterP:32/integer, SlotLng:32/integer, HlpLng:32/integer, - _/binary>> = Header, - case Mutable of - 1 -> M = true; - 0 -> M = false - end, - case Comp of - 1 -> C = true; - 0 -> C = false - end, - {{0, 1}, {M, C}, {FooterP, SlotLng, HlpLng}, none}. - - -%% Append a slot of blocks to the end file, and update the slot index in the -%% file metadata - -append_slot(Handle, SortedKVList, SlotCount, FileMD) -> - {ok, SlotPos} = file:position(Handle, eof), - {KeyList, BlockIndexBin, BlockBin} = add_blocks(SortedKVList), - ok = file:write(Handle, BlockBin), - [TopObject|_] = SortedKVList, - BloomBin = leveled_rice:create_bloom(KeyList), - SlotIndex = array:set(SlotCount, - {TopObject#object.key, BloomBin, BlockIndexBin, SlotPos}, - FileMD#metadata.slot_index), - {Handle, FileMD#metadata{slot_index=SlotIndex}}. - -append_slot_index(Handle, _FileMD=#metadata{slot_index=SlotIndex}) -> - {ok, FooterPos} = file:position(Handle, eof), - SlotBin1 = <>, - SlotBin2 = array:foldl(fun slot_folder_write/3, SlotBin1, SlotIndex), - CRC = erlang:crc32(SlotBin2), - SlotBin3 = <>, - ok = file:write(Handle, SlotBin3), - SlotLength = byte_size(SlotBin3), - Header = <>, - ok = file:pwrite(Handle, ?FOOTERPOS_HEADERPOS, Header). - -slot_folder_write(_Index, undefined, Bin) -> - Bin; -slot_folder_write(_Index, {ObjectKey, _, _, SlotPos}, Bin) -> - KeyBin = serialise_key(ObjectKey), - KeyLen = byte_size(KeyBin), - <>. - -slot_folder_read(<<>>, SlotIndex, SlotCount) -> - io:format("Slot index read with count=~w slots~n", [SlotCount]), - SlotIndex; -slot_folder_read(SlotIndexBin, SlotIndex, SlotCount) -> - <> = SlotIndexBin, - <> = Tail1, - slot_folder_read(Tail2, - array:set(SlotCount, {load_key(KeyBin), null, null, SlotPos}, SlotIndex), - SlotCount + 1). - -read_slot_index(SlotIndexBin) -> - <> = SlotIndexBin, - case erlang:crc32(SlotIndexBin2) of - CRC -> - <> = SlotIndexBin2, - CleanSlotIndex = array:new(SlotCount), - SlotIndex = slot_folder_read(SlotIndexBin3, CleanSlotIndex, 0), - {ok, SlotIndex}; - _ -> - {error, crc_wonky} - end. - -find_slot_index(Handle) -> - {ok, SlotIndexPtr} = file:pread(Handle, ?FOOTERPOS_HEADERPOS, ?DWORD_SIZE), - <> = SlotIndexPtr, - {ok, SlotIndexBin} = file:pread(Handle, FooterPos, SlotIndexLength), - SlotIndexBin. - - -read_blockindex(Handle, Position) -> - {ok, _FilePos} = file:position(Handle, Position), - {ok, <>} = file:read(Handle, 4), - io:format("block length is ~w~n", [BlockLength]), - {ok, BlockBin} = file:read(Handle, BlockLength), - CheckLessBlockLength = BlockLength - 4, - <> = BlockBin, - case erlang:crc32(Block) of - CRC -> - <> = Block, - <> = Tail, - {ok, BloomFilter, KeyHelper}; - _ -> - {error, crc_wonky} - end. - - -add_blocks(SortedKVList) -> - add_blocks(SortedKVList, [], [], [], 0). - -add_blocks([], KeyList, BlockIndex, BlockBinList, _) -> - {KeyList, serialise_blockindex(BlockIndex), list_to_binary(BlockBinList)}; -add_blocks(SortedKVList, KeyList, BlockIndex, BlockBinList, Position) -> - case length(SortedKVList) of - KeyCount when KeyCount >= ?BLOCK_SIZE -> - {TopBlock, Rest} = lists:split(?BLOCK_SIZE, SortedKVList); - KeyCount -> - {TopBlock, Rest} = lists:split(KeyCount, SortedKVList) - end, - [TopKey|_] = TopBlock, - TopBin = serialise_block(TopBlock), - add_blocks(Rest, add_to_keylist(KeyList, TopBlock), - [{TopKey, Position}|BlockIndex], - [TopBin|BlockBinList], Position + byte_size(TopBin)). - -add_to_keylist(KeyList, []) -> - KeyList; -add_to_keylist(KeyList, [TopKV|Rest]) -> - add_to_keylist([map_keyforbloom(TopKV)|KeyList], Rest). - -map_keyforbloom(_Object=#object{key=Key}) -> - Key. - - -serialise_blockindex(BlockIndex) -> - serialise_blockindex(BlockIndex, <<>>). - -serialise_blockindex([], BlockBin) -> - BlockBin; -serialise_blockindex([TopIndex|Rest], BlockBin) -> - {Key, BlockPos} = TopIndex, - KeyBin = serialise_key(Key), - KeyLength = byte_size(KeyBin), - serialise_blockindex(Rest, - <>). - -serialise_block(Block) -> - term_to_binary(Block). - -serialise_key(Key) -> - term_to_binary(Key). - -load_key(KeyBin) -> - binary_to_term(KeyBin). - - -%%%%%%%%%%%%%%%% -% T E S T -%%%%%%%%%%%%%%% - -create_sample_kv(Prefix, Counter) -> - Key = {o, "Bucket1", lists:concat([Prefix, Counter])}, - Object = #object{key=Key, value=null, - sequence_numb=random:uniform(1000000), state=active}, - Object. - -create_ordered_kvlist(KeyList, 0) -> - KeyList; -create_ordered_kvlist(KeyList, Length) -> - KV = create_sample_kv("Key", Length), - create_ordered_kvlist([KV|KeyList], Length - 1). - - -empty_header_test() -> - Header = create_header(initial), - ?assertMatch(32, byte_size(Header)), - <> = Header, - ?assertMatch({0, 1}, {Major, Minor}), - {Version, State, Lengths, Options} = convert_header(Header), - ?assertMatch({0, 1}, Version), - ?assertMatch({true, true}, State), - ?assertMatch({0, 0, 0}, Lengths), - ?assertMatch(none, Options). - -bad_header_test() -> - Header = create_header(initial), - <<_:1/binary, Rest/binary >> = Header, - HdrDetails1 = convert_header(<<0:5/integer, 2:3/integer, Rest/binary>>), - ?assertMatch(crc_mismatch, HdrDetails1), - <<_:1/binary, RestToCRC:27/binary, _:32/integer>> = Header, - NewHdr1 = <<0:5/integer, 2:3/integer, RestToCRC/binary>>, - CRC32 = erlang:crc32(NewHdr1), - NewHdr2 = <>, - ?assertMatch(unknown_version, convert_header(NewHdr2)). - -record_onstartfile_test() -> - {_, FileMD} = start_file("onstartfile.bst"), - ?assertMatch({0, 1}, FileMD#metadata.version), - ok = file:delete("onstartfile.bst"). - -append_initialblock_test() -> - {Handle, FileMD} = start_file("onstartfile.bst"), - KVList = create_ordered_kvlist([], 2048), - Key1 = {o, "Bucket1", "Key1"}, - [TopObj|_] = KVList, - ?assertMatch(Key1, TopObj#object.key), - {_, UpdFileMD} = append_slot(Handle, KVList, 0, FileMD), - {TopKey1, BloomBin, _, _} = array:get(0, UpdFileMD#metadata.slot_index), - io:format("top key of ~w~n", [TopKey1]), - ?assertMatch(Key1, TopKey1), - ?assertMatch(true, leveled_rice:check_key(Key1, BloomBin)), - ?assertMatch(false, leveled_rice:check_key("OtherKey", BloomBin)), - ok = file:delete("onstartfile.bst"). - -append_initialslotindex_test() -> - {Handle, FileMD} = start_file("onstartfile.bst"), - KVList = create_ordered_kvlist([], 2048), - {_, UpdFileMD} = append_slot(Handle, KVList, 0, FileMD), - append_slot_index(Handle, UpdFileMD), - SlotIndexBin = find_slot_index(Handle), - {ok, SlotIndex} = read_slot_index(SlotIndexBin), - io:format("slot index is ~w ~n", [SlotIndex]), - TopItem = array:get(0, SlotIndex), - io:format("top item in slot index is ~w~n", [TopItem]), - {ok, BloomFilter, KeyHelper} = read_blockindex(Handle, 32), - ?assertMatch(true, false), - ok = file:delete("onstartfile.bst"). - - - diff --git a/src/leveled_keymanager.erl b/src/leveled_concierge.erl similarity index 65% rename from src/leveled_keymanager.erl rename to src/leveled_concierge.erl index b08b3ae..4147a3a 100644 --- a/src/leveled_keymanager.erl +++ b/src/leveled_concierge.erl @@ -1,4 +1,4 @@ -%% The manager is responsible for controlling access to the store and +%% The concierge is responsible for controlling access to the store and %% maintaining both an in-memory view and a persisted state of all the sft %% files in use across the store. %% @@ -34,15 +34,18 @@ %% will call the manifets manager on a timeout to confirm that they are no %% longer in use (by any iterators). %% -%% If there is an iterator request, the manager will simply handoff a copy of -%% the manifest, and register the interest of the iterator at the manifest -%% sequence number at the time of the request. Iterators should de-register -%% themselves from the manager on completion. Iterators should be +%% If there is a iterator/snapshot request, the concierge will simply handoff a +%% copy of the manifest, and register the interest of the iterator at the +%% manifest sequence number at the time of the request. Iterators should +%% de-register themselves from the manager on completion. Iterators should be %% automatically release after a timeout period. A file can be deleted if %% there are no registered iterators from before the point the file was %% removed from the manifest. +%% + + --module(leveled_keymanager). +-module(leveled_concierge). %% -behaviour(gen_server). @@ -50,16 +53,19 @@ -include_lib("eunit/include/eunit.hrl"). --define(LEVEL_SCALEFACTOR, [0, 8, 64, 512, - 4096, 32768, 262144, infinity]). +-define(LEVEL_SCALEFACTOR, [{0, 0}, {1, 8}, {2, 64}, {3, 512}, + {4, 4096}, {5, 32768}, {6, 262144}, {7, infinity}]). -define(MAX_LEVELS, 8). -define(MAX_WORK_WAIT, 300). +-define(MANIFEST_FP, "manifest"). +-define(FILES_FP, "files"). -record(state, {level_fileref :: list(), ongoing_work :: list(), manifest_sqn :: integer(), registered_iterators :: list(), - unreferenced_files :: list()}). + unreferenced_files :: list(), + root_path :: string()}). %% Work out what the current work queue should be @@ -92,8 +98,8 @@ return_work(State, From) -> assess_workqueue(WorkQ, ?MAX_LEVELS - 1, _LevelFileRef, _OngoingWork) -> WorkQ; assess_workqueue(WorkQ, LevelToAssess, LevelFileRef, OngoingWork)-> - MaxFiles = get_item(LevelToAssess + 1, ?LEVEL_SCALEFACTOR, 0), - FileCount = length(get_item(LevelToAssess + 1, LevelFileRef, [])), + MaxFiles = get_item(LevelToAssess, ?LEVEL_SCALEFACTOR, 0), + FileCount = length(get_item(LevelToAssess, LevelFileRef, [])), NewWQ = maybe_append_work(WorkQ, LevelToAssess, LevelFileRef, MaxFiles, FileCount, OngoingWork), assess_workqueue(NewWQ, LevelToAssess + 1, LevelFileRef, OngoingWork). @@ -111,8 +117,8 @@ maybe_append_work(WorkQ, Level, LevelFileRef, WorkQ; false -> lists:append(WorkQ, [{Level, - get_item(Level + 1, LevelFileRef, []), - get_item(Level + 2, LevelFileRef, [])}]) + get_item(Level, LevelFileRef, []), + get_item(Level + 1, LevelFileRef, [])}]) end; maybe_append_work(WorkQ, Level, _LevelFileRef, _MaxFiles, FileCount, _OngoingWork) -> @@ -121,10 +127,13 @@ maybe_append_work(WorkQ, Level, _LevelFileRef, WorkQ. -get_item(Index, List, Default) when Index > length(List) -> - Default; -get_item(Index, List, _Default) -> - lists:nth(Index, List). +get_item(Index, List, Default) -> + case lists:keysearch(Index, 1, List) of + {value, {Index, Value}} -> + Value; + false -> + Default + end. %% Request a manifest change @@ -135,15 +144,16 @@ get_item(Index, List, _Default) -> %% - Update the Manifest Sequence Number (msn) %% - Confirm this Pid has a current element of manifest work outstanding at %% that level -%% - Rename the manifest file created under the MergeID at the sink Level -%% to be the current manifest file (current..sink) -%% (Note than on startup if the highest msn in all the current. files for that -%% level is a sink file, it must be confirmed that th elevel above is at the -%% same or higher msn. If not the next lowest current..sink must be -%% chosen. This avoids inconsistency on crash between these steps - although -%% the inconsistency would have been survivable) -%% - Rename the manifest file created under the MergeID at the source levl -%% to the current manifest file (current..src) +%% - Rename the manifest file created under the MergeID (.) +%% at the sink Level to be the current manifest file (current_.) +%% -------- NOTE -------- +%% If there is a crash between these two points, the K/V data that has been +%% merged from the source level will now be in both the source and the sink +%% level. Therefore in store operations this potential duplication must be +%% handled. +%% -------- NOTE -------- +%% - Rename the manifest file created under the MergeID (.) +%% at the source level to the current manifest file (current_.) %% - Update the state of the LevelFileRef lists %% - Add the ClearedFiles to the list of files to be cleared (as a tuple with %% the new msn) @@ -153,38 +163,65 @@ commit_manifest_change(SrcLevel, NewSrcMan, NewSnkMan, ClearedFiles, MergeID, From, State) -> NewMSN = State#state.manifest_sqn + 1, OngoingWork = State#state.ongoing_work, + RootPath = State#state.root_path, SnkLevel = SrcLevel + 1, case {lists:keyfind(SrcLevel, 1, OngoingWork), lists:keyfind(SrcLevel + 1, 1, OngoingWork)} of {{SrcLevel, From, TS}, {SnkLevel, From, TS}} -> io:format("Merge ~s was a success in ~w microseconds", [MergeID, timer:diff_now(os:timestamp(), TS)]), - _OutstandingWork = lists:keydelete(SnkLevel, 1, + OutstandingWork = lists:keydelete(SnkLevel, 1, lists:keydelete(SrcLevel, 1, OngoingWork)), - rename_manifest_file(MergeID, sink, NewMSN, SnkLevel), - rename_manifest_file(MergeID, src, NewMSN, SrcLevel), - _NewLFR = update_levelfileref(NewSrcMan, + ok = rename_manifest_files(RootPath, MergeID, + NewMSN, SrcLevel, SnkLevel), + NewLFR = update_levelfileref(NewSrcMan, NewSnkMan, SrcLevel, State#state.level_fileref), - _UnreferencedFiles = update_deletions(ClearedFiles, + UnreferencedFiles = update_deletions(ClearedFiles, NewMSN, State#state.unreferenced_files), - ok; + io:format("Merge ~s has been commmitted at sequence number ~w~n", + [MergeID, NewMSN]), + {ok, State#state{ongoing_work=OutstandingWork, + manifest_sqn=NewMSN, + level_fileref=NewLFR, + unreferenced_files=UnreferencedFiles}}; _ -> - error + io:format("Merge commit ~s not matched to known work~n", + [MergeID]), + {error, State} end. -rename_manifest_file(_MergeID, _SrcOrSink, _NewMSN, _Level) -> +rename_manifest_files(RootPath, MergeID, NewMSN, SrcLevel, SnkLevel) -> + ManifestFP = RootPath ++ "/" ++ ?MANIFEST_FP ++ "/", + ok = file:rename(ManifestFP ++ MergeID + ++ "." ++ integer_to_list(SnkLevel), + ManifestFP ++ "current_" ++ integer_to_list(SnkLevel) + ++ "." ++ integer_to_list(NewMSN)), + ok = file:rename(ManifestFP ++ MergeID + ++ "." ++ integer_to_list(SrcLevel), + ManifestFP ++ "current_" ++ integer_to_list(SrcLevel) + ++ "." ++ integer_to_list(NewMSN)), ok. -update_levelfileref(_NewSrcMan, _NewSinkMan, _SrcLevel, CurrLFR) -> - CurrLFR. +update_levelfileref(NewSrcMan, NewSinkMan, SrcLevel, CurrLFR) -> + lists:keyreplace(SrcLevel + 1, + 1, + lists:keyreplace(SrcLevel, + 1, + CurrLFR, + {SrcLevel, NewSrcMan}), + {SrcLevel + 1, NewSinkMan}). -update_deletions(_ClearedFiles, _NewMSN, UnreferencedFiles) -> - UnreferencedFiles. +update_deletions([], _NewMSN, UnreferencedFiles) -> + UnreferencedFiles; +update_deletions([ClearedFile|Tail], MSN, UnreferencedFiles) -> + update_deletions(Tail, + MSN, + lists:append(UnreferencedFiles, [{ClearedFile, MSN}])). %%%============================================================================ %%% Test @@ -195,7 +232,7 @@ compaction_work_assessment_test() -> L0 = [{{o, "B1", "K1"}, {o, "B3", "K3"}, dummy_pid}], L1 = [{{o, "B1", "K1"}, {o, "B2", "K2"}, dummy_pid}, {{o, "B2", "K3"}, {o, "B4", "K4"}, dummy_pid}], - LevelFileRef = [L0, L1], + LevelFileRef = [{0, L0}, {1, L1}], OngoingWork1 = [], WorkQ1 = assess_workqueue([], 0, LevelFileRef, OngoingWork1), ?assertMatch(WorkQ1, [{0, L0, L1}]), @@ -210,11 +247,11 @@ compaction_work_assessment_test() -> {{o, "B9", "K0001"}, {o, "B9", "K9999"}, dummy_pid}, {{o, "BA", "K0001"}, {o, "BA", "K9999"}, dummy_pid}, {{o, "BB", "K0001"}, {o, "BB", "K9999"}, dummy_pid}]), - WorkQ3 = assess_workqueue([], 0, [[], L1Alt], OngoingWork1), + WorkQ3 = assess_workqueue([], 0, [{0, []}, {1, L1Alt}], OngoingWork1), ?assertMatch(WorkQ3, [{1, L1Alt, []}]), - WorkQ4 = assess_workqueue([], 0, [[], L1Alt], OngoingWork2), + WorkQ4 = assess_workqueue([], 0, [{0, []}, {1, L1Alt}], OngoingWork2), ?assertMatch(WorkQ4, [{1, L1Alt, []}]), OngoingWork3 = lists:append(OngoingWork2, [{1, dummy_pid, os:timestamp()}]), - WorkQ5 = assess_workqueue([], 0, [[], L1Alt], OngoingWork3), + WorkQ5 = assess_workqueue([], 0, [{0, []}, {1, L1Alt}], OngoingWork3), ?assertMatch(WorkQ5, []). diff --git a/src/leveled_worker.erl b/src/leveled_housekeeping.erl similarity index 99% rename from src/leveled_worker.erl rename to src/leveled_housekeeping.erl index 50b3faf..3d6b8c1 100644 --- a/src/leveled_worker.erl +++ b/src/leveled_housekeeping.erl @@ -2,7 +2,7 @@ %% level and cleaning out of old files across a level --module(leveled_worker). +-module(leveled_housekeeping). -export([merge_file/3, perform_merge/3]).