Inker improvements

Resolve issue in CDB file when we have cached the index.  Allow for
Inker to find keys in the active journal
This commit is contained in:
martinsumner 2016-09-05 20:22:16 +01:00
parent 2a76eb364e
commit f3a40e106d
2 changed files with 46 additions and 18 deletions

View file

@ -373,10 +373,10 @@ get(FileNameOrHandle, Key) ->
get(FileNameOrHandle, Key, CRCCheck) -> get(FileNameOrHandle, Key, CRCCheck) ->
get(FileNameOrHandle, Key, CRCCheck, no_cache). get(FileNameOrHandle, Key, CRCCheck, no_cache).
get(FileName, Key, CRCCheck, Cache) when is_list(FileName), is_list(Key) -> get(FileName, Key, CRCCheck, Cache) when is_list(FileName) ->
{ok,Handle} = file:open(FileName,[binary, raw, read]), {ok, Handle} = file:open(FileName,[binary, raw, read]),
get(Handle,Key, CRCCheck, Cache); get(Handle, Key, CRCCheck, Cache);
get(Handle, Key, CRCCheck, Cache) when is_tuple(Handle), is_list(Key) -> get(Handle, Key, CRCCheck, Cache) when is_tuple(Handle) ->
Hash = hash(Key), Hash = hash(Key),
Index = hash_to_index(Hash), Index = hash_to_index(Hash),
{HashTable, Count} = get_index(Handle, Index, Cache), {HashTable, Count} = get_index(Handle, Index, Cache),
@ -401,7 +401,8 @@ get_index(Handle, Index, no_cache) ->
% Get location of hashtable and number of entries in the hash % Get location of hashtable and number of entries in the hash
read_next_2_integers(Handle); read_next_2_integers(Handle);
get_index(_Handle, Index, Cache) -> get_index(_Handle, Index, Cache) ->
lists:keyfind(Index, 1, Cache). {_Pointer, Count} = lists:keyfind(Index, 1, Cache),
Count.
%% Get a Key/Value pair from an active CDB file (with no hash table written) %% Get a Key/Value pair from an active CDB file (with no hash table written)
%% This requires a key dictionary to be passed in (mapping keys to positions) %% This requires a key dictionary to be passed in (mapping keys to positions)
@ -921,7 +922,7 @@ hash1(H, <<B:8/integer, Rest/bytes>>) ->
hash_to_index(Hash) -> hash_to_index(Hash) ->
Hash band 255. Hash band 255.
hash_to_slot(Hash,L) -> hash_to_slot(Hash, L) ->
(Hash bsr 8) rem L. (Hash bsr 8) rem L.
%% Create a binary of the LengthKeyLengthValue, adding a CRC check %% Create a binary of the LengthKeyLengthValue, adding a CRC check

View file

@ -118,6 +118,7 @@
manifest_sqn = 0 :: integer(), manifest_sqn = 0 :: integer(),
journal_sqn = 0 :: integer(), journal_sqn = 0 :: integer(),
active_journaldb :: pid(), active_journaldb :: pid(),
active_journaldb_sqn :: integer(),
removed_journaldbs = [] :: list(), removed_journaldbs = [] :: list(),
root_path :: string()}). root_path :: string()}).
@ -160,7 +161,7 @@ init([RootPath]) ->
{ok, []} {ok, []}
end, end,
{Manifest, {Manifest,
ActiveJournal, {ActiveJournal, LowActiveSQN},
JournalSQN, JournalSQN,
ManifestSQN} = build_manifest(ManifestFilenames, ManifestSQN} = build_manifest(ManifestFilenames,
JournalFilenames, JournalFilenames,
@ -170,6 +171,7 @@ init([RootPath]) ->
manifest_sqn = ManifestSQN, manifest_sqn = ManifestSQN,
journal_sqn = JournalSQN, journal_sqn = JournalSQN,
active_journaldb = ActiveJournal, active_journaldb = ActiveJournal,
active_journaldb_sqn = LowActiveSQN,
root_path = RootPath}}. root_path = RootPath}}.
@ -190,7 +192,11 @@ handle_call({put, Key, Object, KeyChanges}, From, State) ->
{reply, blocked, UpdState} {reply, blocked, UpdState}
end; end;
handle_call({get, Key, SQN}, _From, State) -> handle_call({get, Key, SQN}, _From, State) ->
{reply, get_object(Key, SQN, State#state.manifest), State}; {reply, get_object(Key,
SQN,
State#state.manifest,
State#state.active_journaldb,
State#state.active_journaldb_sqn), State};
handle_call(snapshot, _From , State) -> handle_call(snapshot, _From , State) ->
%% TODO: Not yet implemented registration of snapshot %% TODO: Not yet implemented registration of snapshot
%% Should return manifest and register the snapshot %% Should return manifest and register the snapshot
@ -248,9 +254,21 @@ roll_active_file(OldActiveJournal, Manifest, ManifestSQN, RootPath) ->
ok = simple_manifest_writer(NewManifest, NewManifestSQN, RootPath), ok = simple_manifest_writer(NewManifest, NewManifestSQN, RootPath),
{NewManifest, NewManifestSQN}. {NewManifest, NewManifestSQN}.
get_object(PrimaryKey, SQN, Manifest) -> get_object(PrimaryKey, SQN, Manifest, ActiveJournal, ActiveJournalSQN) ->
if
SQN < ActiveJournalSQN ->
JournalP = find_in_manifest(SQN, Manifest), JournalP = find_in_manifest(SQN, Manifest),
leveled_cdb:cdb_get(JournalP, {SQN, PrimaryKey}). if
JournalP == error ->
io:format("Unable to find SQN~w in Manifest~w~n",
[SQN, Manifest]),
error;
true ->
leveled_cdb:cdb_get(JournalP, {SQN, PrimaryKey})
end;
true ->
leveled_cdb:cdb_get(ActiveJournal, {SQN, PrimaryKey})
end.
build_manifest(ManifestFilenames, build_manifest(ManifestFilenames,
@ -342,11 +360,14 @@ build_manifest(ManifestFilenames,
{HighSQN, _HighKey} = leveled_cdb:cdb_lastkey(TMPid), {HighSQN, _HighKey} = leveled_cdb:cdb_lastkey(TMPid),
HighSQN HighSQN
end, end,
ActiveFN = filepath(RootPath, TopSQNInManifest + 1, new_journal), LowActiveSQN = TopSQNInManifest + 1,
ActiveFN = filepath(RootPath, LowActiveSQN, new_journal),
{ok, ActiveJournal} = leveled_cdb:cdb_open_writer(ActiveFN), {ok, ActiveJournal} = leveled_cdb:cdb_open_writer(ActiveFN),
{Manifest2, ActiveJournal, TopSQNInManifest, ManifestSQN}; {Manifest2,
{ActiveJournal, LowActiveSQN},
TopSQNInManifest,
ManifestSQN};
_ -> _ ->
{ActiveJournalSQN, {ActiveJournalSQN,
Manifest3} = roll_pending_journals(lists:sort(OtherSQNs_pnd), Manifest3} = roll_pending_journals(lists:sort(OtherSQNs_pnd),
Manifest2, Manifest2,
@ -356,7 +377,10 @@ build_manifest(ManifestFilenames,
ActiveFN = filepath(RootPath, ActiveJournalSQN, new_journal), ActiveFN = filepath(RootPath, ActiveJournalSQN, new_journal),
{ok, ActiveJournal} = leveled_cdb:cdb_open_writer(ActiveFN), {ok, ActiveJournal} = leveled_cdb:cdb_open_writer(ActiveFN),
{HighestSQN, _HighestKey} = leveled_cdb:cdb_lastkey(ActiveJournal), {HighestSQN, _HighestKey} = leveled_cdb:cdb_lastkey(ActiveJournal),
{Manifest3, ActiveJournal, HighestSQN, ManifestSQN} {Manifest3,
{ActiveJournal, ActiveJournalSQN},
HighestSQN,
ManifestSQN}
end. end.
close_allmanifest([], ActiveJournal) -> close_allmanifest([], ActiveJournal) ->
@ -497,12 +521,13 @@ simple_buildmanifest_test() ->
fun simple_manifest_reader/2, fun simple_manifest_reader/2,
RootPath), RootPath),
io:format("Build manifest output is ~w~n", [Res]), io:format("Build manifest output is ~w~n", [Res]),
{Man, ActJournal, HighSQN, ManSQN} = Res, {Man, {ActJournal, ActJournalSQN}, HighSQN, ManSQN} = Res,
?assertMatch(HighSQN, 4), ?assertMatch(HighSQN, 4),
?assertMatch(ManSQN, 1), ?assertMatch(ManSQN, 1),
?assertMatch([{1, "nursery_1.cdb", _}], Man), ?assertMatch([{1, "nursery_1.cdb", _}], Man),
{ActSQN, _ActK} = leveled_cdb:cdb_lastkey(ActJournal), {ActSQN, _ActK} = leveled_cdb:cdb_lastkey(ActJournal),
?assertMatch(ActSQN, 4), ?assertMatch(ActSQN, 4),
?assertMatch(ActJournalSQN, 3),
close_allmanifest(Man, ActJournal), close_allmanifest(Man, ActJournal),
clean_testdir(RootPath). clean_testdir(RootPath).
@ -528,12 +553,13 @@ another_buildmanifest_test() ->
fun simple_manifest_reader/2, fun simple_manifest_reader/2,
RootPath), RootPath),
io:format("Build manifest output is ~w~n", [Res]), io:format("Build manifest output is ~w~n", [Res]),
{Man, ActJournal, HighSQN, ManSQN} = Res, {Man, {ActJournal, ActJournalSQN}, HighSQN, ManSQN} = Res,
?assertMatch(HighSQN, 6), ?assertMatch(HighSQN, 6),
?assertMatch(ManSQN, 1), ?assertMatch(ManSQN, 1),
?assertMatch([{1, "nursery_1.cdb", _}, {3, "nursery_3.cdb", _}], Man), ?assertMatch([{1, "nursery_1.cdb", _}, {3, "nursery_3.cdb", _}], Man),
{ActSQN, _ActK} = leveled_cdb:cdb_lastkey(ActJournal), {ActSQN, _ActK} = leveled_cdb:cdb_lastkey(ActJournal),
?assertMatch(ActSQN, 6), ?assertMatch(ActSQN, 6),
?assertMatch(ActJournalSQN, 5),
close_allmanifest(Man, ActJournal), close_allmanifest(Man, ActJournal),
clean_testdir(RootPath). clean_testdir(RootPath).
@ -545,10 +571,11 @@ empty_buildmanifest_test() ->
fun simple_manifest_reader/2, fun simple_manifest_reader/2,
RootPath), RootPath),
io:format("Build manifest output is ~w~n", [Res]), io:format("Build manifest output is ~w~n", [Res]),
{Man, ActJournal, HighSQN, ManSQN} = Res, {Man, {ActJournal, ActJournalSQN}, HighSQN, ManSQN} = Res,
?assertMatch(Man, []), ?assertMatch(Man, []),
?assertMatch(ManSQN, 0), ?assertMatch(ManSQN, 0),
?assertMatch(HighSQN, 0), ?assertMatch(HighSQN, 0),
?assertMatch(ActJournalSQN, 1),
empty = leveled_cdb:cdb_lastkey(ActJournal), empty = leveled_cdb:cdb_lastkey(ActJournal),
FN = leveled_cdb:cdb_filename(ActJournal), FN = leveled_cdb:cdb_filename(ActJournal),
%% The filename should be based on the next journal SQN (1) not 0 %% The filename should be based on the next journal SQN (1) not 0