Merge remote-tracking branch 'refs/remotes/origin/mas-sstblockv2-i42' into mas-pushmem-i46
# Conflicts: # src/leveled_penciller.erl
This commit is contained in:
commit
878ec41ffa
4 changed files with 480 additions and 252 deletions
|
@ -61,7 +61,8 @@
|
|||
generate_uuid/0,
|
||||
integer_now/0,
|
||||
riak_extract_metadata/2,
|
||||
magic_hash/1]).
|
||||
magic_hash/1,
|
||||
to_lookup/1]).
|
||||
|
||||
-define(V1_VERS, 1).
|
||||
-define(MAGIC, 53). % riak_kv -> riak_object
|
||||
|
@ -73,6 +74,14 @@
|
|||
%% what they are -
|
||||
%% http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function
|
||||
|
||||
to_lookup(Key) ->
|
||||
case element(1, Key) of
|
||||
?IDX_TAG ->
|
||||
no_lookup;
|
||||
_ ->
|
||||
lookup
|
||||
end.
|
||||
|
||||
magic_hash({?RIAK_TAG, Bucket, Key, _SubKey}) ->
|
||||
magic_hash({Bucket, Key});
|
||||
magic_hash({?STD_TAG, Bucket, Key, _SubKey}) ->
|
||||
|
@ -198,32 +207,41 @@ compact_inkerkvc({_InkerKey, crc_wonky, false}, _Strategy) ->
|
|||
compact_inkerkvc({{_SQN, ?INKT_TOMB, _LK}, _V, _CrcCheck}, _Strategy) ->
|
||||
skip;
|
||||
compact_inkerkvc({{SQN, ?INKT_KEYD, LK}, V, CrcCheck}, Strategy) ->
|
||||
{Tag, _, _, _} = LK,
|
||||
{Tag, TagStrat} = lists:keyfind(Tag, 1, Strategy),
|
||||
case TagStrat of
|
||||
case get_tagstrategy(LK, Strategy) of
|
||||
skip ->
|
||||
skip;
|
||||
retain ->
|
||||
{retain, {{SQN, ?INKT_KEYD, LK}, V, CrcCheck}};
|
||||
TagStrat ->
|
||||
{TagStrat, null}
|
||||
end;
|
||||
compact_inkerkvc({{SQN, ?INKT_STND, LK}, V, CrcCheck}, Strategy) ->
|
||||
{Tag, _, _, _} = LK,
|
||||
case lists:keyfind(Tag, 1, Strategy) of
|
||||
{Tag, TagStrat} ->
|
||||
case TagStrat of
|
||||
retain ->
|
||||
{_V, KeyDeltas} = split_inkvalue(V),
|
||||
{TagStrat, {{SQN, ?INKT_KEYD, LK}, {null, KeyDeltas}, CrcCheck}};
|
||||
TagStrat ->
|
||||
{TagStrat, null}
|
||||
end;
|
||||
false ->
|
||||
leveled_log:log("IC012", [Tag, Strategy]),
|
||||
skip
|
||||
case get_tagstrategy(LK, Strategy) of
|
||||
skip ->
|
||||
skip;
|
||||
retain ->
|
||||
{_V, KeyDeltas} = split_inkvalue(V),
|
||||
{retain, {{SQN, ?INKT_KEYD, LK}, {null, KeyDeltas}, CrcCheck}};
|
||||
TagStrat ->
|
||||
{TagStrat, null}
|
||||
end;
|
||||
compact_inkerkvc(_KVC, _Strategy) ->
|
||||
skip.
|
||||
|
||||
get_tagstrategy(LK, Strategy) ->
|
||||
case LK of
|
||||
{Tag, _, _, _} ->
|
||||
case lists:keyfind(Tag, 1, Strategy) of
|
||||
{Tag, TagStrat} ->
|
||||
TagStrat;
|
||||
false ->
|
||||
leveled_log:log("IC012", [Tag, Strategy]),
|
||||
skip
|
||||
end;
|
||||
_ ->
|
||||
skip
|
||||
end.
|
||||
|
||||
split_inkvalue(VBin) ->
|
||||
case is_binary(VBin) of
|
||||
true ->
|
||||
|
@ -429,6 +447,26 @@ endkey_passed_test() ->
|
|||
?assertMatch(true, endkey_passed(TestKey, K2)).
|
||||
|
||||
|
||||
corrupted_ledgerkey_test() ->
|
||||
% When testing for compacted journal which has been corrupted, there may
|
||||
% be a corruptes ledger key. Always skip these keys
|
||||
% Key has become a 3-tuple not a 4-tuple
|
||||
TagStrat1 = compact_inkerkvc({{1,
|
||||
?INKT_STND,
|
||||
{?STD_TAG, "B1", "K1andSK"}},
|
||||
{},
|
||||
true},
|
||||
[{?STD_TAG, retain}]),
|
||||
?assertMatch(skip, TagStrat1),
|
||||
TagStrat2 = compact_inkerkvc({{1,
|
||||
?INKT_KEYD,
|
||||
{?STD_TAG, "B1", "K1andSK"}},
|
||||
{},
|
||||
true},
|
||||
[{?STD_TAG, retain}]),
|
||||
?assertMatch(skip, TagStrat2).
|
||||
|
||||
|
||||
%% Test below proved that the overhead of performing hashes was trivial
|
||||
%% Maybe 5 microseconds per hash
|
||||
|
||||
|
|
|
@ -166,7 +166,8 @@
|
|||
{"PC015",
|
||||
{info, "File created"}},
|
||||
{"PC016",
|
||||
{info, "Slow fetch from SFT ~w of ~w microseconds with result ~w"}},
|
||||
{info, "Slow fetch from SFT ~w of ~w microseconds at level ~w "
|
||||
++ "with result ~w"}},
|
||||
{"PC017",
|
||||
{info, "Notified clerk of manifest change"}},
|
||||
{"PC018",
|
||||
|
@ -259,14 +260,14 @@
|
|||
{error, "False result returned from SST with filename ~s as "
|
||||
++ "slot ~w has failed crc check"}},
|
||||
{"SST03",
|
||||
{info, "Opening SST file with filename ~s keys ~w slots ~w and"
|
||||
{info, "Opening SST file with filename ~s slots ~w and"
|
||||
++ " max sqn ~w"}},
|
||||
{"SST04",
|
||||
{info, "Exit called for reason ~w on filename ~s"}},
|
||||
{"SST05",
|
||||
{warn, "Rename rogue filename ~s to ~s"}},
|
||||
{"SST06",
|
||||
{info, "File ~s has been set for delete"}},
|
||||
{debug, "File ~s has been set for delete"}},
|
||||
{"SST07",
|
||||
{info, "Exit called and now clearing ~s"}},
|
||||
{"SST08",
|
||||
|
|
|
@ -378,7 +378,7 @@ handle_call({push_mem, {LedgerTable, PushedIdx, MinSQN, MaxSQN}},
|
|||
leveled_tree:from_orderedset(LedgerTable,
|
||||
?CACHE_TYPE)
|
||||
end,
|
||||
% Reply ust happen after the table has been converted
|
||||
% Reply must happen after the table has been converted
|
||||
gen_server:reply(From, ok),
|
||||
{noreply,
|
||||
update_levelzero(State#state.levelzero_size,
|
||||
|
@ -862,7 +862,7 @@ fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
|
|||
L0Check = leveled_pmem:check_levelzero(Key, Hash, PosList, L0Cache),
|
||||
case L0Check of
|
||||
{false, not_found} ->
|
||||
fetch(Key, Hash, Manifest, 0, fun timed_sst_get/3);
|
||||
fetch(Key, Hash, Manifest, 0, fun timed_sst_get/4);
|
||||
{true, KV} ->
|
||||
{KV, 0}
|
||||
end.
|
||||
|
@ -874,7 +874,7 @@ fetch(Key, Hash, Manifest, Level, FetchFun) ->
|
|||
false ->
|
||||
fetch(Key, Hash, Manifest, Level + 1, FetchFun);
|
||||
FP ->
|
||||
case FetchFun(FP, Key, Hash) of
|
||||
case FetchFun(FP, Key, Hash, Level) of
|
||||
not_present ->
|
||||
fetch(Key, Hash, Manifest, Level + 1, FetchFun);
|
||||
ObjectFound ->
|
||||
|
@ -882,21 +882,21 @@ fetch(Key, Hash, Manifest, Level, FetchFun) ->
|
|||
end
|
||||
end.
|
||||
|
||||
timed_sst_get(PID, Key, Hash) ->
|
||||
timed_sst_get(PID, Key, Hash, Level) ->
|
||||
SW = os:timestamp(),
|
||||
R = leveled_sst:sst_get(PID, Key, Hash),
|
||||
T0 = timer:now_diff(os:timestamp(), SW),
|
||||
log_slowfetch(T0, R, PID, ?SLOW_FETCH).
|
||||
log_slowfetch(T0, R, PID, Level, ?SLOW_FETCH).
|
||||
|
||||
log_slowfetch(T0, R, PID, FetchTolerance) ->
|
||||
log_slowfetch(T0, R, PID, Level, FetchTolerance) ->
|
||||
case {T0, R} of
|
||||
{T, R} when T < FetchTolerance ->
|
||||
R;
|
||||
{T, not_present} ->
|
||||
leveled_log:log("PC016", [PID, T, not_present]),
|
||||
leveled_log:log("PC016", [PID, T, Level, not_present]),
|
||||
not_present;
|
||||
{T, R} ->
|
||||
leveled_log:log("PC016", [PID, T, found]),
|
||||
leveled_log:log("PC016", [PID, T, Level, found]),
|
||||
R
|
||||
end.
|
||||
|
||||
|
@ -1507,7 +1507,7 @@ create_file_test() ->
|
|||
?assertMatch("hello", binary_to_term(Bin)).
|
||||
|
||||
slow_fetch_test() ->
|
||||
?assertMatch(not_present, log_slowfetch(2, not_present, "fake", 1)).
|
||||
?assertMatch(not_present, log_slowfetch(2, not_present, "fake", 0, 1)).
|
||||
|
||||
checkready(Pid) ->
|
||||
try
|
||||
|
|
|
@ -66,11 +66,12 @@
|
|||
|
||||
-define(MAX_SLOTS, 256).
|
||||
-define(SLOT_SIZE, 128). % This is not configurable
|
||||
-define(NOLOOK_MULT, 2). % How much bigger is a slot/block with no lookups
|
||||
-define(NOLOOK_SLOTSIZE, ?SLOT_SIZE * ?NOLOOK_MULT).
|
||||
-define(COMPRESSION_LEVEL, 1).
|
||||
-define(BINARY_SETTINGS, [{compressed, ?COMPRESSION_LEVEL}]).
|
||||
% -define(LEVEL_BLOOM_BITS, [{0, 8}, {1, 10}, {2, 8}, {default, 6}]).
|
||||
-define(MERGE_SCANWIDTH, 16).
|
||||
-define(INDEX_MARKER_WIDTH, 16).
|
||||
-define(MERGE_SCANWIDTH, 8).
|
||||
-define(DISCARD_EXT, ".discarded").
|
||||
-define(DELETE_TIMEOUT, 10000).
|
||||
-define(TREE_TYPE, idxt).
|
||||
|
@ -143,21 +144,22 @@ sst_open(RootPath, Filename) ->
|
|||
|
||||
sst_new(RootPath, Filename, Level, KVList, MaxSQN) ->
|
||||
{ok, Pid} = gen_fsm:start(?MODULE, [], []),
|
||||
{[], [], SlotList, FK} = merge_lists(KVList),
|
||||
case gen_fsm:sync_send_event(Pid,
|
||||
{sst_new,
|
||||
RootPath,
|
||||
Filename,
|
||||
Level,
|
||||
KVList,
|
||||
{SlotList, FK},
|
||||
MaxSQN},
|
||||
infinity) of
|
||||
{ok, {SK, EK}} ->
|
||||
{ok, Pid, {SK, EK}}
|
||||
end.
|
||||
|
||||
sst_new(RootPath, Filename, KL1, KL2, IsBasement, Level, MaxSQN) ->
|
||||
{{Rem1, Rem2}, MergedList} = merge_lists(KL1, KL2, {IsBasement, Level}),
|
||||
case MergedList of
|
||||
sst_new(RootPath, Filename, KVL1, KVL2, IsBasement, Level, MaxSQN) ->
|
||||
{Rem1, Rem2, SlotList, FK} = merge_lists(KVL1, KVL2, {IsBasement, Level}),
|
||||
case SlotList of
|
||||
[] ->
|
||||
empty;
|
||||
_ ->
|
||||
|
@ -167,7 +169,7 @@ sst_new(RootPath, Filename, KL1, KL2, IsBasement, Level, MaxSQN) ->
|
|||
RootPath,
|
||||
Filename,
|
||||
Level,
|
||||
MergedList,
|
||||
{SlotList, FK},
|
||||
MaxSQN},
|
||||
infinity) of
|
||||
{ok, {SK, EK}} ->
|
||||
|
@ -241,13 +243,13 @@ starting({sst_open, RootPath, Filename}, _From, State) ->
|
|||
{ok, {Summary#summary.first_key, Summary#summary.last_key}},
|
||||
reader,
|
||||
UpdState};
|
||||
starting({sst_new, RootPath, Filename, Level, KVList, MaxSQN}, _From, State) ->
|
||||
starting({sst_new, RootPath, Filename, Level, {SlotList, FirstKey}, MaxSQN},
|
||||
_From, State) ->
|
||||
SW = os:timestamp(),
|
||||
{FirstKey,
|
||||
Length,
|
||||
{Length,
|
||||
SlotIndex,
|
||||
BlockIndex,
|
||||
SlotsBin} = build_all_slots(KVList),
|
||||
SlotsBin} = build_all_slots(SlotList),
|
||||
SummaryBin = build_table_summary(SlotIndex,
|
||||
Level,
|
||||
FirstKey,
|
||||
|
@ -268,15 +270,15 @@ starting({sst_newlevelzero, RootPath, Filename,
|
|||
Slots, FetchFun, Penciller, MaxSQN}, State) ->
|
||||
SW = os:timestamp(),
|
||||
KVList = leveled_pmem:to_list(Slots, FetchFun),
|
||||
{FirstKey,
|
||||
Length,
|
||||
{[], [], SlotList, FirstKey} = merge_lists(KVList),
|
||||
{SlotCount,
|
||||
SlotIndex,
|
||||
BlockIndex,
|
||||
SlotsBin} = build_all_slots(KVList),
|
||||
SlotsBin} = build_all_slots(SlotList),
|
||||
SummaryBin = build_table_summary(SlotIndex,
|
||||
0,
|
||||
FirstKey,
|
||||
Length,
|
||||
SlotCount,
|
||||
MaxSQN),
|
||||
ActualFilename = write_file(RootPath, Filename, SummaryBin, SlotsBin),
|
||||
UpdState = read_file(ActualFilename, State#state{root_path=RootPath}),
|
||||
|
@ -415,19 +417,19 @@ fetch(LedgerKey, Hash, State) ->
|
|||
case CachedBlockIdx of
|
||||
none ->
|
||||
SlotBin = read_slot(State#state.handle, Slot),
|
||||
{Result, BlockIdx} = binaryslot_get(SlotBin,
|
||||
LedgerKey,
|
||||
Hash,
|
||||
none),
|
||||
{Result,
|
||||
BlockLengths,
|
||||
BlockIdx} = binaryslot_get(SlotBin, LedgerKey, Hash),
|
||||
BlockIndexCache = array:set(SlotID - 1,
|
||||
BlockIdx,
|
||||
<<BlockLengths/binary,
|
||||
BlockIdx/binary>>,
|
||||
State#state.blockindex_cache),
|
||||
{Result,
|
||||
slot_fetch,
|
||||
Slot#slot_index_value.slot_id,
|
||||
State#state{blockindex_cache = BlockIndexCache}};
|
||||
_ ->
|
||||
PosList = find_pos(CachedBlockIdx,
|
||||
<<BlockLengths:20/binary, BlockIdx/binary>> ->
|
||||
PosList = find_pos(BlockIdx,
|
||||
double_hash(Hash, LedgerKey),
|
||||
[],
|
||||
0),
|
||||
|
@ -435,12 +437,12 @@ fetch(LedgerKey, Hash, State) ->
|
|||
[] ->
|
||||
{not_present, slot_bloom, SlotID, State};
|
||||
_ ->
|
||||
SlotBin = read_slot(State#state.handle, Slot),
|
||||
Result = binaryslot_get(SlotBin,
|
||||
LedgerKey,
|
||||
Hash,
|
||||
{true, PosList}),
|
||||
{element(1, Result), slot_fetch, SlotID, State}
|
||||
Result = check_blocks(PosList,
|
||||
State#state.handle,
|
||||
Slot,
|
||||
BlockLengths,
|
||||
LedgerKey),
|
||||
{Result, slot_fetch, SlotID, State}
|
||||
end
|
||||
end
|
||||
end.
|
||||
|
@ -534,14 +536,13 @@ read_file(Filename, State) ->
|
|||
{Handle, SummaryBin} = open_reader(filename:join(State#state.root_path,
|
||||
Filename)),
|
||||
{Summary, SlotList} = read_table_summary(SummaryBin),
|
||||
SlotCount = length(SlotList),
|
||||
BlockIndexCache = array:new([{size, SlotCount}, {default, none}]),
|
||||
BlockIndexCache = array:new([{size, Summary#summary.size},
|
||||
{default, none}]),
|
||||
UpdState = State#state{blockindex_cache = BlockIndexCache},
|
||||
SlotIndex = from_list(SlotList),
|
||||
UpdSummary = Summary#summary{index = SlotIndex},
|
||||
leveled_log:log("SST03", [Filename,
|
||||
Summary#summary.size,
|
||||
SlotCount,
|
||||
Summary#summary.max_sqn]),
|
||||
UpdState#state{summary = UpdSummary,
|
||||
handle = Handle,
|
||||
|
@ -554,13 +555,13 @@ open_reader(Filename) ->
|
|||
{ok, SummaryBin} = file:pread(Handle, SlotsLength + 8, SummaryLength),
|
||||
{Handle, SummaryBin}.
|
||||
|
||||
build_table_summary(SlotList, _Level, FirstKey, L, MaxSQN) ->
|
||||
[{LastKey, _LastV}|_Rest] = SlotList,
|
||||
build_table_summary(SlotIndex, _Level, FirstKey, SlotCount, MaxSQN) ->
|
||||
[{LastKey, _LastV}|_Rest] = SlotIndex,
|
||||
Summary = #summary{first_key = FirstKey,
|
||||
last_key = LastKey,
|
||||
size = L,
|
||||
size = SlotCount,
|
||||
max_sqn = MaxSQN},
|
||||
SummBin = term_to_binary({Summary, lists:reverse(SlotList)},
|
||||
SummBin = term_to_binary({Summary, lists:reverse(SlotIndex)},
|
||||
?BINARY_SETTINGS),
|
||||
SummCRC = erlang:crc32(SummBin),
|
||||
<<SummCRC:32/integer, SummBin/binary>>.
|
||||
|
@ -574,15 +575,10 @@ read_table_summary(BinWithCheck) ->
|
|||
binary_to_term(SummBin)
|
||||
end.
|
||||
|
||||
build_all_slots(KVList) ->
|
||||
L = length(KVList),
|
||||
% The length is not a constant time command and the list may be large,
|
||||
% but otherwise length must be called each iteration to avoid exception
|
||||
% on split or sublist
|
||||
[{FirstKey, _FirstV}|_Rest] = KVList,
|
||||
SlotCount = L div ?SLOT_SIZE + 1,
|
||||
BuildResponse = build_all_slots(KVList,
|
||||
SlotCount,
|
||||
|
||||
build_all_slots(SlotList) ->
|
||||
SlotCount = length(SlotList),
|
||||
BuildResponse = build_all_slots(SlotList,
|
||||
8,
|
||||
1,
|
||||
[],
|
||||
|
@ -590,73 +586,27 @@ build_all_slots(KVList) ->
|
|||
{default, none}]),
|
||||
<<>>),
|
||||
{SlotIndex, BlockIndex, SlotsBin} = BuildResponse,
|
||||
{FirstKey, L, SlotIndex, BlockIndex, SlotsBin}.
|
||||
{SlotCount, SlotIndex, BlockIndex, SlotsBin}.
|
||||
|
||||
build_all_slots([], _SC, _Pos, _SlotID, SlotIdx, BlockIdxA, SlotsBin) ->
|
||||
{SlotIdx, BlockIdxA, SlotsBin};
|
||||
build_all_slots(KVL, SC, Pos, SlotID, SlotIdx, BlockIdxA, SlotsBin) ->
|
||||
{SlotList, KVRem} =
|
||||
case SC of
|
||||
1 ->
|
||||
{lists:sublist(KVL, ?SLOT_SIZE), []};
|
||||
_N ->
|
||||
lists:split(?SLOT_SIZE, KVL)
|
||||
end,
|
||||
{LastKey, _V} = lists:last(SlotList),
|
||||
{BlockIndex, SlotBin, HashList} = generate_binary_slot(SlotList),
|
||||
build_all_slots([], _Pos, _SlotID,
|
||||
SlotIdxAcc, BlockIdxAcc, SlotBinAcc) ->
|
||||
{SlotIdxAcc, BlockIdxAcc, SlotBinAcc};
|
||||
build_all_slots([SlotD|Rest], Pos, SlotID,
|
||||
SlotIdxAcc, BlockIdxAcc, SlotBinAcc) ->
|
||||
{BlockIdx, SlotBin, HashList, LastKey} = SlotD,
|
||||
Length = byte_size(SlotBin),
|
||||
Bloom = leveled_tinybloom:create_bloom(HashList),
|
||||
SlotIndexV = #slot_index_value{slot_id = SlotID,
|
||||
start_position = Pos,
|
||||
length = Length,
|
||||
bloom = Bloom},
|
||||
build_all_slots(KVRem,
|
||||
SC - 1,
|
||||
build_all_slots(Rest,
|
||||
Pos + Length,
|
||||
SlotID + 1,
|
||||
[{LastKey, SlotIndexV}|SlotIdx],
|
||||
array:set(SlotID - 1, BlockIndex, BlockIdxA),
|
||||
<<SlotsBin/binary, SlotBin/binary>>).
|
||||
[{LastKey, SlotIndexV}|SlotIdxAcc],
|
||||
array:set(SlotID - 1, BlockIdx, BlockIdxAcc),
|
||||
<<SlotBinAcc/binary, SlotBin/binary>>).
|
||||
|
||||
read_slot(Handle, Slot) ->
|
||||
{ok, SlotBin} = file:pread(Handle,
|
||||
Slot#slot_index_value.start_position,
|
||||
Slot#slot_index_value.length),
|
||||
SlotBin.
|
||||
|
||||
read_slots(Handle, SlotList) ->
|
||||
PointerMapFun =
|
||||
fun(Pointer) ->
|
||||
{Slot, SK, EK} =
|
||||
case Pointer of
|
||||
{pointer, _Pid, Slot0, SK0, EK0} ->
|
||||
{Slot0, SK0, EK0};
|
||||
{pointer, Slot0, SK0, EK0} ->
|
||||
{Slot0, SK0, EK0}
|
||||
end,
|
||||
|
||||
{Slot#slot_index_value.start_position,
|
||||
Slot#slot_index_value.length,
|
||||
SK,
|
||||
EK}
|
||||
end,
|
||||
|
||||
LengthList = lists:map(PointerMapFun, SlotList),
|
||||
StartPos = element(1, lists:nth(1, LengthList)),
|
||||
EndPos = element(1, lists:last(LengthList))
|
||||
+ element(2, lists:last(LengthList)),
|
||||
{ok, MultiSlotBin} = file:pread(Handle, StartPos, EndPos - StartPos),
|
||||
|
||||
BinSplitMapFun =
|
||||
fun({SP, L, SK, EK}) ->
|
||||
Start = SP - StartPos,
|
||||
<<_Pre:Start/binary,
|
||||
SlotBin:L/binary,
|
||||
_Post/binary>> = MultiSlotBin,
|
||||
{SlotBin, SK, EK}
|
||||
end,
|
||||
|
||||
lists:map(BinSplitMapFun, LengthList).
|
||||
|
||||
generate_filenames(RootFilename) ->
|
||||
Ext = filename:extension(RootFilename),
|
||||
|
@ -739,7 +689,7 @@ lookup_slots(StartKey, EndKey, Tree) ->
|
|||
%% based on a 17-bit hash (so 0.0039 fpr).
|
||||
|
||||
|
||||
generate_binary_slot(KVL) ->
|
||||
generate_binary_slot(Lookup, KVL) ->
|
||||
|
||||
HashFoldFun =
|
||||
fun({K, V}, {PosBinAcc, NoHashCount, HashAcc}) ->
|
||||
|
@ -772,85 +722,169 @@ generate_binary_slot(KVL) ->
|
|||
end
|
||||
|
||||
end,
|
||||
|
||||
{PosBinIndex0, NHC, HashL} = lists:foldr(HashFoldFun, {<<>>, 0, []}, KVL),
|
||||
PosBinIndex1 =
|
||||
case NHC of
|
||||
0 ->
|
||||
PosBinIndex0;
|
||||
_ ->
|
||||
N = NHC - 1,
|
||||
<<0:1/integer, N:7/integer, PosBinIndex0/binary>>
|
||||
|
||||
{HashL, PosBinIndex} =
|
||||
case Lookup of
|
||||
lookup ->
|
||||
{PosBinIndex0,
|
||||
NHC,
|
||||
HashL0} = lists:foldr(HashFoldFun, {<<>>, 0, []}, KVL),
|
||||
PosBinIndex1 =
|
||||
case NHC of
|
||||
0 ->
|
||||
PosBinIndex0;
|
||||
_ ->
|
||||
N = NHC - 1,
|
||||
<<0:1/integer, N:7/integer, PosBinIndex0/binary>>
|
||||
end,
|
||||
{HashL0, PosBinIndex1};
|
||||
no_lookup ->
|
||||
{[], <<0:1/integer, 127:7/integer>>}
|
||||
end,
|
||||
|
||||
BlockSize =
|
||||
case Lookup of
|
||||
lookup ->
|
||||
?SLOT_SIZE div 4;
|
||||
no_lookup ->
|
||||
?NOLOOK_SLOTSIZE div 4
|
||||
end,
|
||||
|
||||
|
||||
{B1, B2, B3, B4} =
|
||||
case length(KVL) of
|
||||
L when L =< 32 ->
|
||||
L when L =< BlockSize ->
|
||||
{term_to_binary(KVL, ?BINARY_SETTINGS),
|
||||
<<0:0>>,
|
||||
<<0:0>>,
|
||||
<<0:0>>};
|
||||
L when L =< 64 ->
|
||||
{KVLA_32, KVLB_32} = lists:split(32, KVL),
|
||||
{term_to_binary(KVLA_32, ?BINARY_SETTINGS),
|
||||
term_to_binary(KVLB_32, ?BINARY_SETTINGS),
|
||||
L when L =< 2 * BlockSize ->
|
||||
{KVLA, KVLB} = lists:split(BlockSize, KVL),
|
||||
{term_to_binary(KVLA, ?BINARY_SETTINGS),
|
||||
term_to_binary(KVLB, ?BINARY_SETTINGS),
|
||||
<<0:0>>,
|
||||
<<0:0>>};
|
||||
L when L =< 96 ->
|
||||
{KVLA_32, KVLB_64} = lists:split(32, KVL),
|
||||
{KVLB_32, KVLC_32} = lists:split(32, KVLB_64),
|
||||
{term_to_binary(KVLA_32, ?BINARY_SETTINGS),
|
||||
term_to_binary(KVLB_32, ?BINARY_SETTINGS),
|
||||
term_to_binary(KVLC_32, ?BINARY_SETTINGS),
|
||||
L when L =< 3 * BlockSize ->
|
||||
{KVLA, KVLB_Rest} = lists:split(BlockSize, KVL),
|
||||
{KVLB, KVLC} = lists:split(BlockSize, KVLB_Rest),
|
||||
{term_to_binary(KVLA, ?BINARY_SETTINGS),
|
||||
term_to_binary(KVLB, ?BINARY_SETTINGS),
|
||||
term_to_binary(KVLC, ?BINARY_SETTINGS),
|
||||
<<0:0>>};
|
||||
L when L =< 128 ->
|
||||
{KVLA_32, KVLB_96} = lists:split(32, KVL),
|
||||
{KVLB_32, KVLC_64} = lists:split(32, KVLB_96),
|
||||
{KVLC_32, KVLD_32} = lists:split(32, KVLC_64),
|
||||
{term_to_binary(KVLA_32, ?BINARY_SETTINGS),
|
||||
term_to_binary(KVLB_32, ?BINARY_SETTINGS),
|
||||
term_to_binary(KVLC_32, ?BINARY_SETTINGS),
|
||||
term_to_binary(KVLD_32, ?BINARY_SETTINGS)}
|
||||
L when L =< 4 * BlockSize ->
|
||||
{KVLA, KVLB_Rest} = lists:split(BlockSize, KVL),
|
||||
{KVLB, KVLC_Rest} = lists:split(BlockSize, KVLB_Rest),
|
||||
{KVLC, KVLD} = lists:split(BlockSize, KVLC_Rest),
|
||||
{term_to_binary(KVLA, ?BINARY_SETTINGS),
|
||||
term_to_binary(KVLB, ?BINARY_SETTINGS),
|
||||
term_to_binary(KVLC, ?BINARY_SETTINGS),
|
||||
term_to_binary(KVLD, ?BINARY_SETTINGS)}
|
||||
end,
|
||||
|
||||
B1P = byte_size(PosBinIndex1),
|
||||
B1P = byte_size(PosBinIndex),
|
||||
B1L = byte_size(B1),
|
||||
B2L = byte_size(B2),
|
||||
B3L = byte_size(B3),
|
||||
B4L = byte_size(B4),
|
||||
Lengths = <<B1P:32/integer,
|
||||
Lengths = <<B1P:32/integer,
|
||||
B1L:32/integer,
|
||||
B2L:32/integer,
|
||||
B3L:32/integer,
|
||||
B4L:32/integer>>,
|
||||
SlotBin = <<Lengths/binary,
|
||||
PosBinIndex1/binary,
|
||||
PosBinIndex/binary,
|
||||
B1/binary, B2/binary, B3/binary, B4/binary>>,
|
||||
CRC32 = erlang:crc32(SlotBin),
|
||||
FullBin = <<CRC32:32/integer, SlotBin/binary>>,
|
||||
|
||||
{LastKey, _LV} = lists:last(KVL),
|
||||
|
||||
{PosBinIndex1, FullBin, HashL}.
|
||||
{<<Lengths/binary, PosBinIndex/binary>>, FullBin, HashL, LastKey}.
|
||||
|
||||
|
||||
binaryslot_get(FullBin, Key, Hash, CachedPosLookup) ->
|
||||
check_blocks([], _Handle, _Slot, _BlockLengths, _LedgerKey) ->
|
||||
not_present;
|
||||
check_blocks([Pos|Rest], Handle, Slot, BlockLengths, LedgerKey) ->
|
||||
{BlockNumber, BlockPos} = revert_position(Pos),
|
||||
BlockBin = read_block(Handle, Slot, BlockLengths, BlockNumber),
|
||||
BlockL = binary_to_term(BlockBin),
|
||||
{K, V} = lists:nth(BlockPos, BlockL),
|
||||
case K of
|
||||
LedgerKey ->
|
||||
{K, V};
|
||||
_ ->
|
||||
check_blocks(Rest, Handle, Slot, BlockLengths, LedgerKey)
|
||||
end.
|
||||
|
||||
|
||||
read_block(Handle, Slot, BlockLengths, BlockID) ->
|
||||
{BlockPos, Offset, Length} = block_offsetandlength(BlockLengths, BlockID),
|
||||
{ok, BlockBin} = file:pread(Handle,
|
||||
Slot#slot_index_value.start_position
|
||||
+ BlockPos
|
||||
+ Offset
|
||||
+ 24,
|
||||
% 4-byte CRC, 4 byte pos, 4x4 byte lengths
|
||||
Length),
|
||||
BlockBin.
|
||||
|
||||
read_slot(Handle, Slot) ->
|
||||
{ok, SlotBin} = file:pread(Handle,
|
||||
Slot#slot_index_value.start_position,
|
||||
Slot#slot_index_value.length),
|
||||
SlotBin.
|
||||
|
||||
read_slots(Handle, SlotList) ->
|
||||
PointerMapFun =
|
||||
fun(Pointer) ->
|
||||
{Slot, SK, EK} =
|
||||
case Pointer of
|
||||
{pointer, _Pid, Slot0, SK0, EK0} ->
|
||||
{Slot0, SK0, EK0};
|
||||
{pointer, Slot0, SK0, EK0} ->
|
||||
{Slot0, SK0, EK0}
|
||||
end,
|
||||
|
||||
{Slot#slot_index_value.start_position,
|
||||
Slot#slot_index_value.length,
|
||||
SK,
|
||||
EK}
|
||||
end,
|
||||
|
||||
LengthList = lists:map(PointerMapFun, SlotList),
|
||||
StartPos = element(1, lists:nth(1, LengthList)),
|
||||
EndPos = element(1, lists:last(LengthList))
|
||||
+ element(2, lists:last(LengthList)),
|
||||
{ok, MultiSlotBin} = file:pread(Handle, StartPos, EndPos - StartPos),
|
||||
|
||||
BinSplitMapFun =
|
||||
fun({SP, L, SK, EK}) ->
|
||||
Start = SP - StartPos,
|
||||
<<_Pre:Start/binary,
|
||||
SlotBin:L/binary,
|
||||
_Post/binary>> = MultiSlotBin,
|
||||
{SlotBin, SK, EK}
|
||||
end,
|
||||
|
||||
lists:map(BinSplitMapFun, LengthList).
|
||||
|
||||
|
||||
binaryslot_get(FullBin, Key, Hash) ->
|
||||
case crc_check_slot(FullBin) of
|
||||
{Lengths, Rest} ->
|
||||
B1P = element(1, Lengths),
|
||||
case CachedPosLookup of
|
||||
{true, PosList} ->
|
||||
<<_PosBinIndex:B1P/binary, Blocks/binary>> = Rest,
|
||||
{fetch_value(PosList, Lengths, Blocks, Key), none};
|
||||
none ->
|
||||
<<PosBinIndex:B1P/binary, Blocks/binary>> = Rest,
|
||||
PosList = find_pos(PosBinIndex,
|
||||
double_hash(Hash, Key),
|
||||
[],
|
||||
0),
|
||||
{fetch_value(PosList, Lengths, Blocks, Key), PosBinIndex}
|
||||
end;
|
||||
{BlockLengths, Rest} ->
|
||||
<<B1P:32/integer, _R/binary>> = BlockLengths,
|
||||
<<PosBinIndex:B1P/binary, Blocks/binary>> = Rest,
|
||||
PosList = find_pos(PosBinIndex,
|
||||
double_hash(Hash, Key),
|
||||
[],
|
||||
0),
|
||||
{fetch_value(PosList, BlockLengths, Blocks, Key),
|
||||
BlockLengths,
|
||||
PosBinIndex};
|
||||
crc_wonky ->
|
||||
{not_present, none}
|
||||
{not_present,
|
||||
none,
|
||||
none}
|
||||
end.
|
||||
|
||||
binaryslot_tolist(FullBin) ->
|
||||
|
@ -867,8 +901,12 @@ binaryslot_tolist(FullBin) ->
|
|||
|
||||
{Out, _Rem} =
|
||||
case crc_check_slot(FullBin) of
|
||||
{Lengths, RestBin} ->
|
||||
{B1P, B1L, B2L, B3L, B4L} = Lengths,
|
||||
{BlockLengths, RestBin} ->
|
||||
<<B1P:32/integer,
|
||||
B1L:32/integer,
|
||||
B2L:32/integer,
|
||||
B3L:32/integer,
|
||||
B4L:32/integer>> = BlockLengths,
|
||||
<<_PosBinIndex:B1P/binary, Blocks/binary>> = RestBin,
|
||||
lists:foldl(BlockFetchFun, {[], Blocks}, [B1L, B2L, B3L, B4L]);
|
||||
crc_wonky ->
|
||||
|
@ -919,8 +957,12 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey) ->
|
|||
|
||||
{Out, _Rem} =
|
||||
case crc_check_slot(FullBin) of
|
||||
{Lengths, RestBin} ->
|
||||
{B1P, B1L, B2L, B3L, B4L} = Lengths,
|
||||
{BlockLengths, RestBin} ->
|
||||
<<B1P:32/integer,
|
||||
B1L:32/integer,
|
||||
B2L:32/integer,
|
||||
B3L:32/integer,
|
||||
B4L:32/integer>> = BlockLengths,
|
||||
<<_PosBinIndex:B1P/binary, Blocks/binary>> = RestBin,
|
||||
lists:foldl(BlockFetchFun, {[], Blocks}, [B1L, B2L, B3L, B4L]);
|
||||
crc_wonky ->
|
||||
|
@ -968,65 +1010,67 @@ trim_booleans(FirstKey, LastKey, StartKey, EndKey) ->
|
|||
end.
|
||||
|
||||
|
||||
|
||||
|
||||
crc_check_slot(FullBin) ->
|
||||
<<CRC32:32/integer, SlotBin/binary>> = FullBin,
|
||||
case erlang:crc32(SlotBin) of
|
||||
CRC32 ->
|
||||
<<B1P:32/integer,
|
||||
B1L:32/integer,
|
||||
B2L:32/integer,
|
||||
B3L:32/integer,
|
||||
B4L:32/integer,
|
||||
Rest/binary>> = SlotBin,
|
||||
Lengths = {B1P, B1L, B2L, B3L, B4L},
|
||||
{Lengths, Rest};
|
||||
<<BlockLengths:20/binary, Rest/binary>> = SlotBin,
|
||||
{BlockLengths, Rest};
|
||||
_ ->
|
||||
leveled_log:log("SST09", []),
|
||||
crc_wonky
|
||||
end.
|
||||
|
||||
block_offsetandlength(BlockLengths, BlockID) ->
|
||||
<<BlocksPos:32/integer, BlockLengths0:16/binary>> = BlockLengths,
|
||||
case BlockID of
|
||||
1 ->
|
||||
<<B1L:32/integer, _BR/binary>> = BlockLengths0,
|
||||
{BlocksPos, 0, B1L};
|
||||
2 ->
|
||||
<<B1L:32/integer, B2L:32/integer, _BR/binary>> = BlockLengths0,
|
||||
{BlocksPos, B1L, B2L};
|
||||
3 ->
|
||||
<<B1L:32/integer,
|
||||
B2L:32/integer,
|
||||
B3L:32/integer,
|
||||
_BR/binary>> = BlockLengths0,
|
||||
{BlocksPos, B1L + B2L, B3L};
|
||||
4 ->
|
||||
<<B1L:32/integer,
|
||||
B2L:32/integer,
|
||||
B3L:32/integer,
|
||||
B4L:32/integer>> = BlockLengths0,
|
||||
{BlocksPos, B1L + B2L + B3L, B4L}
|
||||
end.
|
||||
|
||||
double_hash(Hash, Key) ->
|
||||
H2 = erlang:phash2(Key),
|
||||
(Hash bxor H2) band 32767.
|
||||
|
||||
fetch_value([], _Lengths, _Blocks, _Key) ->
|
||||
fetch_value([], _BlockLengths, _Blocks, _Key) ->
|
||||
not_present;
|
||||
fetch_value([Pos|Rest], Lengths, Blocks, Key) ->
|
||||
BlockNumber = (Pos div 32) + 1,
|
||||
BlockPos = (Pos rem 32) + 1,
|
||||
BlockL =
|
||||
case BlockNumber of
|
||||
1 ->
|
||||
B1L = element(2, Lengths),
|
||||
<<Block:B1L/binary, _Rest/binary>> = Blocks,
|
||||
binary_to_term(Block);
|
||||
2 ->
|
||||
B1L = element(2, Lengths),
|
||||
B2L = element(3, Lengths),
|
||||
<<_Pass:B1L/binary, Block:B2L/binary, _Rest/binary>> = Blocks,
|
||||
binary_to_term(Block);
|
||||
3 ->
|
||||
PreL = element(2, Lengths) + element(3, Lengths),
|
||||
B3L = element(4, Lengths),
|
||||
<<_Pass:PreL/binary, Block:B3L/binary, _Rest/binary>> = Blocks,
|
||||
binary_to_term(Block);
|
||||
4 ->
|
||||
{_B1P, B1L, B2L, B3L, B4L} = Lengths,
|
||||
PreL = B1L + B2L + B3L,
|
||||
<<_Pass:PreL/binary, Block:B4L/binary>> = Blocks,
|
||||
binary_to_term(Block)
|
||||
end,
|
||||
|
||||
fetch_value([Pos|Rest], BlockLengths, Blocks, Key) ->
|
||||
{BlockNumber, BlockPos} = revert_position(Pos),
|
||||
{_BlockPos,
|
||||
Offset,
|
||||
Length} = block_offsetandlength(BlockLengths, BlockNumber),
|
||||
<<_Pre:Offset/binary, Block:Length/binary, _Rest/binary>> = Blocks,
|
||||
BlockL = binary_to_term(Block),
|
||||
{K, V} = lists:nth(BlockPos, BlockL),
|
||||
case K of
|
||||
Key ->
|
||||
{K, V};
|
||||
_ ->
|
||||
fetch_value(Rest, Lengths, Blocks, Key)
|
||||
fetch_value(Rest, BlockLengths, Blocks, Key)
|
||||
end.
|
||||
|
||||
|
||||
revert_position(Pos) ->
|
||||
BlockNumber = (Pos div 32) + 1,
|
||||
BlockPos = (Pos rem 32) + 1,
|
||||
{BlockNumber, BlockPos}.
|
||||
|
||||
find_pos(<<>>, _Hash, PosList, _Count) ->
|
||||
PosList;
|
||||
find_pos(<<1:1/integer, Hash:15/integer, T/binary>>, Hash, PosList, Count) ->
|
||||
|
@ -1042,8 +1086,25 @@ find_pos(<<0:1/integer, NHC:7/integer, T/binary>>, Hash, PosList, Count) ->
|
|||
%%% Merge Functions
|
||||
%%%============================================================================
|
||||
|
||||
%% functions for merging two KV lists with pointers
|
||||
|
||||
%% The source lists are merged into lists of slots before the file is created
|
||||
%% At Level zero, there will be a single source list - and this will always be
|
||||
%% split into standard size slots
|
||||
%%
|
||||
%% At lower levels there will be two source lists and they will need to be
|
||||
%% merged to ensure that the best conflicting answer survives and compactable
|
||||
%% KV pairs are discarded.
|
||||
%%
|
||||
%% At lower levels slots can be larger if there are no lookup keys present in
|
||||
%% the slot. This is to slow the growth of the manifest/number-of-files when
|
||||
%% large numbers of index keys are present - as well as improving compression
|
||||
%% ratios in the Ledger.
|
||||
%%
|
||||
%% The outcome of merge_lists/1 and merge_lists/3 should be an list of slots.
|
||||
%% Each slot should be ordered by Key and be of the form {Flag, KVList}, where
|
||||
%% Flag can either be lookup or no-lookup. The list of slots should also be
|
||||
%% ordered by Key (i.e. the first key in the slot)
|
||||
%%
|
||||
%% For merging ...
|
||||
%% Compare the keys at the head of the list, and either skip that "best" key or
|
||||
%% identify as the next key.
|
||||
%%
|
||||
|
@ -1054,23 +1115,117 @@ find_pos(<<0:1/integer, NHC:7/integer, T/binary>>, Hash, PosList, Count) ->
|
|||
%% there are matching keys then the highest sequence number must be chosen and
|
||||
%% any lower sequence numbers should be compacted out of existence
|
||||
|
||||
merge_lists(KeyList1, KeyList2, LevelInfo) ->
|
||||
merge_lists(KeyList1, KeyList2, LevelInfo, [], ?MAX_SLOTS * ?SLOT_SIZE).
|
||||
merge_lists(KVList1) ->
|
||||
SlotCount = length(KVList1) div ?SLOT_SIZE,
|
||||
{[],
|
||||
[],
|
||||
split_lists(KVList1, [], SlotCount),
|
||||
element(1, lists:nth(1, KVList1))}.
|
||||
|
||||
merge_lists([], [], _LevelR, MergedList, _MaxSize) ->
|
||||
{{[], []}, lists:reverse(MergedList)};
|
||||
merge_lists(Rem1, Rem2, _LevelR, MergedList, 0) ->
|
||||
{{Rem1, Rem2}, lists:reverse(MergedList)};
|
||||
merge_lists(KeyList1, KeyList2, {IsBasement, TS}, MergedList, MaxSize) ->
|
||||
case key_dominates(KeyList1, KeyList2, {IsBasement, TS}) of
|
||||
{{next_key, TopKey}, Rem1, Rem2} ->
|
||||
merge_lists(Rem1,
|
||||
split_lists([], SlotLists, 0) ->
|
||||
lists:reverse(SlotLists);
|
||||
split_lists(LastPuff, SlotLists, 0) ->
|
||||
SlotD = generate_binary_slot(lookup, LastPuff),
|
||||
lists:reverse([SlotD|SlotLists]);
|
||||
split_lists(KVList1, SlotLists, N) ->
|
||||
{Slot, KVListRem} = lists:split(?SLOT_SIZE, KVList1),
|
||||
SlotD = generate_binary_slot(lookup, Slot),
|
||||
split_lists(KVListRem, [SlotD|SlotLists], N - 1).
|
||||
|
||||
merge_lists(KVList1, KVList2, LevelInfo) ->
|
||||
merge_lists(KVList1, KVList2, LevelInfo, [], null, 0).
|
||||
|
||||
merge_lists(KVList1, KVList2, _LI, SlotList, FirstKey, ?MAX_SLOTS) ->
|
||||
{KVList1, KVList2, lists:reverse(SlotList), FirstKey};
|
||||
merge_lists([], [], _LI, SlotList, FirstKey, _SlotCount) ->
|
||||
{[], [], lists:reverse(SlotList), FirstKey};
|
||||
merge_lists(KVList1, KVList2, LI, SlotList, FirstKey, SlotCount) ->
|
||||
{KVRem1, KVRem2, Slot, FK0} =
|
||||
form_slot(KVList1, KVList2, LI, no_lookup, 0, [], FirstKey),
|
||||
case Slot of
|
||||
{_, []} ->
|
||||
merge_lists(KVRem1,
|
||||
KVRem2,
|
||||
LI,
|
||||
SlotList,
|
||||
FK0,
|
||||
SlotCount);
|
||||
{Lookup, KVL} ->
|
||||
SlotD = generate_binary_slot(Lookup, KVL),
|
||||
merge_lists(KVRem1,
|
||||
KVRem2,
|
||||
LI,
|
||||
[SlotD|SlotList],
|
||||
FK0,
|
||||
SlotCount + 1)
|
||||
end.
|
||||
|
||||
form_slot([], [], _LI, Type, _Size, Slot, FK) ->
|
||||
{[], [], {Type, lists:reverse(Slot)}, FK};
|
||||
form_slot(KVList1, KVList2, _LI, lookup, ?SLOT_SIZE, Slot, FK) ->
|
||||
{KVList1, KVList2, {lookup, lists:reverse(Slot)}, FK};
|
||||
form_slot(KVList1, KVList2, _LI, no_lookup, ?NOLOOK_SLOTSIZE, Slot, FK) ->
|
||||
{KVList1, KVList2, {no_lookup, lists:reverse(Slot)}, FK};
|
||||
form_slot(KVList1, KVList2, {IsBasement, TS}, lookup, Size, Slot, FK) ->
|
||||
case {key_dominates(KVList1, KVList2, {IsBasement, TS}), FK} of
|
||||
{{{next_key, TopKV}, Rem1, Rem2}, null} ->
|
||||
{TopK, _TopV} = TopKV,
|
||||
form_slot(Rem1,
|
||||
Rem2,
|
||||
{IsBasement, TS},
|
||||
[TopKey|MergedList],
|
||||
MaxSize - 1);
|
||||
lookup,
|
||||
Size + 1,
|
||||
[TopKV|Slot],
|
||||
TopK);
|
||||
{{{next_key, TopKV}, Rem1, Rem2}, _} ->
|
||||
form_slot(Rem1,
|
||||
Rem2,
|
||||
{IsBasement, TS},
|
||||
lookup,
|
||||
Size + 1,
|
||||
[TopKV|Slot],
|
||||
FK);
|
||||
{{skipped_key, Rem1, Rem2}, _} ->
|
||||
form_slot(Rem1, Rem2, {IsBasement, TS}, lookup, Size, Slot, FK)
|
||||
end;
|
||||
form_slot(KVList1, KVList2, {IsBasement, TS}, no_lookup, Size, Slot, FK) ->
|
||||
case key_dominates(KVList1, KVList2, {IsBasement, TS}) of
|
||||
{{next_key, {TopK, TopV}}, Rem1, Rem2} ->
|
||||
FK0 =
|
||||
case FK of
|
||||
null ->
|
||||
TopK;
|
||||
_ ->
|
||||
FK
|
||||
end,
|
||||
case leveled_codec:to_lookup(TopK) of
|
||||
no_lookup ->
|
||||
form_slot(Rem1,
|
||||
Rem2,
|
||||
{IsBasement, TS},
|
||||
no_lookup,
|
||||
Size + 1,
|
||||
[{TopK, TopV}|Slot],
|
||||
FK0);
|
||||
lookup ->
|
||||
case Size >= ?SLOT_SIZE of
|
||||
true ->
|
||||
{KVList1,
|
||||
KVList2,
|
||||
{no_lookup, lists:reverse(Slot)},
|
||||
FK};
|
||||
false ->
|
||||
form_slot(Rem1,
|
||||
Rem2,
|
||||
{IsBasement, TS},
|
||||
lookup,
|
||||
Size + 1,
|
||||
[{TopK, TopV}|Slot],
|
||||
FK0)
|
||||
end
|
||||
end;
|
||||
{skipped_key, Rem1, Rem2} ->
|
||||
merge_lists(Rem1, Rem2, {IsBasement, TS}, MergedList, MaxSize)
|
||||
form_slot(Rem1, Rem2, {IsBasement, TS}, no_lookup, Size, Slot, FK)
|
||||
end.
|
||||
|
||||
key_dominates(KL1, KL2, Level) ->
|
||||
|
@ -1211,6 +1366,31 @@ generate_indexkeys(Count, IndexList) ->
|
|||
generate_indexkeys(Count - 1, IndexList ++ Changes).
|
||||
|
||||
|
||||
form_slot_test() ->
|
||||
% If a skip key happens, mustn't switch to loookup by accident as could be
|
||||
% over the expected size
|
||||
SkippingKV = {{o, "B1", "K9999", null}, {9999, tomb, 1234567, {}}},
|
||||
Slot = [{{o, "B1", "K5", null}, {5, active, 99234567, {}}}],
|
||||
R1 = form_slot([SkippingKV], [],
|
||||
{true, 99999999},
|
||||
no_lookup,
|
||||
?SLOT_SIZE + 1,
|
||||
Slot,
|
||||
{o, "B1", "K5", null}),
|
||||
?assertMatch({[], [], {no_lookup, Slot}, {o, "B1", "K5", null}}, R1).
|
||||
|
||||
merge_tombstonelist_test() ->
|
||||
% Merge lists wiht nothing but tombstones
|
||||
SkippingKV1 = {{o, "B1", "K9995", null}, {9995, tomb, 1234567, {}}},
|
||||
SkippingKV2 = {{o, "B1", "K9996", null}, {9996, tomb, 1234567, {}}},
|
||||
SkippingKV3 = {{o, "B1", "K9997", null}, {9997, tomb, 1234567, {}}},
|
||||
SkippingKV4 = {{o, "B1", "K9998", null}, {9998, tomb, 1234567, {}}},
|
||||
SkippingKV5 = {{o, "B1", "K9999", null}, {9999, tomb, 1234567, {}}},
|
||||
R = merge_lists([SkippingKV1, SkippingKV3, SkippingKV5],
|
||||
[SkippingKV2, SkippingKV4],
|
||||
{true, 9999999}),
|
||||
?assertMatch({[], [], [], null}, R).
|
||||
|
||||
indexed_list_test() ->
|
||||
io:format(user, "~nIndexed list timing test:~n", []),
|
||||
N = 150,
|
||||
|
@ -1219,7 +1399,7 @@ indexed_list_test() ->
|
|||
|
||||
SW0 = os:timestamp(),
|
||||
|
||||
{_PosBinIndex1, FullBin, _HL} = generate_binary_slot(KVL1),
|
||||
{_PosBinIndex1, FullBin, _HL, _LK} = generate_binary_slot(lookup, KVL1),
|
||||
io:format(user,
|
||||
"Indexed list created slot in ~w microseconds of size ~w~n",
|
||||
[timer:now_diff(os:timestamp(), SW0), byte_size(FullBin)]),
|
||||
|
@ -1247,7 +1427,7 @@ indexed_list_mixedkeys_test() ->
|
|||
KVL1 = lists:sublist(KVL0, 33),
|
||||
Keys = lists:ukeysort(1, generate_indexkeys(60) ++ KVL1),
|
||||
|
||||
{_PosBinIndex1, FullBin, _HL} = generate_binary_slot(Keys),
|
||||
{_PosBinIndex1, FullBin, _HL, _LK} = generate_binary_slot(lookup, Keys),
|
||||
|
||||
{TestK1, TestV1} = lists:nth(4, KVL1),
|
||||
MH1 = leveled_codec:magic_hash(TestK1),
|
||||
|
@ -1273,7 +1453,7 @@ indexed_list_mixedkeys2_test() ->
|
|||
IdxKeys2 = lists:ukeysort(1, generate_indexkeys(30)),
|
||||
% this isn't actually ordered correctly
|
||||
Keys = IdxKeys1 ++ KVL1 ++ IdxKeys2,
|
||||
{_PosBinIndex1, FullBin, _HL} = generate_binary_slot(Keys),
|
||||
{_PosBinIndex1, FullBin, _HL, _LK} = generate_binary_slot(lookup, Keys),
|
||||
lists:foreach(fun({K, V}) ->
|
||||
MH = leveled_codec:magic_hash(K),
|
||||
test_binary_slot(FullBin, K, MH, {K, V})
|
||||
|
@ -1282,8 +1462,8 @@ indexed_list_mixedkeys2_test() ->
|
|||
|
||||
indexed_list_allindexkeys_test() ->
|
||||
Keys = lists:sublist(lists:ukeysort(1, generate_indexkeys(150)), 128),
|
||||
{PosBinIndex1, FullBin, _HL} = generate_binary_slot(Keys),
|
||||
?assertMatch(<<127:8/integer>>, PosBinIndex1),
|
||||
{PosBinIndex1, FullBin, _HL, _LK} = generate_binary_slot(lookup, Keys),
|
||||
?assertMatch(<<_BL:20/binary, 127:8/integer>>, PosBinIndex1),
|
||||
% SW = os:timestamp(),
|
||||
BinToList = binaryslot_tolist(FullBin),
|
||||
% io:format(user,
|
||||
|
@ -1292,11 +1472,23 @@ indexed_list_allindexkeys_test() ->
|
|||
?assertMatch(Keys, BinToList),
|
||||
?assertMatch(Keys, binaryslot_trimmedlist(FullBin, all, all)).
|
||||
|
||||
indexed_list_allindexkeys_nolookup_test() ->
|
||||
Keys = lists:sublist(lists:ukeysort(1, generate_indexkeys(1000)),
|
||||
128 * ?NOLOOK_MULT),
|
||||
{PosBinIndex1, FullBin, _HL, _LK} = generate_binary_slot(no_lookup, Keys),
|
||||
?assertMatch(<<_BL:20/binary, 127:8/integer>>, PosBinIndex1),
|
||||
% SW = os:timestamp(),
|
||||
BinToList = binaryslot_tolist(FullBin),
|
||||
% io:format(user,
|
||||
% "Indexed list flattened in ~w microseconds ~n",
|
||||
% [timer:now_diff(os:timestamp(), SW)]),
|
||||
?assertMatch(Keys, BinToList),
|
||||
?assertMatch(Keys, binaryslot_trimmedlist(FullBin, all, all)).
|
||||
|
||||
indexed_list_allindexkeys_trimmed_test() ->
|
||||
Keys = lists:sublist(lists:ukeysort(1, generate_indexkeys(150)), 128),
|
||||
{PosBinIndex1, FullBin, _HL} = generate_binary_slot(Keys),
|
||||
?assertMatch(<<127:8/integer>>, PosBinIndex1),
|
||||
{PosBinIndex1, FullBin, _HL, _LK} = generate_binary_slot(lookup, Keys),
|
||||
?assertMatch(<<_BL:20/binary, 127:8/integer>>, PosBinIndex1),
|
||||
?assertMatch(Keys, binaryslot_trimmedlist(FullBin,
|
||||
{i,
|
||||
"Bucket",
|
||||
|
@ -1333,7 +1525,8 @@ indexed_list_mixedkeys_bitflip_test() ->
|
|||
KVL0 = lists:ukeysort(1, generate_randomkeys(1, 50, 1, 4)),
|
||||
KVL1 = lists:sublist(KVL0, 33),
|
||||
Keys = lists:ukeysort(1, generate_indexkeys(60) ++ KVL1),
|
||||
{_PosBinIndex1, FullBin, _HL} = generate_binary_slot(Keys),
|
||||
{_PosBinIndex1, FullBin, _HL, LK} = generate_binary_slot(lookup, Keys),
|
||||
?assertMatch(LK, element(1, lists:last(Keys))),
|
||||
L = byte_size(FullBin),
|
||||
Byte1 = random:uniform(L),
|
||||
<<PreB1:Byte1/binary, A:8/integer, PostByte1/binary>> = FullBin,
|
||||
|
@ -1362,7 +1555,7 @@ indexed_list_mixedkeys_bitflip_test() ->
|
|||
|
||||
test_binary_slot(FullBin, Key, Hash, ExpectedValue) ->
|
||||
% SW = os:timestamp(),
|
||||
{ReturnedValue, _} = binaryslot_get(FullBin, Key, Hash, none),
|
||||
{ReturnedValue, _BLs, _Idx} = binaryslot_get(FullBin, Key, Hash),
|
||||
?assertMatch(ExpectedValue, ReturnedValue).
|
||||
% io:format(user, "Fetch success in ~w microseconds ~n",
|
||||
% [timer:now_diff(os:timestamp(), SW)]).
|
||||
|
@ -1387,16 +1580,12 @@ merge_test() ->
|
|||
?assertMatch(ExpLK2, LK2),
|
||||
ML1 = [{next, #manifest_entry{owner = P1}, FK1}],
|
||||
ML2 = [{next, #manifest_entry{owner = P2}, FK2}],
|
||||
{ok, P3, {{Rem1, Rem2}, FK3, LK3}} = sst_new("../test/",
|
||||
"level2_merge",
|
||||
ML1,
|
||||
ML2,
|
||||
false,
|
||||
2,
|
||||
N * 2),
|
||||
NewR = sst_new("../test/", "level2_merge", ML1, ML2, false, 2, N * 2),
|
||||
{ok, P3, {{Rem1, Rem2}, FK3, LK3}} = NewR,
|
||||
?assertMatch([], Rem1),
|
||||
?assertMatch([], Rem2),
|
||||
?assertMatch(true, FK3 == min(FK1, FK2)),
|
||||
io:format("LK1 ~w LK2 ~w LK3 ~w~n", [LK1, LK2, LK3]),
|
||||
?assertMatch(true, LK3 == max(LK1, LK2)),
|
||||
io:format(user,
|
||||
"Created and merged two files of size ~w in ~w microseconds~n",
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue