From 2a76eb364e731dbb122a024441c542f6d9df2803 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Mon, 5 Sep 2016 15:01:23 +0100 Subject: [PATCH] Inker - Initial Code An attempt to get a first inker that can build a ledger from a manifest as well as support simple get and put operations. Basic tests surround the building of manifests only at this stage - more work required for get and put. --- src/leveled_cdb.erl | 215 +++++++++++---- src/leveled_inker.erl | 555 +++++++++++++++++++++++++++++++++++++- src/leveled_penciller.erl | 41 +-- 3 files changed, 745 insertions(+), 66 deletions(-) diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index e2c266f..3557609 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -16,9 +16,9 @@ %% %% This is to be used in eleveledb, and in this context: %% - Keys will be a Sequence Number -%% - Values will be a Checksum; Pointers (length * 3); Key; [Metadata]; [Value] -%% where the pointers can be used to extract just part of the value -%% (i.e. metadata only) +%% - Values will be a Checksum | Object | KeyAdditions +%% Where the KeyAdditions are all the Key changes required to be added to the +%% ledger to complete the changes (the addition of postings and tombstones). %% %% This module provides functions to create and query a CDB (constant database). %% A CDB implements a two-level hashtable which provides fast {key,value} @@ -58,7 +58,11 @@ cdb_open_reader/1, cdb_get/2, cdb_put/3, - cdb_close/1]). + cdb_lastkey/1, + cdb_filename/1, + cdb_keycheck/2, + cdb_close/1, + cdb_complete/1]). -include_lib("eunit/include/eunit.hrl"). @@ -70,8 +74,8 @@ -record(state, {hashtree, last_position :: integer(), - smallest_sqn :: integer(), - highest_sqn :: integer(), + last_key = empty, + hash_index = [] :: list(), filename :: string(), handle :: file:fd(), writer :: boolean}). @@ -108,6 +112,22 @@ cdb_put(Pid, Key, Value) -> cdb_close(Pid) -> gen_server:call(Pid, cdb_close, infinity). +cdb_complete(Pid) -> + gen_server:call(Pid, cdb_complete, infinity). + +%% Get the last key to be added to the file (which will have the highest +%% sequence number) +cdb_lastkey(Pid) -> + gen_server:call(Pid, cdb_lastkey, infinity). + +%% Get the filename of the database +cdb_filename(Pid) -> + gen_server:call(Pid, cdb_filename, infinity). + +%% Check to see if the key is probably present, will return either +%% probably or missing. Does not do a definitive check +cdb_keycheck(Pid, Key) -> + gen_server:call(Pid, {cdb_keycheck, Key}, infinity). %%%============================================================================ %%% gen_server callbacks @@ -118,29 +138,59 @@ init([]) -> handle_call({cdb_open_writer, Filename}, _From, State) -> io:format("Opening file for writing with filename ~s~n", [Filename]), - {LastPosition, HashTree} = open_active_file(Filename), + {LastPosition, HashTree, LastKey} = open_active_file(Filename), {ok, Handle} = file:open(Filename, [binary, raw, read, write, delayed_write]), {reply, ok, State#state{handle=Handle, last_position=LastPosition, + last_key=LastKey, filename=Filename, hashtree=HashTree, writer=true}}; handle_call({cdb_open_reader, Filename}, _From, State) -> io:format("Opening file for reading with filename ~s~n", [Filename]), {ok, Handle} = file:open(Filename, [binary, raw, read]), + Index = load_index(Handle), {reply, ok, State#state{handle=Handle, filename=Filename, - writer=false}}; + writer=false, + hash_index=Index}}; handle_call({cdb_get, Key}, _From, State) -> - case State#state.writer of - true -> + case {State#state.writer, State#state.hash_index} of + {true, _} -> {reply, get_mem(Key, State#state.handle, State#state.hashtree), State}; - false -> + {false, []} -> {reply, get(State#state.handle, Key), + State}; + {false, Cache} -> + {reply, + get_withcache(State#state.handle, Key, Cache), + State} + end; +handle_call({cdb_keycheck, Key}, _From, State) -> + case {State#state.writer, State#state.hash_index} of + {true, _} -> + {reply, + get_mem(Key, + State#state.handle, + State#state.hashtree, + loose_presence), + State}; + {false, []} -> + {reply, + get(State#state.handle, + Key, + loose_presence), + State}; + {false, Cache} -> + {reply, + get(State#state.handle, + Key, + loose_presence, + Cache), State} end; handle_call({cdb_put, Key, Value}, _From, State) -> @@ -151,10 +201,12 @@ handle_call({cdb_put, Key, Value}, _From, State) -> {State#state.last_position, State#state.hashtree}), case Result of roll -> + %% Key and value could not be written {reply, roll, State}; {UpdHandle, NewPosition, HashTree} -> {reply, ok, State#state{handle=UpdHandle, last_position=NewPosition, + last_key=Key, hashtree=HashTree}} end; false -> @@ -162,17 +214,31 @@ handle_call({cdb_put, Key, Value}, _From, State) -> {error, read_only}, State} end; +handle_call(cdb_lastkey, _From, State) -> + {reply, State#state.last_key, State}; +handle_call(cdb_filename, _From, State) -> + {reply, State#state.filename, State}; handle_call(cdb_close, _From, State) -> + ok = file:close(State#state.handle), + {stop, normal, ok, State}; +handle_call(cdb_complete, _From, State) -> case State#state.writer of true -> ok = close_file(State#state.handle, State#state.hashtree, - State#state.last_position); + State#state.last_position), + %% Rename file + NewName = filename:rootname(State#state.filename, ".pnd") + ++ ".cdb", + io:format("Renaming file from ~s to ~s~n", [State#state.filename, NewName]), + ok = file:rename(State#state.filename, NewName), + {stop, normal, {ok, NewName}, State}; false -> - ok = file:close(State#state.handle) - end, - {stop, normal, ok, State}. - + ok = file:close(State#state.handle), + {stop, normal, {ok, State#state.filename}, State} + end. + + handle_cast(_Msg, State) -> {noreply, State}. @@ -260,7 +326,7 @@ dump(FileName, CRCCheck) -> open_active_file(FileName) when is_list(FileName) -> {ok, Handle} = file:open(FileName, [binary, raw, read, write]), {ok, Position} = file:position(Handle, {bof, 256*?DWORD_SIZE}), - {LastPosition, HashTree} = scan_over_file(Handle, Position), + {LastPosition, HashTree, LastKey} = scan_over_file(Handle, Position), case file:position(Handle, eof) of {ok, LastPosition} -> ok = file:close(Handle); @@ -272,7 +338,7 @@ open_active_file(FileName) when is_list(FileName) -> ok = file:truncate(Handle), ok = file:close(Handle) end, - {LastPosition, HashTree}. + {LastPosition, HashTree, LastKey}. %% put(Handle, Key, Value, {LastPosition, HashDict}) -> {NewPosition, KeyDict} %% Append to an active file a new key/value pair returning an updated @@ -298,18 +364,22 @@ put(Handle, Key, Value, {LastPosition, HashTree}) -> %% get(FileName,Key) -> {key,value} %% Given a filename and a key, returns a key and value tuple. %% +get_withcache(Handle, Key, Cache) -> + get(Handle, Key, ?CRC_CHECK, Cache). + get(FileNameOrHandle, Key) -> get(FileNameOrHandle, Key, ?CRC_CHECK). -get(FileName, Key, CRCCheck) when is_list(FileName), is_list(Key) -> +get(FileNameOrHandle, Key, CRCCheck) -> + get(FileNameOrHandle, Key, CRCCheck, no_cache). + +get(FileName, Key, CRCCheck, Cache) when is_list(FileName), is_list(Key) -> {ok,Handle} = file:open(FileName,[binary, raw, read]), - get(Handle,Key, CRCCheck); -get(Handle, Key, CRCCheck) when is_tuple(Handle), is_list(Key) -> + get(Handle,Key, CRCCheck, Cache); +get(Handle, Key, CRCCheck, Cache) when is_tuple(Handle), is_list(Key) -> Hash = hash(Key), Index = hash_to_index(Hash), - {ok,_} = file:position(Handle, {bof, ?DWORD_SIZE * Index}), - % Get location of hashtable and number of entries in the hash - {HashTable, Count} = read_next_2_integers(Handle), + {HashTable, Count} = get_index(Handle, Index, Cache), % If the count is 0 for that index - key must be missing case Count of 0 -> @@ -326,14 +396,32 @@ get(Handle, Key, CRCCheck) when is_tuple(Handle), is_list(Key) -> search_hash_table(Handle, lists:append(L2, L1), Hash, Key, CRCCheck) end. +get_index(Handle, Index, no_cache) -> + {ok,_} = file:position(Handle, {bof, ?DWORD_SIZE * Index}), + % Get location of hashtable and number of entries in the hash + read_next_2_integers(Handle); +get_index(_Handle, Index, Cache) -> + lists:keyfind(Index, 1, Cache). + %% Get a Key/Value pair from an active CDB file (with no hash table written) %% This requires a key dictionary to be passed in (mapping keys to positions) %% Will return {Key, Value} or missing -get_mem(Key, Filename, HashTree) when is_list(Filename) -> +get_mem(Key, FNOrHandle, HashTree) -> + get_mem(Key, FNOrHandle, HashTree, ?CRC_CHECK). + +get_mem(Key, Filename, HashTree, CRCCheck) when is_list(Filename) -> {ok, Handle} = file:open(Filename, [binary, raw, read]), - get_mem(Key, Handle, HashTree); -get_mem(Key, Handle, HashTree) -> - extract_kvpair(Handle, get_hashtree(Key, HashTree), Key). + get_mem(Key, Handle, HashTree, CRCCheck); +get_mem(Key, Handle, HashTree, CRCCheck) -> + ListToCheck = get_hashtree(Key, HashTree), + case {CRCCheck, ListToCheck} of + {loose_presence, []} -> + missing; + {loose_presence, _L} -> + probably; + _ -> + extract_kvpair(Handle, ListToCheck, Key, CRCCheck) + end. %% Get the next key at a position in the file (or the first key if no position %% is passed). Will return both a key and the next position @@ -433,6 +521,15 @@ fold_keys(Handle, FoldFun, Acc0) -> %% Internal functions %%%%%%%%%%%%%%%%%%%% +load_index(Handle) -> + Index = lists:seq(0, 255), + lists:map(fun(X) -> + file:position(Handle, {bof, ?DWORD_SIZE * X}), + {HashTablePos, Count} = read_next_2_integers(Handle), + {X, {HashTablePos, Count}} end, + Index). + + %% Take an active file and write the hash details necessary to close that %% file and roll a new active file if requested. %% @@ -473,9 +570,6 @@ put_hashtree(Key, Position, HashTree) -> %% Function to extract a Key-Value pair given a file handle and a position %% Will confirm that the key matches and do a CRC check when requested -extract_kvpair(Handle, Positions, Key) -> - extract_kvpair(Handle, Positions, Key, ?CRC_CHECK). - extract_kvpair(_, [], _, _) -> missing; extract_kvpair(Handle, [Position|Rest], Key, Check) -> @@ -497,12 +591,12 @@ extract_kvpair(Handle, [Position|Rest], Key, Check) -> %% at that point return the position and the key dictionary scanned so far scan_over_file(Handle, Position) -> HashTree = array:new(256, {default, gb_trees:empty()}), - scan_over_file(Handle, Position, HashTree). + scan_over_file(Handle, Position, HashTree, empty). -scan_over_file(Handle, Position, HashTree) -> +scan_over_file(Handle, Position, HashTree, LastKey) -> case saferead_keyvalue(Handle) of false -> - {Position, HashTree}; + {Position, HashTree, LastKey}; {Key, ValueAsBin, KeyLength, ValueLength} -> case crccheck_value(ValueAsBin) of true -> @@ -510,14 +604,15 @@ scan_over_file(Handle, Position, HashTree) -> + ?DWORD_SIZE, scan_over_file(Handle, NewPosition, - put_hashtree(Key, Position, HashTree)); + put_hashtree(Key, Position, HashTree), + Key); false -> io:format("CRC check returned false on key of ~w ~n", [Key]), - {Position, HashTree} + {Position, HashTree, LastKey} end; eof -> - {Position, HashTree} + {Position, HashTree, LastKey} end. @@ -531,7 +626,6 @@ saferead_keyvalue(Handle) -> eof -> false; {KeyL, ValueL} -> - io:format("KeyL ~w ValueL ~w~n", [KeyL, ValueL]), case safe_read_next_term(Handle, KeyL) of {error, einval} -> false; @@ -540,7 +634,6 @@ saferead_keyvalue(Handle) -> false -> false; Key -> - io:format("Found Key of ~s~n", [Key]), case file:read(Handle, ValueL) of {error, einval} -> false; @@ -640,14 +733,25 @@ read_next_2_integers(Handle) -> %% Seach the hash table for the matching hash and key. Be prepared for %% multiple keys to have the same hash value. -search_hash_table(_Handle, [], _Hash, _Key, _CRCCHeck) -> +%% +%% There are three possible values of CRCCheck: +%% true - check the CRC before returning key & value +%% false - don't check the CRC before returning key & value +%% loose_presence - confirm that the hash of the key is present + +search_hash_table(_Handle, [], _Hash, _Key, _CRCCheck) -> missing; search_hash_table(Handle, [Entry|RestOfEntries], Hash, Key, CRCCheck) -> {ok, _} = file:position(Handle, Entry), {StoredHash, DataLoc} = read_next_2_integers(Handle), case StoredHash of Hash -> - KV = extract_kvpair(Handle, [DataLoc], Key, CRCCheck), + KV = case CRCCheck of + loose_presence -> + probably; + _ -> + extract_kvpair(Handle, [DataLoc], Key, CRCCheck) + end, case KV of missing -> search_hash_table(Handle, RestOfEntries, Hash, Key, CRCCheck); @@ -789,6 +893,13 @@ write_top_index_table(Handle, BasePos, List) -> lists:foldl(FnWriteIndex, BasePos, CompleteList), ok = file:advise(Handle, 0, ?DWORD_SIZE * 256, will_need). +%% To make this compatible with original Bernstein format this endian flip +%% and also the use of the standard hash function required. +%% +%% Hash function contains mysterious constants, some explanation here as to +%% what they are - +%% http://stackoverflow.com/ ++ +%% questions/10696223/reason-for-5381-number-in-djb-hash-function endian_flip(Int) -> <> = <>, @@ -962,12 +1073,24 @@ activewrite_singlewrite_test() -> InitialD1 = dict:store("0001", "Initial value", InitialD), ok = from_dict("../test/test_mem.cdb", InitialD1), io:format("New db file created ~n", []), - {LastPosition, KeyDict} = open_active_file("../test/test_mem.cdb"), + {LastPosition, KeyDict, _} = open_active_file("../test/test_mem.cdb"), io:format("File opened as new active file " "with LastPosition=~w ~n", [LastPosition]), - {_, _, UpdKeyDict} = put("../test/test_mem.cdb", Key, Value, {LastPosition, KeyDict}), + {_, _, UpdKeyDict} = put("../test/test_mem.cdb", + Key, Value, + {LastPosition, KeyDict}), io:format("New key and value added to active file ~n", []), - ?assertMatch({Key, Value}, get_mem(Key, "../test/test_mem.cdb", UpdKeyDict)), + ?assertMatch({Key, Value}, + get_mem(Key, "../test/test_mem.cdb", + UpdKeyDict)), + ?assertMatch(probably, + get_mem(Key, "../test/test_mem.cdb", + UpdKeyDict, + loose_presence)), + ?assertMatch(missing, + get_mem("not_present", "../test/test_mem.cdb", + UpdKeyDict, + loose_presence)), ok = file:delete("../test/test_mem.cdb"). search_hash_table_findinslot_test() -> @@ -992,6 +1115,8 @@ search_hash_table_findinslot_test() -> io:format("Slot 2 has Hash ~w Position ~w~n", [ReadH4, ReadP4]), ?assertMatch(0, ReadH4), ?assertMatch({"key1", "value1"}, get(Handle, Key1)), + ?assertMatch(probably, get(Handle, Key1, loose_presence)), + ?assertMatch(missing, get(Handle, "Key99", loose_presence)), {ok, _} = file:position(Handle, FirstHashPosition), FlipH3 = endian_flip(ReadH3), FlipP3 = endian_flip(ReadP3), @@ -1029,7 +1154,7 @@ getnextkey_inclemptyvalue_test() -> ok = file:delete("../test/hashtable2_test.cdb"). newactivefile_test() -> - {LastPosition, _} = open_active_file("../test/activefile_test.cdb"), + {LastPosition, _, _} = open_active_file("../test/activefile_test.cdb"), ?assertMatch(256 * ?DWORD_SIZE, LastPosition), Response = get_nextkey("../test/activefile_test.cdb"), ?assertMatch(nomorekeys, Response), diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 7564ed4..d19334b 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -1,11 +1,560 @@ %% -------- Inker --------- -%% %% +%% The Inker is responsible for managing access and updates to the Journal. +%% +%% The Inker maintains a manifest of what files make up the Journal, and which +%% file is the current append-only nursery log to accept new PUTs into the +%% Journal. The Inker also marshals GET requests to the appropriate database +%% file within the Journal (routed by sequence number). The Inker is also +%% responsible for scheduling compaction work to be carried out by the Inker's +%% clerk. +%% +%% -------- Journal --------- +%% +%% The Journal is a series of files originally named as _nursery.cdb +%% where the sequence number is the first object sequence number (key) within +%% the given database file. The files will be named *.cdb at the point they +%% have been made immutable (through a rename operation). Prior to this, they +%% will originally start out as a *.pnd file. +%% +%% At some stage in the future compacted versions of old journal cdb files may +%% be produced. These files will be named -.cdb, and once +%% the manifest is updated the original _nursery.cdb (or +%% _.cdb) files they replace will be erased. +%% +%% The current Journal is made up of a set of files referenced in the manifest, +%% combined with a set of files of the form _nursery.[cdb|pnd] with +%% a higher Sequence Number compared to the files in the manifest. +%% +%% The Journal is ordered by sequence number from front to back both within +%% and across files. +%% +%% On startup the Inker should open the manifest with the highest sequence +%% number, and this will contain the list of filenames that make up the +%% non-recent part of the Journal. The Manifest is completed by opening these +%% files plus any other files with a higher sequence number. The file with +%% the highest sequence number is assumed to to be the active writer. Any file +%% with a lower sequence number and a *.pnd extension should be re-rolled into +%% a *.cdb file. +%% +%% -------- Objects --------- +%% +%% From the perspective of the Inker, objects to store are made up of: +%% - A Primary Key (as an Erlang term) +%% - A sequence number (assigned by the Inker) +%% - An object (an Erlang term) +%% - A set of Key Deltas associated with the change +%% +%% -------- Manifest --------- +%% +%% The Journal has a manifest which is the current record of which cdb files +%% are currently active in the Journal (i.e. following compaction). The +%% manifest holds this information through two lists - a list of files which +%% are definitely in the current manifest, and a list of files which have been +%% removed, but may still be present on disk. The use of two lists is to +%% avoid any circumsatnces where a compaction event has led to the deletion of +%% a Journal file with a higher sequence number than any in the remaining +%% manifest. +%% +%% A new manifest file is saved for every compaction event. The manifest files +%% are saved using the filename .man once saved. The ManifestSQN +%% is incremented once for every compaction event. +%% +%% -------- Compaction --------- +%% +%% Compaction is a process whereby an Inker's clerk will: +%% - Request a snapshot of the Ledger, as well as the lowest sequence number +%% that is currently registerd by another snapshot owner +%% - Picks a Journal database file at random (not including the current +%% nursery log) +%% - Performs a random walk on keys and sequence numbers in the chosen CDB +%% file to extract a subset of 100 key and sequence number combinations from +%% the database +%% - Looks up the current sequence number for those keys in the Ledger +%% - If more than % (default n=20) of the keys are now at a higher sequence +%% number, then the database file is a candidate for compaction. In this case +%% each of the next 8 files in sequence should be checked until all those 8 +%% files have been checked or one of the files has been found to be below the +%% threshold. +%% - If a set of below-the-threshold files is found, the files are re-written +%% without any superceded values +%%- The clerk should then request that the Inker commit the manifest change +%% +%% -------- Inker's Clerk --------- +%% %% -%% -------- Ledger --------- %% %% - -module(leveled_inker). +-behaviour(gen_server). + +-include("../include/leveled.hrl"). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3, + ink_start/1, + ink_put/4, + ink_get/3, + ink_snap/1, + build_dummy_journal/0, + simple_manifest_reader/2]). + +-include_lib("eunit/include/eunit.hrl"). + +-define(MANIFEST_FP, "journal_manifest"). +-define(FILES_FP, "journal_files"). +-define(JOURNAL_FILEX, "cdb"). +-define(MANIFEST_FILEX, "man"). +-define(PENDING_FILEX, "pnd"). + + +-record(state, {manifest = [] :: list(), + manifest_sqn = 0 :: integer(), + journal_sqn = 0 :: integer(), + active_journaldb :: pid(), + removed_journaldbs = [] :: list(), + root_path :: string()}). + + +%%%============================================================================ +%%% API +%%%============================================================================ + +ink_start(RootDir) -> + gen_server:start(?MODULE, [RootDir], []). + +ink_put(Pid, PrimaryKey, Object, KeyChanges) -> + gen_server:call(Pid, {put, PrimaryKey, Object, KeyChanges}, infinity). + +ink_get(Pid, PrimaryKey, SQN) -> + gen_server:call(Pid, {get, PrimaryKey, SQN}, infinity). + +ink_snap(Pid) -> + gen_server:call(Pid, snapshot, infinity). + +%%%============================================================================ +%%% gen_server callbacks +%%%============================================================================ + +init([RootPath]) -> + JournalFP = filepath(RootPath, journal_dir), + {ok, JournalFilenames} = case filelib:is_dir(JournalFP) of + true -> + file:list_dir(JournalFP); + false -> + filelib:ensure_dir(JournalFP), + {ok, []} + end, + ManifestFP = filepath(RootPath, manifest_dir), + {ok, ManifestFilenames} = case filelib:is_dir(ManifestFP) of + true -> + file:list_dir(ManifestFP); + false -> + filelib:ensure_dir(ManifestFP), + {ok, []} + end, + {Manifest, + ActiveJournal, + JournalSQN, + ManifestSQN} = build_manifest(ManifestFilenames, + JournalFilenames, + fun simple_manifest_reader/2, + RootPath), + {ok, #state{manifest = Manifest, + manifest_sqn = ManifestSQN, + journal_sqn = JournalSQN, + active_journaldb = ActiveJournal, + root_path = RootPath}}. + + +handle_call({put, Key, Object, KeyChanges}, From, State) -> + case put_object(Key, Object, KeyChanges, State) of + {ok, UpdState} -> + {reply, {ok, UpdState#state.journal_sqn}, UpdState}; + {rolling, UpdState} -> + gen_server:reply(From, {ok, UpdState#state.journal_sqn}), + {NewManifest, + NewManifestSQN} = roll_active_file(State#state.active_journaldb, + State#state.manifest, + State#state.manifest_sqn, + State#state.root_path), + {noreply, UpdState#state{manifest=NewManifest, + manifest_sqn=NewManifestSQN}}; + {blocked, UpdState} -> + {reply, blocked, UpdState} + end; +handle_call({get, Key, SQN}, _From, State) -> + {reply, get_object(Key, SQN, State#state.manifest), State}; +handle_call(snapshot, _From , State) -> + %% TODO: Not yet implemented registration of snapshot + %% Should return manifest and register the snapshot + {reply, State#state.manifest, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +%%%============================================================================ +%%% Internal functions +%%%============================================================================ + +put_object(PrimaryKey, Object, KeyChanges, State) -> + NewSQN = State#state.journal_sqn + 1, + Bin1 = term_to_binary({Object, KeyChanges}, [compressed]), + case leveled_cdb:cdb_put(State#state.active_journaldb, + {NewSQN, PrimaryKey}, + Bin1) of + ok -> + {ok, State#state{journal_sqn=NewSQN}}; + roll -> + FileName = filepath(State#state.root_path, NewSQN, new_journal), + {ok, NewJournalP} = leveled_cdb:cdb_open_writer(FileName), + case leveled_cdb:cdb_put(NewJournalP, + {NewSQN, PrimaryKey}, + Bin1) of + ok -> + {rolling, State#state{journal_sqn=NewSQN, + active_journaldb=NewJournalP}}; + roll -> + {blocked, State#state{journal_sqn=NewSQN, + active_journaldb=NewJournalP}} + end + end. + +roll_active_file(OldActiveJournal, Manifest, ManifestSQN, RootPath) -> + {ok, NewFilename} = leveled_cdb:cdb_complete(OldActiveJournal), + {ok, PidR} = leveled_cdb:cdb_open_reader(NewFilename), + JournalRegex2 = "nursery_(?[0-9]+)\\." ++ ?JOURNAL_FILEX, + [JournalSQN] = sequencenumbers_fromfilenames([NewFilename], + JournalRegex2, + 'SQN'), + NewManifest = lists:append(Manifest, {JournalSQN, NewFilename, PidR}), + NewManifestSQN = ManifestSQN + 1, + ok = simple_manifest_writer(NewManifest, NewManifestSQN, RootPath), + {NewManifest, NewManifestSQN}. + +get_object(PrimaryKey, SQN, Manifest) -> + JournalP = find_in_manifest(SQN, Manifest), + leveled_cdb:cdb_get(JournalP, {SQN, PrimaryKey}). + + +build_manifest(ManifestFilenames, + JournalFilenames, + ManifestRdrFun, + RootPath) -> + %% Setup root paths + JournalFP = filepath(RootPath, journal_dir), + %% Find the manifest with a highest Manifest sequence number + %% Open it and read it to get the current Confirmed Manifest + ManifestRegex = "(?[0-9]+)\\." ++ ?MANIFEST_FILEX, + ValidManSQNs = sequencenumbers_fromfilenames(ManifestFilenames, + ManifestRegex, + 'MSQN'), + {JournalSQN1, + ConfirmedManifest, + Removed, + ManifestSQN} = case length(ValidManSQNs) of + 0 -> + {0, [], [], 0}; + _ -> + PersistedManSQN = lists:max(ValidManSQNs), + {J1, M1, R1} = ManifestRdrFun(PersistedManSQN, + RootPath), + {J1, M1, R1, PersistedManSQN} + end, + + %% Find any more recent immutable files that have a higher sequence number + %% - the immutable files have already been rolled, and so have a completed + %% hashtree lookup + JournalRegex1 = "nursery_(?[0-9]+)\\." ++ ?JOURNAL_FILEX, + UnremovedJournalFiles = lists:foldl(fun(FN, Acc) -> + case lists:member(FN, Removed) of + true -> + Acc; + false -> + Acc ++ [FN] + end end, + [], + JournalFilenames), + OtherSQNs_imm = sequencenumbers_fromfilenames(UnremovedJournalFiles, + JournalRegex1, + 'SQN'), + Manifest1 = lists:foldl(fun(X, Acc) -> + if + X > JournalSQN1 + -> + FN = "nursery_" ++ + integer_to_list(X) + ++ "." ++ + ?JOURNAL_FILEX, + Acc ++ [{X, FN}]; + true + -> Acc + end end, + ConfirmedManifest, + lists:sort(OtherSQNs_imm)), + + %% Enrich the manifest so it contains the Pid of any of the immutable + %% entries + io:format("Manifest1 is ~w~n", [Manifest1]), + Manifest2 = lists:map(fun({X, Y}) -> + FN = filename:join(JournalFP, Y), + {ok, Pid} = leveled_cdb:cdb_open_reader(FN), + {X, Y, Pid} end, + Manifest1), + + %% Find any more recent mutable files that have a higher sequence number + %% Roll any mutable files which do not have the highest sequence number + %% to create the hashtree and complete the header entries + JournalRegex2 = "nursery_(?[0-9]+)\\." ++ ?PENDING_FILEX, + OtherSQNs_pnd = sequencenumbers_fromfilenames(JournalFilenames, + JournalRegex2, + 'SQN'), + + case length(OtherSQNs_pnd) of + 0 -> + %% Need to create a new active writer, but also find the highest + %% SQN from within the confirmed manifest + TopSQNInManifest = + case length(Manifest2) of + 0 -> + %% Manifest is empty and no active writers + %% can be found so database is empty + 0; + _ -> + TM = lists:last(lists:keysort(1,Manifest2)), + {_SQN, _FN, TMPid} = TM, + {HighSQN, _HighKey} = leveled_cdb:cdb_lastkey(TMPid), + HighSQN + end, + ActiveFN = filepath(RootPath, TopSQNInManifest + 1, new_journal), + {ok, ActiveJournal} = leveled_cdb:cdb_open_writer(ActiveFN), + {Manifest2, ActiveJournal, TopSQNInManifest, ManifestSQN}; + _ -> + + {ActiveJournalSQN, + Manifest3} = roll_pending_journals(lists:sort(OtherSQNs_pnd), + Manifest2, + RootPath), + %% Need to work out highest sequence number in tail file to feed + %% into opening of pending journal + ActiveFN = filepath(RootPath, ActiveJournalSQN, new_journal), + {ok, ActiveJournal} = leveled_cdb:cdb_open_writer(ActiveFN), + {HighestSQN, _HighestKey} = leveled_cdb:cdb_lastkey(ActiveJournal), + {Manifest3, ActiveJournal, HighestSQN, ManifestSQN} + end. + +close_allmanifest([], ActiveJournal) -> + leveled_cdb:cdb_close(ActiveJournal); +close_allmanifest([H|ManifestT], ActiveJournal) -> + {_, _, Pid} = H, + leveled_cdb:cdb_close(Pid), + close_allmanifest(ManifestT, ActiveJournal). + + +roll_pending_journals([TopJournalSQN], Manifest, _RootPath) + when is_integer(TopJournalSQN) -> + {TopJournalSQN, Manifest}; +roll_pending_journals([JournalSQN|T], Manifest, RootPath) -> + Filename = filepath(RootPath, JournalSQN, new_journal), + PidW = leveled_cdb:cdb_open_writer(Filename), + {ok, NewFilename} = leveled_cdb:cdb_complete(PidW), + {ok, PidR} = leveled_cdb:cdb_open_reader(NewFilename), + roll_pending_journals(T, + lists:append(Manifest, + {JournalSQN, NewFilename, PidR}), + RootPath). + + + +sequencenumbers_fromfilenames(Filenames, Regex, IntName) -> + lists:foldl(fun(FN, Acc) -> + case re:run(FN, + Regex, + [{capture, [IntName], list}]) of + nomatch -> + Acc; + {match, [Int]} when is_list(Int) -> + Acc ++ [list_to_integer(Int)]; + _ -> + Acc + end end, + [], + Filenames). + +find_in_manifest(_SQN, []) -> + error; +find_in_manifest(SQN, [{LowSQN, _FN, Pid}|_Tail]) when SQN >= LowSQN -> + Pid; +find_in_manifest(SQN, [_Head|Tail]) -> + find_in_manifest(SQN, Tail). + +filepath(RootPath, journal_dir) -> + RootPath ++ "/" ++ ?FILES_FP ++ "/"; +filepath(RootPath, manifest_dir) -> + RootPath ++ "/" ++ ?MANIFEST_FP ++ "/". + + +filepath(RootPath, NewSQN, new_journal) -> + filename:join(filepath(RootPath, journal_dir), + "nursery_" + ++ integer_to_list(NewSQN) + ++ "." ++ ?PENDING_FILEX). + + +simple_manifest_reader(SQN, RootPath) -> + ManifestPath = filepath(RootPath, manifest_dir), + {ok, MBin} = file:read_file(filename:join(ManifestPath, + integer_to_list(SQN) + ++ ".man")), + binary_to_term(MBin). + + +simple_manifest_writer(Manifest, ManSQN, RootPath) -> + ManPath = filepath(RootPath, manifest_dir), + NewFN = filename:join(ManPath, integer_to_list(ManSQN) ++ ?MANIFEST_FILEX), + TmpFN = filename:join(ManPath, integer_to_list(ManSQN) ++ ?PENDING_FILEX), + MBin = term_to_binary(Manifest), + case file:is_file(NewFN) of + true -> + io:format("Error - trying to write manifest for" + ++ " ManifestSQN=~w which already exists~n", [ManSQN]), + error; + false -> + io:format("Writing new version of manifest for " + ++ " manifestSQN=~w~n", [ManSQN]), + ok = file:write_file(TmpFN, MBin), + ok = file:rename(TmpFN, NewFN), + ok + end. + + + +%%%============================================================================ +%%% Test +%%%============================================================================ + +-ifdef(TEST). + +build_dummy_journal() -> + RootPath = "../test/inker", + JournalFP = filepath(RootPath, journal_dir), + ManifestFP = filepath(RootPath, manifest_dir), + ok = filelib:ensure_dir(RootPath), + ok = filelib:ensure_dir(JournalFP), + ok = filelib:ensure_dir(ManifestFP), + F1 = filename:join(JournalFP, "nursery_1.pnd"), + {ok, J1} = leveled_cdb:cdb_open_writer(F1), + {K1, V1} = {"Key1", "TestValue1"}, + {K2, V2} = {"Key2", "TestValue2"}, + ok = leveled_cdb:cdb_put(J1, {1, K1}, V1), + ok = leveled_cdb:cdb_put(J1, {2, K2}, V2), + {ok, _} = leveled_cdb:cdb_complete(J1), + F2 = filename:join(JournalFP, "nursery_3.pnd"), + {ok, J2} = leveled_cdb:cdb_open_writer(F2), + {K1, V3} = {"Key1", "TestValue3"}, + {K4, V4} = {"Key4", "TestValue4"}, + ok = leveled_cdb:cdb_put(J2, {3, K1}, V3), + ok = leveled_cdb:cdb_put(J2, {4, K4}, V4), + ok = leveled_cdb:cdb_close(J2), + Manifest = {2, [{1, "nursery_1.cdb"}], []}, + ManifestBin = term_to_binary(Manifest), + {ok, MF1} = file:open(filename:join(ManifestFP, "1.man"), + [binary, raw, read, write]), + ok = file:write(MF1, ManifestBin), + ok = file:close(MF1). + + +clean_testdir(RootPath) -> + clean_subdir(filepath(RootPath, journal_dir)), + clean_subdir(filepath(RootPath, manifest_dir)). + +clean_subdir(DirPath) -> + {ok, Files} = file:list_dir(DirPath), + lists:foreach(fun(FN) -> file:delete(filename:join(DirPath, FN)) end, + Files). + +simple_buildmanifest_test() -> + RootPath = "../test/inker", + build_dummy_journal(), + Res = build_manifest(["1.man"], + ["nursery_1.cdb", "nursery_3.pnd"], + fun simple_manifest_reader/2, + RootPath), + io:format("Build manifest output is ~w~n", [Res]), + {Man, ActJournal, HighSQN, ManSQN} = Res, + ?assertMatch(HighSQN, 4), + ?assertMatch(ManSQN, 1), + ?assertMatch([{1, "nursery_1.cdb", _}], Man), + {ActSQN, _ActK} = leveled_cdb:cdb_lastkey(ActJournal), + ?assertMatch(ActSQN, 4), + close_allmanifest(Man, ActJournal), + clean_testdir(RootPath). + +another_buildmanifest_test() -> + %% There is a rolled jounral file which is not yet in the manifest + RootPath = "../test/inker", + build_dummy_journal(), + FN = filepath(RootPath, 3, new_journal), + {ok, FileToRoll} = leveled_cdb:cdb_open_writer(FN), + {ok, _} = leveled_cdb:cdb_complete(FileToRoll), + FN2 = filepath(RootPath, 5, new_journal), + {ok, NewActiveJN} = leveled_cdb:cdb_open_writer(FN2), + {K5, V5} = {"Key5", "TestValue5"}, + {K6, V6} = {"Key6", "TestValue6"}, + ok = leveled_cdb:cdb_put(NewActiveJN, {5, K5}, V5), + ok = leveled_cdb:cdb_put(NewActiveJN, {6, K6}, V6), + ok = leveled_cdb:cdb_close(NewActiveJN), + %% Test setup - now build manifest + Res = build_manifest(["1.man"], + ["nursery_1.cdb", + "nursery_3.cdb", + "nursery_5.pnd"], + fun simple_manifest_reader/2, + RootPath), + io:format("Build manifest output is ~w~n", [Res]), + {Man, ActJournal, HighSQN, ManSQN} = Res, + ?assertMatch(HighSQN, 6), + ?assertMatch(ManSQN, 1), + ?assertMatch([{1, "nursery_1.cdb", _}, {3, "nursery_3.cdb", _}], Man), + {ActSQN, _ActK} = leveled_cdb:cdb_lastkey(ActJournal), + ?assertMatch(ActSQN, 6), + close_allmanifest(Man, ActJournal), + clean_testdir(RootPath). + + +empty_buildmanifest_test() -> + RootPath = "../test/inker/", + Res = build_manifest([], + [], + fun simple_manifest_reader/2, + RootPath), + io:format("Build manifest output is ~w~n", [Res]), + {Man, ActJournal, HighSQN, ManSQN} = Res, + ?assertMatch(Man, []), + ?assertMatch(ManSQN, 0), + ?assertMatch(HighSQN, 0), + empty = leveled_cdb:cdb_lastkey(ActJournal), + FN = leveled_cdb:cdb_filename(ActJournal), + %% The filename should be based on the next journal SQN (1) not 0 + ?assertMatch(FN, filepath(RootPath, 1, new_journal)), + close_allmanifest(Man, ActJournal), + clean_testdir(RootPath). + + +-endif. \ No newline at end of file diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index c84bfe7..6fe6eac 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -178,11 +178,8 @@ -define(MAX_WORK_WAIT, 300). -define(MANIFEST_FP, "ledger_manifest"). -define(FILES_FP, "ledger_files"). --define(SHUTDOWN_FP, "ledger_onshutdown"). -define(CURRENT_FILEX, "crr"). -define(PENDING_FILEX, "pnd"). --define(BACKUP_FILEX, "bak"). --define(ARCHIVE_FILEX, "arc"). -define(MEMTABLE, mem). -define(MAX_TABLESIZE, 32000). -define(PROMPT_WAIT_ONL0, 5). @@ -198,6 +195,7 @@ table_size = 0 :: integer(), clerk :: pid(), levelzero_pending = ?L0PEND_RESET :: tuple(), + levelzero_snapshot = [] :: list(), memtable, backlog = false :: boolean()}). @@ -471,7 +469,8 @@ push_to_memory(DumpList, State) -> 1, State#state.manifest, {0, [ManifestEntry]}), - levelzero_pending=?L0PEND_RESET}}; + levelzero_pending=?L0PEND_RESET, + levelzero_snapshot=[]}}; ?L0PEND_RESET -> {State#state.table_size, State} end, @@ -479,17 +478,16 @@ push_to_memory(DumpList, State) -> %% Prompt clerk to ask about work - do this for every push_mem ok = leveled_clerk:clerk_prompt(UpdState#state.clerk, penciller), - SW2 = os:timestamp(), MemoryInsertion = do_push_to_mem(DumpList, TableSize, - UpdState#state.memtable), - io:format("Push into memory timed at ~w microseconds~n", - [timer:now_diff(os:timestamp(),SW2)]), + UpdState#state.memtable, + UpdState#state.levelzero_snapshot), case MemoryInsertion of - {twist, ApproxTableSize} -> - {ok, UpdState#state{table_size=ApproxTableSize}}; - {roll, ApproxTableSize} -> + {twist, ApproxTableSize, UpdSnapshot} -> + {ok, UpdState#state{table_size=ApproxTableSize, + levelzero_snapshot=UpdSnapshot}}; + {roll, ApproxTableSize, UpdSnapshot} -> L0 = get_item(0, UpdState#state.manifest, []), case {L0, manifest_locked(UpdState)} of {[], false} -> @@ -508,17 +506,20 @@ push_to_memory(DumpList, State) -> L0Pid, os:timestamp()}, table_size=ApproxTableSize, - manifest_sqn=MSN}}; + manifest_sqn=MSN, + levelzero_snapshot=UpdSnapshot}}; {[], true} -> {{pause, "L0 file write blocked by change at sqn=~w~n", [UpdState#state.manifest_sqn]}, - UpdState#state{table_size=ApproxTableSize}}; + UpdState#state{table_size=ApproxTableSize, + levelzero_snapshot=UpdSnapshot}}; _ -> {{pause, "L0 file write blocked by L0 file in manifest~n", []}, - UpdState#state{table_size=ApproxTableSize}} + UpdState#state{table_size=ApproxTableSize, + levelzero_snapshot=UpdSnapshot}} end end. @@ -556,20 +557,24 @@ fetch(Key, Manifest, Level, FetchFun) -> end end. -do_push_to_mem(DumpList, TableSize, MemTable) -> +do_push_to_mem(DumpList, TableSize, MemTable, Snapshot) -> + SW = os:timestamp(), + UpdSnapshot = lists:append(Snapshot, DumpList), ets:insert(MemTable, DumpList), + io:format("Push into memory timed at ~w microseconds~n", + [timer:now_diff(os:timestamp(), SW)]), case TableSize + length(DumpList) of ApproxTableSize when ApproxTableSize > ?MAX_TABLESIZE -> case ets:info(MemTable, size) of ActTableSize when ActTableSize > ?MAX_TABLESIZE -> - {roll, ActTableSize}; + {roll, ActTableSize, UpdSnapshot}; ActTableSize -> io:format("Table size is actually ~w~n", [ActTableSize]), - {twist, ActTableSize} + {twist, ActTableSize, UpdSnapshot} end; ApproxTableSize -> io:format("Table size is approximately ~w~n", [ApproxTableSize]), - {twist, ApproxTableSize} + {twist, ApproxTableSize, UpdSnapshot} end.