Inker - Initial Code
An attempt to get a first inker that can build a ledger from a manifest as well as support simple get and put operations. Basic tests surround the building of manifests only at this stage - more work required for get and put.
This commit is contained in:
parent
ce0c55a2ec
commit
2a76eb364e
3 changed files with 745 additions and 66 deletions
|
@ -16,9 +16,9 @@
|
||||||
%%
|
%%
|
||||||
%% This is to be used in eleveledb, and in this context:
|
%% This is to be used in eleveledb, and in this context:
|
||||||
%% - Keys will be a Sequence Number
|
%% - Keys will be a Sequence Number
|
||||||
%% - Values will be a Checksum; Pointers (length * 3); Key; [Metadata]; [Value]
|
%% - Values will be a Checksum | Object | KeyAdditions
|
||||||
%% where the pointers can be used to extract just part of the value
|
%% Where the KeyAdditions are all the Key changes required to be added to the
|
||||||
%% (i.e. metadata only)
|
%% ledger to complete the changes (the addition of postings and tombstones).
|
||||||
%%
|
%%
|
||||||
%% This module provides functions to create and query a CDB (constant database).
|
%% This module provides functions to create and query a CDB (constant database).
|
||||||
%% A CDB implements a two-level hashtable which provides fast {key,value}
|
%% A CDB implements a two-level hashtable which provides fast {key,value}
|
||||||
|
@ -58,7 +58,11 @@
|
||||||
cdb_open_reader/1,
|
cdb_open_reader/1,
|
||||||
cdb_get/2,
|
cdb_get/2,
|
||||||
cdb_put/3,
|
cdb_put/3,
|
||||||
cdb_close/1]).
|
cdb_lastkey/1,
|
||||||
|
cdb_filename/1,
|
||||||
|
cdb_keycheck/2,
|
||||||
|
cdb_close/1,
|
||||||
|
cdb_complete/1]).
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
@ -70,8 +74,8 @@
|
||||||
|
|
||||||
-record(state, {hashtree,
|
-record(state, {hashtree,
|
||||||
last_position :: integer(),
|
last_position :: integer(),
|
||||||
smallest_sqn :: integer(),
|
last_key = empty,
|
||||||
highest_sqn :: integer(),
|
hash_index = [] :: list(),
|
||||||
filename :: string(),
|
filename :: string(),
|
||||||
handle :: file:fd(),
|
handle :: file:fd(),
|
||||||
writer :: boolean}).
|
writer :: boolean}).
|
||||||
|
@ -108,6 +112,22 @@ cdb_put(Pid, Key, Value) ->
|
||||||
cdb_close(Pid) ->
|
cdb_close(Pid) ->
|
||||||
gen_server:call(Pid, cdb_close, infinity).
|
gen_server:call(Pid, cdb_close, infinity).
|
||||||
|
|
||||||
|
cdb_complete(Pid) ->
|
||||||
|
gen_server:call(Pid, cdb_complete, infinity).
|
||||||
|
|
||||||
|
%% Get the last key to be added to the file (which will have the highest
|
||||||
|
%% sequence number)
|
||||||
|
cdb_lastkey(Pid) ->
|
||||||
|
gen_server:call(Pid, cdb_lastkey, infinity).
|
||||||
|
|
||||||
|
%% Get the filename of the database
|
||||||
|
cdb_filename(Pid) ->
|
||||||
|
gen_server:call(Pid, cdb_filename, infinity).
|
||||||
|
|
||||||
|
%% Check to see if the key is probably present, will return either
|
||||||
|
%% probably or missing. Does not do a definitive check
|
||||||
|
cdb_keycheck(Pid, Key) ->
|
||||||
|
gen_server:call(Pid, {cdb_keycheck, Key}, infinity).
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% gen_server callbacks
|
%%% gen_server callbacks
|
||||||
|
@ -118,29 +138,59 @@ init([]) ->
|
||||||
|
|
||||||
handle_call({cdb_open_writer, Filename}, _From, State) ->
|
handle_call({cdb_open_writer, Filename}, _From, State) ->
|
||||||
io:format("Opening file for writing with filename ~s~n", [Filename]),
|
io:format("Opening file for writing with filename ~s~n", [Filename]),
|
||||||
{LastPosition, HashTree} = open_active_file(Filename),
|
{LastPosition, HashTree, LastKey} = open_active_file(Filename),
|
||||||
{ok, Handle} = file:open(Filename, [binary, raw, read,
|
{ok, Handle} = file:open(Filename, [binary, raw, read,
|
||||||
write, delayed_write]),
|
write, delayed_write]),
|
||||||
{reply, ok, State#state{handle=Handle,
|
{reply, ok, State#state{handle=Handle,
|
||||||
last_position=LastPosition,
|
last_position=LastPosition,
|
||||||
|
last_key=LastKey,
|
||||||
filename=Filename,
|
filename=Filename,
|
||||||
hashtree=HashTree,
|
hashtree=HashTree,
|
||||||
writer=true}};
|
writer=true}};
|
||||||
handle_call({cdb_open_reader, Filename}, _From, State) ->
|
handle_call({cdb_open_reader, Filename}, _From, State) ->
|
||||||
io:format("Opening file for reading with filename ~s~n", [Filename]),
|
io:format("Opening file for reading with filename ~s~n", [Filename]),
|
||||||
{ok, Handle} = file:open(Filename, [binary, raw, read]),
|
{ok, Handle} = file:open(Filename, [binary, raw, read]),
|
||||||
|
Index = load_index(Handle),
|
||||||
{reply, ok, State#state{handle=Handle,
|
{reply, ok, State#state{handle=Handle,
|
||||||
filename=Filename,
|
filename=Filename,
|
||||||
writer=false}};
|
writer=false,
|
||||||
|
hash_index=Index}};
|
||||||
handle_call({cdb_get, Key}, _From, State) ->
|
handle_call({cdb_get, Key}, _From, State) ->
|
||||||
case State#state.writer of
|
case {State#state.writer, State#state.hash_index} of
|
||||||
true ->
|
{true, _} ->
|
||||||
{reply,
|
{reply,
|
||||||
get_mem(Key, State#state.handle, State#state.hashtree),
|
get_mem(Key, State#state.handle, State#state.hashtree),
|
||||||
State};
|
State};
|
||||||
false ->
|
{false, []} ->
|
||||||
{reply,
|
{reply,
|
||||||
get(State#state.handle, Key),
|
get(State#state.handle, Key),
|
||||||
|
State};
|
||||||
|
{false, Cache} ->
|
||||||
|
{reply,
|
||||||
|
get_withcache(State#state.handle, Key, Cache),
|
||||||
|
State}
|
||||||
|
end;
|
||||||
|
handle_call({cdb_keycheck, Key}, _From, State) ->
|
||||||
|
case {State#state.writer, State#state.hash_index} of
|
||||||
|
{true, _} ->
|
||||||
|
{reply,
|
||||||
|
get_mem(Key,
|
||||||
|
State#state.handle,
|
||||||
|
State#state.hashtree,
|
||||||
|
loose_presence),
|
||||||
|
State};
|
||||||
|
{false, []} ->
|
||||||
|
{reply,
|
||||||
|
get(State#state.handle,
|
||||||
|
Key,
|
||||||
|
loose_presence),
|
||||||
|
State};
|
||||||
|
{false, Cache} ->
|
||||||
|
{reply,
|
||||||
|
get(State#state.handle,
|
||||||
|
Key,
|
||||||
|
loose_presence,
|
||||||
|
Cache),
|
||||||
State}
|
State}
|
||||||
end;
|
end;
|
||||||
handle_call({cdb_put, Key, Value}, _From, State) ->
|
handle_call({cdb_put, Key, Value}, _From, State) ->
|
||||||
|
@ -151,10 +201,12 @@ handle_call({cdb_put, Key, Value}, _From, State) ->
|
||||||
{State#state.last_position, State#state.hashtree}),
|
{State#state.last_position, State#state.hashtree}),
|
||||||
case Result of
|
case Result of
|
||||||
roll ->
|
roll ->
|
||||||
|
%% Key and value could not be written
|
||||||
{reply, roll, State};
|
{reply, roll, State};
|
||||||
{UpdHandle, NewPosition, HashTree} ->
|
{UpdHandle, NewPosition, HashTree} ->
|
||||||
{reply, ok, State#state{handle=UpdHandle,
|
{reply, ok, State#state{handle=UpdHandle,
|
||||||
last_position=NewPosition,
|
last_position=NewPosition,
|
||||||
|
last_key=Key,
|
||||||
hashtree=HashTree}}
|
hashtree=HashTree}}
|
||||||
end;
|
end;
|
||||||
false ->
|
false ->
|
||||||
|
@ -162,17 +214,31 @@ handle_call({cdb_put, Key, Value}, _From, State) ->
|
||||||
{error, read_only},
|
{error, read_only},
|
||||||
State}
|
State}
|
||||||
end;
|
end;
|
||||||
|
handle_call(cdb_lastkey, _From, State) ->
|
||||||
|
{reply, State#state.last_key, State};
|
||||||
|
handle_call(cdb_filename, _From, State) ->
|
||||||
|
{reply, State#state.filename, State};
|
||||||
handle_call(cdb_close, _From, State) ->
|
handle_call(cdb_close, _From, State) ->
|
||||||
|
ok = file:close(State#state.handle),
|
||||||
|
{stop, normal, ok, State};
|
||||||
|
handle_call(cdb_complete, _From, State) ->
|
||||||
case State#state.writer of
|
case State#state.writer of
|
||||||
true ->
|
true ->
|
||||||
ok = close_file(State#state.handle,
|
ok = close_file(State#state.handle,
|
||||||
State#state.hashtree,
|
State#state.hashtree,
|
||||||
State#state.last_position);
|
State#state.last_position),
|
||||||
|
%% Rename file
|
||||||
|
NewName = filename:rootname(State#state.filename, ".pnd")
|
||||||
|
++ ".cdb",
|
||||||
|
io:format("Renaming file from ~s to ~s~n", [State#state.filename, NewName]),
|
||||||
|
ok = file:rename(State#state.filename, NewName),
|
||||||
|
{stop, normal, {ok, NewName}, State};
|
||||||
false ->
|
false ->
|
||||||
ok = file:close(State#state.handle)
|
ok = file:close(State#state.handle),
|
||||||
end,
|
{stop, normal, {ok, State#state.filename}, State}
|
||||||
{stop, normal, ok, State}.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
handle_cast(_Msg, State) ->
|
handle_cast(_Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
@ -260,7 +326,7 @@ dump(FileName, CRCCheck) ->
|
||||||
open_active_file(FileName) when is_list(FileName) ->
|
open_active_file(FileName) when is_list(FileName) ->
|
||||||
{ok, Handle} = file:open(FileName, [binary, raw, read, write]),
|
{ok, Handle} = file:open(FileName, [binary, raw, read, write]),
|
||||||
{ok, Position} = file:position(Handle, {bof, 256*?DWORD_SIZE}),
|
{ok, Position} = file:position(Handle, {bof, 256*?DWORD_SIZE}),
|
||||||
{LastPosition, HashTree} = scan_over_file(Handle, Position),
|
{LastPosition, HashTree, LastKey} = scan_over_file(Handle, Position),
|
||||||
case file:position(Handle, eof) of
|
case file:position(Handle, eof) of
|
||||||
{ok, LastPosition} ->
|
{ok, LastPosition} ->
|
||||||
ok = file:close(Handle);
|
ok = file:close(Handle);
|
||||||
|
@ -272,7 +338,7 @@ open_active_file(FileName) when is_list(FileName) ->
|
||||||
ok = file:truncate(Handle),
|
ok = file:truncate(Handle),
|
||||||
ok = file:close(Handle)
|
ok = file:close(Handle)
|
||||||
end,
|
end,
|
||||||
{LastPosition, HashTree}.
|
{LastPosition, HashTree, LastKey}.
|
||||||
|
|
||||||
%% put(Handle, Key, Value, {LastPosition, HashDict}) -> {NewPosition, KeyDict}
|
%% put(Handle, Key, Value, {LastPosition, HashDict}) -> {NewPosition, KeyDict}
|
||||||
%% Append to an active file a new key/value pair returning an updated
|
%% Append to an active file a new key/value pair returning an updated
|
||||||
|
@ -298,18 +364,22 @@ put(Handle, Key, Value, {LastPosition, HashTree}) ->
|
||||||
%% get(FileName,Key) -> {key,value}
|
%% get(FileName,Key) -> {key,value}
|
||||||
%% Given a filename and a key, returns a key and value tuple.
|
%% Given a filename and a key, returns a key and value tuple.
|
||||||
%%
|
%%
|
||||||
|
get_withcache(Handle, Key, Cache) ->
|
||||||
|
get(Handle, Key, ?CRC_CHECK, Cache).
|
||||||
|
|
||||||
get(FileNameOrHandle, Key) ->
|
get(FileNameOrHandle, Key) ->
|
||||||
get(FileNameOrHandle, Key, ?CRC_CHECK).
|
get(FileNameOrHandle, Key, ?CRC_CHECK).
|
||||||
|
|
||||||
get(FileName, Key, CRCCheck) when is_list(FileName), is_list(Key) ->
|
get(FileNameOrHandle, Key, CRCCheck) ->
|
||||||
|
get(FileNameOrHandle, Key, CRCCheck, no_cache).
|
||||||
|
|
||||||
|
get(FileName, Key, CRCCheck, Cache) when is_list(FileName), is_list(Key) ->
|
||||||
{ok,Handle} = file:open(FileName,[binary, raw, read]),
|
{ok,Handle} = file:open(FileName,[binary, raw, read]),
|
||||||
get(Handle,Key, CRCCheck);
|
get(Handle,Key, CRCCheck, Cache);
|
||||||
get(Handle, Key, CRCCheck) when is_tuple(Handle), is_list(Key) ->
|
get(Handle, Key, CRCCheck, Cache) when is_tuple(Handle), is_list(Key) ->
|
||||||
Hash = hash(Key),
|
Hash = hash(Key),
|
||||||
Index = hash_to_index(Hash),
|
Index = hash_to_index(Hash),
|
||||||
{ok,_} = file:position(Handle, {bof, ?DWORD_SIZE * Index}),
|
{HashTable, Count} = get_index(Handle, Index, Cache),
|
||||||
% Get location of hashtable and number of entries in the hash
|
|
||||||
{HashTable, Count} = read_next_2_integers(Handle),
|
|
||||||
% If the count is 0 for that index - key must be missing
|
% If the count is 0 for that index - key must be missing
|
||||||
case Count of
|
case Count of
|
||||||
0 ->
|
0 ->
|
||||||
|
@ -326,14 +396,32 @@ get(Handle, Key, CRCCheck) when is_tuple(Handle), is_list(Key) ->
|
||||||
search_hash_table(Handle, lists:append(L2, L1), Hash, Key, CRCCheck)
|
search_hash_table(Handle, lists:append(L2, L1), Hash, Key, CRCCheck)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
get_index(Handle, Index, no_cache) ->
|
||||||
|
{ok,_} = file:position(Handle, {bof, ?DWORD_SIZE * Index}),
|
||||||
|
% Get location of hashtable and number of entries in the hash
|
||||||
|
read_next_2_integers(Handle);
|
||||||
|
get_index(_Handle, Index, Cache) ->
|
||||||
|
lists:keyfind(Index, 1, Cache).
|
||||||
|
|
||||||
%% Get a Key/Value pair from an active CDB file (with no hash table written)
|
%% Get a Key/Value pair from an active CDB file (with no hash table written)
|
||||||
%% This requires a key dictionary to be passed in (mapping keys to positions)
|
%% This requires a key dictionary to be passed in (mapping keys to positions)
|
||||||
%% Will return {Key, Value} or missing
|
%% Will return {Key, Value} or missing
|
||||||
get_mem(Key, Filename, HashTree) when is_list(Filename) ->
|
get_mem(Key, FNOrHandle, HashTree) ->
|
||||||
|
get_mem(Key, FNOrHandle, HashTree, ?CRC_CHECK).
|
||||||
|
|
||||||
|
get_mem(Key, Filename, HashTree, CRCCheck) when is_list(Filename) ->
|
||||||
{ok, Handle} = file:open(Filename, [binary, raw, read]),
|
{ok, Handle} = file:open(Filename, [binary, raw, read]),
|
||||||
get_mem(Key, Handle, HashTree);
|
get_mem(Key, Handle, HashTree, CRCCheck);
|
||||||
get_mem(Key, Handle, HashTree) ->
|
get_mem(Key, Handle, HashTree, CRCCheck) ->
|
||||||
extract_kvpair(Handle, get_hashtree(Key, HashTree), Key).
|
ListToCheck = get_hashtree(Key, HashTree),
|
||||||
|
case {CRCCheck, ListToCheck} of
|
||||||
|
{loose_presence, []} ->
|
||||||
|
missing;
|
||||||
|
{loose_presence, _L} ->
|
||||||
|
probably;
|
||||||
|
_ ->
|
||||||
|
extract_kvpair(Handle, ListToCheck, Key, CRCCheck)
|
||||||
|
end.
|
||||||
|
|
||||||
%% Get the next key at a position in the file (or the first key if no position
|
%% Get the next key at a position in the file (or the first key if no position
|
||||||
%% is passed). Will return both a key and the next position
|
%% is passed). Will return both a key and the next position
|
||||||
|
@ -433,6 +521,15 @@ fold_keys(Handle, FoldFun, Acc0) ->
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%%%%%%%%%%%%%%%%%%%
|
%%%%%%%%%%%%%%%%%%%%
|
||||||
|
|
||||||
|
load_index(Handle) ->
|
||||||
|
Index = lists:seq(0, 255),
|
||||||
|
lists:map(fun(X) ->
|
||||||
|
file:position(Handle, {bof, ?DWORD_SIZE * X}),
|
||||||
|
{HashTablePos, Count} = read_next_2_integers(Handle),
|
||||||
|
{X, {HashTablePos, Count}} end,
|
||||||
|
Index).
|
||||||
|
|
||||||
|
|
||||||
%% Take an active file and write the hash details necessary to close that
|
%% Take an active file and write the hash details necessary to close that
|
||||||
%% file and roll a new active file if requested.
|
%% file and roll a new active file if requested.
|
||||||
%%
|
%%
|
||||||
|
@ -473,9 +570,6 @@ put_hashtree(Key, Position, HashTree) ->
|
||||||
|
|
||||||
%% Function to extract a Key-Value pair given a file handle and a position
|
%% Function to extract a Key-Value pair given a file handle and a position
|
||||||
%% Will confirm that the key matches and do a CRC check when requested
|
%% Will confirm that the key matches and do a CRC check when requested
|
||||||
extract_kvpair(Handle, Positions, Key) ->
|
|
||||||
extract_kvpair(Handle, Positions, Key, ?CRC_CHECK).
|
|
||||||
|
|
||||||
extract_kvpair(_, [], _, _) ->
|
extract_kvpair(_, [], _, _) ->
|
||||||
missing;
|
missing;
|
||||||
extract_kvpair(Handle, [Position|Rest], Key, Check) ->
|
extract_kvpair(Handle, [Position|Rest], Key, Check) ->
|
||||||
|
@ -497,12 +591,12 @@ extract_kvpair(Handle, [Position|Rest], Key, Check) ->
|
||||||
%% at that point return the position and the key dictionary scanned so far
|
%% at that point return the position and the key dictionary scanned so far
|
||||||
scan_over_file(Handle, Position) ->
|
scan_over_file(Handle, Position) ->
|
||||||
HashTree = array:new(256, {default, gb_trees:empty()}),
|
HashTree = array:new(256, {default, gb_trees:empty()}),
|
||||||
scan_over_file(Handle, Position, HashTree).
|
scan_over_file(Handle, Position, HashTree, empty).
|
||||||
|
|
||||||
scan_over_file(Handle, Position, HashTree) ->
|
scan_over_file(Handle, Position, HashTree, LastKey) ->
|
||||||
case saferead_keyvalue(Handle) of
|
case saferead_keyvalue(Handle) of
|
||||||
false ->
|
false ->
|
||||||
{Position, HashTree};
|
{Position, HashTree, LastKey};
|
||||||
{Key, ValueAsBin, KeyLength, ValueLength} ->
|
{Key, ValueAsBin, KeyLength, ValueLength} ->
|
||||||
case crccheck_value(ValueAsBin) of
|
case crccheck_value(ValueAsBin) of
|
||||||
true ->
|
true ->
|
||||||
|
@ -510,14 +604,15 @@ scan_over_file(Handle, Position, HashTree) ->
|
||||||
+ ?DWORD_SIZE,
|
+ ?DWORD_SIZE,
|
||||||
scan_over_file(Handle,
|
scan_over_file(Handle,
|
||||||
NewPosition,
|
NewPosition,
|
||||||
put_hashtree(Key, Position, HashTree));
|
put_hashtree(Key, Position, HashTree),
|
||||||
|
Key);
|
||||||
false ->
|
false ->
|
||||||
io:format("CRC check returned false on key of ~w ~n",
|
io:format("CRC check returned false on key of ~w ~n",
|
||||||
[Key]),
|
[Key]),
|
||||||
{Position, HashTree}
|
{Position, HashTree, LastKey}
|
||||||
end;
|
end;
|
||||||
eof ->
|
eof ->
|
||||||
{Position, HashTree}
|
{Position, HashTree, LastKey}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
@ -531,7 +626,6 @@ saferead_keyvalue(Handle) ->
|
||||||
eof ->
|
eof ->
|
||||||
false;
|
false;
|
||||||
{KeyL, ValueL} ->
|
{KeyL, ValueL} ->
|
||||||
io:format("KeyL ~w ValueL ~w~n", [KeyL, ValueL]),
|
|
||||||
case safe_read_next_term(Handle, KeyL) of
|
case safe_read_next_term(Handle, KeyL) of
|
||||||
{error, einval} ->
|
{error, einval} ->
|
||||||
false;
|
false;
|
||||||
|
@ -540,7 +634,6 @@ saferead_keyvalue(Handle) ->
|
||||||
false ->
|
false ->
|
||||||
false;
|
false;
|
||||||
Key ->
|
Key ->
|
||||||
io:format("Found Key of ~s~n", [Key]),
|
|
||||||
case file:read(Handle, ValueL) of
|
case file:read(Handle, ValueL) of
|
||||||
{error, einval} ->
|
{error, einval} ->
|
||||||
false;
|
false;
|
||||||
|
@ -640,14 +733,25 @@ read_next_2_integers(Handle) ->
|
||||||
|
|
||||||
%% Seach the hash table for the matching hash and key. Be prepared for
|
%% Seach the hash table for the matching hash and key. Be prepared for
|
||||||
%% multiple keys to have the same hash value.
|
%% multiple keys to have the same hash value.
|
||||||
search_hash_table(_Handle, [], _Hash, _Key, _CRCCHeck) ->
|
%%
|
||||||
|
%% There are three possible values of CRCCheck:
|
||||||
|
%% true - check the CRC before returning key & value
|
||||||
|
%% false - don't check the CRC before returning key & value
|
||||||
|
%% loose_presence - confirm that the hash of the key is present
|
||||||
|
|
||||||
|
search_hash_table(_Handle, [], _Hash, _Key, _CRCCheck) ->
|
||||||
missing;
|
missing;
|
||||||
search_hash_table(Handle, [Entry|RestOfEntries], Hash, Key, CRCCheck) ->
|
search_hash_table(Handle, [Entry|RestOfEntries], Hash, Key, CRCCheck) ->
|
||||||
{ok, _} = file:position(Handle, Entry),
|
{ok, _} = file:position(Handle, Entry),
|
||||||
{StoredHash, DataLoc} = read_next_2_integers(Handle),
|
{StoredHash, DataLoc} = read_next_2_integers(Handle),
|
||||||
case StoredHash of
|
case StoredHash of
|
||||||
Hash ->
|
Hash ->
|
||||||
KV = extract_kvpair(Handle, [DataLoc], Key, CRCCheck),
|
KV = case CRCCheck of
|
||||||
|
loose_presence ->
|
||||||
|
probably;
|
||||||
|
_ ->
|
||||||
|
extract_kvpair(Handle, [DataLoc], Key, CRCCheck)
|
||||||
|
end,
|
||||||
case KV of
|
case KV of
|
||||||
missing ->
|
missing ->
|
||||||
search_hash_table(Handle, RestOfEntries, Hash, Key, CRCCheck);
|
search_hash_table(Handle, RestOfEntries, Hash, Key, CRCCheck);
|
||||||
|
@ -789,6 +893,13 @@ write_top_index_table(Handle, BasePos, List) ->
|
||||||
lists:foldl(FnWriteIndex, BasePos, CompleteList),
|
lists:foldl(FnWriteIndex, BasePos, CompleteList),
|
||||||
ok = file:advise(Handle, 0, ?DWORD_SIZE * 256, will_need).
|
ok = file:advise(Handle, 0, ?DWORD_SIZE * 256, will_need).
|
||||||
|
|
||||||
|
%% To make this compatible with original Bernstein format this endian flip
|
||||||
|
%% and also the use of the standard hash function required.
|
||||||
|
%%
|
||||||
|
%% Hash function contains mysterious constants, some explanation here as to
|
||||||
|
%% what they are -
|
||||||
|
%% http://stackoverflow.com/ ++
|
||||||
|
%% questions/10696223/reason-for-5381-number-in-djb-hash-function
|
||||||
|
|
||||||
endian_flip(Int) ->
|
endian_flip(Int) ->
|
||||||
<<X:32/unsigned-little-integer>> = <<Int:32>>,
|
<<X:32/unsigned-little-integer>> = <<Int:32>>,
|
||||||
|
@ -962,12 +1073,24 @@ activewrite_singlewrite_test() ->
|
||||||
InitialD1 = dict:store("0001", "Initial value", InitialD),
|
InitialD1 = dict:store("0001", "Initial value", InitialD),
|
||||||
ok = from_dict("../test/test_mem.cdb", InitialD1),
|
ok = from_dict("../test/test_mem.cdb", InitialD1),
|
||||||
io:format("New db file created ~n", []),
|
io:format("New db file created ~n", []),
|
||||||
{LastPosition, KeyDict} = open_active_file("../test/test_mem.cdb"),
|
{LastPosition, KeyDict, _} = open_active_file("../test/test_mem.cdb"),
|
||||||
io:format("File opened as new active file "
|
io:format("File opened as new active file "
|
||||||
"with LastPosition=~w ~n", [LastPosition]),
|
"with LastPosition=~w ~n", [LastPosition]),
|
||||||
{_, _, UpdKeyDict} = put("../test/test_mem.cdb", Key, Value, {LastPosition, KeyDict}),
|
{_, _, UpdKeyDict} = put("../test/test_mem.cdb",
|
||||||
|
Key, Value,
|
||||||
|
{LastPosition, KeyDict}),
|
||||||
io:format("New key and value added to active file ~n", []),
|
io:format("New key and value added to active file ~n", []),
|
||||||
?assertMatch({Key, Value}, get_mem(Key, "../test/test_mem.cdb", UpdKeyDict)),
|
?assertMatch({Key, Value},
|
||||||
|
get_mem(Key, "../test/test_mem.cdb",
|
||||||
|
UpdKeyDict)),
|
||||||
|
?assertMatch(probably,
|
||||||
|
get_mem(Key, "../test/test_mem.cdb",
|
||||||
|
UpdKeyDict,
|
||||||
|
loose_presence)),
|
||||||
|
?assertMatch(missing,
|
||||||
|
get_mem("not_present", "../test/test_mem.cdb",
|
||||||
|
UpdKeyDict,
|
||||||
|
loose_presence)),
|
||||||
ok = file:delete("../test/test_mem.cdb").
|
ok = file:delete("../test/test_mem.cdb").
|
||||||
|
|
||||||
search_hash_table_findinslot_test() ->
|
search_hash_table_findinslot_test() ->
|
||||||
|
@ -992,6 +1115,8 @@ search_hash_table_findinslot_test() ->
|
||||||
io:format("Slot 2 has Hash ~w Position ~w~n", [ReadH4, ReadP4]),
|
io:format("Slot 2 has Hash ~w Position ~w~n", [ReadH4, ReadP4]),
|
||||||
?assertMatch(0, ReadH4),
|
?assertMatch(0, ReadH4),
|
||||||
?assertMatch({"key1", "value1"}, get(Handle, Key1)),
|
?assertMatch({"key1", "value1"}, get(Handle, Key1)),
|
||||||
|
?assertMatch(probably, get(Handle, Key1, loose_presence)),
|
||||||
|
?assertMatch(missing, get(Handle, "Key99", loose_presence)),
|
||||||
{ok, _} = file:position(Handle, FirstHashPosition),
|
{ok, _} = file:position(Handle, FirstHashPosition),
|
||||||
FlipH3 = endian_flip(ReadH3),
|
FlipH3 = endian_flip(ReadH3),
|
||||||
FlipP3 = endian_flip(ReadP3),
|
FlipP3 = endian_flip(ReadP3),
|
||||||
|
@ -1029,7 +1154,7 @@ getnextkey_inclemptyvalue_test() ->
|
||||||
ok = file:delete("../test/hashtable2_test.cdb").
|
ok = file:delete("../test/hashtable2_test.cdb").
|
||||||
|
|
||||||
newactivefile_test() ->
|
newactivefile_test() ->
|
||||||
{LastPosition, _} = open_active_file("../test/activefile_test.cdb"),
|
{LastPosition, _, _} = open_active_file("../test/activefile_test.cdb"),
|
||||||
?assertMatch(256 * ?DWORD_SIZE, LastPosition),
|
?assertMatch(256 * ?DWORD_SIZE, LastPosition),
|
||||||
Response = get_nextkey("../test/activefile_test.cdb"),
|
Response = get_nextkey("../test/activefile_test.cdb"),
|
||||||
?assertMatch(nomorekeys, Response),
|
?assertMatch(nomorekeys, Response),
|
||||||
|
|
|
@ -1,11 +1,560 @@
|
||||||
%% -------- Inker ---------
|
%% -------- Inker ---------
|
||||||
%%
|
|
||||||
%%
|
%%
|
||||||
|
%% The Inker is responsible for managing access and updates to the Journal.
|
||||||
|
%%
|
||||||
|
%% The Inker maintains a manifest of what files make up the Journal, and which
|
||||||
|
%% file is the current append-only nursery log to accept new PUTs into the
|
||||||
|
%% Journal. The Inker also marshals GET requests to the appropriate database
|
||||||
|
%% file within the Journal (routed by sequence number). The Inker is also
|
||||||
|
%% responsible for scheduling compaction work to be carried out by the Inker's
|
||||||
|
%% clerk.
|
||||||
|
%%
|
||||||
|
%% -------- Journal ---------
|
||||||
|
%%
|
||||||
|
%% The Journal is a series of files originally named as <SQN>_nursery.cdb
|
||||||
|
%% where the sequence number is the first object sequence number (key) within
|
||||||
|
%% the given database file. The files will be named *.cdb at the point they
|
||||||
|
%% have been made immutable (through a rename operation). Prior to this, they
|
||||||
|
%% will originally start out as a *.pnd file.
|
||||||
|
%%
|
||||||
|
%% At some stage in the future compacted versions of old journal cdb files may
|
||||||
|
%% be produced. These files will be named <SQN>-<CompactionID>.cdb, and once
|
||||||
|
%% the manifest is updated the original <SQN>_nursery.cdb (or
|
||||||
|
%% <SQN>_<previous CompactionID>.cdb) files they replace will be erased.
|
||||||
|
%%
|
||||||
|
%% The current Journal is made up of a set of files referenced in the manifest,
|
||||||
|
%% combined with a set of files of the form <SQN>_nursery.[cdb|pnd] with
|
||||||
|
%% a higher Sequence Number compared to the files in the manifest.
|
||||||
|
%%
|
||||||
|
%% The Journal is ordered by sequence number from front to back both within
|
||||||
|
%% and across files.
|
||||||
|
%%
|
||||||
|
%% On startup the Inker should open the manifest with the highest sequence
|
||||||
|
%% number, and this will contain the list of filenames that make up the
|
||||||
|
%% non-recent part of the Journal. The Manifest is completed by opening these
|
||||||
|
%% files plus any other files with a higher sequence number. The file with
|
||||||
|
%% the highest sequence number is assumed to to be the active writer. Any file
|
||||||
|
%% with a lower sequence number and a *.pnd extension should be re-rolled into
|
||||||
|
%% a *.cdb file.
|
||||||
|
%%
|
||||||
|
%% -------- Objects ---------
|
||||||
|
%%
|
||||||
|
%% From the perspective of the Inker, objects to store are made up of:
|
||||||
|
%% - A Primary Key (as an Erlang term)
|
||||||
|
%% - A sequence number (assigned by the Inker)
|
||||||
|
%% - An object (an Erlang term)
|
||||||
|
%% - A set of Key Deltas associated with the change
|
||||||
|
%%
|
||||||
|
%% -------- Manifest ---------
|
||||||
|
%%
|
||||||
|
%% The Journal has a manifest which is the current record of which cdb files
|
||||||
|
%% are currently active in the Journal (i.e. following compaction). The
|
||||||
|
%% manifest holds this information through two lists - a list of files which
|
||||||
|
%% are definitely in the current manifest, and a list of files which have been
|
||||||
|
%% removed, but may still be present on disk. The use of two lists is to
|
||||||
|
%% avoid any circumsatnces where a compaction event has led to the deletion of
|
||||||
|
%% a Journal file with a higher sequence number than any in the remaining
|
||||||
|
%% manifest.
|
||||||
|
%%
|
||||||
|
%% A new manifest file is saved for every compaction event. The manifest files
|
||||||
|
%% are saved using the filename <ManifestSQN>.man once saved. The ManifestSQN
|
||||||
|
%% is incremented once for every compaction event.
|
||||||
|
%%
|
||||||
|
%% -------- Compaction ---------
|
||||||
|
%%
|
||||||
|
%% Compaction is a process whereby an Inker's clerk will:
|
||||||
|
%% - Request a snapshot of the Ledger, as well as the lowest sequence number
|
||||||
|
%% that is currently registerd by another snapshot owner
|
||||||
|
%% - Picks a Journal database file at random (not including the current
|
||||||
|
%% nursery log)
|
||||||
|
%% - Performs a random walk on keys and sequence numbers in the chosen CDB
|
||||||
|
%% file to extract a subset of 100 key and sequence number combinations from
|
||||||
|
%% the database
|
||||||
|
%% - Looks up the current sequence number for those keys in the Ledger
|
||||||
|
%% - If more than <n>% (default n=20) of the keys are now at a higher sequence
|
||||||
|
%% number, then the database file is a candidate for compaction. In this case
|
||||||
|
%% each of the next 8 files in sequence should be checked until all those 8
|
||||||
|
%% files have been checked or one of the files has been found to be below the
|
||||||
|
%% threshold.
|
||||||
|
%% - If a set of below-the-threshold files is found, the files are re-written
|
||||||
|
%% without any superceded values
|
||||||
|
%%- The clerk should then request that the Inker commit the manifest change
|
||||||
|
%%
|
||||||
|
%% -------- Inker's Clerk ---------
|
||||||
|
%%
|
||||||
%%
|
%%
|
||||||
%% -------- Ledger ---------
|
|
||||||
%%
|
%%
|
||||||
%%
|
%%
|
||||||
|
|
||||||
|
|
||||||
-module(leveled_inker).
|
-module(leveled_inker).
|
||||||
|
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
-include("../include/leveled.hrl").
|
||||||
|
|
||||||
|
-export([init/1,
|
||||||
|
handle_call/3,
|
||||||
|
handle_cast/2,
|
||||||
|
handle_info/2,
|
||||||
|
terminate/2,
|
||||||
|
code_change/3,
|
||||||
|
ink_start/1,
|
||||||
|
ink_put/4,
|
||||||
|
ink_get/3,
|
||||||
|
ink_snap/1,
|
||||||
|
build_dummy_journal/0,
|
||||||
|
simple_manifest_reader/2]).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
-define(MANIFEST_FP, "journal_manifest").
|
||||||
|
-define(FILES_FP, "journal_files").
|
||||||
|
-define(JOURNAL_FILEX, "cdb").
|
||||||
|
-define(MANIFEST_FILEX, "man").
|
||||||
|
-define(PENDING_FILEX, "pnd").
|
||||||
|
|
||||||
|
|
||||||
|
-record(state, {manifest = [] :: list(),
|
||||||
|
manifest_sqn = 0 :: integer(),
|
||||||
|
journal_sqn = 0 :: integer(),
|
||||||
|
active_journaldb :: pid(),
|
||||||
|
removed_journaldbs = [] :: list(),
|
||||||
|
root_path :: string()}).
|
||||||
|
|
||||||
|
|
||||||
|
%%%============================================================================
|
||||||
|
%%% API
|
||||||
|
%%%============================================================================
|
||||||
|
|
||||||
|
ink_start(RootDir) ->
|
||||||
|
gen_server:start(?MODULE, [RootDir], []).
|
||||||
|
|
||||||
|
ink_put(Pid, PrimaryKey, Object, KeyChanges) ->
|
||||||
|
gen_server:call(Pid, {put, PrimaryKey, Object, KeyChanges}, infinity).
|
||||||
|
|
||||||
|
ink_get(Pid, PrimaryKey, SQN) ->
|
||||||
|
gen_server:call(Pid, {get, PrimaryKey, SQN}, infinity).
|
||||||
|
|
||||||
|
ink_snap(Pid) ->
|
||||||
|
gen_server:call(Pid, snapshot, infinity).
|
||||||
|
|
||||||
|
%%%============================================================================
|
||||||
|
%%% gen_server callbacks
|
||||||
|
%%%============================================================================
|
||||||
|
|
||||||
|
init([RootPath]) ->
|
||||||
|
JournalFP = filepath(RootPath, journal_dir),
|
||||||
|
{ok, JournalFilenames} = case filelib:is_dir(JournalFP) of
|
||||||
|
true ->
|
||||||
|
file:list_dir(JournalFP);
|
||||||
|
false ->
|
||||||
|
filelib:ensure_dir(JournalFP),
|
||||||
|
{ok, []}
|
||||||
|
end,
|
||||||
|
ManifestFP = filepath(RootPath, manifest_dir),
|
||||||
|
{ok, ManifestFilenames} = case filelib:is_dir(ManifestFP) of
|
||||||
|
true ->
|
||||||
|
file:list_dir(ManifestFP);
|
||||||
|
false ->
|
||||||
|
filelib:ensure_dir(ManifestFP),
|
||||||
|
{ok, []}
|
||||||
|
end,
|
||||||
|
{Manifest,
|
||||||
|
ActiveJournal,
|
||||||
|
JournalSQN,
|
||||||
|
ManifestSQN} = build_manifest(ManifestFilenames,
|
||||||
|
JournalFilenames,
|
||||||
|
fun simple_manifest_reader/2,
|
||||||
|
RootPath),
|
||||||
|
{ok, #state{manifest = Manifest,
|
||||||
|
manifest_sqn = ManifestSQN,
|
||||||
|
journal_sqn = JournalSQN,
|
||||||
|
active_journaldb = ActiveJournal,
|
||||||
|
root_path = RootPath}}.
|
||||||
|
|
||||||
|
|
||||||
|
handle_call({put, Key, Object, KeyChanges}, From, State) ->
|
||||||
|
case put_object(Key, Object, KeyChanges, State) of
|
||||||
|
{ok, UpdState} ->
|
||||||
|
{reply, {ok, UpdState#state.journal_sqn}, UpdState};
|
||||||
|
{rolling, UpdState} ->
|
||||||
|
gen_server:reply(From, {ok, UpdState#state.journal_sqn}),
|
||||||
|
{NewManifest,
|
||||||
|
NewManifestSQN} = roll_active_file(State#state.active_journaldb,
|
||||||
|
State#state.manifest,
|
||||||
|
State#state.manifest_sqn,
|
||||||
|
State#state.root_path),
|
||||||
|
{noreply, UpdState#state{manifest=NewManifest,
|
||||||
|
manifest_sqn=NewManifestSQN}};
|
||||||
|
{blocked, UpdState} ->
|
||||||
|
{reply, blocked, UpdState}
|
||||||
|
end;
|
||||||
|
handle_call({get, Key, SQN}, _From, State) ->
|
||||||
|
{reply, get_object(Key, SQN, State#state.manifest), State};
|
||||||
|
handle_call(snapshot, _From , State) ->
|
||||||
|
%% TODO: Not yet implemented registration of snapshot
|
||||||
|
%% Should return manifest and register the snapshot
|
||||||
|
{reply, State#state.manifest, State}.
|
||||||
|
|
||||||
|
handle_cast(_Msg, State) ->
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
handle_info(_Info, State) ->
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
terminate(_Reason, _State) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
|
{ok, State}.
|
||||||
|
|
||||||
|
|
||||||
|
%%%============================================================================
|
||||||
|
%%% Internal functions
|
||||||
|
%%%============================================================================
|
||||||
|
|
||||||
|
put_object(PrimaryKey, Object, KeyChanges, State) ->
|
||||||
|
NewSQN = State#state.journal_sqn + 1,
|
||||||
|
Bin1 = term_to_binary({Object, KeyChanges}, [compressed]),
|
||||||
|
case leveled_cdb:cdb_put(State#state.active_journaldb,
|
||||||
|
{NewSQN, PrimaryKey},
|
||||||
|
Bin1) of
|
||||||
|
ok ->
|
||||||
|
{ok, State#state{journal_sqn=NewSQN}};
|
||||||
|
roll ->
|
||||||
|
FileName = filepath(State#state.root_path, NewSQN, new_journal),
|
||||||
|
{ok, NewJournalP} = leveled_cdb:cdb_open_writer(FileName),
|
||||||
|
case leveled_cdb:cdb_put(NewJournalP,
|
||||||
|
{NewSQN, PrimaryKey},
|
||||||
|
Bin1) of
|
||||||
|
ok ->
|
||||||
|
{rolling, State#state{journal_sqn=NewSQN,
|
||||||
|
active_journaldb=NewJournalP}};
|
||||||
|
roll ->
|
||||||
|
{blocked, State#state{journal_sqn=NewSQN,
|
||||||
|
active_journaldb=NewJournalP}}
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
roll_active_file(OldActiveJournal, Manifest, ManifestSQN, RootPath) ->
|
||||||
|
{ok, NewFilename} = leveled_cdb:cdb_complete(OldActiveJournal),
|
||||||
|
{ok, PidR} = leveled_cdb:cdb_open_reader(NewFilename),
|
||||||
|
JournalRegex2 = "nursery_(?<SQN>[0-9]+)\\." ++ ?JOURNAL_FILEX,
|
||||||
|
[JournalSQN] = sequencenumbers_fromfilenames([NewFilename],
|
||||||
|
JournalRegex2,
|
||||||
|
'SQN'),
|
||||||
|
NewManifest = lists:append(Manifest, {JournalSQN, NewFilename, PidR}),
|
||||||
|
NewManifestSQN = ManifestSQN + 1,
|
||||||
|
ok = simple_manifest_writer(NewManifest, NewManifestSQN, RootPath),
|
||||||
|
{NewManifest, NewManifestSQN}.
|
||||||
|
|
||||||
|
get_object(PrimaryKey, SQN, Manifest) ->
|
||||||
|
JournalP = find_in_manifest(SQN, Manifest),
|
||||||
|
leveled_cdb:cdb_get(JournalP, {SQN, PrimaryKey}).
|
||||||
|
|
||||||
|
|
||||||
|
build_manifest(ManifestFilenames,
|
||||||
|
JournalFilenames,
|
||||||
|
ManifestRdrFun,
|
||||||
|
RootPath) ->
|
||||||
|
%% Setup root paths
|
||||||
|
JournalFP = filepath(RootPath, journal_dir),
|
||||||
|
%% Find the manifest with a highest Manifest sequence number
|
||||||
|
%% Open it and read it to get the current Confirmed Manifest
|
||||||
|
ManifestRegex = "(?<MSQN>[0-9]+)\\." ++ ?MANIFEST_FILEX,
|
||||||
|
ValidManSQNs = sequencenumbers_fromfilenames(ManifestFilenames,
|
||||||
|
ManifestRegex,
|
||||||
|
'MSQN'),
|
||||||
|
{JournalSQN1,
|
||||||
|
ConfirmedManifest,
|
||||||
|
Removed,
|
||||||
|
ManifestSQN} = case length(ValidManSQNs) of
|
||||||
|
0 ->
|
||||||
|
{0, [], [], 0};
|
||||||
|
_ ->
|
||||||
|
PersistedManSQN = lists:max(ValidManSQNs),
|
||||||
|
{J1, M1, R1} = ManifestRdrFun(PersistedManSQN,
|
||||||
|
RootPath),
|
||||||
|
{J1, M1, R1, PersistedManSQN}
|
||||||
|
end,
|
||||||
|
|
||||||
|
%% Find any more recent immutable files that have a higher sequence number
|
||||||
|
%% - the immutable files have already been rolled, and so have a completed
|
||||||
|
%% hashtree lookup
|
||||||
|
JournalRegex1 = "nursery_(?<SQN>[0-9]+)\\." ++ ?JOURNAL_FILEX,
|
||||||
|
UnremovedJournalFiles = lists:foldl(fun(FN, Acc) ->
|
||||||
|
case lists:member(FN, Removed) of
|
||||||
|
true ->
|
||||||
|
Acc;
|
||||||
|
false ->
|
||||||
|
Acc ++ [FN]
|
||||||
|
end end,
|
||||||
|
[],
|
||||||
|
JournalFilenames),
|
||||||
|
OtherSQNs_imm = sequencenumbers_fromfilenames(UnremovedJournalFiles,
|
||||||
|
JournalRegex1,
|
||||||
|
'SQN'),
|
||||||
|
Manifest1 = lists:foldl(fun(X, Acc) ->
|
||||||
|
if
|
||||||
|
X > JournalSQN1
|
||||||
|
->
|
||||||
|
FN = "nursery_" ++
|
||||||
|
integer_to_list(X)
|
||||||
|
++ "." ++
|
||||||
|
?JOURNAL_FILEX,
|
||||||
|
Acc ++ [{X, FN}];
|
||||||
|
true
|
||||||
|
-> Acc
|
||||||
|
end end,
|
||||||
|
ConfirmedManifest,
|
||||||
|
lists:sort(OtherSQNs_imm)),
|
||||||
|
|
||||||
|
%% Enrich the manifest so it contains the Pid of any of the immutable
|
||||||
|
%% entries
|
||||||
|
io:format("Manifest1 is ~w~n", [Manifest1]),
|
||||||
|
Manifest2 = lists:map(fun({X, Y}) ->
|
||||||
|
FN = filename:join(JournalFP, Y),
|
||||||
|
{ok, Pid} = leveled_cdb:cdb_open_reader(FN),
|
||||||
|
{X, Y, Pid} end,
|
||||||
|
Manifest1),
|
||||||
|
|
||||||
|
%% Find any more recent mutable files that have a higher sequence number
|
||||||
|
%% Roll any mutable files which do not have the highest sequence number
|
||||||
|
%% to create the hashtree and complete the header entries
|
||||||
|
JournalRegex2 = "nursery_(?<SQN>[0-9]+)\\." ++ ?PENDING_FILEX,
|
||||||
|
OtherSQNs_pnd = sequencenumbers_fromfilenames(JournalFilenames,
|
||||||
|
JournalRegex2,
|
||||||
|
'SQN'),
|
||||||
|
|
||||||
|
case length(OtherSQNs_pnd) of
|
||||||
|
0 ->
|
||||||
|
%% Need to create a new active writer, but also find the highest
|
||||||
|
%% SQN from within the confirmed manifest
|
||||||
|
TopSQNInManifest =
|
||||||
|
case length(Manifest2) of
|
||||||
|
0 ->
|
||||||
|
%% Manifest is empty and no active writers
|
||||||
|
%% can be found so database is empty
|
||||||
|
0;
|
||||||
|
_ ->
|
||||||
|
TM = lists:last(lists:keysort(1,Manifest2)),
|
||||||
|
{_SQN, _FN, TMPid} = TM,
|
||||||
|
{HighSQN, _HighKey} = leveled_cdb:cdb_lastkey(TMPid),
|
||||||
|
HighSQN
|
||||||
|
end,
|
||||||
|
ActiveFN = filepath(RootPath, TopSQNInManifest + 1, new_journal),
|
||||||
|
{ok, ActiveJournal} = leveled_cdb:cdb_open_writer(ActiveFN),
|
||||||
|
{Manifest2, ActiveJournal, TopSQNInManifest, ManifestSQN};
|
||||||
|
_ ->
|
||||||
|
|
||||||
|
{ActiveJournalSQN,
|
||||||
|
Manifest3} = roll_pending_journals(lists:sort(OtherSQNs_pnd),
|
||||||
|
Manifest2,
|
||||||
|
RootPath),
|
||||||
|
%% Need to work out highest sequence number in tail file to feed
|
||||||
|
%% into opening of pending journal
|
||||||
|
ActiveFN = filepath(RootPath, ActiveJournalSQN, new_journal),
|
||||||
|
{ok, ActiveJournal} = leveled_cdb:cdb_open_writer(ActiveFN),
|
||||||
|
{HighestSQN, _HighestKey} = leveled_cdb:cdb_lastkey(ActiveJournal),
|
||||||
|
{Manifest3, ActiveJournal, HighestSQN, ManifestSQN}
|
||||||
|
end.
|
||||||
|
|
||||||
|
close_allmanifest([], ActiveJournal) ->
|
||||||
|
leveled_cdb:cdb_close(ActiveJournal);
|
||||||
|
close_allmanifest([H|ManifestT], ActiveJournal) ->
|
||||||
|
{_, _, Pid} = H,
|
||||||
|
leveled_cdb:cdb_close(Pid),
|
||||||
|
close_allmanifest(ManifestT, ActiveJournal).
|
||||||
|
|
||||||
|
|
||||||
|
roll_pending_journals([TopJournalSQN], Manifest, _RootPath)
|
||||||
|
when is_integer(TopJournalSQN) ->
|
||||||
|
{TopJournalSQN, Manifest};
|
||||||
|
roll_pending_journals([JournalSQN|T], Manifest, RootPath) ->
|
||||||
|
Filename = filepath(RootPath, JournalSQN, new_journal),
|
||||||
|
PidW = leveled_cdb:cdb_open_writer(Filename),
|
||||||
|
{ok, NewFilename} = leveled_cdb:cdb_complete(PidW),
|
||||||
|
{ok, PidR} = leveled_cdb:cdb_open_reader(NewFilename),
|
||||||
|
roll_pending_journals(T,
|
||||||
|
lists:append(Manifest,
|
||||||
|
{JournalSQN, NewFilename, PidR}),
|
||||||
|
RootPath).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
sequencenumbers_fromfilenames(Filenames, Regex, IntName) ->
|
||||||
|
lists:foldl(fun(FN, Acc) ->
|
||||||
|
case re:run(FN,
|
||||||
|
Regex,
|
||||||
|
[{capture, [IntName], list}]) of
|
||||||
|
nomatch ->
|
||||||
|
Acc;
|
||||||
|
{match, [Int]} when is_list(Int) ->
|
||||||
|
Acc ++ [list_to_integer(Int)];
|
||||||
|
_ ->
|
||||||
|
Acc
|
||||||
|
end end,
|
||||||
|
[],
|
||||||
|
Filenames).
|
||||||
|
|
||||||
|
find_in_manifest(_SQN, []) ->
|
||||||
|
error;
|
||||||
|
find_in_manifest(SQN, [{LowSQN, _FN, Pid}|_Tail]) when SQN >= LowSQN ->
|
||||||
|
Pid;
|
||||||
|
find_in_manifest(SQN, [_Head|Tail]) ->
|
||||||
|
find_in_manifest(SQN, Tail).
|
||||||
|
|
||||||
|
filepath(RootPath, journal_dir) ->
|
||||||
|
RootPath ++ "/" ++ ?FILES_FP ++ "/";
|
||||||
|
filepath(RootPath, manifest_dir) ->
|
||||||
|
RootPath ++ "/" ++ ?MANIFEST_FP ++ "/".
|
||||||
|
|
||||||
|
|
||||||
|
filepath(RootPath, NewSQN, new_journal) ->
|
||||||
|
filename:join(filepath(RootPath, journal_dir),
|
||||||
|
"nursery_"
|
||||||
|
++ integer_to_list(NewSQN)
|
||||||
|
++ "." ++ ?PENDING_FILEX).
|
||||||
|
|
||||||
|
|
||||||
|
simple_manifest_reader(SQN, RootPath) ->
|
||||||
|
ManifestPath = filepath(RootPath, manifest_dir),
|
||||||
|
{ok, MBin} = file:read_file(filename:join(ManifestPath,
|
||||||
|
integer_to_list(SQN)
|
||||||
|
++ ".man")),
|
||||||
|
binary_to_term(MBin).
|
||||||
|
|
||||||
|
|
||||||
|
simple_manifest_writer(Manifest, ManSQN, RootPath) ->
|
||||||
|
ManPath = filepath(RootPath, manifest_dir),
|
||||||
|
NewFN = filename:join(ManPath, integer_to_list(ManSQN) ++ ?MANIFEST_FILEX),
|
||||||
|
TmpFN = filename:join(ManPath, integer_to_list(ManSQN) ++ ?PENDING_FILEX),
|
||||||
|
MBin = term_to_binary(Manifest),
|
||||||
|
case file:is_file(NewFN) of
|
||||||
|
true ->
|
||||||
|
io:format("Error - trying to write manifest for"
|
||||||
|
++ " ManifestSQN=~w which already exists~n", [ManSQN]),
|
||||||
|
error;
|
||||||
|
false ->
|
||||||
|
io:format("Writing new version of manifest for "
|
||||||
|
++ " manifestSQN=~w~n", [ManSQN]),
|
||||||
|
ok = file:write_file(TmpFN, MBin),
|
||||||
|
ok = file:rename(TmpFN, NewFN),
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
%%%============================================================================
|
||||||
|
%%% Test
|
||||||
|
%%%============================================================================
|
||||||
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
|
||||||
|
build_dummy_journal() ->
|
||||||
|
RootPath = "../test/inker",
|
||||||
|
JournalFP = filepath(RootPath, journal_dir),
|
||||||
|
ManifestFP = filepath(RootPath, manifest_dir),
|
||||||
|
ok = filelib:ensure_dir(RootPath),
|
||||||
|
ok = filelib:ensure_dir(JournalFP),
|
||||||
|
ok = filelib:ensure_dir(ManifestFP),
|
||||||
|
F1 = filename:join(JournalFP, "nursery_1.pnd"),
|
||||||
|
{ok, J1} = leveled_cdb:cdb_open_writer(F1),
|
||||||
|
{K1, V1} = {"Key1", "TestValue1"},
|
||||||
|
{K2, V2} = {"Key2", "TestValue2"},
|
||||||
|
ok = leveled_cdb:cdb_put(J1, {1, K1}, V1),
|
||||||
|
ok = leveled_cdb:cdb_put(J1, {2, K2}, V2),
|
||||||
|
{ok, _} = leveled_cdb:cdb_complete(J1),
|
||||||
|
F2 = filename:join(JournalFP, "nursery_3.pnd"),
|
||||||
|
{ok, J2} = leveled_cdb:cdb_open_writer(F2),
|
||||||
|
{K1, V3} = {"Key1", "TestValue3"},
|
||||||
|
{K4, V4} = {"Key4", "TestValue4"},
|
||||||
|
ok = leveled_cdb:cdb_put(J2, {3, K1}, V3),
|
||||||
|
ok = leveled_cdb:cdb_put(J2, {4, K4}, V4),
|
||||||
|
ok = leveled_cdb:cdb_close(J2),
|
||||||
|
Manifest = {2, [{1, "nursery_1.cdb"}], []},
|
||||||
|
ManifestBin = term_to_binary(Manifest),
|
||||||
|
{ok, MF1} = file:open(filename:join(ManifestFP, "1.man"),
|
||||||
|
[binary, raw, read, write]),
|
||||||
|
ok = file:write(MF1, ManifestBin),
|
||||||
|
ok = file:close(MF1).
|
||||||
|
|
||||||
|
|
||||||
|
clean_testdir(RootPath) ->
|
||||||
|
clean_subdir(filepath(RootPath, journal_dir)),
|
||||||
|
clean_subdir(filepath(RootPath, manifest_dir)).
|
||||||
|
|
||||||
|
clean_subdir(DirPath) ->
|
||||||
|
{ok, Files} = file:list_dir(DirPath),
|
||||||
|
lists:foreach(fun(FN) -> file:delete(filename:join(DirPath, FN)) end,
|
||||||
|
Files).
|
||||||
|
|
||||||
|
simple_buildmanifest_test() ->
|
||||||
|
RootPath = "../test/inker",
|
||||||
|
build_dummy_journal(),
|
||||||
|
Res = build_manifest(["1.man"],
|
||||||
|
["nursery_1.cdb", "nursery_3.pnd"],
|
||||||
|
fun simple_manifest_reader/2,
|
||||||
|
RootPath),
|
||||||
|
io:format("Build manifest output is ~w~n", [Res]),
|
||||||
|
{Man, ActJournal, HighSQN, ManSQN} = Res,
|
||||||
|
?assertMatch(HighSQN, 4),
|
||||||
|
?assertMatch(ManSQN, 1),
|
||||||
|
?assertMatch([{1, "nursery_1.cdb", _}], Man),
|
||||||
|
{ActSQN, _ActK} = leveled_cdb:cdb_lastkey(ActJournal),
|
||||||
|
?assertMatch(ActSQN, 4),
|
||||||
|
close_allmanifest(Man, ActJournal),
|
||||||
|
clean_testdir(RootPath).
|
||||||
|
|
||||||
|
another_buildmanifest_test() ->
|
||||||
|
%% There is a rolled jounral file which is not yet in the manifest
|
||||||
|
RootPath = "../test/inker",
|
||||||
|
build_dummy_journal(),
|
||||||
|
FN = filepath(RootPath, 3, new_journal),
|
||||||
|
{ok, FileToRoll} = leveled_cdb:cdb_open_writer(FN),
|
||||||
|
{ok, _} = leveled_cdb:cdb_complete(FileToRoll),
|
||||||
|
FN2 = filepath(RootPath, 5, new_journal),
|
||||||
|
{ok, NewActiveJN} = leveled_cdb:cdb_open_writer(FN2),
|
||||||
|
{K5, V5} = {"Key5", "TestValue5"},
|
||||||
|
{K6, V6} = {"Key6", "TestValue6"},
|
||||||
|
ok = leveled_cdb:cdb_put(NewActiveJN, {5, K5}, V5),
|
||||||
|
ok = leveled_cdb:cdb_put(NewActiveJN, {6, K6}, V6),
|
||||||
|
ok = leveled_cdb:cdb_close(NewActiveJN),
|
||||||
|
%% Test setup - now build manifest
|
||||||
|
Res = build_manifest(["1.man"],
|
||||||
|
["nursery_1.cdb",
|
||||||
|
"nursery_3.cdb",
|
||||||
|
"nursery_5.pnd"],
|
||||||
|
fun simple_manifest_reader/2,
|
||||||
|
RootPath),
|
||||||
|
io:format("Build manifest output is ~w~n", [Res]),
|
||||||
|
{Man, ActJournal, HighSQN, ManSQN} = Res,
|
||||||
|
?assertMatch(HighSQN, 6),
|
||||||
|
?assertMatch(ManSQN, 1),
|
||||||
|
?assertMatch([{1, "nursery_1.cdb", _}, {3, "nursery_3.cdb", _}], Man),
|
||||||
|
{ActSQN, _ActK} = leveled_cdb:cdb_lastkey(ActJournal),
|
||||||
|
?assertMatch(ActSQN, 6),
|
||||||
|
close_allmanifest(Man, ActJournal),
|
||||||
|
clean_testdir(RootPath).
|
||||||
|
|
||||||
|
|
||||||
|
empty_buildmanifest_test() ->
|
||||||
|
RootPath = "../test/inker/",
|
||||||
|
Res = build_manifest([],
|
||||||
|
[],
|
||||||
|
fun simple_manifest_reader/2,
|
||||||
|
RootPath),
|
||||||
|
io:format("Build manifest output is ~w~n", [Res]),
|
||||||
|
{Man, ActJournal, HighSQN, ManSQN} = Res,
|
||||||
|
?assertMatch(Man, []),
|
||||||
|
?assertMatch(ManSQN, 0),
|
||||||
|
?assertMatch(HighSQN, 0),
|
||||||
|
empty = leveled_cdb:cdb_lastkey(ActJournal),
|
||||||
|
FN = leveled_cdb:cdb_filename(ActJournal),
|
||||||
|
%% The filename should be based on the next journal SQN (1) not 0
|
||||||
|
?assertMatch(FN, filepath(RootPath, 1, new_journal)),
|
||||||
|
close_allmanifest(Man, ActJournal),
|
||||||
|
clean_testdir(RootPath).
|
||||||
|
|
||||||
|
|
||||||
|
-endif.
|
|
@ -178,11 +178,8 @@
|
||||||
-define(MAX_WORK_WAIT, 300).
|
-define(MAX_WORK_WAIT, 300).
|
||||||
-define(MANIFEST_FP, "ledger_manifest").
|
-define(MANIFEST_FP, "ledger_manifest").
|
||||||
-define(FILES_FP, "ledger_files").
|
-define(FILES_FP, "ledger_files").
|
||||||
-define(SHUTDOWN_FP, "ledger_onshutdown").
|
|
||||||
-define(CURRENT_FILEX, "crr").
|
-define(CURRENT_FILEX, "crr").
|
||||||
-define(PENDING_FILEX, "pnd").
|
-define(PENDING_FILEX, "pnd").
|
||||||
-define(BACKUP_FILEX, "bak").
|
|
||||||
-define(ARCHIVE_FILEX, "arc").
|
|
||||||
-define(MEMTABLE, mem).
|
-define(MEMTABLE, mem).
|
||||||
-define(MAX_TABLESIZE, 32000).
|
-define(MAX_TABLESIZE, 32000).
|
||||||
-define(PROMPT_WAIT_ONL0, 5).
|
-define(PROMPT_WAIT_ONL0, 5).
|
||||||
|
@ -198,6 +195,7 @@
|
||||||
table_size = 0 :: integer(),
|
table_size = 0 :: integer(),
|
||||||
clerk :: pid(),
|
clerk :: pid(),
|
||||||
levelzero_pending = ?L0PEND_RESET :: tuple(),
|
levelzero_pending = ?L0PEND_RESET :: tuple(),
|
||||||
|
levelzero_snapshot = [] :: list(),
|
||||||
memtable,
|
memtable,
|
||||||
backlog = false :: boolean()}).
|
backlog = false :: boolean()}).
|
||||||
|
|
||||||
|
@ -471,7 +469,8 @@ push_to_memory(DumpList, State) ->
|
||||||
1,
|
1,
|
||||||
State#state.manifest,
|
State#state.manifest,
|
||||||
{0, [ManifestEntry]}),
|
{0, [ManifestEntry]}),
|
||||||
levelzero_pending=?L0PEND_RESET}};
|
levelzero_pending=?L0PEND_RESET,
|
||||||
|
levelzero_snapshot=[]}};
|
||||||
?L0PEND_RESET ->
|
?L0PEND_RESET ->
|
||||||
{State#state.table_size, State}
|
{State#state.table_size, State}
|
||||||
end,
|
end,
|
||||||
|
@ -479,17 +478,16 @@ push_to_memory(DumpList, State) ->
|
||||||
%% Prompt clerk to ask about work - do this for every push_mem
|
%% Prompt clerk to ask about work - do this for every push_mem
|
||||||
ok = leveled_clerk:clerk_prompt(UpdState#state.clerk, penciller),
|
ok = leveled_clerk:clerk_prompt(UpdState#state.clerk, penciller),
|
||||||
|
|
||||||
SW2 = os:timestamp(),
|
|
||||||
MemoryInsertion = do_push_to_mem(DumpList,
|
MemoryInsertion = do_push_to_mem(DumpList,
|
||||||
TableSize,
|
TableSize,
|
||||||
UpdState#state.memtable),
|
UpdState#state.memtable,
|
||||||
io:format("Push into memory timed at ~w microseconds~n",
|
UpdState#state.levelzero_snapshot),
|
||||||
[timer:now_diff(os:timestamp(),SW2)]),
|
|
||||||
|
|
||||||
case MemoryInsertion of
|
case MemoryInsertion of
|
||||||
{twist, ApproxTableSize} ->
|
{twist, ApproxTableSize, UpdSnapshot} ->
|
||||||
{ok, UpdState#state{table_size=ApproxTableSize}};
|
{ok, UpdState#state{table_size=ApproxTableSize,
|
||||||
{roll, ApproxTableSize} ->
|
levelzero_snapshot=UpdSnapshot}};
|
||||||
|
{roll, ApproxTableSize, UpdSnapshot} ->
|
||||||
L0 = get_item(0, UpdState#state.manifest, []),
|
L0 = get_item(0, UpdState#state.manifest, []),
|
||||||
case {L0, manifest_locked(UpdState)} of
|
case {L0, manifest_locked(UpdState)} of
|
||||||
{[], false} ->
|
{[], false} ->
|
||||||
|
@ -508,17 +506,20 @@ push_to_memory(DumpList, State) ->
|
||||||
L0Pid,
|
L0Pid,
|
||||||
os:timestamp()},
|
os:timestamp()},
|
||||||
table_size=ApproxTableSize,
|
table_size=ApproxTableSize,
|
||||||
manifest_sqn=MSN}};
|
manifest_sqn=MSN,
|
||||||
|
levelzero_snapshot=UpdSnapshot}};
|
||||||
{[], true} ->
|
{[], true} ->
|
||||||
{{pause,
|
{{pause,
|
||||||
"L0 file write blocked by change at sqn=~w~n",
|
"L0 file write blocked by change at sqn=~w~n",
|
||||||
[UpdState#state.manifest_sqn]},
|
[UpdState#state.manifest_sqn]},
|
||||||
UpdState#state{table_size=ApproxTableSize}};
|
UpdState#state{table_size=ApproxTableSize,
|
||||||
|
levelzero_snapshot=UpdSnapshot}};
|
||||||
_ ->
|
_ ->
|
||||||
{{pause,
|
{{pause,
|
||||||
"L0 file write blocked by L0 file in manifest~n",
|
"L0 file write blocked by L0 file in manifest~n",
|
||||||
[]},
|
[]},
|
||||||
UpdState#state{table_size=ApproxTableSize}}
|
UpdState#state{table_size=ApproxTableSize,
|
||||||
|
levelzero_snapshot=UpdSnapshot}}
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -556,20 +557,24 @@ fetch(Key, Manifest, Level, FetchFun) ->
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_push_to_mem(DumpList, TableSize, MemTable) ->
|
do_push_to_mem(DumpList, TableSize, MemTable, Snapshot) ->
|
||||||
|
SW = os:timestamp(),
|
||||||
|
UpdSnapshot = lists:append(Snapshot, DumpList),
|
||||||
ets:insert(MemTable, DumpList),
|
ets:insert(MemTable, DumpList),
|
||||||
|
io:format("Push into memory timed at ~w microseconds~n",
|
||||||
|
[timer:now_diff(os:timestamp(), SW)]),
|
||||||
case TableSize + length(DumpList) of
|
case TableSize + length(DumpList) of
|
||||||
ApproxTableSize when ApproxTableSize > ?MAX_TABLESIZE ->
|
ApproxTableSize when ApproxTableSize > ?MAX_TABLESIZE ->
|
||||||
case ets:info(MemTable, size) of
|
case ets:info(MemTable, size) of
|
||||||
ActTableSize when ActTableSize > ?MAX_TABLESIZE ->
|
ActTableSize when ActTableSize > ?MAX_TABLESIZE ->
|
||||||
{roll, ActTableSize};
|
{roll, ActTableSize, UpdSnapshot};
|
||||||
ActTableSize ->
|
ActTableSize ->
|
||||||
io:format("Table size is actually ~w~n", [ActTableSize]),
|
io:format("Table size is actually ~w~n", [ActTableSize]),
|
||||||
{twist, ActTableSize}
|
{twist, ActTableSize, UpdSnapshot}
|
||||||
end;
|
end;
|
||||||
ApproxTableSize ->
|
ApproxTableSize ->
|
||||||
io:format("Table size is approximately ~w~n", [ApproxTableSize]),
|
io:format("Table size is approximately ~w~n", [ApproxTableSize]),
|
||||||
{twist, ApproxTableSize}
|
{twist, ApproxTableSize, UpdSnapshot}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue