From f3a40e106db94ccc87275855dc5c5449c8726a24 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Mon, 5 Sep 2016 20:22:16 +0100 Subject: [PATCH] Inker improvements Resolve issue in CDB file when we have cached the index. Allow for Inker to find keys in the active journal --- src/leveled_cdb.erl | 13 ++++++----- src/leveled_inker.erl | 51 +++++++++++++++++++++++++++++++++---------- 2 files changed, 46 insertions(+), 18 deletions(-) diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 3557609..88c8223 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -373,10 +373,10 @@ get(FileNameOrHandle, Key) -> get(FileNameOrHandle, Key, CRCCheck) -> get(FileNameOrHandle, Key, CRCCheck, no_cache). -get(FileName, Key, CRCCheck, Cache) when is_list(FileName), is_list(Key) -> - {ok,Handle} = file:open(FileName,[binary, raw, read]), - get(Handle,Key, CRCCheck, Cache); -get(Handle, Key, CRCCheck, Cache) when is_tuple(Handle), is_list(Key) -> +get(FileName, Key, CRCCheck, Cache) when is_list(FileName) -> + {ok, Handle} = file:open(FileName,[binary, raw, read]), + get(Handle, Key, CRCCheck, Cache); +get(Handle, Key, CRCCheck, Cache) when is_tuple(Handle) -> Hash = hash(Key), Index = hash_to_index(Hash), {HashTable, Count} = get_index(Handle, Index, Cache), @@ -401,7 +401,8 @@ get_index(Handle, Index, no_cache) -> % Get location of hashtable and number of entries in the hash read_next_2_integers(Handle); get_index(_Handle, Index, Cache) -> - lists:keyfind(Index, 1, Cache). + {_Pointer, Count} = lists:keyfind(Index, 1, Cache), + Count. %% Get a Key/Value pair from an active CDB file (with no hash table written) %% This requires a key dictionary to be passed in (mapping keys to positions) @@ -921,7 +922,7 @@ hash1(H, <>) -> hash_to_index(Hash) -> Hash band 255. -hash_to_slot(Hash,L) -> +hash_to_slot(Hash, L) -> (Hash bsr 8) rem L. %% Create a binary of the LengthKeyLengthValue, adding a CRC check diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index d19334b..b303f4e 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -118,6 +118,7 @@ manifest_sqn = 0 :: integer(), journal_sqn = 0 :: integer(), active_journaldb :: pid(), + active_journaldb_sqn :: integer(), removed_journaldbs = [] :: list(), root_path :: string()}). @@ -160,7 +161,7 @@ init([RootPath]) -> {ok, []} end, {Manifest, - ActiveJournal, + {ActiveJournal, LowActiveSQN}, JournalSQN, ManifestSQN} = build_manifest(ManifestFilenames, JournalFilenames, @@ -170,6 +171,7 @@ init([RootPath]) -> manifest_sqn = ManifestSQN, journal_sqn = JournalSQN, active_journaldb = ActiveJournal, + active_journaldb_sqn = LowActiveSQN, root_path = RootPath}}. @@ -190,7 +192,11 @@ handle_call({put, Key, Object, KeyChanges}, From, State) -> {reply, blocked, UpdState} end; handle_call({get, Key, SQN}, _From, State) -> - {reply, get_object(Key, SQN, State#state.manifest), State}; + {reply, get_object(Key, + SQN, + State#state.manifest, + State#state.active_journaldb, + State#state.active_journaldb_sqn), State}; handle_call(snapshot, _From , State) -> %% TODO: Not yet implemented registration of snapshot %% Should return manifest and register the snapshot @@ -248,9 +254,21 @@ roll_active_file(OldActiveJournal, Manifest, ManifestSQN, RootPath) -> ok = simple_manifest_writer(NewManifest, NewManifestSQN, RootPath), {NewManifest, NewManifestSQN}. -get_object(PrimaryKey, SQN, Manifest) -> - JournalP = find_in_manifest(SQN, Manifest), - leveled_cdb:cdb_get(JournalP, {SQN, PrimaryKey}). +get_object(PrimaryKey, SQN, Manifest, ActiveJournal, ActiveJournalSQN) -> + if + SQN < ActiveJournalSQN -> + JournalP = find_in_manifest(SQN, Manifest), + if + JournalP == error -> + io:format("Unable to find SQN~w in Manifest~w~n", + [SQN, Manifest]), + error; + true -> + leveled_cdb:cdb_get(JournalP, {SQN, PrimaryKey}) + end; + true -> + leveled_cdb:cdb_get(ActiveJournal, {SQN, PrimaryKey}) + end. build_manifest(ManifestFilenames, @@ -342,11 +360,14 @@ build_manifest(ManifestFilenames, {HighSQN, _HighKey} = leveled_cdb:cdb_lastkey(TMPid), HighSQN end, - ActiveFN = filepath(RootPath, TopSQNInManifest + 1, new_journal), + LowActiveSQN = TopSQNInManifest + 1, + ActiveFN = filepath(RootPath, LowActiveSQN, new_journal), {ok, ActiveJournal} = leveled_cdb:cdb_open_writer(ActiveFN), - {Manifest2, ActiveJournal, TopSQNInManifest, ManifestSQN}; + {Manifest2, + {ActiveJournal, LowActiveSQN}, + TopSQNInManifest, + ManifestSQN}; _ -> - {ActiveJournalSQN, Manifest3} = roll_pending_journals(lists:sort(OtherSQNs_pnd), Manifest2, @@ -356,7 +377,10 @@ build_manifest(ManifestFilenames, ActiveFN = filepath(RootPath, ActiveJournalSQN, new_journal), {ok, ActiveJournal} = leveled_cdb:cdb_open_writer(ActiveFN), {HighestSQN, _HighestKey} = leveled_cdb:cdb_lastkey(ActiveJournal), - {Manifest3, ActiveJournal, HighestSQN, ManifestSQN} + {Manifest3, + {ActiveJournal, ActiveJournalSQN}, + HighestSQN, + ManifestSQN} end. close_allmanifest([], ActiveJournal) -> @@ -497,12 +521,13 @@ simple_buildmanifest_test() -> fun simple_manifest_reader/2, RootPath), io:format("Build manifest output is ~w~n", [Res]), - {Man, ActJournal, HighSQN, ManSQN} = Res, + {Man, {ActJournal, ActJournalSQN}, HighSQN, ManSQN} = Res, ?assertMatch(HighSQN, 4), ?assertMatch(ManSQN, 1), ?assertMatch([{1, "nursery_1.cdb", _}], Man), {ActSQN, _ActK} = leveled_cdb:cdb_lastkey(ActJournal), ?assertMatch(ActSQN, 4), + ?assertMatch(ActJournalSQN, 3), close_allmanifest(Man, ActJournal), clean_testdir(RootPath). @@ -528,12 +553,13 @@ another_buildmanifest_test() -> fun simple_manifest_reader/2, RootPath), io:format("Build manifest output is ~w~n", [Res]), - {Man, ActJournal, HighSQN, ManSQN} = Res, + {Man, {ActJournal, ActJournalSQN}, HighSQN, ManSQN} = Res, ?assertMatch(HighSQN, 6), ?assertMatch(ManSQN, 1), ?assertMatch([{1, "nursery_1.cdb", _}, {3, "nursery_3.cdb", _}], Man), {ActSQN, _ActK} = leveled_cdb:cdb_lastkey(ActJournal), ?assertMatch(ActSQN, 6), + ?assertMatch(ActJournalSQN, 5), close_allmanifest(Man, ActJournal), clean_testdir(RootPath). @@ -545,10 +571,11 @@ empty_buildmanifest_test() -> fun simple_manifest_reader/2, RootPath), io:format("Build manifest output is ~w~n", [Res]), - {Man, ActJournal, HighSQN, ManSQN} = Res, + {Man, {ActJournal, ActJournalSQN}, HighSQN, ManSQN} = Res, ?assertMatch(Man, []), ?assertMatch(ManSQN, 0), ?assertMatch(HighSQN, 0), + ?assertMatch(ActJournalSQN, 1), empty = leveled_cdb:cdb_lastkey(ActJournal), FN = leveled_cdb:cdb_filename(ActJournal), %% The filename should be based on the next journal SQN (1) not 0